Skip to content

Commit

Permalink
some code refactor (#18633)
Browse files Browse the repository at this point in the history
1. refactor some code for the future code reuse
2. remove dummy code
3. remove stale stats related code

Approved by: @LeftHandCold, @triump2020
  • Loading branch information
XuPeng-SH committed Sep 8, 2024
1 parent d1d05f1 commit aa13822
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 232 deletions.
4 changes: 4 additions & 0 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,7 @@ func BuildQueryResultMetaPath(accountName, statementId string) string {
func BuildProfilePath(serviceTyp string, nodeId string, typ, name string) string {
return fmt.Sprintf("%s/%s_%s_%s_%s", ProfileDir, serviceTyp, nodeId, typ, name)
}

func IsFakePkName(name string) bool {
return name == FakePrimaryKeyColName
}
30 changes: 0 additions & 30 deletions pkg/objectio/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package objectio

import (
"bytes"
"context"
"fmt"
"sync"

"github.com/matrixorigin/matrixone/pkg/logutil"
Expand Down Expand Up @@ -74,8 +72,6 @@ type mataCacheKey [cacheKeyLen]byte

var metaCache *fifocache.Cache[mataCacheKey, []byte]
var onceInit sync.Once
var metaCacheStats hitStats
var metaCacheHitStats hitStats

func metaCacheSize() int64 {
v, err := mem.VirtualMemory()
Expand Down Expand Up @@ -117,21 +113,6 @@ func encodeCacheKey(name ObjectNameShort, cacheKeyType uint16) mataCacheKey {
return key
}

func ExportCacheStats() string {
var buf bytes.Buffer
hw, hwt := metaCacheHitStats.ExportW()
ht, htt := metaCacheHitStats.Export()
w, wt := metaCacheStats.ExportW()
t, tt := metaCacheStats.Export()

fmt.Fprintf(
&buf,
"MetaCacheWindow: %d/%d | %d/%d, MetaCacheTotal: %d/%d | %d/%d", hw, hwt, w, wt, ht, htt, t, tt,
)

return buf.String()
}

func LoadObjectMetaByExtent(
ctx context.Context,
name *ObjectName,
Expand All @@ -149,10 +130,6 @@ func LoadObjectMetaByExtent(
return
}
meta = obj.(ObjectMeta)
// metaCacheStats.Record(1, 1)
// if !prefetch {
// metaCacheHitStats.Record(1, 1)
// }
return
}
if extent.Length() == 0 {
Expand All @@ -170,10 +147,6 @@ func LoadObjectMetaByExtent(
}
meta = obj.(ObjectMeta)
metaCache.Set(key, v[:], int64(len(v)))
// metaCacheStats.Record(0, 1)
// if !prefetch {
// metaCacheHitStats.Record(0, 1)
// }
return
}

Expand All @@ -186,7 +159,6 @@ func FastLoadBF(
key := encodeCacheKey(*location.ShortName(), cacheKeyTypeBloomFilter)
v, ok := metaCache.Get(key)
if ok {
// metaCacheStats.Record(1, 1)
return v, nil
}
meta, err := FastLoadObjectMeta(ctx, &location, isPrefetch, fs)
Expand All @@ -205,7 +177,6 @@ func LoadBFWithMeta(
key := encodeCacheKey(*location.ShortName(), cacheKeyTypeBloomFilter)
v, ok := metaCache.Get(key)
if ok {
// metaCacheStats.Record(1, 1)
return v, nil
}
extent := meta.BlockHeader().BFExtent()
Expand All @@ -214,7 +185,6 @@ func LoadBFWithMeta(
return nil, err
}
metaCache.Set(key, bf, int64(len(bf)))
// metaCacheStats.Record(0, 1)
return bf, nil
}

Expand Down
112 changes: 0 additions & 112 deletions pkg/objectio/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
package objectio

import (
"bytes"
"fmt"
"time"

"github.com/matrixorigin/matrixone/pkg/util/metric/stats"
)

Expand All @@ -43,111 +39,3 @@ func (s *hitStats) ExportW() (hit, total int64) {
total = s.total.SwapW(0)
return
}

func (s *hitStats) ExportAll() (whit, wtotal int64, hit, total int64) {
whit = s.hit.SwapW(0)
wtotal = s.total.SwapW(0)
hit = s.hit.Swap(0)
total = s.total.Swap(0)
return
}

type Stats struct {
blockSelectivity hitStats
columnSelectivity hitStats
readFilterSelectivity hitStats
readDelCnt stats.Counter
readDelOpTotal stats.Counter
readDelRead stats.Counter
readDelBisect stats.Counter
}

func NewStats() *Stats {
return &Stats{}
}

func (s *Stats) RecordReadFilterSelectivity(hit, total int) {
s.readFilterSelectivity.Record(hit, total)
}

func (s *Stats) ExportReadFilterSelectivity() (
whit, wtotal int64, hit, total int64,
) {
whit, wtotal = s.readFilterSelectivity.ExportW()
if wtotal == 0 {
whit = 0
}
hit, total = s.readFilterSelectivity.Export()
return
}

func (s *Stats) RecordBlockSelectivity(hit, total int) {
s.blockSelectivity.Record(hit, total)
}

func (s *Stats) ExportBlockSelectivity() (
whit, wtotal int64,
) {
whit, wtotal, _, _ = s.blockSelectivity.ExportAll()
if wtotal == 0 {
whit = 0
}
return
}

func (s *Stats) RecordColumnSelectivity(hit, total int) {
s.columnSelectivity.Record(hit, total)
}

func (s *Stats) ExportColumnSelctivity() (
hit, total int64,
) {
hit, total, _, _ = s.columnSelectivity.ExportAll()
if total == 0 {
hit = 0
}
return
}

func (s *Stats) RecordReadDel(total, read, bisect time.Duration) {
s.readDelOpTotal.Add(int64(total))
s.readDelRead.Add(int64(read))
s.readDelBisect.Add(int64(bisect))
s.readDelCnt.Add(1)
}

func (s *Stats) ExportReadDel() (total, read, bisect time.Duration, cnt int64) {
total = time.Duration(s.readDelOpTotal.SwapW(0))
read = time.Duration(s.readDelRead.SwapW(0))
bisect = time.Duration(s.readDelBisect.SwapW(0))
cnt = s.readDelCnt.SwapW(0)
return
}

func (s *Stats) ExportString() string {
var w bytes.Buffer
whit, wtotal := s.ExportBlockSelectivity()
wrate, rate := 0.0, 0.0
if wtotal != 0 {
wrate = float64(whit) / float64(wtotal)
}
fmt.Fprintf(&w, "SelectivityStats: BLK[%d/%d=%0.4f] ", whit, wtotal, wrate)
whit, wtotal = s.ExportColumnSelctivity()
wrate = 0.0
if wtotal != 0 {
wrate = float64(whit) / float64(wtotal)
}
fmt.Fprintf(&w, "COL[%d/%d=%0.4f] ", whit, wtotal, wrate)
whit, wtotal, hit, total := s.ExportReadFilterSelectivity()
wrate = 0.0
if wtotal != 0 {
wrate = float64(whit) / float64(wtotal)
}
if total != 0 {
rate = float64(hit) / float64(total)
}
fmt.Fprintf(&w, "RDF[%d/%d=%0.4f,%d/%d=%0.4f]", whit, wtotal, wrate, hit, total, rate)
rtotal, rread, rbisect, rcnt := s.ExportReadDel()
fmt.Fprintf(&w, "RDD[%v/%v/%v/%v]", rtotal, rread, rbisect, rcnt)
return w.String()
}
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (t *cnMergeTask) PrepareNewWriter() *blockio.BlockWriter {
func (t *cnMergeTask) readblock(ctx context.Context, info *objectio.BlockInfo) (bat *batch.Batch, dels *nulls.Nulls, release func(), err error) {
// read data
bat, dels, release, err = blockio.BlockDataReadNoCopy(
ctx, "", info, t.ds, t.colseqnums, t.coltypes,
ctx, info, t.ds, t.colseqnums, t.coltypes,
t.snapshot, fileservice.Policy(0), t.proc.GetMPool(), t.fs)
if err != nil {
logutil.Infof("read block data failed: %v", err.Error())
Expand Down
10 changes: 1 addition & 9 deletions pkg/vm/engine/disttae/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
// -----------------------------------------------------------------

func (mixin *withFilterMixin) reset() {
mixin.filterState.evaluated = false
mixin.filterState.filter = objectio.BlockReadFilter{}
mixin.columns.pkPos = -1
mixin.columns.indexOfFirstSortedColumn = -1
Expand All @@ -71,7 +70,6 @@ func (mixin *withFilterMixin) tryUpdateColumns(cols []string) {
chit, ctotal := len(cols), len(mixin.tableDef.Cols)
v2.TaskSelColumnTotal.Add(float64(ctotal))
v2.TaskSelColumnHit.Add(float64(ctotal - chit))
blockio.RecordColumnSelectivity(mixin.proc.GetService(), chit, ctotal)

mixin.columns.seqnums = make([]uint16, len(cols))
mixin.columns.colTypes = make([]types.Type, len(cols))
Expand Down Expand Up @@ -107,13 +105,8 @@ func (mixin *withFilterMixin) tryUpdateColumns(cols []string) {
// use the search function to find the offset of the primary key.
// it returns the offset of the primary key in the pk vector.
// if the primary key is not found, it returns empty slice
mixin.filterState.evaluated = true
//mixin.filterState.filter = filter
mixin.filterState.seqnums = []uint16{mixin.columns.seqnums[mixin.columns.pkPos]}
mixin.filterState.colTypes = mixin.columns.colTypes[mixin.columns.pkPos : mixin.columns.pkPos+1]

// records how many blks one reader needs to read when having filter
//objectio.BlkReadStats.BlksByReaderStats.Record(1, blkCnt)
}
}

Expand Down Expand Up @@ -271,7 +264,7 @@ func NewReader(
}

blockFilter, err := engine_util.ConstructBlockPKFilter(
tableDef.Pkey.PkeyColName,
catalog.IsFakePkName(tableDef.Pkey.PkeyColName),
baseFilter,
)
if err != nil {
Expand Down Expand Up @@ -371,7 +364,6 @@ func (r *reader) Read(

err = blockio.BlockDataRead(
statsCtx,
r.withFilterMixin.proc.GetService(),
blkInfo,
r.source,
r.columns.seqnums,
Expand Down
6 changes: 4 additions & 2 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,6 @@ func (tbl *txnTable) rangesOnePart(
bhit, btotal := outBlocks.Len()-1, int(s3BlkCnt)
v2.TaskSelBlockTotal.Add(float64(btotal))
v2.TaskSelBlockHit.Add(float64(btotal - bhit))
blockio.RecordBlockSelectivity(proc.GetService(), bhit, btotal)
if btotal > 0 {
v2.TxnRangesSlowPathLoadObjCntHistogram.Observe(float64(loadObjCnt))
v2.TxnRangesSlowPathSelectedBlockCntHistogram.Observe(float64(bhit))
Expand Down Expand Up @@ -1919,7 +1918,10 @@ func (tbl *txnTable) PKPersistedBetween(
return nil, err
}

blockReadPKFilter, err := engine_util.ConstructBlockPKFilter(tbl.tableDef.Pkey.PkeyColName, basePKFilter)
blockReadPKFilter, err := engine_util.ConstructBlockPKFilter(
catalog.IsFakePkName(tbl.tableDef.Pkey.PkeyColName),
basePKFilter,
)
if err != nil {
return nil, err
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,21 +851,17 @@ type withFilterMixin struct {
columns struct {
seqnums []uint16
colTypes []types.Type
// colNulls []bool

pkPos int // -1 means no primary key in columns
indexOfFirstSortedColumn int
}

filterState struct {
evaluated bool
//point select for primary key
expr *plan.Expr
filter objectio.BlockReadFilter
seqnums []uint16 // seqnums of the columns in the filter
colTypes []types.Type
hasNull bool
record bool
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/vm/engine/engine_util/expr_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,12 +860,12 @@ func CompileFilterExpr(
return can, ok, nil
}
if isPK {
blkBf := bf.GetBloomFilter(uint32(blkIdx))
blkBfIdx := index.NewEmptyBloomFilter()
if err := index.DecodeBloomFilter(blkBfIdx, blkBf); err != nil {
var blkBF index.BloomFilter
buf := bf.GetBloomFilter(uint32(blkIdx))
if err := blkBF.Unmarshal(buf); err != nil {
return false, false, err
}
exist, err := blkBfIdx.MayContainsKey(vals[0])
exist, err := blkBF.MayContainsKey(vals[0])
if err != nil || !exist {
return false, false, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/vm/engine/engine_util/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,10 @@ func TestConstructBlockPKFilter(t *testing.T) {
}

for i := range basePKFilters {
blkPKFilter, err := ConstructBlockPKFilter("a", basePKFilters[i])
blkPKFilter, err := ConstructBlockPKFilter(
false,
basePKFilters[i],
)
require.NoError(t, err)

require.True(t, blkPKFilter.Valid)
Expand Down
Loading

0 comments on commit aa13822

Please sign in to comment.