From 48304fdb79f5d6486706f9a4cc965b83781c9c01 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 16 Sep 2024 20:18:15 +0800 Subject: [PATCH] improve eval deletes (#18816) improve eval deletes Approved by: @LeftHandCold, @triump2020 --- pkg/objectio/const.go | 26 ++++++- pkg/objectio/utils.go | 2 + pkg/vm/engine/disttae/datasource.go | 26 +++++-- pkg/vm/engine/tae/blockio/funcs.go | 9 +-- pkg/vm/engine/tae/blockio/read.go | 100 ++++++++++++++++++++------ pkg/vm/engine/tae/blockio/utils.go | 55 +++++++++++++- pkg/vm/engine/tae/compute/compare.go | 6 +- pkg/vm/engine/tae/index/zm.go | 8 +++ pkg/vm/engine/tae/logtail/snapshot.go | 6 +- 9 files changed, 199 insertions(+), 39 deletions(-) diff --git a/pkg/objectio/const.go b/pkg/objectio/const.go index 8a7256bbaba2..aae60c1954ca 100644 --- a/pkg/objectio/const.go +++ b/pkg/objectio/const.go @@ -15,6 +15,7 @@ package objectio import ( + "fmt" "math" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" @@ -33,8 +34,29 @@ const ( TombstoneAttr_Rowid_Idx = 0 TombstoneAttr_PK_Idx = 1 - // Tombstones created by CN has no commit ts column - TombstoneAttr_CommitTs_Idx = 2 + // Appendable + TombstoneAttr_A_PhyAddr_Idx = 2 + TombstoneAttr_A_CommitTs_Idx = 3 + TombstoneAttr_A_Abort_Idx = 4 + + // non-Appendable tn created + TombstoneAttr_NA_CommitTs_Idx = 2 + + TombstoneAttr_Rowid_SeqNum = TombstoneAttr_Rowid_Idx + TombstoneAttr_PK_SeqNum = TombstoneAttr_PK_Idx + + TombstoneAttr_A_PhyAddr_SeqNum = TombstoneAttr_A_PhyAddr_Idx + TombstoneAttr_CommitTs_SeqNum = SEQNUM_COMMITTS + TombstoneAttr_Abort_SeqNum = SEQNUM_ABORT ) const ZoneMapSize = index.ZMSize + +func GetTombstoneCommitTSAttrIdx(columnCnt uint16) uint16 { + if columnCnt == 3 { + return TombstoneAttr_NA_CommitTs_Idx + } else if columnCnt == 4 { + return TombstoneAttr_A_CommitTs_Idx + } + panic(fmt.Sprintf("invalid tombstone column count %d", columnCnt)) +} diff --git a/pkg/objectio/utils.go b/pkg/objectio/utils.go index 86e0bbbc5f12..64cadac9d734 100644 --- a/pkg/objectio/utils.go +++ b/pkg/objectio/utils.go @@ -31,10 +31,12 @@ import ( var ( RowidType types.Type + TSType types.Type ) func init() { RowidType = types.T_Rowid.ToType() + TSType = types.T_TS.ToType() } type CreateObjOpt struct { diff --git a/pkg/vm/engine/disttae/datasource.go b/pkg/vm/engine/disttae/datasource.go index 354d30b85648..ef91f8549aa9 100644 --- a/pkg/vm/engine/disttae/datasource.go +++ b/pkg/vm/engine/disttae/datasource.go @@ -1084,11 +1084,6 @@ func (ls *LocalDataSource) applyPStateTombstoneObjects( offsets []int64, deletedRows *nulls.Nulls, ) ([]int64, error) { - - //if ls.rc.SkipPStateDeletes { - // return offsets, nil - //} - if ls.pState.ApproxTombstoneObjectsNum() == 0 { return offsets, nil } @@ -1115,6 +1110,25 @@ func (ls *LocalDataSource) applyPStateTombstoneObjects( } }() + // PXU TODO: handle len(offsets) < 10 or 20, 30? + if len(offsets) == 1 { + rowid := objectio.NewRowid(&bid, uint32(offsets[0])) + deleted, err := blockio.IsRowDeleted( + ls.ctx, + &ls.snapshotTS, + rowid, + getTombstone, + ls.fs, + ) + if err != nil { + return nil, err + } + if deleted { + return nil, nil + } + return offsets, nil + } + if deletedRows == nil { deletedRows = &nulls.Nulls{} deletedRows.InitWithSize(8192) @@ -1220,7 +1234,7 @@ func (ls *LocalDataSource) batchApplyTombstoneObjects( for idx := 0; idx < int(obj.BlkCnt()) && len(rowIds) > len(deleted); idx++ { location = obj.ObjectStats.BlockLocation(uint16(idx), objectio.BlockMaxRows) - if loaded, release, err = blockio.ReadDeletes(ls.ctx, location, ls.fs, obj.GetCNCreated()); err != nil { + if loaded, _, release, err = blockio.ReadDeletes(ls.ctx, location, ls.fs, obj.GetCNCreated()); err != nil { return nil, err } diff --git a/pkg/vm/engine/tae/blockio/funcs.go b/pkg/vm/engine/tae/blockio/funcs.go index 2eb81c6fb980..d3ddd5213c21 100644 --- a/pkg/vm/engine/tae/blockio/funcs.go +++ b/pkg/vm/engine/tae/blockio/funcs.go @@ -37,14 +37,14 @@ func LoadColumnsData( location objectio.Location, m *mpool.MPool, policy fileservice.Policy, -) (bat *batch.Batch, release func(), err error) { +) (bat *batch.Batch, dataMeta objectio.ObjectDataMeta, release func(), err error) { name := location.Name() var meta objectio.ObjectMeta var ioVectors *fileservice.IOVector if meta, err = objectio.FastLoadObjectMeta(ctx, &location, false, fs); err != nil { return } - dataMeta := meta.MustGetMeta(metaType) + dataMeta = meta.MustGetMeta(metaType) if ioVectors, err = objectio.ReadOneBlock(ctx, &dataMeta, name.String(), location.ID(), cols, typs, m, fs, policy); err != nil { return } @@ -140,7 +140,7 @@ func LoadTombstoneColumns( location objectio.Location, m *mpool.MPool, policy fileservice.Policy, -) (bat *batch.Batch, release func(), err error) { +) (bat *batch.Batch, meta objectio.ObjectDataMeta, release func(), err error) { return LoadColumnsData(ctx, objectio.SchemaTombstone, cols, typs, fs, location, m, policy) } @@ -153,7 +153,8 @@ func LoadColumns( m *mpool.MPool, policy fileservice.Policy, ) (bat *batch.Batch, release func(), err error) { - return LoadColumnsData(ctx, objectio.SchemaData, cols, typs, fs, location, m, policy) + bat, _, release, err = LoadColumnsData(ctx, objectio.SchemaData, cols, typs, fs, location, m, policy) + return } // LoadColumns2 load columns data from file service for TN diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index fd299200670e..22776bce6d82 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -17,6 +17,7 @@ package blockio import ( "context" "math" + "sort" "time" "go.uber.org/zap" @@ -537,43 +538,62 @@ func ReadDeletes( deltaLoc objectio.Location, fs fileservice.FileService, isPersistedByCN bool, -) (bat *batch.Batch, release func(), err error) { +) (*batch.Batch, objectio.ObjectDataMeta, func(), error) { var cols []uint16 var typs []types.Type if isPersistedByCN { - cols = []uint16{0} - typs = []types.Type{types.T_Rowid.ToType()} + cols = []uint16{objectio.TombstoneAttr_Rowid_SeqNum} + typs = []types.Type{objectio.RowidType} } else { - cols = []uint16{0, objectio.SEQNUM_COMMITTS} - typs = []types.Type{types.T_Rowid.ToType(), types.T_TS.ToType()} + cols = []uint16{objectio.TombstoneAttr_Rowid_SeqNum, objectio.TombstoneAttr_CommitTs_SeqNum} + typs = []types.Type{objectio.RowidType, objectio.TSType} } - bat, release, err = LoadTombstoneColumns(ctx, cols, typs, fs, deltaLoc, nil, fileservice.Policy(0)) - return + return LoadTombstoneColumns( + ctx, cols, typs, fs, deltaLoc, nil, fileservice.Policy(0), + ) } func EvalDeleteMaskFromDNCreatedTombstones( - deletes *batch.Batch, ts types.TS, blockid *types.Blockid, + deletes *batch.Batch, + meta objectio.BlockObject, + ts types.TS, + blockid *types.Blockid, ) (rows *nulls.Bitmap) { if deletes == nil { return } rowids := vector.MustFixedColWithTypeCheck[types.Rowid](deletes.Vecs[0]) - tss := vector.MustFixedColWithTypeCheck[types.TS](deletes.Vecs[1]) - //aborts := deletes.Vecs[3] - start, end := FindStartEndOfBlockFromSortedRowids(rowids, blockid) - for i := end - 1; i >= start; i-- { - if tss[i].Greater(&ts) { - continue + noTSCheck := false + if end-start > 10 { + // fast path is true if the maxTS is less than the snapshotTS + // this means that all the rows between start and end are visible + idx := objectio.GetTombstoneCommitTSAttrIdx(meta.GetMetaColumnCount()) + noTSCheck = meta.MustGetColumn(idx).ZoneMap().FastLEValue(ts[:], 0) + } + if noTSCheck { + for i := end - 1; i >= start; i-- { + row := rowids[i].GetRowOffset() + if rows == nil { + rows = nulls.NewWithSize(int(row) + 1) + } + rows.Add(uint64(row)) } - row := rowids[i].GetRowOffset() - if rows == nil { - rows = nulls.NewWithSize(int(row) + 1) + } else { + tss := vector.MustFixedColWithTypeCheck[types.TS](deletes.Vecs[1]) + for i := end - 1; i >= start; i-- { + if tss[i].Greater(&ts) { + continue + } + row := rowids[i].GetRowOffset() + if rows == nil { + rows = nulls.NewWithSize(int(row) + 1) + } + rows.Add(uint64(row)) } - rows.Add(uint64(row)) } return @@ -629,6 +649,43 @@ func FindStartEndOfBlockFromSortedRowids(rowids []types.Rowid, id *types.Blockid return } +func IsRowDeletedByLocation( + ctx context.Context, + snapshotTS *types.TS, + row *objectio.Rowid, + location objectio.Location, + fs fileservice.FileService, + createdByCN bool, +) (deleted bool, err error) { + data, _, release, err := ReadDeletes(ctx, location, fs, createdByCN) + if err != nil { + return + } + defer release() + if data.RowCount() == 0 { + return + } + rowids := vector.MustFixedColNoTypeCheck[types.Rowid](data.Vecs[0]) + idx := sort.Search(len(rowids), func(i int) bool { + return rowids[i].GE(row) + }) + if createdByCN { + deleted = idx < len(rowids) + } else { + tss := vector.MustFixedColNoTypeCheck[types.TS](data.Vecs[1]) + for i := idx; i < len(rowids); i++ { + if !rowids[i].EQ(row) { + break + } + if tss[i].LessEq(snapshotTS) { + deleted = true + break + } + } + } + return +} + func FillBlockDeleteMask( ctx context.Context, snapshotTS types.TS, @@ -641,10 +698,11 @@ func FillBlockDeleteMask( rows *nulls.Nulls release func() persistedDeletes *batch.Batch + meta objectio.ObjectDataMeta ) if !location.IsEmpty() { - if persistedDeletes, release, err = ReadDeletes(ctx, location, fs, createdByCN); err != nil { + if persistedDeletes, meta, release, err = ReadDeletes(ctx, location, fs, createdByCN); err != nil { return nil, err } defer release() @@ -652,7 +710,9 @@ func FillBlockDeleteMask( if createdByCN { rows = EvalDeleteMaskFromCNCreatedTombstones(blockId, persistedDeletes) } else { - rows = EvalDeleteMaskFromDNCreatedTombstones(persistedDeletes, snapshotTS, &blockId) + rows = EvalDeleteMaskFromDNCreatedTombstones( + persistedDeletes, meta.GetBlockMeta(uint32(location.ID())), snapshotTS, &blockId, + ) } if rows != nil { diff --git a/pkg/vm/engine/tae/blockio/utils.go b/pkg/vm/engine/tae/blockio/utils.go index 66f4f7bc1890..630b3205b8bd 100644 --- a/pkg/vm/engine/tae/blockio/utils.go +++ b/pkg/vm/engine/tae/blockio/utils.go @@ -26,6 +26,56 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" ) +func IsRowDeleted( + ctx context.Context, + ts *types.TS, + row *types.Rowid, + getTombstoneFileFn func() (*objectio.ObjectStats, error), + fs fileservice.FileService, +) (bool, error) { + var isDeleted bool + loadedBlkCnt := 0 + onBlockSelectedFn := func(tombstoneObject *objectio.ObjectStats, pos int) (bool, error) { + if isDeleted { + return false, nil + } + var err error + var location objectio.ObjectLocation + tombstoneObject.BlockLocationTo(uint16(pos), objectio.BlockMaxRows, location[:]) + deleted, err := IsRowDeletedByLocation( + ctx, ts, row, location[:], fs, tombstoneObject.GetCNCreated(), + ) + if err != nil { + return false, err + } + loadedBlkCnt++ + // if deleted, stop searching + if deleted { + isDeleted = true + return false, nil + } + return true, nil + } + + tombstoneObjectCnt, skipObjectCnt, totalBlkCnt, err := CheckTombstoneFile( + ctx, row[:], getTombstoneFileFn, onBlockSelectedFn, fs, + ) + if err != nil { + return false, err + } + + v2.TxnReaderEachBLKLoadedTombstoneHistogram.Observe(float64(loadedBlkCnt)) + v2.TxnReaderScannedTotalTombstoneHistogram.Observe(float64(tombstoneObjectCnt)) + if tombstoneObjectCnt > 0 { + v2.TxnReaderTombstoneZMSelectivityHistogram.Observe(float64(skipObjectCnt) / float64(tombstoneObjectCnt)) + } + if totalBlkCnt > 0 { + v2.TxnReaderTombstoneBLSelectivityHistogram.Observe(float64(loadedBlkCnt) / float64(totalBlkCnt)) + } + + return isDeleted, nil +} + func GetTombstonesByBlockId( ctx context.Context, ts types.TS, @@ -36,9 +86,10 @@ func GetTombstonesByBlockId( ) (err error) { loadedBlkCnt := 0 onBlockSelectedFn := func(tombstoneObject *objectio.ObjectStats, pos int) (bool, error) { - location := tombstoneObject.BlockLocation(uint16(pos), objectio.BlockMaxRows) + var location objectio.ObjectLocation + tombstoneObject.BlockLocationTo(uint16(pos), objectio.BlockMaxRows, location[:]) if mask, err := FillBlockDeleteMask( - ctx, ts, blockId, location, fs, tombstoneObject.GetCNCreated(), + ctx, ts, blockId, location[:], fs, tombstoneObject.GetCNCreated(), ); err != nil { return false, err } else { diff --git a/pkg/vm/engine/tae/compute/compare.go b/pkg/vm/engine/tae/compute/compare.go index e1fa7561d255..393fe69dadfc 100644 --- a/pkg/vm/engine/tae/compute/compare.go +++ b/pkg/vm/engine/tae/compute/compare.go @@ -99,9 +99,9 @@ func Compare(a, b []byte, t types.T, scale1, scale2 int32) int { case types.T_enum: return CompareOrdered(types.DecodeEnum(a), types.DecodeEnum(b)) case types.T_TS: - // PXU FIXME Done - aa, bb := types.TS(a), types.TS(b) - return aa.Compare(&bb) + aa := (*types.TS)(unsafe.Pointer(&a[0])) + bb := (*types.TS)(unsafe.Pointer(&b[0])) + return aa.Compare(bb) case types.T_Rowid: // Row id is very special. it is not valena type but always be diff --git a/pkg/vm/engine/tae/index/zm.go b/pkg/vm/engine/tae/index/zm.go index 3a3100d477a4..3d9e1933e9ad 100644 --- a/pkg/vm/engine/tae/index/zm.go +++ b/pkg/vm/engine/tae/index/zm.go @@ -583,6 +583,14 @@ func (zm ZM) AnyBetween(lb, ub ZM) (res bool, ok bool) { return } +// no init check +// no type check +// max <= v +func (zm ZM) FastLEValue(v []byte, scale int32) (res bool) { + t := zm.GetType() + return compute.Compare(zm.GetMaxBuf(), v, t, zm.GetScale(), scale) <= 0 +} + func (zm ZM) FastIntersect(o ZM) (res bool) { t := zm.GetType() // zm.max >= o.min && zm.min <= v2.max diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index 384634df972c..c8e782cd5296 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -227,7 +227,7 @@ func (d *DeltaLocDataSource) getAndApplyTombstones( return nil, nil } logutil.Infof("deltaLoc: %v, id is %d", deltaLoc.String(), bid.Sequence()) - deletes, release, err := blockio.ReadDeletes(ctx, deltaLoc, d.fs, false) + deletes, meta, release, err := blockio.ReadDeletes(ctx, deltaLoc, d.fs, false) if err != nil { return nil, err } @@ -235,7 +235,9 @@ func (d *DeltaLocDataSource) getAndApplyTombstones( if ts.IsEmpty() { ts = d.ts } - return blockio.EvalDeleteMaskFromDNCreatedTombstones(deletes, ts, &bid), nil + return blockio.EvalDeleteMaskFromDNCreatedTombstones( + deletes, meta.GetBlockMeta(uint32(deltaLoc.ID())), ts, &bid, + ), nil } func (d *DeltaLocDataSource) SetOrderBy(orderby []*plan.OrderBySpec) {