From 09d98cb65f8cf34a488ceff348402ba67958f785 Mon Sep 17 00:00:00 2001 From: GreatRiver <2552853833@qq.com> Date: Fri, 3 May 2024 15:29:43 +0800 Subject: [PATCH] DiskCleaner replay add Init snapshot table info op (#15824) After the cluster is upgraded, the first startup needs to collect historical table information Approved by: @XuPeng-SH --- pkg/vm/engine/tae/db/gc/checkpoint.go | 63 +++++++++++++++++++----- pkg/vm/engine/tae/logtail/snapshot.go | 71 ++++++++++++++++++++------- 2 files changed, 104 insertions(+), 30 deletions(-) diff --git a/pkg/vm/engine/tae/db/gc/checkpoint.go b/pkg/vm/engine/tae/db/gc/checkpoint.go index 8e29170dc4de..e79565b635a4 100644 --- a/pkg/vm/engine/tae/db/gc/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/checkpoint.go @@ -29,6 +29,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" "sync" "sync/atomic" + "time" ) type checkpointCleaner struct { @@ -216,16 +217,38 @@ func (c *checkpointCleaner) Replay() error { return err } } + ckp := checkpoint.NewCheckpointEntry(maxConsumedStart, maxConsumedEnd, checkpoint.ET_Incremental) + c.updateMaxConsumed(ckp) + ckp = checkpoint.NewCheckpointEntry(minMergedStart, minMergedEnd, checkpoint.ET_Incremental) + c.updateMinMerged(ckp) + if acctFile != "" { err = c.snapshotMeta.ReadTableInfo(c.ctx, GCMetaDir+acctFile, c.fs.Service) if err != nil { return err } + } else { + //No account table information, it may be a new cluster or an upgraded cluster, + //and the table information needs to be initialized from the checkpoint + maxConsumed := c.maxConsumed.Load() + checkpointEntries, err := checkpoint.ListSnapshotCheckpoint(c.ctx, c.fs.Service, ckp.GetEnd(), 0, checkpoint.SpecifiedCheckpoint) + if err != nil { + logutil.Warnf("list checkpoint failed, err[%v]", err) + } + if len(checkpointEntries) == 0 { + return nil + } + for _, entry := range checkpointEntries { + logutil.Infof("load checkpoint: %s, consumedEnd: %s", entry.String(), maxConsumed.String()) + ckpData, err := c.collectCkpData(entry) + if err != nil { + logutil.Warnf("load checkpoint data failed, err[%v]", err) + continue + } + c.snapshotMeta.InitTableInfo(ckpData) + } + logutil.Infof("table info initialized: %s", c.snapshotMeta.TableInfoString()) } - ckp := checkpoint.NewCheckpointEntry(maxConsumedStart, maxConsumedEnd, checkpoint.ET_Incremental) - c.updateMaxConsumed(ckp) - ckp = checkpoint.NewCheckpointEntry(minMergedStart, minMergedEnd, checkpoint.ET_Incremental) - c.updateMinMerged(ckp) return nil } @@ -342,7 +365,7 @@ func (c *checkpointCleaner) mergeGCFile() error { } else { *file = GCMetaDir + name max = ts - logutil.Debugf("mergeSnapAcctFile: %v, max: %v", name, max.ToString()) + logutil.Infof("mergeSnapAcctFile: %v, max: %v", name, max.ToString()) } return nil } @@ -509,7 +532,7 @@ func (c *checkpointCleaner) tryGC(data *logtail.CheckpointData, gckp *checkpoint gcTable.UpdateTable(data) snapshots, err := c.GetSnapshots() if err != nil { - logutil.Errorf("GetSnapshots failed: %v", err.Error()) + logutil.Errorf("[DiskCleaner] GetSnapshots failed: %v", err.Error()) return nil } defer logtail.CloseSnapshotList(snapshots) @@ -638,6 +661,7 @@ func (c *checkpointCleaner) Process() { if !c.isEnableGC() { return } + maxConsumed := c.maxConsumed.Load() if maxConsumed != nil { ts = maxConsumed.GetEnd() @@ -662,7 +686,7 @@ func (c *checkpointCleaner) Process() { var input *GCTable var err error if input, err = c.createNewInput(candidates); err != nil { - logutil.Errorf("processing clean %s: %v", candidates[0].String(), err) + logutil.Errorf("[DiskCleaner] processing clean %s: %v", candidates[0].String(), err) // TODO return } @@ -680,15 +704,19 @@ func (c *checkpointCleaner) Process() { } maxEnd := maxGlobalCKP.GetEnd() if maxGlobalCKP != nil && compareTS.Less(&maxEnd) { - logutil.Infof("maxGlobalCKP is %v, compareTS is %v", maxGlobalCKP.String(), compareTS.ToString()) + logutil.Info("[DiskCleaner]", common.OperationField("Try GC"), + common.AnyField("maxGlobalCKP :", maxGlobalCKP.String()), + common.AnyField("compareTS :", compareTS.ToString())) data, err := c.collectGlobalCkpData(maxGlobalCKP) if err != nil { c.inputs.RUnlock() + logutil.Errorf("[DiskCleaner] processing clean %s: %v", candidates[0].String(), err) return } defer data.Close() err = c.tryGC(data, maxGlobalCKP) if err != nil { + logutil.Errorf("[DiskCleaner] processing clean %s: %v", candidates[0].String(), err) return } } @@ -725,6 +753,17 @@ func (c *checkpointCleaner) AddChecker(checker func(item any) bool) { func (c *checkpointCleaner) createNewInput( ckps []*checkpoint.CheckpointEntry) (input *GCTable, err error) { + now := time.Now() + var snapSize, tableSize uint32 + logutil.Info("[DiskCleaner]", common.OperationField("Consume-Start"), + common.AnyField("entry count :", len(ckps))) + defer func() { + logutil.Info("[DiskCleaner]", common.OperationField("Consume-End"), + common.AnyField("cost :", time.Since(now).String()), + common.AnyField("snap meta size :", snapSize), + common.AnyField("table meta size :", tableSize), + common.OperandField(c.snapshotMeta.String())) + }() input = NewGCTable() var data *logtail.CheckpointData for _, candidate := range ckps { @@ -740,16 +779,16 @@ func (c *checkpointCleaner) createNewInput( } name := blockio.EncodeSnapshotMetadataFileName(GCMetaDir, PrefixSnapMeta, ckps[0].GetStart(), ckps[len(ckps)-1].GetEnd()) - err = c.snapshotMeta.SaveMeta(name, c.fs.Service) + snapSize, err = c.snapshotMeta.SaveMeta(name, c.fs.Service) if err != nil { - logutil.Infof("SaveMeta is failed") + logutil.Errorf("SaveMeta is failed") return } name = blockio.EncodeTableMetadataFileName(GCMetaDir, PrefixAcctMeta, ckps[0].GetStart(), ckps[len(ckps)-1].GetEnd()) - err = c.snapshotMeta.SaveTableInfo(name, c.fs.Service) + tableSize, err = c.snapshotMeta.SaveTableInfo(name, c.fs.Service) if err != nil { - logutil.Infof("SaveTableInfo is failed") + logutil.Errorf("SaveTableInfo is failed") return } files := c.GetAndClearOutputs() diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index b8b5fdcf4521..e9fdd22dbaeb 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -15,7 +15,9 @@ package logtail import ( + "bytes" "context" + "fmt" catalog2 "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -95,7 +97,7 @@ var ( snapshotSchemaTypes = []types.Type{ types.New(types.T_uint64, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), - types.New(types.T_TS, types.MaxVarcharLen, 0), + types.New(types.T_int64, 0, 0), types.New(types.T_enum, 0, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), types.New(types.T_varchar, types.MaxVarcharLen, 0), @@ -135,10 +137,7 @@ func NewSnapshotMeta() *SnapshotMeta { } } -func (sm *SnapshotMeta) Update(data *CheckpointData) *SnapshotMeta { - sm.Lock() - defer sm.Unlock() - +func (sm *SnapshotMeta) updateTableInfo(data *CheckpointData) { insTable, _, _, _, delTableTxn := data.GetTblBatchs() insAccIDs := vector.MustFixedCol[uint32](insTable.GetVectorByName(catalog2.SystemColAttr_AccID).GetDownstreamVector()) insTIDs := vector.MustFixedCol[uint64](insTable.GetVectorByName(catalog2.SystemRelAttr_ID).GetDownstreamVector()) @@ -191,6 +190,13 @@ func (sm *SnapshotMeta) Update(data *CheckpointData) *SnapshotMeta { sm.acctIndexes[tid] = table sm.tables[table.accID][tid] = table } +} + +func (sm *SnapshotMeta) Update(data *CheckpointData) *SnapshotMeta { + sm.Lock() + defer sm.Unlock() + + sm.updateTableInfo(data) if sm.tid == 0 { return sm } @@ -281,7 +287,7 @@ func (sm *SnapshotMeta) GetSnapshot(ctx context.Context, fs fileservice.FileServ if snapshotType == SnapshotTypeCluster { for account := range sm.tables { if snapshotList[account] == nil { - snapshotList[account] = containers.MakeVector(colTypes[0], mp) + snapshotList[account] = containers.MakeVector(types.T_TS.ToType(), mp) } err = vector.AppendFixed[types.TS](snapshotList[account].GetDownstreamVector(), snapTs, false, mp) if err != nil { @@ -292,7 +298,7 @@ func (sm *SnapshotMeta) GetSnapshot(ctx context.Context, fs fileservice.FileServ } id := uint32(acct) if snapshotList[id] == nil { - snapshotList[id] = containers.MakeVector(colTypes[0], mp) + snapshotList[id] = containers.MakeVector(types.T_TS.ToType(), mp) } logutil.Infof("GetSnapshot: id %d, ts %v", id, snapTs.ToString()) err = vector.AppendFixed[types.TS](snapshotList[id].GetDownstreamVector(), snapTs, false, mp) @@ -312,9 +318,9 @@ func (sm *SnapshotMeta) SetTid(tid uint64) { sm.tid = tid } -func (sm *SnapshotMeta) SaveMeta(name string, fs fileservice.FileService) error { +func (sm *SnapshotMeta) SaveMeta(name string, fs fileservice.FileService) (uint32, error) { if len(sm.objects) == 0 { - return nil + return 0, nil } bat := containers.NewBatch() for i, attr := range objectInfoSchemaAttr { @@ -349,25 +355,26 @@ func (sm *SnapshotMeta) SaveMeta(name string, fs fileservice.FileService) error defer deltaBat.Close() writer, err := objectio.NewObjectWriterSpecial(objectio.WriterGC, name, fs) if err != nil { - return err + return 0, err } if _, err = writer.WriteWithoutSeqnum(containers.ToCNBatch(bat)); err != nil { - return err + return 0, err } if deltaBat.Length() > 0 { logutil.Infof("deltaBat length is %d", deltaBat.Length()) if _, err = writer.WriteWithoutSeqnum(containers.ToCNBatch(deltaBat)); err != nil { - return err + return 0, err } } _, err = writer.WriteEnd(context.Background()) - return err + size := writer.GetObjectStats()[0].OriginSize() + return size, err } -func (sm *SnapshotMeta) SaveTableInfo(name string, fs fileservice.FileService) error { +func (sm *SnapshotMeta) SaveTableInfo(name string, fs fileservice.FileService) (uint32, error) { if len(sm.tables) == 0 { - return nil + return 0, nil } bat := containers.NewBatch() for i, attr := range tableInfoSchemaAttr { @@ -395,14 +402,15 @@ func (sm *SnapshotMeta) SaveTableInfo(name string, fs fileservice.FileService) e defer bat.Close() writer, err := objectio.NewObjectWriterSpecial(objectio.WriterGC, name, fs) if err != nil { - return err + return 0, err } if _, err = writer.WriteWithoutSeqnum(containers.ToCNBatch(bat)); err != nil { - return err + return 0, err } _, err = writer.WriteEnd(context.Background()) - return err + size := writer.GetObjectStats()[0].OriginSize() + return size, err } func (sm *SnapshotMeta) RebuildTableInfo(ins *containers.Batch) { @@ -572,6 +580,26 @@ func (sm *SnapshotMeta) ReadTableInfo(ctx context.Context, name string, fs files return nil } +func (sm *SnapshotMeta) InitTableInfo(data *CheckpointData) { + sm.Lock() + defer sm.Unlock() + sm.updateTableInfo(data) +} + +func (sm *SnapshotMeta) TableInfoString() string { + sm.RLock() + defer sm.RUnlock() + var buf bytes.Buffer + for accID, tables := range sm.tables { + buf.WriteString(fmt.Sprintf("accountID: %d\n", accID)) + for tid, table := range tables { + buf.WriteString(fmt.Sprintf("tableID: %d, create: %s, deleteAt: %s\n", + tid, table.createAt.ToString(), table.deleteAt.ToString())) + } + } + return buf.String() +} + func (sm *SnapshotMeta) GetSnapshotList(SnapshotList map[uint32][]types.TS, tid uint64) []types.TS { sm.RLock() defer sm.RUnlock() @@ -608,6 +636,13 @@ func (sm *SnapshotMeta) MergeTableInfo(SnapshotList map[uint32][]types.TS) error return nil } +func (sm *SnapshotMeta) String() string { + sm.RLock() + defer sm.RUnlock() + return fmt.Sprintf("account count: %d, table count: %d, object count: %d", + len(sm.tables), len(sm.acctIndexes), len(sm.objects)) +} + func isSnapshotRefers(table *TableInfo, snapVec []types.TS) bool { if len(snapVec) == 0 { return false