From d1d05f1180c904b05c493318aad3a00ac1b4bfe6 Mon Sep 17 00:00:00 2001 From: gouhongshen Date: Sun, 8 Sep 2024 14:05:11 +0800 Subject: [PATCH] adding flag to block info and object info (#18527) flag: 1. dependable 2. sorted 3. by cn created. Approved by: @daviszhen, @ouyuanning, @aunjgr, @m-schen, @XuPeng-SH, @LeftHandCold, @triump2020 --- pkg/objectio/block.go | 9 ++- pkg/objectio/block_info.go | 61 +++++++++++++------ pkg/objectio/block_info_test.go | 29 +++++---- pkg/objectio/object_stats.go | 59 +++++++++++++----- pkg/objectio/object_stats_test.go | 17 ++++++ pkg/objectio/writer.go | 26 +++++--- pkg/objectio/writer_test.go | 4 +- pkg/sql/colexec/mergeblock/mergeblock_test.go | 6 +- pkg/sql/colexec/s3util.go | 18 ++---- pkg/sql/compile/compile.go | 2 +- pkg/vm/engine/disttae/datasource.go | 12 ++-- pkg/vm/engine/disttae/datasource_test.go | 3 +- .../disttae/logtailreplay/change_handle.go | 2 +- .../disttae/logtailreplay/partition_state.go | 16 ++--- pkg/vm/engine/disttae/logtailreplay/types.go | 10 +-- pkg/vm/engine/disttae/merge.go | 3 +- pkg/vm/engine/disttae/reader.go | 2 +- pkg/vm/engine/disttae/txn_table.go | 11 ++-- pkg/vm/engine/engine_util/exec_util.go | 3 +- pkg/vm/engine/tae/blockio/read.go | 58 +++++------------- pkg/vm/engine/tae/blockio/utils.go | 2 +- pkg/vm/engine/tae/blockio/writer.go | 7 +-- pkg/vm/engine/tae/db/gc/v2/table.go | 3 +- pkg/vm/engine/tae/db/test/db_test.go | 26 +++++--- pkg/vm/engine/tae/db/testutil/funcs.go | 3 +- pkg/vm/engine/tae/logtail/backup.go | 5 +- pkg/vm/engine/tae/logtail/snapshot.go | 8 ++- pkg/vm/engine/tae/mergesort/merger.go | 6 +- pkg/vm/engine/tae/mergesort/reshaper.go | 7 +-- pkg/vm/engine/tae/rpc/rpc_test.go | 14 ++--- pkg/vm/engine/tae/tables/jobs/flushobj.go | 2 +- pkg/vm/engine/test/disttae_engine_test.go | 32 +++++----- 32 files changed, 254 insertions(+), 212 deletions(-) diff --git a/pkg/objectio/block.go b/pkg/objectio/block.go index 6aa0a316e440..8d2334a80e54 100644 --- a/pkg/objectio/block.go +++ b/pkg/objectio/block.go @@ -148,10 +148,13 @@ func (bm BlockObject) GenerateBlockInfo(objName ObjectName, sorted bool) BlockIn &sid, location.Name().Num(), location.ID()), - //non-appendable block - Appendable: false, - Sorted: sorted, } blkInfo.SetMetaLocation(location) + + blkInfo.StateFlag |= CNCreatedFlag + if sorted { + blkInfo.StateFlag |= SortedFlag + } + return blkInfo } diff --git a/pkg/objectio/block_info.go b/pkg/objectio/block_info.go index 7053051b3a46..975098f6808c 100644 --- a/pkg/objectio/block_info.go +++ b/pkg/objectio/block_info.go @@ -55,19 +55,48 @@ const ( ) type BlockInfo struct { - BlockID types.Blockid - //It's used to indicate whether the block is appendable block or non-appendable blk for reader. - // for appendable block, the data visibility in the block is determined by the commit ts and abort ts. - Appendable bool - Sorted bool - MetaLoc ObjectLocation + BlockID types.Blockid + MetaLoc ObjectLocation + StateFlag int8 //TODO:: remove it. PartitionNum int16 } +func (b *BlockInfo) SetFlagByObjStats(stats ObjectStats) { + b.StateFlag = stats.GetFlag() +} + +func (b *BlockInfo) IsAppendable() bool { + return b.StateFlag&AppendableFlag != 0 +} + +func (b *BlockInfo) IsSorted() bool { + return b.StateFlag&SortedFlag != 0 +} + +func (b *BlockInfo) IsCNCreated() bool { + return b.StateFlag&CNCreatedFlag != 0 +} + func (b *BlockInfo) String() string { - return fmt.Sprintf("[A-%v]blk-%s", b.Appendable, b.BlockID.ShortStringEx()) + flag := "" + if b.IsAppendable() { + flag = flag + "A" + } + if b.IsSorted() { + flag = flag + "S" + } + if b.IsCNCreated() { + flag = flag + "C" + } + + return fmt.Sprintf( + "[%s]ID-%s, MetaLoc: %s, PartitionNum: %d", + flag, + b.BlockID.ShortStringEx(), + b.MetaLocation().String(), + b.PartitionNum) } func (b *BlockInfo) MarshalWithBuf(w *bytes.Buffer) (uint32, error) { @@ -77,21 +106,16 @@ func (b *BlockInfo) MarshalWithBuf(w *bytes.Buffer) (uint32, error) { } space += uint32(types.BlockidSize) - if _, err := w.Write(types.EncodeBool(&b.Appendable)); err != nil { + if _, err := w.Write(types.EncodeSlice(b.MetaLoc[:])); err != nil { return 0, err } - space++ + space += uint32(LocationLen) - if _, err := w.Write(types.EncodeBool(&b.Sorted)); err != nil { + if _, err := w.Write(types.EncodeInt8(&b.StateFlag)); err != nil { return 0, err } space++ - if _, err := w.Write(types.EncodeSlice(b.MetaLoc[:])); err != nil { - return 0, err - } - space += uint32(LocationLen) - if _, err := w.Write(types.EncodeInt16(&b.PartitionNum)); err != nil { return 0, err } @@ -103,14 +127,13 @@ func (b *BlockInfo) MarshalWithBuf(w *bytes.Buffer) (uint32, error) { func (b *BlockInfo) Unmarshal(buf []byte) error { b.BlockID = types.DecodeFixed[types.Blockid](buf[:types.BlockidSize]) buf = buf[types.BlockidSize:] - b.Appendable = types.DecodeFixed[bool](buf) - buf = buf[1:] - b.Sorted = types.DecodeFixed[bool](buf) - buf = buf[1:] copy(b.MetaLoc[:], buf[:LocationLen]) buf = buf[LocationLen:] + b.StateFlag = types.DecodeInt8(buf[:1]) + buf = buf[1:] + b.PartitionNum = types.DecodeFixed[int16](buf[:2]) return nil } diff --git a/pkg/objectio/block_info_test.go b/pkg/objectio/block_info_test.go index 0a0a39318f1c..051f01a38783 100644 --- a/pkg/objectio/block_info_test.go +++ b/pkg/objectio/block_info_test.go @@ -82,18 +82,21 @@ func TestBlockInfoSliceTraverse(t *testing.T) { for i := 0; i < s.Len(); i++ { blkInfo := s.Get(i) require.Equal(t, intToBlockid(int32(i)), blkInfo.BlockID) - require.Equal(t, false, blkInfo.Appendable) - blkInfo.Appendable = true + require.Equal(t, false, blkInfo.IsAppendable()) + blkInfo.StateFlag |= AppendableFlag } for i := 0; i < s.Len(); i++ { - require.Equal(t, true, s.Get(i).Appendable) + require.Equal(t, true, s.Get(i).IsAppendable()) } - s.AppendBlockInfo(BlockInfo{BlockID: intToBlockid(1000), Appendable: true}) + blk := BlockInfo{BlockID: intToBlockid(1000)} + blk.StateFlag |= AppendableFlag + + s.AppendBlockInfo(blk) for i := 0; i < s.Len(); i++ { - require.Equal(t, true, s.Get(i).Appendable) + require.Equal(t, true, s.Get(i).IsAppendable()) } } @@ -109,14 +112,16 @@ func TestBytesToBlockInfoSlice(t *testing.T) { for i := 0; i < s.Len(); i++ { blkInfo := s.Get(i) require.Equal(t, intToBlockid(int32(i)), blkInfo.BlockID) - require.Equal(t, false, blkInfo.Appendable) - blkInfo.Appendable = true + require.Equal(t, false, blkInfo.IsAppendable()) + blkInfo.StateFlag |= AppendableFlag } - s.AppendBlockInfo(BlockInfo{BlockID: intToBlockid(1000), Appendable: true}) + blk := BlockInfo{BlockID: intToBlockid(1000)} + blk.StateFlag |= AppendableFlag + s.AppendBlockInfo(blk) for i := 0; i < s.Len(); i++ { - require.Equal(t, true, s.Get(i).Appendable) + require.Equal(t, true, s.Get(i).IsAppendable()) } require.Equal(t, 1000*BlockInfoSize, len(bs)) @@ -125,10 +130,10 @@ func TestBytesToBlockInfoSlice(t *testing.T) { require.Equal(t, 1001*BlockInfoSize, len(bs)) require.Equal(t, s.GetAllBytes(), bs) - s.Get(999).Appendable = false - require.Equal(t, false, s.Get(999).Appendable) + s.Get(999).StateFlag &= ^AppendableFlag + require.Equal(t, false, s.Get(999).IsAppendable()) blkInfo := DecodeBlockInfo(bs[999*BlockInfoSize:]) - require.Equal(t, false, blkInfo.Appendable) + require.Equal(t, false, blkInfo.IsAppendable()) } func TestBlockInfoSlice_Slice(t *testing.T) { diff --git a/pkg/objectio/object_stats.go b/pkg/objectio/object_stats.go index a19d159255eb..5a22d07a761f 100644 --- a/pkg/objectio/object_stats.go +++ b/pkg/objectio/object_stats.go @@ -23,7 +23,7 @@ import ( ) type ObjectDescriber interface { - DescribeObject() ([]ObjectStats, error) + DescribeObject() (ObjectStats, error) } const ( @@ -44,6 +44,12 @@ const ( ObjectStatsLen = reservedOffset + reservedLen ) +const ( + AppendableFlag = 0x1 + SortedFlag = 0x2 + CNCreatedFlag = 0x4 +) + var ZeroObjectStats ObjectStats // ObjectStats has format: @@ -52,6 +58,26 @@ var ZeroObjectStats ObjectStats // +------------------------------------------------------------------------------------------------+--------+ type ObjectStats [ObjectStatsLen]byte +type ObjectStatsOptions func(*ObjectStats) + +func WithCNCreated() ObjectStatsOptions { + return func(o *ObjectStats) { + o[reservedOffset] |= CNCreatedFlag + } +} + +func WithSorted() ObjectStatsOptions { + return func(o *ObjectStats) { + o[reservedOffset] |= SortedFlag + } +} + +func WithAppendable() ObjectStatsOptions { + return func(o *ObjectStats) { + o[reservedOffset] |= AppendableFlag + } +} + func NewObjectStats() *ObjectStats { return new(ObjectStats) } @@ -60,14 +86,17 @@ func NewObjectStatsWithObjectID(id *ObjectId, appendable, sorted, cnCreated bool stats := new(ObjectStats) SetObjectStatsObjectName(stats, BuildObjectNameWithObjectID(id)) if appendable { - stats.setAppendable() + stats[reservedOffset] = stats[reservedOffset] | AppendableFlag } + if sorted { - stats.SetSorted() + stats[reservedOffset] = stats[reservedOffset] | SortedFlag } + if cnCreated { - stats.SetCNCreated() + stats[reservedOffset] = stats[reservedOffset] | CNCreatedFlag } + return stats } func (des *ObjectStats) Marshal() []byte { @@ -78,29 +107,27 @@ func (des *ObjectStats) UnMarshal(data []byte) { copy(des[:], data) } +func (des *ObjectStats) GetFlag() int8 { + return int8(des[reservedOffset]) +} + // Clone deep copies the stats and returns its pointer func (des *ObjectStats) Clone() *ObjectStats { copied := NewObjectStats() copy(copied[:], des[:]) return copied } -func (des *ObjectStats) setAppendable() { - des[reservedOffset] = des[reservedOffset] | 0x1 -} + func (des *ObjectStats) GetAppendable() bool { - return des[reservedOffset]&0x1 != 0 -} -func (des *ObjectStats) SetSorted() { - des[reservedOffset] = des[reservedOffset] | 0x2 + return des[reservedOffset]&AppendableFlag != 0 } + func (des *ObjectStats) GetSorted() bool { - return des[reservedOffset]&0x2 != 0 -} -func (des *ObjectStats) SetCNCreated() { - des[reservedOffset] = des[reservedOffset] | 0x4 + return des[reservedOffset]&SortedFlag != 0 } + func (des *ObjectStats) GetCNCreated() bool { - return des[reservedOffset]&0x4 != 0 + return des[reservedOffset]&CNCreatedFlag != 0 } func (des *ObjectStats) IsZero() bool { return bytes.Equal(des[:], ZeroObjectStats[:]) diff --git a/pkg/objectio/object_stats_test.go b/pkg/objectio/object_stats_test.go index 5a6a5b8daa12..99130d73d088 100644 --- a/pkg/objectio/object_stats_test.go +++ b/pkg/objectio/object_stats_test.go @@ -96,3 +96,20 @@ func TestObjectStats_Marshal_UnMarshal(t *testing.T) { require.True(t, bytes.Equal(stats.Marshal(), rawBytes)) fmt.Println(stats.String()) } + +func TestObjectStatsOptions(t *testing.T) { + stats := NewObjectStats() + require.True(t, stats.IsZero()) + require.False(t, stats.GetAppendable()) + require.False(t, stats.GetCNCreated()) + require.False(t, stats.GetSorted()) + + WithCNCreated()(stats) + require.True(t, stats.GetCNCreated()) + + WithSorted()(stats) + require.True(t, stats.GetSorted()) + + WithAppendable()(stats) + require.True(t, stats.GetAppendable()) +} diff --git a/pkg/objectio/writer.go b/pkg/objectio/writer.go index b1e7864dedd5..dae18ee34b98 100644 --- a/pkg/objectio/writer.go +++ b/pkg/objectio/writer.go @@ -48,7 +48,7 @@ type objectWriterV1 struct { name ObjectName compressBuf []byte bloomFilter []byte - objStats []ObjectStats + objStats ObjectStats sortKeySeqnum uint16 appendable bool originSize uint32 @@ -132,8 +132,14 @@ func newObjectWriterV1(name ObjectName, fs fileservice.FileService, schemaVersio return writer, nil } -func (w *objectWriterV1) GetObjectStats() []ObjectStats { - return w.objStats +func (w *objectWriterV1) GetObjectStats(opts ...ObjectStatsOptions) (copied ObjectStats) { + copied = w.objStats + + for _, opt := range opts { + opt(&copied) + } + + return copied } func describeObjectHelper(w *objectWriterV1, colmeta []ColumnMeta, idx DataMetaType) ObjectStats { @@ -163,14 +169,18 @@ func describeObjectHelper(w *objectWriterV1, colmeta []ColumnMeta, idx DataMetaT // if an object only has deletes, only the tombstone object stats valid. // // if an object has both inserts and deletes, both stats are valid. -func (w *objectWriterV1) DescribeObject() ([]ObjectStats, error) { - stats := make([]ObjectStats, 2) +func (w *objectWriterV1) DescribeObject() (ObjectStats, error) { + var stats ObjectStats + if len(w.blocks[SchemaData]) != 0 { - stats[SchemaData] = describeObjectHelper(w, w.colmeta, SchemaData) + stats = describeObjectHelper(w, w.colmeta, SchemaData) } if len(w.blocks[SchemaTombstone]) != 0 { - stats[SchemaTombstone] = describeObjectHelper(w, w.tombstonesColmeta, SchemaTombstone) + if !stats.IsZero() { + panic("schema data and schema tombstone should not all be valid at the same time") + } + stats = describeObjectHelper(w, w.tombstonesColmeta, SchemaTombstone) } return stats, nil @@ -611,7 +621,7 @@ func (w *objectWriterV1) Sync(ctx context.Context, items ...WriteOptions) error } func (w *objectWriterV1) GetDataStats() ObjectStats { - return w.objStats[SchemaData] + return w.objStats } func (w *objectWriterV1) WriteWithCompress(offset uint32, buf []byte) (data []byte, extent Extent, err error) { diff --git a/pkg/objectio/writer_test.go b/pkg/objectio/writer_test.go index fc8247331c95..a3e5232287e4 100644 --- a/pkg/objectio/writer_test.go +++ b/pkg/objectio/writer_test.go @@ -105,7 +105,7 @@ func TestNewObjectWriter(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 3, len(blocks)) assert.Nil(t, objectWriter.buffer) - require.Equal(t, objectWriter.objStats[0].Size(), blocks[0].GetExtent().End()+FooterSize) + require.Equal(t, objectWriter.objStats.Size(), blocks[0].GetExtent().End()+FooterSize) objectReader, _ := NewObjectReaderWithStr(name, service) extents := make([]Extent, 3) @@ -130,7 +130,7 @@ func TestNewObjectWriter(t *testing.T) { oSize += meta.BlockHeader().ZoneMapArea().OriginSize() // 24 is the size of empty bf and zm oSize += HeaderSize + FooterSize + 24 + extents[0].OriginSize() - require.Equal(t, objectWriter.objStats[0].OriginSize(), oSize) + require.Equal(t, objectWriter.objStats.OriginSize(), oSize) assert.Equal(t, uint32(3), meta.BlockCount()) assert.True(t, meta.BlockHeader().Appendable()) assert.Equal(t, uint16(math.MaxUint16), meta.BlockHeader().SortKey()) diff --git a/pkg/sql/colexec/mergeblock/mergeblock_test.go b/pkg/sql/colexec/mergeblock/mergeblock_test.go index 92867a2d2c68..49052d07648f 100644 --- a/pkg/sql/colexec/mergeblock/mergeblock_test.go +++ b/pkg/sql/colexec/mergeblock/mergeblock_test.go @@ -63,7 +63,7 @@ func TestMergeBlock(t *testing.T) { loc1.Name().Num(), loc1.ID()), //non-appendable block - Appendable: false, + //Appendable: false, } blkInfo1.SetMetaLocation(loc1) @@ -74,7 +74,7 @@ func TestMergeBlock(t *testing.T) { loc2.Name().Num(), loc2.ID()), //non-appendable block - Appendable: false, + //Appendable: false, } blkInfo2.SetMetaLocation(loc2) @@ -85,7 +85,7 @@ func TestMergeBlock(t *testing.T) { loc3.Name().Num(), loc3.ID()), //non-appendable block - Appendable: false, + //Appendable: false, } blkInfo3.SetMetaLocation(loc3) diff --git a/pkg/sql/colexec/s3util.go b/pkg/sql/colexec/s3util.go index e28958ce3c4e..0208434cadc8 100644 --- a/pkg/sql/colexec/s3util.go +++ b/pkg/sql/colexec/s3util.go @@ -547,18 +547,12 @@ func (w *S3Writer) sync(proc *process.Process) ([]objectio.BlockInfo, objectio.O ) } - stats := w.writer.GetObjectStats() - - var i = -1 - for i = range stats { - if !stats[i].IsZero() { - stats[i].SetCNCreated() - if w.sortIndex != -1 { - stats[i].SetSorted() - } - break - } + var stats objectio.ObjectStats + if w.sortIndex != -1 { + stats = w.writer.GetObjectStats(objectio.WithCNCreated(), objectio.WithSorted()) + } else { + stats = w.writer.GetObjectStats(objectio.WithCNCreated()) } - return blkInfos, stats[i], err + return blkInfos, stats, err } diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 2a875b0c0035..235bd6779ad9 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3856,7 +3856,7 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e ctx, blk.BlockID, fs, ); err2 != nil { return false, err2 - } else if blk.Appendable || hasTombstone { + } else if blk.IsAppendable() || hasTombstone { newRelData.AppendBlockInfo(blk) return true, nil } diff --git a/pkg/vm/engine/disttae/datasource.go b/pkg/vm/engine/disttae/datasource.go index 3a2976577f1d..f1a4d34935af 100644 --- a/pkg/vm/engine/disttae/datasource.go +++ b/pkg/vm/engine/disttae/datasource.go @@ -607,7 +607,8 @@ func (ls *LocalDataSource) filterInMemUnCommittedInserts( offsets := rowIdsToOffset(retainedRowIds, int64(0)).([]int64) b, _ := retainedRowIds[0].Decode() - sels, err := ls.ApplyTombstones(ls.ctx, b, offsets, engine.Policy_CheckUnCommittedOnly) + sels, err := ls.ApplyTombstones( + ls.ctx, b, offsets, engine.Policy_CheckUnCommittedOnly) if err != nil { return err } @@ -1153,9 +1154,8 @@ func (ls *LocalDataSource) batchApplyTombstoneObjects( bfIndex index.StaticFilter location objectio.Location - loaded *batch.Batch - persistedByCN bool - release func() + loaded *batch.Batch + release func() ) anyIf := func(check func(row types.Rowid) bool) bool { @@ -1207,7 +1207,7 @@ func (ls *LocalDataSource) batchApplyTombstoneObjects( location = obj.ObjectStats.BlockLocation(uint16(idx), objectio.BlockMaxRows) - if loaded, persistedByCN, release, err = blockio.ReadBlockDelete(ls.ctx, location, ls.fs); err != nil { + if loaded, release, err = blockio.ReadDeletes(ls.ctx, location, ls.fs, obj.GetCNCreated()); err != nil { return nil, err } @@ -1215,7 +1215,7 @@ func (ls *LocalDataSource) batchApplyTombstoneObjects( var commit []types.TS deletedRowIds = vector.MustFixedColWithTypeCheck[types.Rowid](loaded.Vecs[0]) - if !persistedByCN { + if !obj.GetCNCreated() { commit = vector.MustFixedColWithTypeCheck[types.TS](loaded.Vecs[1]) } diff --git a/pkg/vm/engine/disttae/datasource_test.go b/pkg/vm/engine/disttae/datasource_test.go index 190974fbdd9f..19ec13041c50 100644 --- a/pkg/vm/engine/disttae/datasource_test.go +++ b/pkg/vm/engine/disttae/datasource_test.go @@ -240,11 +240,10 @@ func TestRelationDataV2_MarshalAndUnMarshal(t *testing.T) { blkID := types.NewBlockidWithObjectID(&objID, uint16(blkNum)) blkInfo := objectio.BlockInfo{ BlockID: *blkID, - Appendable: true, - Sorted: false, MetaLoc: metaLoc, PartitionNum: int16(i), } + blkInfo.StateFlag |= objectio.AppendableFlag relData.AppendBlockInfo(blkInfo) } diff --git a/pkg/vm/engine/disttae/logtailreplay/change_handle.go b/pkg/vm/engine/disttae/logtailreplay/change_handle.go index f5e506b7e224..c7aa6a7fd312 100755 --- a/pkg/vm/engine/disttae/logtailreplay/change_handle.go +++ b/pkg/vm/engine/disttae/logtailreplay/change_handle.go @@ -402,7 +402,7 @@ func sortBatch(bat *batch.Batch, sortIdx int, mp *mpool.MPool) error { } func checkObjectEntry(entry *ObjectEntry, start, end types.TS) bool { - if entry.Appendable { + if entry.GetAppendable() { if entry.CreateTime.Greater(&end) { return false } diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state.go b/pkg/vm/engine/disttae/logtailreplay/partition_state.go index 34fc5e28342b..5b1ac26f95ef 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state.go @@ -139,10 +139,8 @@ func (p *PartitionState) HandleDataObjectList( continue } - objEntry.Appendable = objEntry.ObjectStats.GetAppendable() objEntry.CreateTime = createTSCol[idx] objEntry.DeleteTime = deleteTSCol[idx] - objEntry.Sorted = objEntry.ObjectStats.GetSorted() old, exist := p.dataObjects.Get(objEntry) if exist { @@ -163,7 +161,7 @@ func (p *PartitionState) HandleDataObjectList( Time: createTSCol[idx], ShortObjName: *objEntry.ObjectShortName(), IsDelete: false, - IsAppendable: objEntry.Appendable, + IsAppendable: objEntry.GetAppendable(), } p.objectIndexByTS.Set(e) } @@ -176,12 +174,12 @@ func (p *PartitionState) HandleDataObjectList( Time: deleteTSCol[idx], IsDelete: true, ShortObjName: *objEntry.ObjectShortName(), - IsAppendable: objEntry.Appendable, + IsAppendable: objEntry.GetAppendable(), } p.objectIndexByTS.Set(e) } - if objEntry.Appendable && objEntry.DeleteTime.IsEmpty() { + if objEntry.GetAppendable() && objEntry.DeleteTime.IsEmpty() { panic("logic error") } @@ -211,7 +209,7 @@ func (p *PartitionState) HandleDataObjectList( // if the inserting block is appendable, need to delete the rows for it; // if the inserting block is non-appendable and has delta location, need to delete // the deletes for it. - if objEntry.Appendable { + if objEntry.GetAppendable() { if entry.Time.LessEq(&trunctPoint) { // delete the row p.rows.Delete(entry) @@ -297,10 +295,8 @@ func (p *PartitionState) HandleTombstoneObjectList( continue } - objEntry.Appendable = objEntry.ObjectStats.GetAppendable() objEntry.CreateTime = createTSCol[idx] objEntry.DeleteTime = deleteTSCol[idx] - objEntry.Sorted = objEntry.ObjectStats.GetSorted() old, exist := p.tombstoneObjects.Get(objEntry) if exist { @@ -320,12 +316,12 @@ func (p *PartitionState) HandleTombstoneObjectList( p.tombstoneObjects.Set(objEntry) - if objEntry.Appendable && objEntry.DeleteTime.IsEmpty() { + if objEntry.GetAppendable() && objEntry.DeleteTime.IsEmpty() { panic("logic error") } // for appendable object, gc rows when delete object - if !objEntry.Appendable { + if !objEntry.GetAppendable() { continue } diff --git a/pkg/vm/engine/disttae/logtailreplay/types.go b/pkg/vm/engine/disttae/logtailreplay/types.go index b1d1c952e058..31588f33b64a 100644 --- a/pkg/vm/engine/disttae/logtailreplay/types.go +++ b/pkg/vm/engine/disttae/logtailreplay/types.go @@ -26,9 +26,6 @@ import ( type ObjectInfo struct { objectio.ObjectStats - - Appendable bool - Sorted bool CreateTime types.TS DeleteTime types.TS } @@ -36,8 +33,11 @@ type ObjectInfo struct { func (o ObjectInfo) String() string { return fmt.Sprintf( "%s; appendable: %v; sorted: %v; createTS: %s; deleteTS: %s", - o.ObjectStats.String(), o.Appendable, o.Sorted, - o.CreateTime.ToString(), o.DeleteTime.ToString()) + o.ObjectStats.String(), + o.ObjectStats.GetAppendable(), + o.ObjectStats.GetSorted(), + o.CreateTime.ToString(), + o.DeleteTime.ToString()) } func (o ObjectInfo) Location() objectio.Location { diff --git a/pkg/vm/engine/disttae/merge.go b/pkg/vm/engine/disttae/merge.go index f21f520d0a40..2052dfc20f44 100644 --- a/pkg/vm/engine/disttae/merge.go +++ b/pkg/vm/engine/disttae/merge.go @@ -183,8 +183,7 @@ func (t *cnMergeTask) LoadNextBatch(ctx context.Context, objIdx uint32) (*batch. blk := iter.Entry() // update delta location obj := t.targets[objIdx] - blk.Sorted = obj.Sorted - blk.Appendable = obj.Appendable + blk.SetFlagByObjStats(obj.ObjectStats) return t.readblock(ctx, &blk) } return nil, nil, nil, mergesort.ErrNoMoreBlocks diff --git a/pkg/vm/engine/disttae/reader.go b/pkg/vm/engine/disttae/reader.go index 2a72fb39fcad..72806c9bbb6f 100644 --- a/pkg/vm/engine/disttae/reader.go +++ b/pkg/vm/engine/disttae/reader.go @@ -397,7 +397,7 @@ func (r *reader) Read( bat.SetAttributes(cols) - if blkInfo.Sorted && r.columns.indexOfFirstSortedColumn != -1 { + if blkInfo.IsSorted() && r.columns.indexOfFirstSortedColumn != -1 { bat.GetVector(int32(r.columns.indexOfFirstSortedColumn)).SetSorted(true) } diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 2d9adf884b4e..6c43ae57c8e2 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -815,8 +815,7 @@ func (tbl *txnTable) rangesOnePart( } } - blk.Sorted = obj.Sorted - blk.Appendable = obj.Appendable + blk.SetFlagByObjStats(obj.ObjectStats) outBlocks.AppendBlockInfo(blk) @@ -1890,8 +1889,8 @@ func (tbl *txnTable) PKPersistedBetween( } } - blk.Sorted = obj.Sorted - blk.Appendable = obj.Appendable + blk.SetFlagByObjStats(obj.ObjectStats) + blk.PartitionNum = -1 candidateBlks[blk.BlockID] = &blk return true @@ -1952,7 +1951,7 @@ func (tbl *txnTable) PKPersistedBetween( } defer release() - if !blk.Sorted { + if !blk.IsSorted() { if unsortedFilter == nil { unsortedFilter = buildUnsortedFilter() } @@ -2095,7 +2094,7 @@ func (tbl *txnTable) GetNonAppendableObjectStats(ctx context.Context) ([]objecti objStats := make([]objectio.ObjectStats, 0, tbl.ApproxObjectsNum(ctx)) err = ForeachVisibleDataObject(state, snapshot, func(obj logtailreplay.ObjectEntry) error { - if obj.Appendable { + if obj.GetAppendable() { return nil } if sortKeyPos != -1 { diff --git a/pkg/vm/engine/engine_util/exec_util.go b/pkg/vm/engine/engine_util/exec_util.go index 40e71c18471a..2dd8b8d194c8 100644 --- a/pkg/vm/engine/engine_util/exec_util.go +++ b/pkg/vm/engine/engine_util/exec_util.go @@ -163,8 +163,7 @@ func FilterObjects( MetaLoc: objectio.ObjectLocation(loc), } - blk.Sorted = objStats.GetSorted() - blk.Appendable = objStats.GetAppendable() + blk.SetFlagByObjStats(objStats) outBlocks.AppendBlockInfo(blk) } diff --git a/pkg/vm/engine/tae/blockio/read.go b/pkg/vm/engine/tae/blockio/read.go index d2d77717ff27..d3c628b1f93b 100644 --- a/pkg/vm/engine/tae/blockio/read.go +++ b/pkg/vm/engine/tae/blockio/read.go @@ -179,9 +179,9 @@ func BlockDataRead( ) var searchFunc objectio.ReadFilterSearchFuncType - if (filter.HasFakePK || !info.Sorted) && filter.UnSortedSearchFunc != nil { + if (filter.HasFakePK || !info.IsSorted()) && filter.UnSortedSearchFunc != nil { searchFunc = filter.UnSortedSearchFunc - } else if info.Sorted && filter.SortedSearchFunc != nil { + } else if info.IsSorted() && filter.SortedSearchFunc != nil { searchFunc = filter.SortedSearchFunc } @@ -536,7 +536,7 @@ func readBlockData( return } - if info.Appendable { + if info.IsAppendable() { bat, deleteMask, err = readABlkColumns(idxes) } else { bat, _, err = readColumns(idxes) @@ -545,20 +545,13 @@ func readBlockData( return } -func ReadBlockDelete( - ctx context.Context, deltaloc objectio.Location, fs fileservice.FileService, -) (bat *batch.Batch, isPersistedByCN bool, release func(), err error) { - isPersistedByCN, err = IsPersistedByCN(ctx, deltaloc, fs) - if err != nil { - return - } - bat, release, err = ReadBlockDeleteBySchema(ctx, deltaloc, fs, isPersistedByCN) - return -} - -func ReadBlockDeleteBySchema( - ctx context.Context, deltaloc objectio.Location, fs fileservice.FileService, isPersistedByCN bool, +func ReadDeletes( + ctx context.Context, + deltaLoc objectio.Location, + fs fileservice.FileService, + isPersistedByCN bool, ) (bat *batch.Batch, release func(), err error) { + var cols []uint16 var typs []types.Type @@ -569,26 +562,10 @@ func ReadBlockDeleteBySchema( cols = []uint16{0, objectio.SEQNUM_COMMITTS} typs = []types.Type{types.T_Rowid.ToType(), types.T_TS.ToType()} } - bat, release, err = LoadTombstoneColumns(ctx, cols, typs, fs, deltaloc, nil, fileservice.Policy(0)) + bat, release, err = LoadTombstoneColumns(ctx, cols, typs, fs, deltaLoc, nil, fileservice.Policy(0)) return } -func IsPersistedByCN( - ctx context.Context, deltaloc objectio.Location, fs fileservice.FileService, -) (bool, error) { - objectMeta, err := objectio.FastLoadObjectMeta(ctx, &deltaloc, false, fs) - if err != nil { - return false, err - } - meta, ok := objectMeta.TombstoneMeta() - if !ok { - meta = objectMeta.MustDataMeta() - } - blkmeta := meta.GetBlockMeta(uint32(deltaloc.ID())) - columnCount := blkmeta.GetColumnCount() - return columnCount == 2, nil -} - func EvalDeleteRowsByTimestamp( deletes *batch.Batch, ts types.TS, blockid *types.Blockid, ) (rows *nulls.Bitmap) { @@ -702,31 +679,24 @@ func FillBlockDeleteMask( blockId types.Blockid, location objectio.Location, fs fileservice.FileService, + createdByCN bool, ) (deleteMask *nulls.Nulls, err error) { var ( - rows *nulls.Nulls - //bisect time.Duration + rows *nulls.Nulls release func() - persistedByCN bool persistedDeletes *batch.Batch ) if !location.IsEmpty() { - //t1 := time.Now() - - if persistedDeletes, persistedByCN, release, err = ReadBlockDelete(ctx, location, fs); err != nil { + if persistedDeletes, release, err = ReadDeletes(ctx, location, fs, createdByCN); err != nil { return nil, err } defer release() - //readCost := time.Since(t1) - - if persistedByCN { + if createdByCN { rows = EvalDeleteRowsByTimestampForDeletesPersistedByCN(blockId, persistedDeletes) } else { - //t2 := time.Now() rows = EvalDeleteRowsByTimestamp(persistedDeletes, snapshotTS, &blockId) - //bisect = time.Since(t2) } if rows != nil { diff --git a/pkg/vm/engine/tae/blockio/utils.go b/pkg/vm/engine/tae/blockio/utils.go index 0d12c78749b8..78b106baa208 100644 --- a/pkg/vm/engine/tae/blockio/utils.go +++ b/pkg/vm/engine/tae/blockio/utils.go @@ -38,7 +38,7 @@ func GetTombstonesByBlockId( onBlockSelectedFn := func(tombstoneObject *objectio.ObjectStats, pos int) (bool, error) { location := tombstoneObject.BlockLocation(uint16(pos), objectio.BlockMaxRows) if mask, err := FillBlockDeleteMask( - ctx, ts, blockId, location, fs, + ctx, ts, blockId, location, fs, tombstoneObject.GetCNCreated(), ); err != nil { return false, err } else { diff --git a/pkg/vm/engine/tae/blockio/writer.go b/pkg/vm/engine/tae/blockio/writer.go index 3d02b1a4bdac..aecdc15774bf 100644 --- a/pkg/vm/engine/tae/blockio/writer.go +++ b/pkg/vm/engine/tae/blockio/writer.go @@ -38,7 +38,6 @@ type BlockWriter struct { sortKeyIdx uint16 nameStr string name objectio.ObjectName - objectStats []objectio.ObjectStats prefix []index.PrefixFn // schema data @@ -101,8 +100,8 @@ func (w *BlockWriter) SetAppendable() { w.writer.SetAppendable() } -func (w *BlockWriter) GetObjectStats() []objectio.ObjectStats { - return w.objectStats +func (w *BlockWriter) GetObjectStats(opts ...objectio.ObjectStatsOptions) objectio.ObjectStats { + return w.writer.GetObjectStats(opts...) } // WriteBatch write a batch whose schema is decribed by seqnum in NewBlockWriterNew @@ -204,8 +203,6 @@ func (w *BlockWriter) Sync(ctx context.Context) ([]objectio.BlockObject, objecti return blocks, objectio.Extent{}, err } - w.objectStats = w.writer.GetObjectStats() - logutil.Debug("[WriteEnd]", common.OperationField(w.String(blocks)), common.OperandField(w.writer.GetSeqnums()), diff --git a/pkg/vm/engine/tae/db/gc/v2/table.go b/pkg/vm/engine/tae/db/gc/v2/table.go index 1dba1562537d..81cb2130773a 100644 --- a/pkg/vm/engine/tae/db/gc/v2/table.go +++ b/pkg/vm/engine/tae/db/gc/v2/table.go @@ -293,7 +293,8 @@ func (t *GCTable) SaveFullTable(start, end types.TS, fs *objectio.ObjectFS, file objectCount := 0 tombstoneCount := 0 if len(blocks) > 0 && err == nil { - size = writer.GetObjectStats()[0].OriginSize() + ss := writer.GetObjectStats() + size = ss.OriginSize() } if bats != nil { objectCount = bats[ObjectList].Length() diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 42d9ac435191..3421b0eb1f57 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -4197,8 +4197,7 @@ func TestBlockRead(t *testing.T) { bid, _ := blkEntry.ID(), blkEntry.ID() info := &objectio.BlockInfo{ - BlockID: *objectio.NewBlockidWithObjectID(bid, 0), - Appendable: false, // TODO: jxm + BlockID: *objectio.NewBlockidWithObjectID(bid, 0), } metaloc := objectEntry.ObjectLocation() metaloc.SetRows(schema.BlockMaxRows) @@ -4275,7 +4274,7 @@ func TestBlockRead(t *testing.T) { assert.Equal(t, 16, b4.Vecs[0].Length()) // read rowid column only - info.Appendable = false + //info.Appendable = false b5 := buildBatch([]types.Type{types.T_Rowid.ToType()}) err = blockio.BlockDataReadInner( context.Background(), "", info, @@ -4997,7 +4996,8 @@ func TestMergeMemsize(t *testing.T) { assert.Nil(t, err) assert.Equal(t, batCnt, len(blocks)) statsVec := containers.MakeVector(types.T_varchar.ToType(), common.DefaultAllocator) - statsVec.Append(writer.GetObjectStats()[objectio.SchemaData][:], false) + ss := writer.GetObjectStats() + statsVec.Append(ss[:], false) { txn, _ := tae.StartTxn(nil) txn.SetDedupType(txnif.DedupPolicy_CheckIncremental) @@ -5067,7 +5067,8 @@ func TestCollectDeletesAfterCKP(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 1, len(blocks)) statsVec := containers.MakeVector(types.T_varchar.ToType(), common.DefaultAllocator) - statsVec.Append(writer.GetObjectStats()[objectio.SchemaData][:], false) + ss := writer.GetObjectStats() + statsVec.Append(ss[:], false) defer statsVec.Close() { txn, _ := tae.StartTxn(nil) @@ -5169,7 +5170,8 @@ func TestAlwaysUpdate(t *testing.T) { blocks, _, err := writer.Sync(context.Background()) assert.Nil(t, err) assert.Equal(t, 25, len(blocks)) - statsVec.Append(writer.GetObjectStats()[objectio.SchemaData][:], false) + ss := writer.GetObjectStats() + statsVec.Append(ss[:], false) } // var did, tid uint64 @@ -7572,7 +7574,8 @@ func TestCommitS3Blocks(t *testing.T) { assert.Equal(t, 50, len(blocks)) statsVec := containers.MakeVector(types.T_varchar.ToType(), common.DefaultAllocator) defer statsVec.Close() - statsVec.Append(writer.GetObjectStats()[objectio.SchemaData][:], false) + ss := writer.GetObjectStats() + statsVec.Append(ss[:], false) statsVecs = append(statsVecs, statsVec) } @@ -7646,7 +7649,8 @@ func TestDedupSnapshot2(t *testing.T) { assert.Equal(t, 1, len(blocks)) statsVec := containers.MakeVector(types.T_varchar.ToType(), common.DefaultAllocator) defer statsVec.Close() - statsVec.Append(writer.GetObjectStats()[objectio.SchemaData][:], false) + ss := writer.GetObjectStats() + statsVec.Append(ss[:], false) name2 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) writer, err = blockio.NewBlockWriterNew(tae.Runtime.Fs.Service, name2, 0, nil) @@ -7659,7 +7663,8 @@ func TestDedupSnapshot2(t *testing.T) { assert.Equal(t, 1, len(blocks)) statsVec2 := containers.MakeVector(types.T_varchar.ToType(), common.DefaultAllocator) defer statsVec2.Close() - statsVec2.Append(writer.GetObjectStats()[objectio.SchemaData][:], false) + ss = writer.GetObjectStats() + statsVec2.Append(ss[:], false) txn, rel := tae.GetRelation() err = rel.AddObjsWithMetaLoc(context.Background(), statsVec) @@ -7821,7 +7826,8 @@ func TestDeduplication(t *testing.T) { statsVec := containers.MakeVector(types.T_varchar.ToType(), common.DefaultAllocator) defer statsVec.Close() - statsVec.Append(writer.GetObjectStats()[objectio.SchemaData][:], false) + ss := writer.GetObjectStats() + statsVec.Append(ss[:], false) txn, rel := tae.GetRelation() err = rel.AddObjsWithMetaLoc(context.Background(), statsVec) diff --git a/pkg/vm/engine/tae/db/testutil/funcs.go b/pkg/vm/engine/tae/db/testutil/funcs.go index 39af7c1b051c..bf1bed69801d 100644 --- a/pkg/vm/engine/tae/db/testutil/funcs.go +++ b/pkg/vm/engine/tae/db/testutil/funcs.go @@ -418,8 +418,7 @@ func MockCNDeleteInS3( _, _, err = writer.Sync(context.Background()) //location = blockio.EncodeLocation(name, blks[0].GetExtent(), uint32(bat.Length()), blks[0].GetID()) - stats = writer.GetObjectStats()[0] - stats.SetCNCreated() + stats = writer.GetObjectStats(objectio.WithCNCreated()) return } diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index af47e3288169..513b7173f19e 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -550,7 +550,8 @@ func ReWriteCheckpointAndBlockFromKey( } files = append(files, name.String()) blockLocation := objectio.BuildLocation(name, extent, blocks[0].GetRows(), blocks[0].GetID()) - objectData.stats = &writer.GetObjectStats()[objectio.SchemaData] + ss := writer.GetObjectStats() + objectData.stats = &ss objectio.SetObjectStatsLocation(objectData.stats, blockLocation) insertObjBatch[objectData.tid] = append(insertObjBatch[objectData.tid], objectData) } @@ -655,7 +656,7 @@ func ReWriteCheckpointAndBlockFromKey( } else { appendValToBatch(oldMeta, newMeta, i) row := newMeta.Length() - 1 - insertObjData[i].stats.SetSorted() + objectio.WithSorted()(insertObjData[i].stats) newMeta.GetVectorByName(ObjectAttr_ObjectStats).Update(row, insertObjData[i].stats[:], false) newMeta.GetVectorByName(EntryNode_DeleteAt).Update(row, types.TS{}, false) } diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index b53ba495e925..b5925a415434 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -227,7 +227,7 @@ func (d *DeltaLocDataSource) getAndApplyTombstones( return nil, nil } logutil.Infof("deltaLoc: %v, id is %d", deltaLoc.String(), bid.Sequence()) - deletes, _, release, err := blockio.ReadBlockDelete(ctx, deltaLoc, d.fs) + deletes, release, err := blockio.ReadDeletes(ctx, deltaLoc, d.fs, false) if err != nil { return nil, err } @@ -597,7 +597,8 @@ func (sm *SnapshotMeta) SaveMeta(name string, fs fileservice.FileService) (uint3 if err != nil { return 0, err } - size := writer.GetObjectStats()[0].OriginSize() + ss := writer.GetObjectStats() + size := ss.OriginSize() return size, err } @@ -665,7 +666,8 @@ func (sm *SnapshotMeta) SaveTableInfo(name string, fs fileservice.FileService) ( if err != nil { return 0, err } - size := writer.GetObjectStats()[0].OriginSize() + ss := writer.GetObjectStats() + size := ss.OriginSize() return size, err } diff --git a/pkg/vm/engine/tae/mergesort/merger.go b/pkg/vm/engine/tae/mergesort/merger.go index e742dbe6d7fc..6d866a042628 100644 --- a/pkg/vm/engine/tae/mergesort/merger.go +++ b/pkg/vm/engine/tae/mergesort/merger.go @@ -296,11 +296,9 @@ func (m *merger[T]) syncObject(ctx context.Context) error { if _, _, err := m.writer.Sync(ctx); err != nil { return err } - cobjstats := m.writer.GetObjectStats()[:objectio.SchemaTombstone] + cobjstats := m.writer.GetObjectStats() commitEntry := m.host.GetCommitEntry() - for _, cobj := range cobjstats { - commitEntry.CreatedObjs = append(commitEntry.CreatedObjs, cobj.Clone().Marshal()) - } + commitEntry.CreatedObjs = append(commitEntry.CreatedObjs, cobjstats.Clone().Marshal()) m.writer = nil return nil } diff --git a/pkg/vm/engine/tae/mergesort/reshaper.go b/pkg/vm/engine/tae/mergesort/reshaper.go index 89a51ef97901..dbebca1d5be5 100644 --- a/pkg/vm/engine/tae/mergesort/reshaper.go +++ b/pkg/vm/engine/tae/mergesort/reshaper.go @@ -19,7 +19,6 @@ import ( "errors" "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" ) @@ -146,9 +145,7 @@ func syncObject(ctx context.Context, writer *blockio.BlockWriter, commitEntry *a if _, _, err := writer.Sync(ctx); err != nil { return err } - cobjstats := writer.GetObjectStats()[:objectio.SchemaTombstone] - for _, cobj := range cobjstats { - commitEntry.CreatedObjs = append(commitEntry.CreatedObjs, cobj.Clone().Marshal()) - } + cobjstats := writer.GetObjectStats() + commitEntry.CreatedObjs = append(commitEntry.CreatedObjs, cobjstats.Clone().Marshal()) return nil } diff --git a/pkg/vm/engine/tae/rpc/rpc_test.go b/pkg/vm/engine/tae/rpc/rpc_test.go index f324b4d60184..fd8f49c9c139 100644 --- a/pkg/vm/engine/tae/rpc/rpc_test.go +++ b/pkg/vm/engine/tae/rpc/rpc_test.go @@ -112,7 +112,7 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) { blk.GetID()) assert.Nil(t, err) blkMetas = append(blkMetas, metaLoc.String()) - stats = append(stats, writer.GetObjectStats()[objectio.SchemaData]) + stats = append(stats, writer.GetObjectStats(objectio.WithCNCreated())) } } @@ -182,7 +182,7 @@ func TestHandle_HandleCommitPerformanceForS3Load(t *testing.T) { metaLocBat := containers.BuildBatch(attrs, vecTypes, vecOpts) for i := 0; i < 50; i++ { metaLocBat.Vecs[0].Append([]byte(blkMetas[offset+i]), false) - stats[offset+i].SetCNCreated() + //stats[offset+i].SetCNCreated() metaLocBat.Vecs[1].Append([]byte(stats[offset+i][:]), false) } offset += 50 @@ -278,7 +278,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) { blocks[1].GetID(), ).String() assert.Nil(t, err) - stats1 := writer.GetObjectStats()[objectio.SchemaData] + stats1 := writer.GetObjectStats(objectio.WithCNCreated()) //write taeBats[3] into file service objName2 := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) @@ -296,7 +296,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) { uint32(taeBats[3].Vecs[0].Length()), blocks[0].GetID(), ).String() - stats3 := writer.GetObjectStats()[objectio.SchemaData] + stats3 := writer.GetObjectStats(objectio.WithCNCreated()) assert.Nil(t, err) //create db; @@ -395,7 +395,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) { metaLocBat1 := containers.BuildBatch(attrs, vecTypes, vecOpts) metaLocBat1.Vecs[0].Append([]byte(metaLoc1), false) metaLocBat1.Vecs[0].Append([]byte(metaLoc2), false) - stats1.SetCNCreated() + //stats1.SetCNCreated() metaLocBat1.Vecs[1].Append([]byte(stats1[:]), false) metaLocBat1.Vecs[1].Append([]byte(stats1[:]), false) metaLocMoBat1 := containers.ToCNBatch(metaLocBat1) @@ -411,7 +411,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) { //add one non-appendable block from S3 into "tbtest" table metaLocBat2 := containers.BuildBatch(attrs, vecTypes, vecOpts) metaLocBat2.Vecs[0].Append([]byte(metaLoc3), false) - stats3.SetCNCreated() + //stats3.SetCNCreated() metaLocBat2.Vecs[1].Append([]byte(stats3[:]), false) metaLocMoBat2 := containers.ToCNBatch(metaLocBat2) addS3BlkEntry2, err := makePBEntry(INSERT, dbTestID, @@ -499,7 +499,7 @@ func TestHandle_HandlePreCommitWriteS3(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(physicals), len(blocks)) - stats := writer.GetObjectStats()[0] + stats := writer.GetObjectStats() require.False(t, stats.IsZero()) //prepare delete locations. diff --git a/pkg/vm/engine/tae/tables/jobs/flushobj.go b/pkg/vm/engine/tae/tables/jobs/flushobj.go index 369100701790..21b7d7dc4432 100644 --- a/pkg/vm/engine/tae/tables/jobs/flushobj.go +++ b/pkg/vm/engine/tae/tables/jobs/flushobj.go @@ -160,7 +160,7 @@ func (task *flushObjTask) Execute(ctx context.Context) (err error) { common.AnyField("delete-rows", drow), ) } - task.Stats = writer.GetObjectStats()[objectio.SchemaData] + task.Stats = writer.GetObjectStats() perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { counter.TAE.Block.Flush.Add(1) diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index bfbcd54d26d4..c31e63205699 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -886,12 +886,12 @@ func TestObjectStats1(t *testing.T) { obj := iter.Entry() objCount++ if bytes.Equal(obj.ObjectInfo.ObjectStats.ObjectName().ObjectId()[:], appendableObjectID[:]) { - assert.True(t, obj.Appendable) - assert.False(t, obj.Sorted) + assert.True(t, obj.GetAppendable()) + assert.False(t, obj.GetSorted()) assert.False(t, obj.ObjectStats.GetCNCreated()) } else { - assert.False(t, obj.Appendable) - assert.True(t, obj.Sorted) + assert.False(t, obj.GetAppendable()) + assert.True(t, obj.GetSorted()) assert.False(t, obj.ObjectStats.GetCNCreated()) } } @@ -904,10 +904,10 @@ func TestObjectStats1(t *testing.T) { for iter.Next() { obj := iter.Entry() objCount++ - if obj.Appendable { + if obj.GetAppendable() { appendableCount++ } - assert.True(t, obj.Sorted) + assert.True(t, obj.GetSorted()) assert.False(t, obj.ObjectStats.GetCNCreated()) } assert.Equal(t, appendableCount, 1) @@ -925,8 +925,8 @@ func TestObjectStats1(t *testing.T) { for iter.Next() { obj := iter.Entry() objCount++ - assert.False(t, obj.Appendable) - assert.True(t, obj.Sorted) + assert.False(t, obj.GetAppendable()) + assert.True(t, obj.GetSorted()) assert.False(t, obj.ObjectStats.GetCNCreated()) } assert.Equal(t, objCount, 1) @@ -973,12 +973,12 @@ func TestObjectStats2(t *testing.T) { obj := iter.Entry() objCount++ if bytes.Equal(obj.ObjectInfo.ObjectStats.ObjectName().ObjectId()[:], appendableObjectID[:]) { - assert.True(t, obj.Appendable) - assert.False(t, obj.Sorted) + assert.True(t, obj.GetAppendable()) + assert.False(t, obj.GetSorted()) assert.False(t, obj.ObjectStats.GetCNCreated()) } else { - assert.False(t, obj.Appendable) - assert.False(t, obj.Sorted) + assert.False(t, obj.GetAppendable()) + assert.False(t, obj.GetSorted()) assert.False(t, obj.ObjectStats.GetCNCreated()) } } @@ -991,10 +991,10 @@ func TestObjectStats2(t *testing.T) { for iter.Next() { obj := iter.Entry() objCount++ - if obj.Appendable { + if obj.GetAppendable() { appendableCount++ } - assert.True(t, obj.Sorted) + assert.True(t, obj.GetSorted()) assert.False(t, obj.ObjectStats.GetCNCreated()) } assert.Equal(t, appendableCount, 1) @@ -1012,8 +1012,8 @@ func TestObjectStats2(t *testing.T) { for iter.Next() { obj := iter.Entry() objCount++ - assert.False(t, obj.Appendable) - assert.False(t, obj.Sorted) + assert.False(t, obj.GetAppendable()) + assert.False(t, obj.GetSorted()) assert.False(t, obj.ObjectStats.GetCNCreated()) } iter.Close()