Skip to content

Commit

Permalink
Show file tree
Hide file tree
Showing 94 changed files with 16,644 additions and 4,594 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ path_to_file
tester-log/
**/scratch/
pkg/util/trace/impl/motrace/pprof/
**/.tmp.*

# bvt generate files
test/distributed/resources/into_outfile/outfile_*.csv
Expand Down
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
# pkg/catalog
/pkg/catalog @XuPeng-SH

#pkg/cdc
/pkg/cdc @daviszhen @ck89119

# pkg/cnservice
/pkg/cnservice @reusee @daviszhen

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
github.com/spkg/bom v1.0.0
github.com/stretchr/testify v1.9.0
github.com/ti-mo/conntrack v0.5.1
github.com/ti-mo/netfilter v0.5.2
github.com/tidwall/btree v1.6.0
github.com/tidwall/pretty v1.2.1
go.uber.org/automaxprocs v1.5.3
Expand Down Expand Up @@ -115,7 +116,6 @@ require (
github.com/segmentio/asm v1.1.3 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tetratelabs/wazero v1.7.3 // indirect
github.com/ti-mo/netfilter v0.5.2 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
22 changes: 22 additions & 0 deletions pkg/bootstrap/versions/v1_3_0/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var tenantUpgEntries = []versions.UpgradeEntry{
upg_systemMetrics_server_snapshot_usage,
upg_mo_snapshots,
upg_mo_retention,
upg_mo_cdc_task,
upg_mo_cdc_watermark,
}

const viewServerSnapshotUsage = "server_snapshot_usage"
Expand Down Expand Up @@ -70,3 +72,23 @@ var upg_mo_retention = versions.UpgradeEntry{
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_RETENTION)
},
}

var upg_mo_cdc_task = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_CDC_TASK,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: frontend.MoCatalogMoCdcTaskDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_CDC_TASK)
},
}

var upg_mo_cdc_watermark = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_CDC_WATERMARK,
UpgType: versions.CREATE_NEW_TABLE,
UpgSql: frontend.MoCatalogMoCdcWatermarkDDL,
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
return versions.CheckTableDefinition(txn, accountId, catalog.MO_CATALOG, catalog.MO_CDC_WATERMARK)
},
}
3 changes: 3 additions & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ const (
MO_PITR = "mo_pitr"

MO_RETENTION = "mo_retention"

MO_CDC_TASK = "mo_cdc_task"
MO_CDC_WATERMARK = "mo_cdc_watermark"
)

