Skip to content

Commit

Permalink
DiskCleaner replay add Init snapshot table info op (#15824)
Browse files Browse the repository at this point in the history
After the cluster is upgraded, the first startup needs to collect historical table information

Approved by: @XuPeng-SH
  • Loading branch information
LeftHandCold committed May 3, 2024
1 parent e1475c1 commit 09d98cb
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 30 deletions.
63 changes: 51 additions & 12 deletions pkg/vm/engine/tae/db/gc/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"sync"
"sync/atomic"
"time"
)

type checkpointCleaner struct {
Expand Down Expand Up @@ -216,16 +217,38 @@ func (c *checkpointCleaner) Replay() error {
return err
}
}
ckp := checkpoint.NewCheckpointEntry(maxConsumedStart, maxConsumedEnd, checkpoint.ET_Incremental)
c.updateMaxConsumed(ckp)
ckp = checkpoint.NewCheckpointEntry(minMergedStart, minMergedEnd, checkpoint.ET_Incremental)
c.updateMinMerged(ckp)

if acctFile != "" {
err = c.snapshotMeta.ReadTableInfo(c.ctx, GCMetaDir+acctFile, c.fs.Service)
if err != nil {
return err
}
} else {
//No account table information, it may be a new cluster or an upgraded cluster,
//and the table information needs to be initialized from the checkpoint
maxConsumed := c.maxConsumed.Load()
checkpointEntries, err := checkpoint.ListSnapshotCheckpoint(c.ctx, c.fs.Service, ckp.GetEnd(), 0, checkpoint.SpecifiedCheckpoint)
if err != nil {
logutil.Warnf("list checkpoint failed, err[%v]", err)
}
if len(checkpointEntries) == 0 {
return nil
}
for _, entry := range checkpointEntries {
logutil.Infof("load checkpoint: %s, consumedEnd: %s", entry.String(), maxConsumed.String())
ckpData, err := c.collectCkpData(entry)
if err != nil {
logutil.Warnf("load checkpoint data failed, err[%v]", err)
continue
}
c.snapshotMeta.InitTableInfo(ckpData)
}
logutil.Infof("table info initialized: %s", c.snapshotMeta.TableInfoString())
}
ckp := checkpoint.NewCheckpointEntry(maxConsumedStart, maxConsumedEnd, checkpoint.ET_Incremental)
c.updateMaxConsumed(ckp)
ckp = checkpoint.NewCheckpointEntry(minMergedStart, minMergedEnd, checkpoint.ET_Incremental)
c.updateMinMerged(ckp)
return nil

}
Expand Down Expand Up @@ -342,7 +365,7 @@ func (c *checkpointCleaner) mergeGCFile() error {
} else {
*file = GCMetaDir + name
max = ts
logutil.Debugf("mergeSnapAcctFile: %v, max: %v", name, max.ToString())
logutil.Infof("mergeSnapAcctFile: %v, max: %v", name, max.ToString())
}
return nil
}
Expand Down Expand Up @@ -509,7 +532,7 @@ func (c *checkpointCleaner) tryGC(data *logtail.CheckpointData, gckp *checkpoint
gcTable.UpdateTable(data)
snapshots, err := c.GetSnapshots()
if err != nil {
logutil.Errorf("GetSnapshots failed: %v", err.Error())
logutil.Errorf("[DiskCleaner] GetSnapshots failed: %v", err.Error())
return nil
}
defer logtail.CloseSnapshotList(snapshots)
Expand Down Expand Up @@ -638,6 +661,7 @@ func (c *checkpointCleaner) Process() {
if !c.isEnableGC() {
return
}

maxConsumed := c.maxConsumed.Load()
if maxConsumed != nil {
ts = maxConsumed.GetEnd()
Expand All @@ -662,7 +686,7 @@ func (c *checkpointCleaner) Process() {
var input *GCTable
var err error
if input, err = c.createNewInput(candidates); err != nil {
logutil.Errorf("processing clean %s: %v", candidates[0].String(), err)
logutil.Errorf("[DiskCleaner] processing clean %s: %v", candidates[0].String(), err)
// TODO
return
}
Expand All @@ -680,15 +704,19 @@ func (c *checkpointCleaner) Process() {
}
maxEnd := maxGlobalCKP.GetEnd()
if maxGlobalCKP != nil && compareTS.Less(&maxEnd) {
logutil.Infof("maxGlobalCKP is %v, compareTS is %v", maxGlobalCKP.String(), compareTS.ToString())
logutil.Info("[DiskCleaner]", common.OperationField("Try GC"),
common.AnyField("maxGlobalCKP :", maxGlobalCKP.String()),
common.AnyField("compareTS :", compareTS.ToString()))
data, err := c.collectGlobalCkpData(maxGlobalCKP)
if err != nil {
c.inputs.RUnlock()
logutil.Errorf("[DiskCleaner] processing clean %s: %v", candidates[0].String(), err)
return
}
defer data.Close()
err = c.tryGC(data, maxGlobalCKP)
if err != nil {
logutil.Errorf("[DiskCleaner] processing clean %s: %v", candidates[0].String(), err)
return
}
}
Expand Down Expand Up @@ -725,6 +753,17 @@ func (c *checkpointCleaner) AddChecker(checker func(item any) bool) {

func (c *checkpointCleaner) createNewInput(
ckps []*checkpoint.CheckpointEntry) (input *GCTable, err error) {
now := time.Now()
var snapSize, tableSize uint32
logutil.Info("[DiskCleaner]", common.OperationField("Consume-Start"),
common.AnyField("entry count :", len(ckps)))
defer func() {
logutil.Info("[DiskCleaner]", common.OperationField("Consume-End"),
common.AnyField("cost :", time.Since(now).String()),
common.AnyField("snap meta size :", snapSize),
common.AnyField("table meta size :", tableSize),
common.OperandField(c.snapshotMeta.String()))
}()
input = NewGCTable()
var data *logtail.CheckpointData
for _, candidate := range ckps {
Expand All @@ -740,16 +779,16 @@ func (c *checkpointCleaner) createNewInput(
}
name := blockio.EncodeSnapshotMetadataFileName(GCMetaDir,
PrefixSnapMeta, ckps[0].GetStart(), ckps[len(ckps)-1].GetEnd())
err = c.snapshotMeta.SaveMeta(name, c.fs.Service)
snapSize, err = c.snapshotMeta.SaveMeta(name, c.fs.Service)
if err != nil {
logutil.Infof("SaveMeta is failed")
logutil.Errorf("SaveMeta is failed")
return
}
name = blockio.EncodeTableMetadataFileName(GCMetaDir,
PrefixAcctMeta, ckps[0].GetStart(), ckps[len(ckps)-1].GetEnd())
err = c.snapshotMeta.SaveTableInfo(name, c.fs.Service)
tableSize, err = c.snapshotMeta.SaveTableInfo(name, c.fs.Service)
if err != nil {
logutil.Infof("SaveTableInfo is failed")
logutil.Errorf("SaveTableInfo is failed")
return
}
files := c.GetAndClearOutputs()
Expand Down
71 changes: 53 additions & 18 deletions pkg/vm/engine/tae/logtail/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package logtail

import (
"bytes"
"context"
"fmt"
catalog2 "github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -95,7 +97,7 @@ var (
snapshotSchemaTypes = []types.Type{
types.New(types.T_uint64, 0, 0),
types.New(types.T_varchar, types.MaxVarcharLen, 0),
types.New(types.T_TS, types.MaxVarcharLen, 0),
types.New(types.T_int64, 0, 0),
types.New(types.T_enum, 0, 0),
types.New(types.T_varchar, types.MaxVarcharLen, 0),
types.New(types.T_varchar, types.MaxVarcharLen, 0),
Expand Down Expand Up @@ -135,10 +137,7 @@ func NewSnapshotMeta() *SnapshotMeta {
}
}

func (sm *SnapshotMeta) Update(data *CheckpointData) *SnapshotMeta {
sm.Lock()
defer sm.Unlock()

func (sm *SnapshotMeta) updateTableInfo(data *CheckpointData) {
insTable, _, _, _, delTableTxn := data.GetTblBatchs()
insAccIDs := vector.MustFixedCol[uint32](insTable.GetVectorByName(catalog2.SystemColAttr_AccID).GetDownstreamVector())
insTIDs := vector.MustFixedCol[uint64](insTable.GetVectorByName(catalog2.SystemRelAttr_ID).GetDownstreamVector())
Expand Down Expand Up @@ -191,6 +190,13 @@ func (sm *SnapshotMeta) Update(data *CheckpointData) *SnapshotMeta {
sm.acctIndexes[tid] = table
sm.tables[table.accID][tid] = table
}
}

func (sm *SnapshotMeta) Update(data *CheckpointData) *SnapshotMeta {
sm.Lock()
defer sm.Unlock()

sm.updateTableInfo(data)
if sm.tid == 0 {
return sm
}
Expand Down Expand Up @@ -281,7 +287,7 @@ func (sm *SnapshotMeta) GetSnapshot(ctx context.Context, fs fileservice.FileServ
if snapshotType == SnapshotTypeCluster {
for account := range sm.tables {
if snapshotList[account] == nil {
snapshotList[account] = containers.MakeVector(colTypes[0], mp)
snapshotList[account] = containers.MakeVector(types.T_TS.ToType(), mp)
}
err = vector.AppendFixed[types.TS](snapshotList[account].GetDownstreamVector(), snapTs, false, mp)
if err != nil {
Expand All @@ -292,7 +298,7 @@ func (sm *SnapshotMeta) GetSnapshot(ctx context.Context, fs fileservice.FileServ
}
id := uint32(acct)
if snapshotList[id] == nil {
snapshotList[id] = containers.MakeVector(colTypes[0], mp)
snapshotList[id] = containers.MakeVector(types.T_TS.ToType(), mp)
}
logutil.Infof("GetSnapshot: id %d, ts %v", id, snapTs.ToString())
err = vector.AppendFixed[types.TS](snapshotList[id].GetDownstreamVector(), snapTs, false, mp)
Expand All @@ -312,9 +318,9 @@ func (sm *SnapshotMeta) SetTid(tid uint64) {
sm.tid = tid
}

func (sm *SnapshotMeta) SaveMeta(name string, fs fileservice.FileService) error {
func (sm *SnapshotMeta) SaveMeta(name string, fs fileservice.FileService) (uint32, error) {
if len(sm.objects) == 0 {
return nil
return 0, nil
}
bat := containers.NewBatch()
for i, attr := range objectInfoSchemaAttr {
Expand Down Expand Up @@ -349,25 +355,26 @@ func (sm *SnapshotMeta) SaveMeta(name string, fs fileservice.FileService) error
defer deltaBat.Close()
writer, err := objectio.NewObjectWriterSpecial(objectio.WriterGC, name, fs)
if err != nil {
return err
return 0, err
}
if _, err = writer.WriteWithoutSeqnum(containers.ToCNBatch(bat)); err != nil {
return err
return 0, err
}
if deltaBat.Length() > 0 {
logutil.Infof("deltaBat length is %d", deltaBat.Length())
if _, err = writer.WriteWithoutSeqnum(containers.ToCNBatch(deltaBat)); err != nil {
return err
return 0, err
}
}

_, err = writer.WriteEnd(context.Background())
return err
size := writer.GetObjectStats()[0].OriginSize()
return size, err
}

func (sm *SnapshotMeta) SaveTableInfo(name string, fs fileservice.FileService) error {
func (sm *SnapshotMeta) SaveTableInfo(name string, fs fileservice.FileService) (uint32, error) {
if len(sm.tables) == 0 {
return nil
return 0, nil
}
bat := containers.NewBatch()
for i, attr := range tableInfoSchemaAttr {
Expand Down Expand Up @@ -395,14 +402,15 @@ func (sm *SnapshotMeta) SaveTableInfo(name string, fs fileservice.FileService) e
defer bat.Close()
writer, err := objectio.NewObjectWriterSpecial(objectio.WriterGC, name, fs)
if err != nil {
return err
return 0, err
}
if _, err = writer.WriteWithoutSeqnum(containers.ToCNBatch(bat)); err != nil {
return err
return 0, err
}

_, err = writer.WriteEnd(context.Background())
return err
size := writer.GetObjectStats()[0].OriginSize()
return size, err
}

func (sm *SnapshotMeta) RebuildTableInfo(ins *containers.Batch) {
Expand Down Expand Up @@ -572,6 +580,26 @@ func (sm *SnapshotMeta) ReadTableInfo(ctx context.Context, name string, fs files
return nil
}

func (sm *SnapshotMeta) InitTableInfo(data *CheckpointData) {
sm.Lock()
defer sm.Unlock()
sm.updateTableInfo(data)
}

func (sm *SnapshotMeta) TableInfoString() string {
sm.RLock()
defer sm.RUnlock()
var buf bytes.Buffer
for accID, tables := range sm.tables {
buf.WriteString(fmt.Sprintf("accountID: %d\n", accID))
for tid, table := range tables {
buf.WriteString(fmt.Sprintf("tableID: %d, create: %s, deleteAt: %s\n",
tid, table.createAt.ToString(), table.deleteAt.ToString()))
}
}
return buf.String()
}

func (sm *SnapshotMeta) GetSnapshotList(SnapshotList map[uint32][]types.TS, tid uint64) []types.TS {
sm.RLock()
defer sm.RUnlock()
Expand Down Expand Up @@ -608,6 +636,13 @@ func (sm *SnapshotMeta) MergeTableInfo(SnapshotList map[uint32][]types.TS) error
return nil
}

func (sm *SnapshotMeta) String() string {
sm.RLock()
defer sm.RUnlock()
return fmt.Sprintf("account count: %d, table count: %d, object count: %d",
len(sm.tables), len(sm.acctIndexes), len(sm.objects))
}

func isSnapshotRefers(table *TableInfo, snapVec []types.TS) bool {
if len(snapVec) == 0 {
return false
Expand Down

0 comments on commit 09d98cb

Please sign in to comment.