Skip to content

Commit

Permalink
improve eval deletes (#18816)
Browse files Browse the repository at this point in the history
improve eval deletes

Approved by: @LeftHandCold, @triump2020
  • Loading branch information
XuPeng-SH committed Sep 16, 2024
1 parent 1bf8b00 commit 48304fd
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 39 deletions.
26 changes: 24 additions & 2 deletions pkg/objectio/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package objectio

import (
"fmt"
"math"

"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
Expand All @@ -33,8 +34,29 @@ const (
TombstoneAttr_Rowid_Idx = 0
TombstoneAttr_PK_Idx = 1

// Tombstones created by CN has no commit ts column
TombstoneAttr_CommitTs_Idx = 2
// Appendable
TombstoneAttr_A_PhyAddr_Idx = 2
TombstoneAttr_A_CommitTs_Idx = 3
TombstoneAttr_A_Abort_Idx = 4

// non-Appendable tn created
TombstoneAttr_NA_CommitTs_Idx = 2

TombstoneAttr_Rowid_SeqNum = TombstoneAttr_Rowid_Idx
TombstoneAttr_PK_SeqNum = TombstoneAttr_PK_Idx

TombstoneAttr_A_PhyAddr_SeqNum = TombstoneAttr_A_PhyAddr_Idx
TombstoneAttr_CommitTs_SeqNum = SEQNUM_COMMITTS
TombstoneAttr_Abort_SeqNum = SEQNUM_ABORT
)

const ZoneMapSize = index.ZMSize

func GetTombstoneCommitTSAttrIdx(columnCnt uint16) uint16 {
if columnCnt == 3 {
return TombstoneAttr_NA_CommitTs_Idx
} else if columnCnt == 4 {
return TombstoneAttr_A_CommitTs_Idx
}
panic(fmt.Sprintf("invalid tombstone column count %d", columnCnt))
}
2 changes: 2 additions & 0 deletions pkg/objectio/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (

var (
RowidType types.Type
TSType types.Type
)

func init() {
RowidType = types.T_Rowid.ToType()
TSType = types.T_TS.ToType()
}

type CreateObjOpt struct {
Expand Down
26 changes: 20 additions & 6 deletions pkg/vm/engine/disttae/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,11 +1084,6 @@ func (ls *LocalDataSource) applyPStateTombstoneObjects(
offsets []int64,
deletedRows *nulls.Nulls,
) ([]int64, error) {

//if ls.rc.SkipPStateDeletes {
// return offsets, nil
//}

if ls.pState.ApproxTombstoneObjectsNum() == 0 {
return offsets, nil
}
Expand All @@ -1115,6 +1110,25 @@ func (ls *LocalDataSource) applyPStateTombstoneObjects(
}
}()

// PXU TODO: handle len(offsets) < 10 or 20, 30?
if len(offsets) == 1 {
rowid := objectio.NewRowid(&bid, uint32(offsets[0]))
deleted, err := blockio.IsRowDeleted(
ls.ctx,
&ls.snapshotTS,
rowid,
getTombstone,
ls.fs,
)
if err != nil {
return nil, err
}
if deleted {
return nil, nil
}
return offsets, nil
}

if deletedRows == nil {
deletedRows = &nulls.Nulls{}
deletedRows.InitWithSize(8192)
Expand Down Expand Up @@ -1220,7 +1234,7 @@ func (ls *LocalDataSource) batchApplyTombstoneObjects(
for idx := 0; idx < int(obj.BlkCnt()) && len(rowIds) > len(deleted); idx++ {
location = obj.ObjectStats.BlockLocation(uint16(idx), objectio.BlockMaxRows)

if loaded, release, err = blockio.ReadDeletes(ls.ctx, location, ls.fs, obj.GetCNCreated()); err != nil {
if loaded, _, release, err = blockio.ReadDeletes(ls.ctx, location, ls.fs, obj.GetCNCreated()); err != nil {
return nil, err
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/vm/engine/tae/blockio/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ func LoadColumnsData(
location objectio.Location,
m *mpool.MPool,
policy fileservice.Policy,
) (bat *batch.Batch, release func(), err error) {
) (bat *batch.Batch, dataMeta objectio.ObjectDataMeta, release func(), err error) {
name := location.Name()
var meta objectio.ObjectMeta
var ioVectors *fileservice.IOVector
if meta, err = objectio.FastLoadObjectMeta(ctx, &location, false, fs); err != nil {
return
}
dataMeta := meta.MustGetMeta(metaType)
dataMeta = meta.MustGetMeta(metaType)
if ioVectors, err = objectio.ReadOneBlock(ctx, &dataMeta, name.String(), location.ID(), cols, typs, m, fs, policy); err != nil {
return
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func LoadTombstoneColumns(
location objectio.Location,
m *mpool.MPool,
policy fileservice.Policy,
) (bat *batch.Batch, release func(), err error) {
) (bat *batch.Batch, meta objectio.ObjectDataMeta, release func(), err error) {
return LoadColumnsData(ctx, objectio.SchemaTombstone, cols, typs, fs, location, m, policy)
}

Expand All @@ -153,7 +153,8 @@ func LoadColumns(
m *mpool.MPool,
policy fileservice.Policy,
) (bat *batch.Batch, release func(), err error) {
return LoadColumnsData(ctx, objectio.SchemaData, cols, typs, fs, location, m, policy)
bat, _, release, err = LoadColumnsData(ctx, objectio.SchemaData, cols, typs, fs, location, m, policy)
return
}

// LoadColumns2 load columns data from file service for TN
Expand Down
100 changes: 80 additions & 20 deletions pkg/vm/engine/tae/blockio/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package blockio
import (
"context"
"math"
"sort"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -537,43 +538,62 @@ func ReadDeletes(
deltaLoc objectio.Location,
fs fileservice.FileService,
isPersistedByCN bool,
) (bat *batch.Batch, release func(), err error) {
) (*batch.Batch, objectio.ObjectDataMeta, func(), error) {

var cols []uint16
var typs []types.Type

if isPersistedByCN {
cols = []uint16{0}
typs = []types.Type{types.T_Rowid.ToType()}
cols = []uint16{objectio.TombstoneAttr_Rowid_SeqNum}
typs = []types.Type{objectio.RowidType}
} else {
cols = []uint16{0, objectio.SEQNUM_COMMITTS}
typs = []types.Type{types.T_Rowid.ToType(), types.T_TS.ToType()}
cols = []uint16{objectio.TombstoneAttr_Rowid_SeqNum, objectio.TombstoneAttr_CommitTs_SeqNum}
typs = []types.Type{objectio.RowidType, objectio.TSType}
}
bat, release, err = LoadTombstoneColumns(ctx, cols, typs, fs, deltaLoc, nil, fileservice.Policy(0))
return
return LoadTombstoneColumns(
ctx, cols, typs, fs, deltaLoc, nil, fileservice.Policy(0),
)
}

func EvalDeleteMaskFromDNCreatedTombstones(
deletes *batch.Batch, ts types.TS, blockid *types.Blockid,
deletes *batch.Batch,
meta objectio.BlockObject,
ts types.TS,
blockid *types.Blockid,
) (rows *nulls.Bitmap) {
if deletes == nil {
return
}
rowids := vector.MustFixedColWithTypeCheck[types.Rowid](deletes.Vecs[0])
tss := vector.MustFixedColWithTypeCheck[types.TS](deletes.Vecs[1])
//aborts := deletes.Vecs[3]

start, end := FindStartEndOfBlockFromSortedRowids(rowids, blockid)

for i := end - 1; i >= start; i-- {
if tss[i].Greater(&ts) {
continue
noTSCheck := false
if end-start > 10 {
// fast path is true if the maxTS is less than the snapshotTS
// this means that all the rows between start and end are visible
idx := objectio.GetTombstoneCommitTSAttrIdx(meta.GetMetaColumnCount())
noTSCheck = meta.MustGetColumn(idx).ZoneMap().FastLEValue(ts[:], 0)
}
if noTSCheck {
for i := end - 1; i >= start; i-- {
row := rowids[i].GetRowOffset()
if rows == nil {
rows = nulls.NewWithSize(int(row) + 1)
}
rows.Add(uint64(row))
}
row := rowids[i].GetRowOffset()
if rows == nil {
rows = nulls.NewWithSize(int(row) + 1)
} else {
tss := vector.MustFixedColWithTypeCheck[types.TS](deletes.Vecs[1])
for i := end - 1; i >= start; i-- {
if tss[i].Greater(&ts) {
continue
}
row := rowids[i].GetRowOffset()
if rows == nil {
rows = nulls.NewWithSize(int(row) + 1)
}
rows.Add(uint64(row))
}
rows.Add(uint64(row))
}

return
Expand Down Expand Up @@ -629,6 +649,43 @@ func FindStartEndOfBlockFromSortedRowids(rowids []types.Rowid, id *types.Blockid
return
}

func IsRowDeletedByLocation(
ctx context.Context,
snapshotTS *types.TS,
row *objectio.Rowid,
location objectio.Location,
fs fileservice.FileService,
createdByCN bool,
) (deleted bool, err error) {
data, _, release, err := ReadDeletes(ctx, location, fs, createdByCN)
if err != nil {
return
}
defer release()
if data.RowCount() == 0 {
return
}
rowids := vector.MustFixedColNoTypeCheck[types.Rowid](data.Vecs[0])
idx := sort.Search(len(rowids), func(i int) bool {
return rowids[i].GE(row)
})
if createdByCN {
deleted = idx < len(rowids)
} else {
tss := vector.MustFixedColNoTypeCheck[types.TS](data.Vecs[1])
for i := idx; i < len(rowids); i++ {
if !rowids[i].EQ(row) {
break
}
if tss[i].LessEq(snapshotTS) {
deleted = true
break
}
}
}
return
}

func FillBlockDeleteMask(
ctx context.Context,
snapshotTS types.TS,
Expand All @@ -641,18 +698,21 @@ func FillBlockDeleteMask(
rows *nulls.Nulls
release func()
persistedDeletes *batch.Batch
meta objectio.ObjectDataMeta
)

if !location.IsEmpty() {
if persistedDeletes, release, err = ReadDeletes(ctx, location, fs, createdByCN); err != nil {
if persistedDeletes, meta, release, err = ReadDeletes(ctx, location, fs, createdByCN); err != nil {
return nil, err
}
defer release()

if createdByCN {
rows = EvalDeleteMaskFromCNCreatedTombstones(blockId, persistedDeletes)
} else {
rows = EvalDeleteMaskFromDNCreatedTombstones(persistedDeletes, snapshotTS, &blockId)
rows = EvalDeleteMaskFromDNCreatedTombstones(
persistedDeletes, meta.GetBlockMeta(uint32(location.ID())), snapshotTS, &blockId,
)
}

if rows != nil {
Expand Down
55 changes: 53 additions & 2 deletions pkg/vm/engine/tae/blockio/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,56 @@ import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
)

func IsRowDeleted(
ctx context.Context,
ts *types.TS,
row *types.Rowid,
getTombstoneFileFn func() (*objectio.ObjectStats, error),
fs fileservice.FileService,
) (bool, error) {
var isDeleted bool
loadedBlkCnt := 0
onBlockSelectedFn := func(tombstoneObject *objectio.ObjectStats, pos int) (bool, error) {
if isDeleted {
return false, nil
}
var err error
var location objectio.ObjectLocation
tombstoneObject.BlockLocationTo(uint16(pos), objectio.BlockMaxRows, location[:])
deleted, err := IsRowDeletedByLocation(
ctx, ts, row, location[:], fs, tombstoneObject.GetCNCreated(),
)
if err != nil {
return false, err
}
loadedBlkCnt++
// if deleted, stop searching
if deleted {
isDeleted = true
return false, nil
}
return true, nil
}

tombstoneObjectCnt, skipObjectCnt, totalBlkCnt, err := CheckTombstoneFile(
ctx, row[:], getTombstoneFileFn, onBlockSelectedFn, fs,
)
if err != nil {
return false, err
}

v2.TxnReaderEachBLKLoadedTombstoneHistogram.Observe(float64(loadedBlkCnt))
v2.TxnReaderScannedTotalTombstoneHistogram.Observe(float64(tombstoneObjectCnt))
if tombstoneObjectCnt > 0 {
v2.TxnReaderTombstoneZMSelectivityHistogram.Observe(float64(skipObjectCnt) / float64(tombstoneObjectCnt))
}
if totalBlkCnt > 0 {
v2.TxnReaderTombstoneBLSelectivityHistogram.Observe(float64(loadedBlkCnt) / float64(totalBlkCnt))
}

return isDeleted, nil
}

func GetTombstonesByBlockId(
ctx context.Context,
ts types.TS,
Expand All @@ -36,9 +86,10 @@ func GetTombstonesByBlockId(
) (err error) {
loadedBlkCnt := 0
onBlockSelectedFn := func(tombstoneObject *objectio.ObjectStats, pos int) (bool, error) {
location := tombstoneObject.BlockLocation(uint16(pos), objectio.BlockMaxRows)
var location objectio.ObjectLocation
tombstoneObject.BlockLocationTo(uint16(pos), objectio.BlockMaxRows, location[:])
if mask, err := FillBlockDeleteMask(
ctx, ts, blockId, location, fs, tombstoneObject.GetCNCreated(),
ctx, ts, blockId, location[:], fs, tombstoneObject.GetCNCreated(),
); err != nil {
return false, err
} else {
Expand Down
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/compute/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func Compare(a, b []byte, t types.T, scale1, scale2 int32) int {
case types.T_enum:
return CompareOrdered(types.DecodeEnum(a), types.DecodeEnum(b))
case types.T_TS:
// PXU FIXME Done
aa, bb := types.TS(a), types.TS(b)
return aa.Compare(&bb)
aa := (*types.TS)(unsafe.Pointer(&a[0]))
bb := (*types.TS)(unsafe.Pointer(&b[0]))
return aa.Compare(bb)

case types.T_Rowid:
// Row id is very special. it is not valena type but always be
Expand Down
8 changes: 8 additions & 0 deletions pkg/vm/engine/tae/index/zm.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,14 @@ func (zm ZM) AnyBetween(lb, ub ZM) (res bool, ok bool) {
return
}

// no init check
// no type check
// max <= v
func (zm ZM) FastLEValue(v []byte, scale int32) (res bool) {
t := zm.GetType()
return compute.Compare(zm.GetMaxBuf(), v, t, zm.GetScale(), scale) <= 0
}

func (zm ZM) FastIntersect(o ZM) (res bool) {
t := zm.GetType()
// zm.max >= o.min && zm.min <= v2.max
Expand Down
Loading

0 comments on commit 48304fd

Please sign in to comment.