Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor merge task host #18883

Merged
merged 8 commits into from
Sep 20, 2024
77 changes: 33 additions & 44 deletions pkg/vm/engine/disttae/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ import (
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

var gTaskID atomic.Uint64
Expand All @@ -47,20 +45,15 @@ type cnMergeTask struct {
// txn
snapshot types.TS // start ts, fixed
ds engine.DataSource
proc *process.Process
mp *mpool.MPool

// schema
version uint32 // version
colseqnums []uint16 // no rowid column
coltypes []types.Type // no rowid column
colattrs []string // no rowid column
sortkeyPos int // (composite) primary key, cluster by etc. -1 meas no sort key
colattrs []string // no rowid column
sortkeyPos int // (composite) primary key, cluster by etc. -1 meas no sort key
sortkeyIsPK bool

doTransfer bool

// targets
targets []logtailreplay.ObjectInfo
targets []objectio.ObjectStats

// commit things
commitEntry *api.MergeCommitEntry
Expand All @@ -81,7 +74,7 @@ func newCNMergeTask(
snapshot types.TS,
sortkeyPos int,
sortkeyIsPK bool,
targets []logtailreplay.ObjectInfo,
targets []objectio.ObjectStats,
targetObjSize uint32,
) (*cnMergeTask, error) {
relData := NewBlockListRelationData(1)
Expand All @@ -95,45 +88,41 @@ func newCNMergeTask(
return nil, err
}

proc := tbl.proc.Load()
attrs := make([]string, 0, len(tbl.seqnums))
for i := 0; i < len(tbl.tableDef.Cols)-1; i++ {
for i := range len(tbl.tableDef.Cols) - 1 {
attrs = append(attrs, tbl.tableDef.Cols[i].Name)
}

proc := tbl.proc.Load()
fs := proc.Base.FileService

blkCnts := make([]int, len(targets))
blkIters := make([]*StatsBlkIter, len(targets))
for i, objInfo := range targets {
blkCnts[i] = int(objInfo.BlkCnt())
for i, objStats := range targets {
blkCnts[i] = int(objStats.BlkCnt())

loc := objInfo.ObjectLocation()
loc := objStats.ObjectLocation()
meta, err := objectio.FastLoadObjectMeta(ctx, &loc, false, fs)
if err != nil {
return nil, err
}

blkIters[i] = NewStatsBlkIter(&objInfo.ObjectStats, meta.MustDataMeta())
blkIters[i] = NewStatsBlkIter(&objStats, meta.MustDataMeta())
}
return &cnMergeTask{
taskId: gTaskID.Add(1),
host: tbl,
snapshot: snapshot,
ds: source,
proc: proc,
version: tbl.version,
colseqnums: tbl.seqnums,
coltypes: tbl.typs,
colattrs: attrs,
sortkeyPos: sortkeyPos,
sortkeyIsPK: sortkeyIsPK,
targets: targets,
fs: fs,
blkCnts: blkCnts,
blkIters: blkIters,

taskId: gTaskID.Add(1),
host: tbl,
snapshot: snapshot,
ds: source,
mp: proc.GetMPool(),
colattrs: attrs,
sortkeyPos: sortkeyPos,
sortkeyIsPK: sortkeyIsPK,
targets: targets,
fs: fs,
blkCnts: blkCnts,
blkIters: blkIters,
targetObjSize: targetObjSize,
doTransfer: !strings.Contains(tbl.comment, catalog.MO_COMMENT_NO_DEL_HINT),
}, nil
}

Expand All @@ -142,7 +131,7 @@ func (t *cnMergeTask) Name() string {
}

func (t *cnMergeTask) DoTransfer() bool {
return t.doTransfer
return !strings.Contains(t.host.comment, catalog.MO_COMMENT_NO_DEL_HINT)
}
func (t *cnMergeTask) GetObjectCnt() int {
return len(t.targets)
Expand Down Expand Up @@ -176,7 +165,7 @@ func (t *cnMergeTask) GetTargetObjSize() uint32 {

func (t *cnMergeTask) GetSortKeyType() types.Type {
if t.sortkeyPos >= 0 {
return t.coltypes[t.sortkeyPos]
return t.host.typs[t.sortkeyPos]
}
return types.Type{}
}
Expand All @@ -187,7 +176,7 @@ func (t *cnMergeTask) LoadNextBatch(ctx context.Context, objIdx uint32) (*batch.
blk := iter.Entry()
// update delta location
obj := t.targets[objIdx]
blk.SetFlagByObjStats(&obj.ObjectStats)
blk.SetFlagByObjStats(&obj)
return t.readblock(ctx, &blk)
}
return nil, nil, nil, mergesort.ErrNoMoreBlocks
Expand All @@ -214,11 +203,11 @@ func (t *cnMergeTask) GetTransferMaps() api.TransferMaps {
// impl DisposableVecPool
func (t *cnMergeTask) GetVector(typ *types.Type) (*vector.Vector, func()) {
v := vector.NewVec(*typ)
return v, func() { v.Free(t.proc.GetMPool()) }
return v, func() { v.Free(t.mp) }
}

func (t *cnMergeTask) GetMPool() *mpool.MPool {
return t.proc.GetMPool()
return t.mp
}

func (t *cnMergeTask) HostHintName() string { return "CN" }
Expand Down Expand Up @@ -246,23 +235,23 @@ func (t *cnMergeTask) prepareCommitEntry() *api.MergeCommitEntry {
commitEntry.TableName = t.host.tableName
commitEntry.StartTs = t.snapshot.ToTimestamp()
for _, o := range t.targets {
commitEntry.MergedObjs = append(commitEntry.MergedObjs, o.ObjectStats.Clone().Marshal())
commitEntry.MergedObjs = append(commitEntry.MergedObjs, o.Clone().Marshal())
}
t.commitEntry = commitEntry
// leave mapping to ReadMergeAndWrite
return commitEntry
}

func (t *cnMergeTask) PrepareNewWriter() *blockio.BlockWriter {
return mergesort.GetNewWriter(t.fs, t.version, t.colseqnums, t.sortkeyPos, t.sortkeyIsPK, false) // TODO obj.isTombstone
return mergesort.GetNewWriter(t.fs, t.host.version, t.host.seqnums, t.sortkeyPos, t.sortkeyIsPK, false) // TODO obj.isTombstone
}

// readblock reads block data. there is no rowid column, no ablk
func (t *cnMergeTask) readblock(ctx context.Context, info *objectio.BlockInfo) (bat *batch.Batch, dels *nulls.Nulls, release func(), err error) {
// read data
bat, dels, release, err = blockio.BlockDataReadNoCopy(
ctx, info, t.ds, t.colseqnums, t.coltypes,
t.snapshot, fileservice.Policy(0), t.proc.GetMPool(), t.fs)
ctx, info, t.ds, t.host.seqnums, t.host.typs,
t.snapshot, fileservice.Policy(0), t.mp, t.fs)
if err != nil {
logutil.Infof("read block data failed: %v", err.Error())
return
Expand Down
10 changes: 3 additions & 7 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2058,26 +2058,22 @@ func (tbl *txnTable) MergeObjects(
}

sortKeyPos, sortKeyIsPK := tbl.getSortKeyPosAndSortKeyIsPK()
objInfos := make([]logtailreplay.ObjectInfo, 0, len(objStats))

// check object visibility
for _, objstat := range objStats {
info, exist := state.GetObject(*objstat.ObjectShortName())
if !exist || (!info.DeleteTime.IsEmpty() && info.DeleteTime.LessEq(&snapshot)) {
logutil.Errorf("object not visible: %s", info.String())
return nil, moerr.NewInternalErrorNoCtxf("object %s not exist", objstat.ObjectName().String())
}
objInfos = append(objInfos, info)
}

if len(objInfos) < 2 {
return nil, moerr.NewInternalErrorNoCtx("no matching objects")
}

tbl.ensureSeqnumsAndTypesExpectRowid()

taskHost, err := newCNMergeTask(
ctx, tbl, snapshot, // context
sortKeyPos, sortKeyIsPK, // schema
objInfos, // targets
objStats, // targets
targetObjSize)
if err != nil {
return nil, err
Expand Down
Loading
Loading