const (
Expand Down
285 changes: 285 additions & 0 deletions pkg/cdc/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cdc

import (
"context"
"errors"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
)

var _ Reader = new(tableReader)

type tableReader struct {
cnTxnClient client.TxnClient
cnEngine engine.Engine
mp *mpool.MPool
packerPool *fileservice.Pool[*types.Packer]
info *DbTableInfo
sinker Sinker
wMarkUpdater *WatermarkUpdater
tick *time.Ticker
restartFunc func(*DbTableInfo) error

insTsColIdx, insCompositedPkColIdx int
delTsColIdx, delCompositedPkColIdx int
}

func NewTableReader(
cnTxnClient client.TxnClient,
cnEngine engine.Engine,
mp *mpool.MPool,
packerPool *fileservice.Pool[*types.Packer],
info *DbTableInfo,
sinker Sinker,
wMarkUpdater *WatermarkUpdater,
tableDef *plan.TableDef,
restartFunc func(*DbTableInfo) error,
) Reader {
reader := &tableReader{
cnTxnClient: cnTxnClient,
cnEngine: cnEngine,
mp: mp,
packerPool: packerPool,
info: info,
sinker: sinker,
wMarkUpdater: wMarkUpdater,
tick: time.NewTicker(200 * time.Millisecond),
restartFunc: restartFunc,
}

// batch columns layout:
// 1. data: user defined cols | cpk (if need) | commit-ts
// 2. tombstone: pk/cpk | commit-ts
reader.insTsColIdx, reader.insCompositedPkColIdx = len(tableDef.Cols)-1, len(tableDef.Cols)-2
reader.delTsColIdx, reader.delCompositedPkColIdx = 1, 0
// if single col pk, there's no additional cpk col
if len(tableDef.Pkey.Names) == 1 {
reader.insCompositedPkColIdx = int(tableDef.Name2ColIndex[tableDef.Pkey.Names[0]])
}
return reader
}

func (reader *tableReader) Close() {}

func (reader *tableReader) Run(
ctx context.Context,
ar *ActiveRoutine) {
logutil.Infof("^^^^^ tableReader(%s).Run: start", reader.info.SourceTblName)
defer func() {
logutil.Infof("^^^^^ tableReader(%s).Run: end", reader.info.SourceTblName)
}()

for {
select {
case <-ctx.Done():
return
case <-ar.Cancel:
return
case <-reader.tick.C:
}

if err := reader.readTable(ctx, ar); err != nil {
logutil.Errorf("reader %v failed err:%v", reader.info, err)

// if stale read, restart reader
var moErr *moerr.Error
if errors.As(err, &moErr) && moErr.ErrorCode() == moerr.ErrStaleRead {
if err = reader.restartFunc(reader.info); err != nil {
logutil.Errorf("reader %v restart failed, err:%v", reader.info, err)
return
}
continue
}

return
}
}
}

func (reader *tableReader) readTable(
ctx context.Context,
ar *ActiveRoutine) (err error) {

var txnOp client.TxnOperator
//step1 : create an txnOp
txnOp, err = GetTxnOp(ctx, reader.cnEngine, reader.cnTxnClient, "readMultipleTables")
if err != nil {
return err
}
defer func() {
FinishTxnOp(ctx, err, txnOp, reader.cnEngine)
}()
err = GetTxn(ctx, reader.cnEngine, txnOp)
if err != nil {
return err
}

var packer *types.Packer
put := reader.packerPool.Get(&packer)
defer put.Put()

//step2 : read table
err = reader.readTableWithTxn(
ctx,
txnOp,
packer,
ar)

return
}

func (reader *tableReader) readTableWithTxn(
ctx context.Context,
txnOp client.TxnOperator,
packer *types.Packer,
ar *ActiveRoutine) (err error) {
var rel engine.Relation
var changes engine.ChangesHandle
//step1 : get relation
_, _, rel, err = GetRelationById(ctx, reader.cnEngine, txnOp, reader.info.SourceTblId)
if err != nil {
return
}

//step2 : define time range
// from = last wmark
// to = txn operator snapshot ts
fromTs := reader.wMarkUpdater.GetFromMem(reader.info.SourceTblId)
toTs := types.TimestampToTS(GetSnapshotTS(txnOp))
//fmt.Fprintln(os.Stderr, reader.info, "from", fromTs.ToString(), "to", toTs.ToString())
changes, err = CollectChanges(ctx, rel, fromTs, toTs, reader.mp)
if err != nil {
return
}
defer changes.Close()

//step3: pull data
var insertData, deleteData *batch.Batch
var insertAtmBatch, deleteAtmBatch *AtomicBatch

allocateAtomicBatchIfNeed := func(atmBatch *AtomicBatch) *AtomicBatch {
if atmBatch == nil {
atmBatch = NewAtomicBatch(
reader.mp,
fromTs, toTs,
)
}
return atmBatch
}

//batchSize := func(bat *batch.Batch) int {
// if bat == nil {
// return 0
// }
// return bat.Vecs[0].Length()
//}

var curHint engine.ChangesHandle_Hint
for {
select {
case <-ctx.Done():
return
case <-ar.Cancel:
return
default:
}

if insertData, deleteData, curHint, err = changes.Next(ctx, reader.mp); err != nil {
return
}

//_, _ = fmt.Fprintf(os.Stderr, "^^^^^ Reader: [%s, %s), "+
// "curHint: %v, insertData is nil: %v, deleteData is nil: %v, "+
// "insertDataSize() = %d, deleteDataSize() = %d\n",
// fromTs.ToString(), toTs.ToString(),
// curHint, insertData == nil, deleteData == nil,
// batchSize(insertData), batchSize(deleteData),
//)

//both nil denote no more data
if insertData == nil && deleteData == nil {
//FIXME: it is engine's bug
//Tail_wip does not finished with Tail_done
if insertAtmBatch != nil || deleteAtmBatch != nil {
err = reader.sinker.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeTailDone,
insertAtmBatch: insertAtmBatch,
deleteAtmBatch: deleteAtmBatch,
fromTs: fromTs,
toTs: toTs,
})
if err != nil {
return err
}

insertAtmBatch = nil
deleteAtmBatch = nil
}

// heartbeat
err = reader.sinker.Sink(ctx, &DecoderOutput{
noMoreData: true,
fromTs: fromTs,
toTs: toTs,
})
return
}

switch curHint {
case engine.ChangesHandle_Snapshot:
// transform into insert instantly
err = reader.sinker.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeCheckpoint,
checkpointBat: insertData,
fromTs: fromTs,
toTs: toTs,
})
case engine.ChangesHandle_Tail_wip:
insertAtmBatch = allocateAtomicBatchIfNeed(insertAtmBatch)
deleteAtmBatch = allocateAtomicBatchIfNeed(deleteAtmBatch)
insertAtmBatch.Append(packer, insertData, reader.insTsColIdx, reader.insCompositedPkColIdx)
deleteAtmBatch.Append(packer, deleteData, reader.delTsColIdx, reader.delCompositedPkColIdx)
case engine.ChangesHandle_Tail_done:
insertAtmBatch = allocateAtomicBatchIfNeed(insertAtmBatch)
deleteAtmBatch = allocateAtomicBatchIfNeed(deleteAtmBatch)
insertAtmBatch.Append(packer, insertData, reader.insTsColIdx, reader.insCompositedPkColIdx)
deleteAtmBatch.Append(packer, deleteData, reader.delTsColIdx, reader.delCompositedPkColIdx)
err = reader.sinker.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeTailDone,
insertAtmBatch: insertAtmBatch,
deleteAtmBatch: deleteAtmBatch,
fromTs: fromTs,
toTs: toTs,
})
insertAtmBatch = nil
deleteAtmBatch = nil
}

if err != nil {
return
}
}
}
Loading

0 comments on commit 1bf8b00

Please sign in to comment.