diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 814818c537ed..314cb036f131 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -183,3 +183,7 @@ func BuildQueryResultMetaPath(accountName, statementId string) string { func BuildProfilePath(serviceTyp string, nodeId string, typ, name string) string { return fmt.Sprintf("%s/%s_%s_%s_%s", ProfileDir, serviceTyp, nodeId, typ, name) } + +func IsFakePkName(name string) bool { + return name == FakePrimaryKeyColName +} diff --git a/pkg/objectio/cache.go b/pkg/objectio/cache.go index 29583182f3be..7e2aea3b8bbc 100644 --- a/pkg/objectio/cache.go +++ b/pkg/objectio/cache.go @@ -15,9 +15,7 @@ package objectio import ( - "bytes" "context" - "fmt" "sync" "github.com/matrixorigin/matrixone/pkg/logutil" @@ -74,8 +72,6 @@ type mataCacheKey [cacheKeyLen]byte var metaCache *fifocache.Cache[mataCacheKey, []byte] var onceInit sync.Once -var metaCacheStats hitStats -var metaCacheHitStats hitStats func metaCacheSize() int64 { v, err := mem.VirtualMemory() @@ -117,21 +113,6 @@ func encodeCacheKey(name ObjectNameShort, cacheKeyType uint16) mataCacheKey { return key } -func ExportCacheStats() string { - var buf bytes.Buffer - hw, hwt := metaCacheHitStats.ExportW() - ht, htt := metaCacheHitStats.Export() - w, wt := metaCacheStats.ExportW() - t, tt := metaCacheStats.Export() - - fmt.Fprintf( - &buf, - "MetaCacheWindow: %d/%d | %d/%d, MetaCacheTotal: %d/%d | %d/%d", hw, hwt, w, wt, ht, htt, t, tt, - ) - - return buf.String() -} - func LoadObjectMetaByExtent( ctx context.Context, name *ObjectName, @@ -149,10 +130,6 @@ func LoadObjectMetaByExtent( return } meta = obj.(ObjectMeta) - // metaCacheStats.Record(1, 1) - // if !prefetch { - // metaCacheHitStats.Record(1, 1) - // } return } if extent.Length() == 0 { @@ -170,10 +147,6 @@ func LoadObjectMetaByExtent( } meta = obj.(ObjectMeta) metaCache.Set(key, v[:], int64(len(v))) - // metaCacheStats.Record(0, 1) - // if !prefetch { - // metaCacheHitStats.Record(0, 1) - // } return } @@ -186,7 +159,6 @@ func FastLoadBF( key := encodeCacheKey(*location.ShortName(), cacheKeyTypeBloomFilter) v, ok := metaCache.Get(key) if ok { - // metaCacheStats.Record(1, 1) return v, nil } meta, err := FastLoadObjectMeta(ctx, &location, isPrefetch, fs) @@ -205,7 +177,6 @@ func LoadBFWithMeta( key := encodeCacheKey(*location.ShortName(), cacheKeyTypeBloomFilter) v, ok := metaCache.Get(key) if ok { - // metaCacheStats.Record(1, 1) return v, nil } extent := meta.BlockHeader().BFExtent() @@ -214,7 +185,6 @@ func LoadBFWithMeta( return nil, err } metaCache.Set(key, bf, int64(len(bf))) - // metaCacheStats.Record(0, 1) return bf, nil } diff --git a/pkg/objectio/stats.go b/pkg/objectio/stats.go index 4ed2428f913a..d21407e7920f 100644 --- a/pkg/objectio/stats.go +++ b/pkg/objectio/stats.go @@ -15,10 +15,6 @@ package objectio import ( - "bytes" - "fmt" - "time" - "github.com/matrixorigin/matrixone/pkg/util/metric/stats" ) @@ -43,111 +39,3 @@ func (s *hitStats) ExportW() (hit, total int64) { total = s.total.SwapW(0) return } - -func (s *hitStats) ExportAll() (whit, wtotal int64, hit, total int64) { - whit = s.hit.SwapW(0) - wtotal = s.total.SwapW(0) - hit = s.hit.Swap(0) - total = s.total.Swap(0) - return -} - -type Stats struct { - blockSelectivity hitStats - columnSelectivity hitStats - readFilterSelectivity hitStats - readDelCnt stats.Counter - readDelOpTotal stats.Counter - readDelRead stats.Counter - readDelBisect stats.Counter -} - -func NewStats() *Stats { - return &Stats{} -} - -func (s *Stats) RecordReadFilterSelectivity(hit, total int) { - s.readFilterSelectivity.Record(hit, total) -} - -func (s *Stats) ExportReadFilterSelectivity() ( - whit, wtotal int64, hit, total int64, -) { - whit, wtotal = s.readFilterSelectivity.ExportW() - if wtotal == 0 { - whit = 0 - } - hit, total = s.readFilterSelectivity.Export() - return -} - -func (s *Stats) RecordBlockSelectivity(hit, total int) { - s.blockSelectivity.Record(hit, total) -} - -func (s *Stats) ExportBlockSelectivity() ( - whit, wtotal int64, -) { - whit, wtotal, _, _ = s.blockSelectivity.ExportAll() - if wtotal == 0 { - whit = 0 - } - return -} - -func (s *Stats) RecordColumnSelectivity(hit, total int) { - s.columnSelectivity.Record(hit, total) -} - -func (s *Stats) ExportColumnSelctivity() ( - hit, total int64, -) { - hit, total, _, _ = s.columnSelectivity.ExportAll() - if total == 0 { - hit = 0 - } - return -} - -func (s *Stats) RecordReadDel(total, read, bisect time.Duration) { - s.readDelOpTotal.Add(int64(total)) - s.readDelRead.Add(int64(read)) - s.readDelBisect.Add(int64(bisect)) - s.readDelCnt.Add(1) -} - -func (s *Stats) ExportReadDel() (total, read, bisect time.Duration, cnt int64) { - total = time.Duration(s.readDelOpTotal.SwapW(0)) - read = time.Duration(s.readDelRead.SwapW(0)) - bisect = time.Duration(s.readDelBisect.SwapW(0)) - cnt = s.readDelCnt.SwapW(0) - return -} - -func (s *Stats) ExportString() string { - var w bytes.Buffer - whit, wtotal := s.ExportBlockSelectivity() - wrate, rate := 0.0, 0.0 - if wtotal != 0 { - wrate = float64(whit) / float64(wtotal) - } - fmt.Fprintf(&w, "SelectivityStats: BLK[%d/%d=%0.4f] ", whit, wtotal, wrate) - whit, wtotal = s.ExportColumnSelctivity() - wrate = 0.0 - if wtotal != 0 { - wrate = float64(whit) / float64(wtotal) - } - fmt.Fprintf(&w, "COL[%d/%d=%0.4f] ", whit, wtotal, wrate) - whit, wtotal, hit, total := s.ExportReadFilterSelectivity() - wrate = 0.0 - if wtotal != 0 { - wrate = float64(whit) / float64(wtotal) - } - if total != 0 { - rate = float64(hit) / float64(total) - } - fmt.Fprintf(&w, "RDF[%d/%d=%0.4f,%d/%d=%0.4f]", whit, wtotal, wrate, hit, total, rate) - rtotal, rread, rbisect, rcnt := s.ExportReadDel() - fmt.Fprintf(&w, "RDD[%v/%v/%v/%v]", rtotal, rread, rbisect, rcnt) - return w.String() -} diff --git a/pkg/vm/engine/disttae/merge.go b/pkg/vm/engine/disttae/merge.go index 2052dfc20f44..6d9e0d5662d1 100644 --- a/pkg/vm/engine/disttae/merge.go +++ b/pkg/vm/engine/disttae/merge.go @@ -257,7 +257,7 @@ func (t *cnMergeTask) PrepareNewWriter() *blockio.BlockWriter { func (t *cnMergeTask) readblock(ctx context.Context, info *objectio.BlockInfo) (bat *batch.Batch, dels *nulls.Nulls, release func(), err error) { // read data bat, dels, release, err = blockio.BlockDataReadNoCopy( - ctx, "", info, t.ds, t.colseqnums, t.coltypes, + ctx, info, t.ds, t.colseqnums, t.coltypes, t.snapshot, fileservice.Policy(0), t.proc.GetMPool(), t.fs) if err != nil { logutil.Infof("read block data failed: %v", err.Error()) diff --git a/pkg/vm/engine/disttae/reader.go b/pkg/vm/engine/disttae/reader.go index 72806c9bbb6f..cdc094725b06 100644 --- a/pkg/vm/engine/disttae/reader.go +++ b/pkg/vm/engine/disttae/reader.go @@ -46,7 +46,6 @@ import ( // ----------------------------------------------------------------- func (mixin *withFilterMixin) reset() { - mixin.filterState.evaluated = false mixin.filterState.filter = objectio.BlockReadFilter{} mixin.columns.pkPos = -1 mixin.columns.indexOfFirstSortedColumn = -1 @@ -71,7 +70,6 @@ func (mixin *withFilterMixin) tryUpdateColumns(cols []string) { chit, ctotal := len(cols), len(mixin.tableDef.Cols) v2.TaskSelColumnTotal.Add(float64(ctotal)) v2.TaskSelColumnHit.Add(float64(ctotal - chit)) - blockio.RecordColumnSelectivity(mixin.proc.GetService(), chit, ctotal) mixin.columns.seqnums = make([]uint16, len(cols)) mixin.columns.colTypes = make([]types.Type, len(cols)) @@ -107,13 +105,8 @@ func (mixin *withFilterMixin) tryUpdateColumns(cols []string) { // use the search function to find the offset of the primary key. // it returns the offset of the primary key in the pk vector. // if the primary key is not found, it returns empty slice - mixin.filterState.evaluated = true - //mixin.filterState.filter = filter mixin.filterState.seqnums = []uint16{mixin.columns.seqnums[mixin.columns.pkPos]} mixin.filterState.colTypes = mixin.columns.colTypes[mixin.columns.pkPos : mixin.columns.pkPos+1] - - // records how many blks one reader needs to read when having filter - //objectio.BlkReadStats.BlksByReaderStats.Record(1, blkCnt) } } @@ -271,7 +264,7 @@ func NewReader( } blockFilter, err := engine_util.ConstructBlockPKFilter( - tableDef.Pkey.PkeyColName, + catalog.IsFakePkName(tableDef.Pkey.PkeyColName), baseFilter, ) if err != nil { @@ -371,7 +364,6 @@ func (r *reader) Read( err = blockio.BlockDataRead( statsCtx, - r.withFilterMixin.proc.GetService(), blkInfo, r.source, r.columns.seqnums, diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 6c43ae57c8e2..d00cd5c1e3cd 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -836,7 +836,6 @@ func (tbl *txnTable) rangesOnePart( bhit, btotal := outBlocks.Len()-1, int(s3BlkCnt) v2.TaskSelBlockTotal.Add(float64(btotal)) v2.TaskSelBlockHit.Add(float64(btotal - bhit)) - blockio.RecordBlockSelectivity(proc.GetService(), bhit, btotal) if btotal > 0 { v2.TxnRangesSlowPathLoadObjCntHistogram.Observe(float64(loadObjCnt)) v2.TxnRangesSlowPathSelectedBlockCntHistogram.Observe(float64(bhit)) @@ -1919,7 +1918,10 @@ func (tbl *txnTable) PKPersistedBetween( return nil, err } - blockReadPKFilter, err := engine_util.ConstructBlockPKFilter(tbl.tableDef.Pkey.PkeyColName, basePKFilter) + blockReadPKFilter, err := engine_util.ConstructBlockPKFilter( + catalog.IsFakePkName(tbl.tableDef.Pkey.PkeyColName), + basePKFilter, + ) if err != nil { return nil, err } diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 1bcaf6d72a6a..3172493b83fb 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -851,21 +851,17 @@ type withFilterMixin struct { columns struct { seqnums []uint16 colTypes []types.Type - // colNulls []bool pkPos int // -1 means no primary key in columns indexOfFirstSortedColumn int } filterState struct { - evaluated bool //point select for primary key expr *plan.Expr filter objectio.BlockReadFilter seqnums []uint16 // seqnums of the columns in the filter colTypes []types.Type - hasNull bool - record bool } } diff --git a/pkg/vm/engine/engine_util/expr_filter.go b/pkg/vm/engine/engine_util/expr_filter.go index f546063da33a..889437d8f900 100644 --- a/pkg/vm/engine/engine_util/expr_filter.go +++ b/pkg/vm/engine/engine_util/expr_filter.go @@ -860,12 +860,12 @@ func CompileFilterExpr( return can, ok, nil } if isPK { - blkBf := bf.GetBloomFilter(uint32(blkIdx)) - blkBfIdx := index.NewEmptyBloomFilter() - if err := index.DecodeBloomFilter(blkBfIdx, blkBf); err != nil { + var blkBF index.BloomFilter + buf := bf.GetBloomFilter(uint32(blkIdx)) + if err := blkBF.Unmarshal(buf); err != nil { return false, false, err } - exist, err := blkBfIdx.MayContainsKey(vals[0]) + exist, err := blkBF.MayContainsKey(vals[0]) if err != nil || !exist { return false, false, err } diff --git a/pkg/vm/engine/engine_util/filter_test.go b/pkg/vm/engine/engine_util/filter_test.go index 5e84bf87fa85..7ded3ca51e7a 100644 --- a/pkg/vm/engine/engine_util/filter_test.go +++ b/pkg/vm/engine/engine_util/filter_test.go @@ -791,7 +791,10 @@ func TestConstructBlockPKFilter(t *testing.T) { } for i := range basePKFilters { - blkPKFilter, err := ConstructBlockPKFilter("a", basePKFilters[i]) + blkPKFilter, err := ConstructBlockPKFilter( + false, + basePKFilters[i], + ) require.NoError(t, err) require.True(t, blkPKFilter.Valid) diff --git a/pkg/vm/engine/engine_util/filter_util.go b/pkg/vm/engine/engine_util/filter_util.go new file mode 100644 index 000000000000..adb61d13652c --- /dev/null +++ b/pkg/vm/engine/engine_util/filter_util.go @@ -0,0 +1,169 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package engine_util + +/* DONT remove me, will be used later + +import ( + "context" + "sort" + + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" +) + +// Either len(val) == 0 or vec == nil +// inVec should be sorted +func CompilePrimaryKeyEqualFilter( + ctx context.Context, + val []byte, + inVec *vector.Vector, + colSeqnum uint16, + isFakePK bool, + skipBloomFilter bool, + fs fileservice.FileService, +) ( + fastFilterOp FastFilterOp, + loadOp LoadOp, + objectFilterOp ObjectFilterOp, + blockFilterOp BlockFilterOp, + seekOp SeekFirstBlockOp, + err error, +) { + if skipBloomFilter { + loadOp = loadMetadataOnlyOpFactory(fs) + } else { + loadOp = loadMetadataAndBFOpFactory(fs) + } + + // Here process the pk in filter + if inVec != nil { + if !isFakePK { + fastFilterOp = func(obj objectio.ObjectStats) (bool, error) { + if obj.ZMIsEmpty() { + return true, nil + } + return obj.SortKeyZoneMap().AnyIn(inVec), nil + } + } + + objectFilterOp = func(meta objectio.ObjectMeta, _ objectio.BloomFilter) (bool, error) { + if !isFakePK { + return true, nil + } + dataMeta := meta.MustDataMeta() + return dataMeta.MustGetColumn(colSeqnum).ZoneMap().AnyIn(inVec), nil + } + + blockFilterOp = func( + blkIdx int, blkMeta objectio.BlockObject, bf objectio.BloomFilter, + ) (bool, bool, error) { + // TODO: support skipFollowing + zm := blkMeta.MustGetColumn(colSeqnum).ZoneMap() + if !zm.AnyIn(inVec) { + return false, false, nil + } + + if skipBloomFilter || bf.Size() == 0 { + return false, true, nil + } + + buf := bf.GetBloomFilter(uint32(blkIdx)) + var blkBF index.BloomFilter + if err := blkBF.Unmarshal(buf); err != nil { + return false, false, err + } + lb, ub := zm.SubVecIn(inVec) + if exist := blkBF.MayContainsAny( + inVec, lb, ub, + ); !exist { + return false, false, nil + } + return false, true, nil + } + + return + } + + // Here process the pk equal filter + + // for non-fake PK, we can use the object stats sort key zone map + // to filter as the fastFilterOp + if !isFakePK { + fastFilterOp = func(obj objectio.ObjectStats) (bool, error) { + if obj.ZMIsEmpty() { + return true, nil + } + return obj.SortKeyZoneMap().ContainsKey(val), nil + } + } + + objectFilterOp = func(meta objectio.ObjectMeta, _ objectio.BloomFilter) (bool, error) { + if !isFakePK { + return true, nil + } + dataMeta := meta.MustDataMeta() + return dataMeta.MustGetColumn(colSeqnum).ZoneMap().ContainsKey(val), nil + } + + blockFilterOp = func( + blkIdx int, blkMeta objectio.BlockObject, bf objectio.BloomFilter, + ) (bool, bool, error) { + var ( + skipFollowing, ok bool + ) + zm := blkMeta.MustGetColumn(colSeqnum).ZoneMap() + if isFakePK { + skipFollowing = false + ok = zm.ContainsKey(val) + } else { + skipFollowing = !zm.AnyLEByValue(val) + if skipFollowing { + ok = false + } else { + ok = zm.ContainsKey(val) + } + } + if !ok || skipBloomFilter || bf.Size() == 0 { + return skipFollowing, ok, nil + } + + // check bloom filter here + blkBFBuf := bf.GetBloomFilter(uint32(blkIdx)) + var blkBF index.BloomFilter + if err := blkBF.Unmarshal(blkBFBuf); err != nil { + return false, false, err + } + exist, err := blkBF.MayContainsKey(val) + if err != nil || !exist { + return false, false, err + } + + return false, true, nil + } + if !isFakePK { + seekOp = func(meta objectio.ObjectDataMeta) int { + blockCnt := int(meta.BlockCount()) + blkIdx := sort.Search(blockCnt, func(i int) bool { + return meta.GetBlockMeta(uint32(i)).MustGetColumn(colSeqnum).ZoneMap().AnyGEByValue(val) + }) + return blkIdx + } + } + return +} +*/ diff --git a/pkg/vm/engine/engine_util/pk_filter.go b/pkg/vm/engine/engine_util/pk_filter.go index 379319f2a066..55eb72bc646b 100644 --- a/pkg/vm/engine/engine_util/pk_filter.go +++ b/pkg/vm/engine/engine_util/pk_filter.go @@ -17,7 +17,6 @@ package engine_util import ( "bytes" - "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" @@ -25,8 +24,30 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/process" ) +/* Don't remove me. will be used lated +func DirectConstructBlockPKFilter( + isFakePK bool, + val []byte, + inVec *vector.Vector, + oid types.T, +) (f objectio.BlockReadFilter, err error) { + var base BasePKFilter + if inVec != nil { + base.Op = function.IN + base.Vec = inVec + base.Oid = oid + } else { + base.Op = function.EQUAL + base.LB = val + base.Oid = oid + } + base.Valid = true + return ConstructBlockPKFilter(isFakePK, base) +} +*/ + func ConstructBlockPKFilter( - pkName string, + isFakePK bool, basePKFilter BasePKFilter, ) (f objectio.BlockReadFilter, err error) { if !basePKFilter.Valid { @@ -36,7 +57,7 @@ func ConstructBlockPKFilter( var readFilter objectio.BlockReadFilter var sortedSearchFunc, unSortedSearchFunc func(*vector.Vector) []int64 - readFilter.HasFakePK = pkName == catalog.FakePrimaryKeyColName + readFilter.HasFakePK = isFakePK switch basePKFilter.Op { case function.EQUAL: diff --git a/pkg/vm/engine/tae/blockio/pipeline.go b/pkg/vm/engine/tae/blockio/pipeline.go index 68cfdbd4168d..37928889c44c 100644 --- a/pkg/vm/engine/tae/blockio/pipeline.go +++ b/pkg/vm/engine/tae/blockio/pipeline.go @@ -234,7 +234,6 @@ type IoPipeline struct { } stats struct { - selectivityStats *objectio.Stats prefetchDropStats stats.Counter } printer *stopper.Stopper @@ -286,10 +285,6 @@ func (p *IoPipeline) fillDefaults() { p.jobFactory = jobFactory } - if p.stats.selectivityStats == nil { - p.stats.selectivityStats = objectio.NewStats() - } - if p.sensors.prefetchDepth == nil { name := utils.MakeSensorName("IO", "PrefetchDepth") sensor := utils.NewNumericSensor[int64]( @@ -481,13 +476,6 @@ func (p *IoPipeline) onWait(jobs ...any) { func (p *IoPipeline) crontask(ctx context.Context) { hb := w.NewHeartBeaterWithFunc(time.Second*10, func() { - logutil.Info(p.stats.selectivityStats.ExportString()) - // logutil.Info(p.sensors.prefetchDepth.String()) - // wdrops := p.stats.prefetchDropStats.SwapW(0) - // if wdrops > 0 { - // logutil.Infof("PrefetchDropStats: %d", wdrops) - // } - // logutil.Info(objectio.ExportCacheStats()) }, nil) hb.Start() <-ctx.Done() diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index d3c628b1f93b..766ee00a3c69 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -55,7 +55,6 @@ func removeIf[T any](data []T, pred func(t T) bool) []T { func ReadDataByFilter( ctx context.Context, tableName string, - sid string, info *objectio.BlockInfo, ds engine.DataSource, columns []uint16, @@ -90,7 +89,6 @@ func ReadDataByFilter( // BlockDataReadNoCopy only read block data from storage, don't apply deletes. func BlockDataReadNoCopy( ctx context.Context, - sid string, info *objectio.BlockInfo, ds engine.DataSource, columns []uint16, @@ -154,7 +152,6 @@ func BlockDataReadNoCopy( // BlockDataRead only read block data from storage, don't apply deletes. func BlockDataRead( ctx context.Context, - sid string, info *objectio.BlockInfo, ds engine.DataSource, columns []uint16, @@ -187,17 +184,14 @@ func BlockDataRead( if searchFunc != nil { if sels, err = ReadDataByFilter( - ctx, tableName, sid, info, ds, filterSeqnums, filterColTypes, + ctx, tableName, info, ds, filterSeqnums, filterColTypes, types.TimestampToTS(ts), searchFunc, mp, fs, ); err != nil { return err } v2.TaskSelReadFilterTotal.Inc() if len(sels) == 0 { - RecordReadFilterSelectivity(sid, 1, 1) v2.TaskSelReadFilterHit.Inc() - } else { - RecordReadFilterSelectivity(sid, 0, 1) } if len(sels) == 0 { @@ -206,7 +200,7 @@ func BlockDataRead( } err = BlockDataReadInner( - ctx, sid, info, ds, columns, colTypes, + ctx, info, ds, columns, colTypes, types.TimestampToTS(ts), sels, policy, bat, mp, fs, ) if err != nil { @@ -270,7 +264,6 @@ func windowCNBatch(bat *batch.Batch, start, end uint64) error { func BlockDataReadBackup( ctx context.Context, - sid string, info *objectio.BlockInfo, ds engine.DataSource, ts types.TS, @@ -315,7 +308,6 @@ func BlockDataReadBackup( // BlockDataReadInner only read data,don't apply deletes. func BlockDataReadInner( ctx context.Context, - sid string, info *objectio.BlockInfo, ds engine.DataSource, columns []uint16, @@ -612,38 +604,6 @@ func EvalDeleteRowsByTimestampForDeletesPersistedByCN( return } -func RecordReadDel( - sid string, - total, read, bisect time.Duration, -) { - MustGetPipeline(sid).stats.selectivityStats.RecordReadDel(total, read, bisect) -} - -func RecordReadFilterSelectivity( - sid string, - hit, total int, -) { - MustGetPipeline(sid).stats.selectivityStats.RecordReadFilterSelectivity(hit, total) -} - -func RecordBlockSelectivity( - sid string, - hit, total int, -) { - MustGetPipeline(sid).stats.selectivityStats.RecordBlockSelectivity(hit, total) -} - -func RecordColumnSelectivity( - sid string, - hit, total int, -) { - MustGetPipeline(sid).stats.selectivityStats.RecordColumnSelectivity(hit, total) -} - -func ExportSelectivityString(sid string) string { - return MustGetPipeline(sid).stats.selectivityStats.ExportString() -} - func FindIntervalForBlock(rowids []types.Rowid, id *types.Blockid) (start int, end int) { lowRowid := objectio.NewRowid(id, 0) highRowid := objectio.NewRowid(id, math.MaxUint32) diff --git a/pkg/vm/engine/tae/catalog/schema.go b/pkg/vm/engine/tae/catalog/schema.go index 870ba5b839b7..6e4bee7e749e 100644 --- a/pkg/vm/engine/tae/catalog/schema.go +++ b/pkg/vm/engine/tae/catalog/schema.go @@ -42,10 +42,6 @@ func i82bool(v int8) bool { return v == 1 } -func IsFakePkName(name string) bool { - return name == pkgcatalog.FakePrimaryKeyColName -} - const ( COLIDX_COMMITS = objectio.SEQNUM_COMMITTS ) @@ -913,7 +909,7 @@ func (s *Schema) Finalize(withoutPhyAddr bool) (err error) { } names[def.Name] = true // Fake pk - if IsFakePkName(def.Name) { + if pkgcatalog.IsFakePkName(def.Name) { def.FakePK = true def.SortKey = false def.SortIdx = -1 diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 3421b0eb1f57..1335c50f1e89 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -4233,7 +4233,7 @@ func TestBlockRead(t *testing.T) { } b1 := buildBatch(colTyps) err = blockio.BlockDataReadInner( - context.Background(), "", info, ds, colIdxs, colTyps, + context.Background(), info, ds, colIdxs, colTyps, beforeDel, nil, fileservice.Policy(0), b1, pool, fs, ) assert.NoError(t, err) @@ -4244,7 +4244,7 @@ func TestBlockRead(t *testing.T) { b2 := buildBatch(colTyps) err = blockio.BlockDataReadInner( - context.Background(), "", info, ds, colIdxs, colTyps, + context.Background(), info, ds, colIdxs, colTyps, afterFirstDel, nil, fileservice.Policy(0), b2, pool, fs, ) assert.NoError(t, err) @@ -4254,7 +4254,7 @@ func TestBlockRead(t *testing.T) { b3 := buildBatch(colTyps) err = blockio.BlockDataReadInner( - context.Background(), "", info, ds, colIdxs, colTyps, + context.Background(), info, ds, colIdxs, colTyps, afterSecondDel, nil, fileservice.Policy(0), b3, pool, fs, ) assert.NoError(t, err) @@ -4263,7 +4263,7 @@ func TestBlockRead(t *testing.T) { // read rowid column only b4 := buildBatch([]types.Type{types.T_Rowid.ToType()}) err = blockio.BlockDataReadInner( - context.Background(), "", info, + context.Background(), info, ds, []uint16{2}, []types.Type{types.T_Rowid.ToType()}, @@ -4277,7 +4277,7 @@ func TestBlockRead(t *testing.T) { //info.Appendable = false b5 := buildBatch([]types.Type{types.T_Rowid.ToType()}) err = blockio.BlockDataReadInner( - context.Background(), "", info, + context.Background(), info, ds, []uint16{2}, []types.Type{types.T_Rowid.ToType()}, afterSecondDel, nil, fileservice.Policy(0), b5, pool, fs, diff --git a/pkg/vm/engine/tae/index/filter.go b/pkg/vm/engine/tae/index/filter.go index c932e6e86eaa..54d37e18b2e9 100644 --- a/pkg/vm/engine/tae/index/filter.go +++ b/pkg/vm/engine/tae/index/filter.go @@ -49,6 +49,8 @@ func NewEmptyBloomFilterWithType(t uint8) StaticFilter { } } +type BloomFilter = bloomFilter + func NewEmptyBloomFilter() StaticFilter { return &bloomFilter{} } diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index 513b7173f19e..17989b1e65da 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -569,7 +569,7 @@ func ReWriteCheckpointAndBlockFromKey( BlockID: *objectio.BuildObjectBlockid(name, uint16(0)), MetaLoc: objectio.ObjectLocation(metaLoc), } - bat, sortKey, err := blockio.BlockDataReadBackup(ctx, sid, &blk, ds, ts, fs) + bat, sortKey, err := blockio.BlockDataReadBackup(ctx, &blk, ds, ts, fs) if err != nil { return true, err } diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index b5925a415434..3b343b8542a9 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -484,7 +484,7 @@ func (sm *SnapshotMeta) GetSnapshot(ctx context.Context, sid string, fs fileserv bat := buildBatch() defer bat.Clean(mp) err := blockio.BlockDataRead( - ctx, sid, &blk, ds, idxes, colTypes, checkpointTS.ToTimestamp(), + ctx, &blk, ds, idxes, colTypes, checkpointTS.ToTimestamp(), nil, nil, objectio.BlockReadFilter{}, fileservice.Policy(0), "", bat, mp, fs, ) if err != nil {