diff --git a/pkg/vm/engine/tae/common/generator.go b/pkg/objectio/const.go similarity index 61% rename from pkg/vm/engine/tae/common/generator.go rename to pkg/objectio/const.go index d06ca02c8584..64c40c05bf2d 100644 --- a/pkg/vm/engine/tae/common/generator.go +++ b/pkg/objectio/const.go @@ -12,9 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -package common +package objectio -type RowGen interface { - HasNext() bool - Next() uint32 -} +import ( + "math" + + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" +) + +const ( + BlockMaxRows = 8192 + + SEQNUM_UPPER = math.MaxUint16 - 5 // reserved 5 column for special committs、committs etc. + SEQNUM_ROWID = math.MaxUint16 + SEQNUM_ABORT = math.MaxUint16 - 1 + SEQNUM_COMMITTS = math.MaxUint16 - 2 +) + +const ZoneMapSize = index.ZMSize diff --git a/pkg/objectio/types.go b/pkg/objectio/types.go index 4cecec91015c..cce2456fb39a 100644 --- a/pkg/objectio/types.go +++ b/pkg/objectio/types.go @@ -16,7 +16,6 @@ package objectio import ( "context" - "math" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" @@ -24,21 +23,12 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" ) -const ( - SEQNUM_UPPER = math.MaxUint16 - 5 // reserved 5 column for special committs、committs etc. - SEQNUM_ROWID = math.MaxUint16 - SEQNUM_ABORT = math.MaxUint16 - 1 - SEQNUM_COMMITTS = math.MaxUint16 - 2 -) - type WriteType int8 const ( WriteTS WriteType = iota ) -const ZoneMapSize = index.ZMSize - type ZoneMap = index.ZM type StaticFilter = index.StaticFilter diff --git a/pkg/vm/engine/disttae/datasource.go b/pkg/vm/engine/disttae/datasource.go index e0b4fbeab23d..888b05b32623 100644 --- a/pkg/vm/engine/disttae/datasource.go +++ b/pkg/vm/engine/disttae/datasource.go @@ -36,7 +36,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -586,7 +585,7 @@ func (ls *LocalDataSource) filterInMemUnCommittedInserts( rows := 0 writes := ls.table.getTxn().writes - maxRows := int(options.DefaultBlockMaxRows) + maxRows := objectio.BlockMaxRows if len(writes) == 0 { return nil } @@ -655,7 +654,7 @@ func (ls *LocalDataSource) filterInMemCommittedInserts( // ls.rc.SkipPStateDeletes = false //}() - if bat.RowCount() >= int(options.DefaultBlockMaxRows) { + if bat.RowCount() >= objectio.BlockMaxRows { return nil } @@ -698,9 +697,9 @@ func (ls *LocalDataSource) filterInMemCommittedInserts( applyOffset := 0 goon := true - for goon && bat.Vecs[0].Length() < int(options.DefaultBlockMaxRows) { + for goon && bat.Vecs[0].Length() < int(objectio.BlockMaxRows) { //minTS = types.MaxTs() - for bat.Vecs[0].Length() < int(options.DefaultBlockMaxRows) { + for bat.Vecs[0].Length() < int(objectio.BlockMaxRows) { if goon = ls.pStateRows.insIter.Next(); !goon { break } @@ -1211,7 +1210,7 @@ func (ls *LocalDataSource) batchApplyTombstoneObjects( return nil, err } - location = obj.ObjectStats.BlockLocation(uint16(idx), options.DefaultBlockMaxRows) + location = obj.ObjectStats.BlockLocation(uint16(idx), objectio.BlockMaxRows) if loaded, persistedByCN, release, err = blockio.ReadBlockDelete(ls.ctx, location, ls.fs); err != nil { return nil, err diff --git a/pkg/vm/engine/disttae/filter.go b/pkg/vm/engine/disttae/filter.go index 240ee062ec28..3b761bc233cb 100644 --- a/pkg/vm/engine/disttae/filter.go +++ b/pkg/vm/engine/disttae/filter.go @@ -31,7 +31,6 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -1144,9 +1143,9 @@ func ExecuteBlockFilter( var rows uint32 if objRows := objStats.Rows(); objRows != 0 { if pos < blockCnt-1 { - rows = options.DefaultBlockMaxRows + rows = objectio.BlockMaxRows } else { - rows = objRows - options.DefaultBlockMaxRows*uint32(pos) + rows = objRows - objectio.BlockMaxRows*uint32(pos) } } else { if blkMeta == nil { diff --git a/pkg/vm/engine/disttae/merge.go b/pkg/vm/engine/disttae/merge.go index 633a235eeabf..a2cedd3b2d55 100644 --- a/pkg/vm/engine/disttae/merge.go +++ b/pkg/vm/engine/disttae/merge.go @@ -159,7 +159,7 @@ func (t *cnMergeTask) GetAccBlkCnts() []int { } func (t *cnMergeTask) GetBlockMaxRows() uint32 { - return options.DefaultBlockMaxRows + return objectio.BlockMaxRows } func (t *cnMergeTask) GetObjectMaxBlocks() uint16 { diff --git a/pkg/vm/engine/disttae/tombstones.go b/pkg/vm/engine/disttae/tombstones.go index 330fc0cae159..fabec378a527 100644 --- a/pkg/vm/engine/disttae/tombstones.go +++ b/pkg/vm/engine/disttae/tombstones.go @@ -21,7 +21,6 @@ import ( "sort" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -226,7 +225,7 @@ func (tomb *tombstoneData) PrefetchTombstones( for i, end := 0, tomb.files.Len(); i < end; i++ { stats := tomb.files.Get(i) for j := 0; j < int(stats.BlkCnt()); j++ { - loc := stats.BlockLocation(uint16(j), options.DefaultBlockMaxRows) + loc := stats.BlockLocation(uint16(j), objectio.BlockMaxRows) if err := blockio.Prefetch( srvId, []uint16{0, 1, 2}, diff --git a/pkg/vm/engine/disttae/util.go b/pkg/vm/engine/disttae/util.go index 430eabc7cd52..b7f9300b2fdf 100644 --- a/pkg/vm/engine/disttae/util.go +++ b/pkg/vm/engine/disttae/util.go @@ -45,7 +45,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/process" "go.uber.org/zap" ) @@ -1435,7 +1434,7 @@ func NewStatsBlkIter(stats *objectio.ObjectStats, meta objectio.ObjectDataMeta) cur: -1, accRows: 0, totalRows: stats.Rows(), - curBlkRows: options.DefaultBlockMaxRows, + curBlkRows: objectio.BlockMaxRows, meta: meta, } } @@ -1453,7 +1452,7 @@ func (i *StatsBlkIter) Entry() objectio.BlockInfo { i.cur = 0 } - // assume that all blks have DefaultBlockMaxRows, except the last one + // assume that all blks have BlockMaxRows, except the last one if i.meta.IsEmpty() { if i.cur == int(i.blkCnt-1) { i.curBlkRows = i.totalRows - i.accRows diff --git a/pkg/vm/engine/disttae/util_test.go b/pkg/vm/engine/disttae/util_test.go index 5925f4b28cdb..f22f14375df9 100644 --- a/pkg/vm/engine/disttae/util_test.go +++ b/pkg/vm/engine/disttae/util_test.go @@ -30,7 +30,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/plan/function" "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/stretchr/testify/require" ) @@ -305,7 +304,7 @@ func mockStatsList(t *testing.T, statsCnt int) (statsList []objectio.ObjectStats stats := objectio.NewObjectStats() blkCnt := rand.Uint32()%100 + 1 require.Nil(t, objectio.SetObjectStatsBlkCnt(stats, blkCnt)) - require.Nil(t, objectio.SetObjectStatsRowCnt(stats, options.DefaultBlockMaxRows*(blkCnt-1)+options.DefaultBlockMaxRows*6/10)) + require.Nil(t, objectio.SetObjectStatsRowCnt(stats, objectio.BlockMaxRows*(blkCnt-1)+objectio.BlockMaxRows*6/10)) require.Nil(t, objectio.SetObjectStatsObjectName(stats, objectio.BuildObjectName(objectio.NewSegmentid(), uint16(blkCnt)))) require.Nil(t, objectio.SetObjectStatsExtent(stats, objectio.NewExtent(0, 0, 0, 0))) require.Nil(t, objectio.SetObjectStatsSortKeyZoneMap(stats, index.NewZM(types.T_bool, 1))) diff --git a/pkg/vm/engine/tae/blockio/utils.go b/pkg/vm/engine/tae/blockio/utils.go index 632b746cde29..e935a32f0070 100644 --- a/pkg/vm/engine/tae/blockio/utils.go +++ b/pkg/vm/engine/tae/blockio/utils.go @@ -23,7 +23,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" ) func GetTombstonesByBlockId( @@ -36,7 +35,7 @@ func GetTombstonesByBlockId( ) (err error) { loadedBlkCnt := 0 onBlockSelectedFn := func(tombstoneObject *objectio.ObjectStats, pos int) (bool, error) { - location := tombstoneObject.BlockLocation(uint16(pos), options.DefaultBlockMaxRows) + location := tombstoneObject.BlockLocation(uint16(pos), objectio.BlockMaxRows) if mask, err := FillBlockDeleteMask( ctx, ts, blockId, location, fs, ); err != nil { diff --git a/pkg/vm/engine/tae/catalog/base.go b/pkg/vm/engine/tae/catalog/base.go index 9a986c6fb9ce..60a7196538e5 100644 --- a/pkg/vm/engine/tae/catalog/base.go +++ b/pkg/vm/engine/tae/catalog/base.go @@ -96,22 +96,6 @@ func (be *BaseEntryImpl[T]) CreateWithTxnLocked(txn txnif.AsyncTxn, baseNode T) be.InsertLocked(node) } -func (be *BaseEntryImpl[T]) TryGetTerminatedTS(waitIfcommitting bool) (terminated bool, TS types.TS) { - be.RLock() - defer be.RUnlock() - return be.TryGetTerminatedTSLocked(waitIfcommitting) -} - -func (be *BaseEntryImpl[T]) TryGetTerminatedTSLocked(waitIfcommitting bool) (terminated bool, TS types.TS) { - node := be.GetLatestCommittedNodeLocked() - if node == nil { - return - } - if node.HasDropCommitted() { - return true, node.DeletedAt - } - return -} func (be *BaseEntryImpl[T]) PrepareAdd(txn txnif.TxnReader) (err error) { if err = be.ConflictCheck(txn); err != nil { return @@ -203,14 +187,6 @@ func (be *BaseEntryImpl[T]) HasDropCommittedLocked() bool { return un.HasDropCommitted() } -func (be *BaseEntryImpl[T]) HasDropIntentLocked() bool { - un := be.GetLatestNodeLocked() - if un == nil { - return false - } - return un.HasDropIntent() -} - func (be *BaseEntryImpl[T]) ensureVisibleAndNotDroppedLocked(txn txnif.TxnReader) bool { visible, dropped := be.GetVisibilityLocked(txn) if !visible { diff --git a/pkg/vm/engine/tae/catalog/basemvccnode.go b/pkg/vm/engine/tae/catalog/basemvccnode.go index a7c821d3c1b1..812b44134bba 100644 --- a/pkg/vm/engine/tae/catalog/basemvccnode.go +++ b/pkg/vm/engine/tae/catalog/basemvccnode.go @@ -192,7 +192,6 @@ type BaseNode[T any] interface { CloneData() T String() string Update(vun T) - IdempotentUpdate(vun T) WriteTo(w io.Writer) (n int64, err error) ReadFromWithVersion(r io.Reader, ver uint16) (n int64, err error) } @@ -254,12 +253,6 @@ func (e *MVCCNode[T]) Update(un *MVCCNode[T]) { e.BaseNode.Update(un.BaseNode) } -func (e *MVCCNode[T]) IdempotentUpdate(un *MVCCNode[T]) { - e.CreatedAt = un.CreatedAt - e.DeletedAt = un.DeletedAt - e.BaseNode.Update(un.BaseNode) -} - func (e *MVCCNode[T]) ApplyCommit(id string) (err error) { var commitTS types.TS commitTS, err = e.TxnMVCCNode.ApplyCommit(id) diff --git a/pkg/vm/engine/tae/catalog/dbmvccnode.go b/pkg/vm/engine/tae/catalog/dbmvccnode.go index 6993d28fb4c0..188187d5a084 100644 --- a/pkg/vm/engine/tae/catalog/dbmvccnode.go +++ b/pkg/vm/engine/tae/catalog/dbmvccnode.go @@ -40,8 +40,7 @@ func (e *EmptyMVCCNode) String() string { } // for create drop in one txn -func (e *EmptyMVCCNode) Update(vun *EmptyMVCCNode) {} -func (e *EmptyMVCCNode) IdempotentUpdate(vun *EmptyMVCCNode) {} +func (e *EmptyMVCCNode) Update(vun *EmptyMVCCNode) {} func (e *EmptyMVCCNode) WriteTo(w io.Writer) (n int64, err error) { return } diff --git a/pkg/vm/engine/tae/catalog/metamvccnode.go b/pkg/vm/engine/tae/catalog/metamvccnode.go index 9f7f7220afc6..0e1a6393737f 100644 --- a/pkg/vm/engine/tae/catalog/metamvccnode.go +++ b/pkg/vm/engine/tae/catalog/metamvccnode.go @@ -67,14 +67,6 @@ func (e *MetadataMVCCNode) Update(un *MetadataMVCCNode) { e.DeltaLoc = un.DeltaLoc } } -func (e *MetadataMVCCNode) IdempotentUpdate(un *MetadataMVCCNode) { - if !un.MetaLoc.IsEmpty() { - e.MetaLoc = un.MetaLoc - } - if !un.DeltaLoc.IsEmpty() { - e.DeltaLoc = un.DeltaLoc - } -} func (e *MetadataMVCCNode) WriteTo(w io.Writer) (n int64, err error) { var sn int64 if sn, err = objectio.WriteBytes(e.MetaLoc, w); err != nil { @@ -154,16 +146,6 @@ func (e *ObjectMVCCNode) String() string { func (e *ObjectMVCCNode) Update(vun *ObjectMVCCNode) { e.ObjectStats = *vun.ObjectStats.Clone() } -func (e *ObjectMVCCNode) IdempotentUpdate(vun *ObjectMVCCNode) { - if e.ObjectStats.IsZero() { - e.ObjectStats = *vun.ObjectStats.Clone() - } else { - if e.IsEmpty() && !vun.IsEmpty() { - e.ObjectStats = *vun.ObjectStats.Clone() - - } - } -} func (e *ObjectMVCCNode) WriteTo(w io.Writer) (n int64, err error) { var sn int if sn, err = w.Write(e.ObjectStats[:]); err != nil { diff --git a/pkg/vm/engine/tae/catalog/object.go b/pkg/vm/engine/tae/catalog/object.go index e24043848d4b..b798b4f78240 100644 --- a/pkg/vm/engine/tae/catalog/object.go +++ b/pkg/vm/engine/tae/catalog/object.go @@ -44,7 +44,6 @@ type ObjectEntry struct { table *TableEntry ObjectNode objData data.Object - deleteCount atomic.Uint32 ObjectState uint8 HasPrintedPrepareComapct atomic.Bool @@ -64,13 +63,6 @@ func (entry *ObjectEntry) GetLoaded() bool { return stats.Rows() != 0 } -func (entry *ObjectEntry) AddDeleteCount(count uint32) { - entry.deleteCount.Add(count) -} -func (entry *ObjectEntry) GetDeleteCount() uint32 { - return entry.deleteCount.Load() -} - func (entry *ObjectEntry) GetSortKeyZonemap() index.ZM { stats := entry.GetObjectStats() return stats.SortKeyZoneMap() @@ -398,7 +390,6 @@ func (entry *ObjectEntry) getBlockCntFromStats() (blkCnt uint32) { func (entry *ObjectEntry) IsAppendable() bool { return entry.ObjectStats.GetAppendable() - // return entry.state == ES_Appendable } func (entry *ObjectEntry) IsSorted() bool { @@ -433,7 +424,6 @@ func (entry *ObjectEntry) GetLatestCommittedNode() *txnbase.TxnMVCCNode { } return nil } -func (entry *ObjectEntry) GetCatalog() *Catalog { return entry.table.db.catalog } func (entry *ObjectEntry) PrepareRollback() (err error) { lastNode := entry.table.getObjectList(entry.IsTombstone).GetLastestNode(entry.SortHint) @@ -488,23 +478,6 @@ func (entry *ObjectEntry) TreeMaxDropCommitEntry() (BaseEntry, *ObjectEntry) { return nil, nil } -// GetTerminationTS is coarse API: no consistency check -func (entry *ObjectEntry) GetTerminationTS() (ts types.TS, terminated bool) { - tableEntry := entry.GetTable() - dbEntry := tableEntry.GetDB() - - dbEntry.RLock() - terminated, ts = dbEntry.TryGetTerminatedTSLocked(true) - if terminated { - dbEntry.RUnlock() - return - } - dbEntry.RUnlock() - - terminated, ts = tableEntry.TryGetTerminatedTS(true) - return -} - func (entry *ObjectEntry) GetSchema() *Schema { return entry.table.GetLastestSchema(entry.IsTombstone) } diff --git a/pkg/vm/engine/tae/catalog/schema.go b/pkg/vm/engine/tae/catalog/schema.go index 5cee1c31773f..870ba5b839b7 100644 --- a/pkg/vm/engine/tae/catalog/schema.go +++ b/pkg/vm/engine/tae/catalog/schema.go @@ -158,7 +158,7 @@ func NewEmptySchema(name string) *Schema { SeqnumMap: make(map[uint16]int), Extra: &apipb.SchemaExtra{}, } - schema.BlockMaxRows = options.DefaultBlockMaxRows + schema.BlockMaxRows = objectio.BlockMaxRows schema.ObjectMaxBlocks = options.DefaultBlocksPerObject return schema } diff --git a/pkg/vm/engine/tae/catalog/table.go b/pkg/vm/engine/tae/catalog/table.go index bb4a3eeafbda..c4abb51759ed 100644 --- a/pkg/vm/engine/tae/catalog/table.go +++ b/pkg/vm/engine/tae/catalog/table.go @@ -596,15 +596,6 @@ func (entry *TableEntry) IsActive() bool { return !entry.HasDropCommitted() } -// GetTerminationTS is coarse API: no consistency check -func (entry *TableEntry) GetTerminationTS() (ts types.TS, terminated bool) { - dbEntry := entry.GetDB() - - terminated, ts = dbEntry.TryGetTerminatedTS(true) - - return -} - func (entry *TableEntry) AlterTable(ctx context.Context, txn txnif.TxnReader, req *apipb.AlterTableReq) (isNewNode bool, newSchema *Schema, err error) { entry.Lock() defer entry.Unlock() diff --git a/pkg/vm/engine/tae/catalog/tablemvccnode.go b/pkg/vm/engine/tae/catalog/tablemvccnode.go index 5dcdc6f32e89..33778c3d41e2 100644 --- a/pkg/vm/engine/tae/catalog/tablemvccnode.go +++ b/pkg/vm/engine/tae/catalog/tablemvccnode.go @@ -57,9 +57,6 @@ func (e *TableMVCCNode) Update(un *TableMVCCNode) { e.Schema = un.Schema } -func (e *TableMVCCNode) IdempotentUpdate(un *TableMVCCNode) { - e.Schema = un.Schema -} func (e *TableMVCCNode) WriteTo(w io.Writer) (n int64, err error) { var schemaBuf []byte if schemaBuf, err = e.Schema.Marshal(); err != nil { diff --git a/pkg/vm/engine/tae/common/bitmap.go b/pkg/vm/engine/tae/common/bitmap.go index 794d7a94bb76..9869d869bc92 100644 --- a/pkg/vm/engine/tae/common/bitmap.go +++ b/pkg/vm/engine/tae/common/bitmap.go @@ -47,13 +47,3 @@ func RoaringToMOBitmap(bm *roaring.Bitmap) *nulls.Bitmap { } return nbm } - -func MOOrRoaringBitmap(bm *nulls.Bitmap, rbm *roaring.Bitmap) { - if bm == nil || rbm == nil { - return - } - iterator := rbm.Iterator() - for iterator.HasNext() { - bm.Add(uint64(iterator.Next())) - } -} diff --git a/pkg/vm/engine/tae/common/id.go b/pkg/vm/engine/tae/common/id.go index d5a9f515b347..210a761c5055 100644 --- a/pkg/vm/engine/tae/common/id.go +++ b/pkg/vm/engine/tae/common/id.go @@ -98,12 +98,3 @@ func IDArraryString(ids []ID) string { str = fmt.Sprintf("%s]", str) return str } - -func BlockIDArraryString(ids []ID) string { - str := "[" - for _, id := range ids { - str = fmt.Sprintf("%s%s,", str, id.BlockID.String()) - } - str = fmt.Sprintf("%s]", str) - return str -} diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 30439d2a1edc..99ba0106cf0c 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -85,7 +85,7 @@ func TestAppend1(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, nil) defer tae.Close() schema := catalog.MockSchemaAll(14, 3) - schema.BlockMaxRows = options.DefaultBlockMaxRows + schema.BlockMaxRows = objectio.BlockMaxRows schema.ObjectMaxBlocks = options.DefaultBlocksPerObject tae.BindSchema(schema) data := catalog.MockBatch(schema, int(schema.BlockMaxRows*2)) diff --git a/pkg/vm/engine/tae/iface/data/object.go b/pkg/vm/engine/tae/iface/data/object.go index 9ab630eaba09..9b9070a8fff9 100644 --- a/pkg/vm/engine/tae/iface/data/object.go +++ b/pkg/vm/engine/tae/iface/data/object.go @@ -72,7 +72,6 @@ type Object interface { MakeAppender() (ObjectAppender, error) - GetTotalChanges() int TryUpgrade() error // check if all rows are committed before ts diff --git a/pkg/vm/engine/tae/iface/handle/object.go b/pkg/vm/engine/tae/iface/handle/object.go index e10c73042e8c..48b02747b9ab 100644 --- a/pkg/vm/engine/tae/iface/handle/object.go +++ b/pkg/vm/engine/tae/iface/handle/object.go @@ -82,7 +82,6 @@ type ObjectReader interface { ) error GetRelation() Relation - BatchDedup(pks containers.Vector) error Prefetch(idxes []int) error BlkCnt() int } @@ -90,10 +89,6 @@ type ObjectReader interface { type ObjectWriter interface { io.Closer String() string - - PushDeleteOp(filter Filter) error - PushUpdateOp(filter Filter, attr string, val any) error - UpdateStats(objectio.ObjectStats) error } diff --git a/pkg/vm/engine/tae/iface/handle/relation.go b/pkg/vm/engine/tae/iface/handle/relation.go index 23612dd82651..6ee127627a97 100644 --- a/pkg/vm/engine/tae/iface/handle/relation.go +++ b/pkg/vm/engine/tae/iface/handle/relation.go @@ -32,7 +32,6 @@ type Relation interface { ID() uint64 String() string SimplePPString(common.PPLevel) string - GetCardinality(attr string) int64 Schema(bool) any AlterTable(ctx context.Context, req *apipb.AlterTableReq) error MakeObjectIt(bool) ObjectIt @@ -42,9 +41,7 @@ type Relation interface { GetValueByPhyAddrKey(key any, col int) (any, bool, error) DeleteByPhyAddrKeys(keys containers.Vector, pkVec containers.Vector) error RangeDelete(id *common.ID, start, end uint32, dt DeleteType) error - //TryDeleteByDeltaloc(id *common.ID, deltaloc objectio.Location) (ok bool, err error) TryDeleteByStats(id *common.ID, stats objectio.ObjectStats) (ok bool, err error) - Update(id *common.ID, row uint32, col uint16, v any, isNull bool) error GetByFilter(ctx context.Context, filter *Filter) (id *common.ID, offset uint32, err error) GetValue(id *common.ID, row uint32, col uint16, skipCheckDelete bool) (any, bool, error) GetValueByFilter(ctx context.Context, filter *Filter, col int) (any, bool, error) diff --git a/pkg/vm/engine/tae/options/options.go b/pkg/vm/engine/tae/options/options.go index 5c254bbed6e9..ae65a04c9909 100644 --- a/pkg/vm/engine/tae/options/options.go +++ b/pkg/vm/engine/tae/options/options.go @@ -19,6 +19,7 @@ import ( "runtime" "time" + "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/txn/clock" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" ) @@ -136,7 +137,7 @@ func (o *Options) FillDefaults(dirname string) *Options { if o.StorageCfg == nil { o.StorageCfg = &StorageCfg{ - BlockMaxRows: DefaultBlockMaxRows, + BlockMaxRows: objectio.BlockMaxRows, ObjectMaxBlocks: DefaultBlocksPerObject, } } diff --git a/pkg/vm/engine/tae/options/types.go b/pkg/vm/engine/tae/options/types.go index 0f801784035c..0b14f531900a 100644 --- a/pkg/vm/engine/tae/options/types.go +++ b/pkg/vm/engine/tae/options/types.go @@ -18,6 +18,7 @@ import ( "context" "time" + "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/taskservice" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -31,11 +32,9 @@ import ( const ( DefaultIndexCacheSize = 256 * mpool.MB - DefaultBlockMaxRows = uint32(8192) + DefaultBlockMaxRows = objectio.BlockMaxRows DefaultBlocksPerObject = uint16(256) - DefaultObjectPerSegment = uint16(512) - DefaultScannerInterval = time.Second * 5 DefaultCheckpointFlushInterval = time.Minute DefaultCheckpointTransferInterval = time.Second * 5 diff --git a/pkg/vm/engine/tae/rpc/adaptors.go b/pkg/vm/engine/tae/rpc/adaptors.go index 0f931b75cbb4..0098c8a44d18 100644 --- a/pkg/vm/engine/tae/rpc/adaptors.go +++ b/pkg/vm/engine/tae/rpc/adaptors.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" @@ -43,7 +44,7 @@ func CreateRelation( if err != nil { return } - schema.BlockMaxRows = options.DefaultBlockMaxRows + schema.BlockMaxRows = objectio.BlockMaxRows schema.ObjectMaxBlocks = options.DefaultBlocksPerObject if len(schema.Comment) > 0 { if r := MaxRows.FindStringSubmatch(schema.Comment); len(r) > 0 { diff --git a/pkg/vm/engine/tae/rpc/handle.go b/pkg/vm/engine/tae/rpc/handle.go index d2fdb14233ef..2843c69fbd90 100644 --- a/pkg/vm/engine/tae/rpc/handle.go +++ b/pkg/vm/engine/tae/rpc/handle.go @@ -785,7 +785,7 @@ func (h *Handle) HandleWrite( stats.String()) for i := range stats.BlkCnt() { - loc = stats.BlockLocation(uint16(i), options.DefaultBlockMaxRows) + loc = stats.BlockLocation(uint16(i), objectio.BlockMaxRows) vectors, closeFunc, err = blockio.LoadColumns2( ctx, []uint16{uint16(rowidIdx), uint16(pkIdx)}, diff --git a/pkg/vm/engine/tae/rpc/inspect.go b/pkg/vm/engine/tae/rpc/inspect.go index 9332ca9b3766..3bab6ca24820 100644 --- a/pkg/vm/engine/tae/rpc/inspect.go +++ b/pkg/vm/engine/tae/rpc/inspect.go @@ -725,13 +725,7 @@ func (c *infoArg) Run() error { b.WriteRune('\n') // b.WriteString(fmt.Sprintf("persisted_ts: %v\n", c.obj.GetObjectData().GetDeltaPersistedTS().ToString())) r, reason := c.obj.GetObjectData().PrepareCompactInfo() - rows, err := c.obj.GetObjectData().Rows() - if err != nil { - logutil.Warnf("get object rows failed, obj: %v, err %v", c.obj.ID().String(), err) - } - dels := c.obj.GetObjectData().GetTotalChanges() b.WriteString(fmt.Sprintf("prepareCompact: %v, %q\n", r, reason)) - b.WriteString(fmt.Sprintf("left rows: %v\n", rows-dels)) b.WriteString(fmt.Sprintf("ppstring: %v\n", c.obj.GetObjectData().PPString(c.verbose, 0, "", c.blkn))) schema := c.obj.GetSchema() diff --git a/pkg/vm/engine/tae/rpc/utils.go b/pkg/vm/engine/tae/rpc/utils.go index b5019eb731b7..1a03a7c7e698 100644 --- a/pkg/vm/engine/tae/rpc/utils.go +++ b/pkg/vm/engine/tae/rpc/utils.go @@ -26,7 +26,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -78,7 +77,7 @@ func (h *Handle) prefetchDeleteRowID(_ context.Context, req *db.WriteReq) error //start loading jobs asynchronously,should create a new root context. for _, stats := range req.TombstoneStats { - loc := stats.BlockLocation(uint16(0), options.DefaultBlockMaxRows) + loc := stats.BlockLocation(uint16(0), objectio.BlockMaxRows) pref, err := blockio.BuildPrefetchParams(h.db.Runtime.Fs.Service, loc) if err != nil { return err diff --git a/pkg/vm/engine/tae/tables/base.go b/pkg/vm/engine/tae/tables/base.go index ed53dee9c3bb..51716f61738c 100644 --- a/pkg/vm/engine/tae/tables/base.go +++ b/pkg/vm/engine/tae/tables/base.go @@ -413,10 +413,6 @@ func (obj *baseObject) MakeAppender() (appender data.ObjectAppender, err error) panic("not supported") } -func (obj *baseObject) GetTotalChanges() int { - return int(obj.meta.Load().GetDeleteCount()) -} - func (obj *baseObject) IsAppendable() bool { return false } func (obj *baseObject) PPString(level common.PPLevel, depth int, prefix string, blkid int) string { diff --git a/pkg/vm/engine/tae/txn/txnbase/handle.go b/pkg/vm/engine/tae/txn/txnbase/handle.go index 0bab64485e1a..5671ab3507f9 100644 --- a/pkg/vm/engine/tae/txn/txnbase/handle.go +++ b/pkg/vm/engine/tae/txn/txnbase/handle.go @@ -67,7 +67,6 @@ func (rel *TxnRelation) Close() error func (rel *TxnRelation) ID() uint64 { return 0 } func (rel *TxnRelation) Rows() int64 { return 0 } func (rel *TxnRelation) Size(attr string) int64 { return 0 } -func (rel *TxnRelation) GetCardinality(attr string) int64 { return 0 } func (rel *TxnRelation) Schema(bool) any { return nil } func (rel *TxnRelation) MakeObjectIt(bool) handle.ObjectIt { return nil } func (rel *TxnRelation) MakeObjectItOnSnap(bool) handle.ObjectIt { return nil } @@ -92,7 +91,6 @@ func (rel *TxnRelation) GetValue(*common.ID, uint32, uint16, bool) (v any, isNul func (rel *TxnRelation) GetValueByPhyAddrKey(any, int) (v any, isNull bool, err error) { return } -func (rel *TxnRelation) Update(*common.ID, uint32, uint16, any, bool) (err error) { return } func (rel *TxnRelation) DeleteByPhyAddrKey(any) (err error) { return } func (rel *TxnRelation) DeleteByPhyAddrKeys(containers.Vector, containers.Vector) (err error) { return } func (rel *TxnRelation) RangeDelete(*common.ID, uint32, uint32, handle.DeleteType) (err error) { @@ -139,18 +137,12 @@ func (obj *TxnObject) GetRelation() (rel handle.Relation) func (obj *TxnObject) Update(uint64, uint32, uint16, any) (err error) { return } func (obj *TxnObject) RangeDelete(uint64, uint32, uint32, handle.DeleteType) (err error) { return } -func (obj *TxnObject) PushDeleteOp(handle.Filter) (err error) { return } -func (obj *TxnObject) PushUpdateOp(handle.Filter, string, any) (err error) { return } -func (obj *TxnObject) SoftDeleteBlock(id types.Blockid) (err error) { return } -func (obj *TxnObject) BatchDedup(containers.Vector) (err error) { return } - // func (blk *TxnBlock) IsAppendable() bool { return true } func (blk *TxnBlock) Reset() { blk.Txn = nil blk.Seg = nil } -func (blk *TxnBlock) GetTotalChanges() int { return 0 } func (blk *TxnBlock) IsAppendableBlock() bool { return true } func (blk *TxnBlock) Fingerprint() *common.ID { return &common.ID{} } func (blk *TxnBlock) Rows() int { return 0 } @@ -165,5 +157,3 @@ func (blk *TxnBlock) GetObject() (obj handle.Object) { return } func (blk *TxnBlock) Append(*containers.Batch, uint32) (n uint32, err error) { return } func (blk *TxnBlock) Update(uint32, uint16, any) (err error) { return } func (blk *TxnBlock) RangeDelete(uint32, uint32, handle.DeleteType) (err error) { return } -func (blk *TxnBlock) PushDeleteOp(handle.Filter) (err error) { return } -func (blk *TxnBlock) PushUpdateOp(handle.Filter, string, any) (err error) { return } diff --git a/pkg/vm/engine/tae/txn/txnimpl/object.go b/pkg/vm/engine/tae/txn/txnimpl/object.go index 726bcb6d3e47..78c9ee833020 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/object.go +++ b/pkg/vm/engine/tae/txn/txnimpl/object.go @@ -178,9 +178,6 @@ func (obj *txnObject) Close() (err error) { // putObjectCnt.Add(1) return } -func (obj *txnObject) GetTotalChanges() int { - return obj.entry.GetObjectData().GetTotalChanges() -} func (obj *txnObject) GetMeta() any { return obj.entry } func (obj *txnObject) String() string { return obj.entry.String() } func (obj *txnObject) GetID() *types.Objectid { return obj.entry.ID() } diff --git a/pkg/vm/engine/tae/txn/txnimpl/relation.go b/pkg/vm/engine/tae/txn/txnimpl/relation.go index 452e762586a7..4fc7f806ba10 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/relation.go +++ b/pkg/vm/engine/tae/txn/txnimpl/relation.go @@ -153,8 +153,6 @@ func (h *txnRelation) GetMeta() any { return h.table.entry } // Schema return schema in txnTable, not the lastest schema in TableEntry func (h *txnRelation) Schema(isTombstone bool) any { return h.table.GetLocalSchema(isTombstone) } -func (h *txnRelation) GetCardinality(attr string) int64 { return 0 } - func (h *txnRelation) BatchDedup(col containers.Vector) error { return h.Txn.GetStore().BatchDedup(h.table.entry.GetDB().ID, h.table.entry.GetID(), col) } diff --git a/pkg/vm/engine/test/change_handle_test.go b/pkg/vm/engine/test/change_handle_test.go index b760f00828e7..3bc40f1cfe6b 100644 --- a/pkg/vm/engine/test/change_handle_test.go +++ b/pkg/vm/engine/test/change_handle_test.go @@ -25,6 +25,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/vm/engine" catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" @@ -32,7 +33,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" testutil2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/testutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/handle" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -378,7 +378,7 @@ func TestChangesHandleForCNWrite(t *testing.T) { require.NoError(t, err) blockCnt := 10 - rowsCount := int(options.DefaultBlockMaxRows) * blockCnt + rowsCount := objectio.BlockMaxRows * blockCnt bat := catalog2.MockBatch(schema, rowsCount) bats := bat.Split(blockCnt) diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index b39d6d107fc2..abcdf53eaf3b 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -29,6 +29,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/plan" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/txn/client" @@ -37,7 +38,6 @@ import ( catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/handle" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil" ) @@ -82,7 +82,7 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) { require.NoError(t, err) blockCnt := 10 - rowsCount := int(options.DefaultBlockMaxRows) * blockCnt + rowsCount := int(objectio.BlockMaxRows) * blockCnt bats := catalog2.MockBatch(schema, rowsCount).Split(blockCnt) // write table