Skip to content

Commit

Permalink
update 4
Browse files Browse the repository at this point in the history
  • Loading branch information
XuPeng-SH committed Sep 19, 2024
1 parent 0c84c51 commit 2a31778
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 5 deletions.
13 changes: 10 additions & 3 deletions pkg/vm/engine/disttae/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

const (
Expand All @@ -44,6 +45,7 @@ const (

func NewRemoteDataSource(
ctx context.Context,
proc *process.Process,
fs fileservice.FileService,
snapshotTS timestamp.Timestamp,
relData engine.RelData,
Expand Down Expand Up @@ -110,7 +112,8 @@ func NewLocalDataSource(
// --------------------------------------------------------------------------------

type RemoteDataSource struct {
ctx context.Context
ctx context.Context
proc *process.Process

fs fileservice.FileService
ts types.TS
Expand Down Expand Up @@ -141,6 +144,10 @@ func (rs *RemoteDataSource) Next(
}

func (rs *RemoteDataSource) batchPrefetch(seqNums []uint16) {
// TODO: remove proc and don't GetService
if rs.proc == nil {
return
}
if rs.batchPrefetchCursor >= rs.data.DataCnt() ||
rs.cursor < rs.batchPrefetchCursor {
return
Expand All @@ -160,14 +167,14 @@ func (rs *RemoteDataSource) batchPrefetch(seqNums []uint16) {
}

err := blockio.Prefetch(
"", rs.fs, blks[0].MetaLocation())
rs.proc.GetService(), rs.fs, blks[0].MetaLocation())
if err != nil {
logutil.Errorf("pefetch block data: %s", err.Error())
}

tombstoner := rs.data.GetTombstones()
if tombstoner != nil {
rs.data.GetTombstones().PrefetchTombstones("", rs.fs, bids)
rs.data.GetTombstones().PrefetchTombstones(rs.proc.GetService(), rs.fs, bids)
}

rs.batchPrefetchCursor = end
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ func (e *Engine) BuildBlockReaders(
}
ds := NewRemoteDataSource(
ctx,
proc,
fs,
ts,
shard)
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/disttae/reader_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func SimpleObjectReader(
) engine.Reader {
relData := NewBlockListRelationDataOfObject(obj, false)
ds := NewRemoteDataSource(
ctx, fs, ts, relData,
ctx, nil, fs, ts, relData,
)
return NewSimpleReader(
ctx, ds, fs, ts, opts...,
Expand Down Expand Up @@ -119,7 +119,7 @@ func SimpleMultiObjectsReader(
relData := NewBlockListRelationData(0)
relData.SetBlockList(slice)
ds := NewRemoteDataSource(
ctx, fs, ts, relData,
ctx, nil, fs, ts, relData,
)
return NewSimpleReader(
ctx, ds, fs, ts, opts...,
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,7 @@ func buildRemoteDS(

source = NewRemoteDataSource(
ctx,
tbl.proc.Load(),
tbl.getTxn().engine.fs,
tbl.db.op.SnapshotTS(),
newRelData,
Expand Down

0 comments on commit 2a31778

Please sign in to comment.