diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index dac5cf37a133..eb9bdd4b6516 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -39,6 +39,7 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" + catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "go.uber.org/zap" ) @@ -132,6 +133,11 @@ func (txn *Transaction) WriteBatch( } } + if typ == DELETE && tableId != catalog.MO_DATABASE_ID && + tableId != catalog.MO_TABLES_ID && tableId != catalog.MO_COLUMNS_ID { + txn.approximateInMemDeleteCnt += bat.RowCount() + } + e := Entry{ typ: typ, accountId: accountId, @@ -405,7 +411,8 @@ func (txn *Transaction) dumpBatchLocked(offset int) error { //offset < 0 indicates commit. if offset < 0 { - if txn.workspaceSize < txn.engine.workspaceThreshold && txn.insertCount < txn.engine.insertEntryMaxCount { + if txn.workspaceSize < txn.engine.workspaceThreshold && txn.insertCount < txn.engine.insertEntryMaxCount && + txn.approximateInMemDeleteCnt < txn.engine.insertEntryMaxCount { return nil } } else { @@ -439,19 +446,47 @@ func (txn *Transaction) dumpBatchLocked(offset int) error { size = 0 } txn.hasS3Op.Store(true) - mp := make(map[tableKey][]*batch.Batch) + if err := txn.dumpInsertBatchLocked(offset, &size, &pkCount); err != nil { + return err + } + + if dumpAll { + if txn.approximateInMemDeleteCnt >= txn.engine.insertEntryMaxCount { + if err := txn.dumpDeleteBatchLocked(offset); err != nil { + return err + } + } + txn.approximateInMemDeleteCnt = 0 + txn.workspaceSize = 0 + txn.pkCount -= pkCount + // modifies txn.writes. + writes := txn.writes[:0] + for i, write := range txn.writes { + if write.bat != nil { + writes = append(writes, txn.writes[i]) + } + } + txn.writes = writes + } else { + txn.workspaceSize -= size + txn.pkCount -= pkCount + } + return nil +} + +func (txn *Transaction) dumpInsertBatchLocked(offset int, size *uint64, pkCount *int) error { + mp := make(map[tableKey][]*batch.Batch) lastTxnWritesIndex := offset + write := txn.writes for i := offset; i < len(txn.writes); i++ { - if txn.writes[i].tableId == catalog.MO_DATABASE_ID || - txn.writes[i].tableId == catalog.MO_TABLES_ID || - txn.writes[i].tableId == catalog.MO_COLUMNS_ID { - txn.writes[lastTxnWritesIndex] = txn.writes[i] + if txn.writes[i].isCatalog() { + write[lastTxnWritesIndex] = write[i] lastTxnWritesIndex++ continue } if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 { - txn.writes[lastTxnWritesIndex] = txn.writes[i] + write[lastTxnWritesIndex] = write[i] lastTxnWritesIndex++ continue } @@ -465,8 +500,8 @@ func (txn *Transaction) dumpBatchLocked(offset int) error { name: txn.writes[i].tableName, } bat := txn.writes[i].bat - size += uint64(bat.Size()) - pkCount += bat.RowCount() + *size += uint64(bat.Size()) + *pkCount += bat.RowCount() // skip rowid newBat := batch.NewWithSize(len(bat.Vecs) - 1) newBat.SetAttributes(bat.Attrs[1:]) @@ -479,11 +514,12 @@ func (txn *Transaction) dumpBatchLocked(offset int) error { } if keepElement { - txn.writes[lastTxnWritesIndex] = txn.writes[i] + write[lastTxnWritesIndex] = write[i] lastTxnWritesIndex++ } } - txn.writes = txn.writes[:lastTxnWritesIndex] + + txn.writes = write[:lastTxnWritesIndex] for tbKey := range mp { // scenario 2 for cn write s3, more info in the comment of S3Writer @@ -527,9 +563,7 @@ func (txn *Transaction) dumpBatchLocked(offset int) error { } else { table = tbl.(*txnTable) } - fileName := objectio.DecodeBlockInfo( - blockInfo.Vecs[0].GetBytesAt(0)). - MetaLocation().Name().String() + fileName := objectio.DecodeBlockInfo(blockInfo.Vecs[0].GetBytesAt(0)).MetaLocation().Name().String() err = table.getTxn().WriteFileLocked( INSERT, table.accountId, @@ -545,21 +579,110 @@ func (txn *Transaction) dumpBatchLocked(offset int) error { return err } } + return nil +} - if dumpAll { - txn.workspaceSize = 0 - txn.pkCount -= pkCount - // modifies txn.writes. - writes := txn.writes[:0] - for i, write := range txn.writes { - if write.bat != nil { - writes = append(writes, txn.writes[i]) +func (txn *Transaction) dumpDeleteBatchLocked(offset int) error { + deleteCnt := 0 + mp := make(map[tableKey][]*batch.Batch) + lastTxnWritesIndex := offset + write := txn.writes + for i := offset; i < len(txn.writes); i++ { + if txn.writes[i].isCatalog() { + write[lastTxnWritesIndex] = write[i] + lastTxnWritesIndex++ + continue + } + if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 { + write[lastTxnWritesIndex] = write[i] + lastTxnWritesIndex++ + continue + } + + keepElement := true + if txn.writes[i].typ == DELETE && txn.writes[i].fileName == "" { + tbKey := tableKey{ + accountId: txn.writes[i].accountId, + databaseId: txn.writes[i].databaseId, + dbName: txn.writes[i].databaseName, + name: txn.writes[i].tableName, } + bat := txn.writes[i].bat + deleteCnt += bat.RowCount() + + newBat := batch.NewWithSize(len(bat.Vecs)) + newBat.SetAttributes(bat.Attrs) + newBat.Vecs = bat.Vecs + newBat.SetRowCount(bat.Vecs[0].Length()) + + mp[tbKey] = append(mp[tbKey], newBat) + txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat) + + keepElement = false + } + + if keepElement { + write[lastTxnWritesIndex] = write[i] + lastTxnWritesIndex++ + } + } + + if deleteCnt < txn.engine.insertEntryMaxCount { + return nil + } + + txn.writes = write[:lastTxnWritesIndex] + + for tbKey := range mp { + // scenario 2 for cn write s3, more info in the comment of S3Writer + tbl, err := txn.getTable(tbKey.accountId, tbKey.dbName, tbKey.name) + if err != nil { + return err + } + + s3Writer, err := colexec.NewS3TombstoneWriter() + if err != nil { + return err + } + defer s3Writer.Free(txn.proc) + for i := 0; i < len(mp[tbKey]); i++ { + s3Writer.StashBatch(txn.proc, mp[tbKey][i]) + } + _, stats, err := s3Writer.SortAndSync(txn.proc) + if err != nil { + return err + } + bat := batch.NewWithSize(2) + bat.Attrs = []string{catalog2.ObjectAttr_ObjectStats, catalog2.AttrPKVal} + bat.SetVector(0, vector.NewVec(types.T_text.ToType())) + if err = vector.AppendBytes( + bat.GetVector(0), stats.Marshal(), false, txn.proc.GetMPool()); err != nil { + return err + } + + bat.SetRowCount(bat.Vecs[0].Length()) + + var table *txnTable + if v, ok := tbl.(*txnTableDelegate); ok { + table = v.origin + } else { + table = tbl.(*txnTable) + } + fileName := stats.ObjectLocation().String() + err = table.getTxn().WriteFileLocked( + DELETE, + table.accountId, + table.db.databaseId, + table.tableId, + table.db.databaseName, + table.tableName, + fileName, + bat, + table.getTxn().tnStores[0], + ) + if err != nil { + return err } - txn.writes = writes - } else { - txn.workspaceSize -= size - txn.pkCount -= pkCount } return nil } diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 2eac352fd65a..fc5b2a0bad51 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -254,6 +254,8 @@ type Transaction struct { workspaceSize uint64 // the total row count for insert entries when txn commits. insertCount int + // the approximation of total row count for delete entries when txn commits. + approximateInMemDeleteCnt int // the last snapshot write offset snapshotWriteOffset int diff --git a/pkg/vm/engine/test/workspace_test.go b/pkg/vm/engine/test/workspace_test.go new file mode 100644 index 000000000000..02cd7f2e9779 --- /dev/null +++ b/pkg/vm/engine/test/workspace_test.go @@ -0,0 +1,127 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae" + catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" + "github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func Test_BigDeleteWriteS3(t *testing.T) { + var ( + //err error + mp *mpool.MPool + accountId = catalog.System_Account + tableName = "test_reader_table" + databaseName = "test_reader_database" + + primaryKeyIdx = 3 + + taeEngine *testutil.TestTxnStorage + rpcAgent *testutil.MockRPCAgent + disttaeEngine *testutil.TestDisttaeEngine + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) + + // mock a schema with 4 columns and the 4th column as primary key + // the first column is the 9th column in the predefined columns in + // the mock function. Here we exepct the type of the primary key + // is types.T_char or types.T_varchar + schema := catalog2.MockSchemaEnhanced(4, primaryKeyIdx, 9) + schema.Name = tableName + + { + opt, err := testutil.GetS3SharedFileServiceOption(ctx, testutil.GetDefaultTestPath("test", t)) + require.NoError(t, err) + + disttaeEngine, taeEngine, rpcAgent, mp = testutil.CreateEngines( + ctx, + testutil.TestOptions{TaeEngineOptions: opt}, + t, + testutil.WithDisttaeEngineInsertEntryMaxCount(1), + testutil.WithDisttaeEngineWorkspaceThreshold(1), + ) + defer func() { + disttaeEngine.Close(ctx) + taeEngine.Close(true) + rpcAgent.Close() + }() + + _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) + require.NoError(t, err) + } + + insertCnt := 150 + deleteCnt := 100 + var bat2 *batch.Batch + + { + // insert 150 rows + _, table, txn, err := disttaeEngine.GetTable(ctx, databaseName, tableName) + require.NoError(t, err) + + bat := catalog2.MockBatch(schema, insertCnt) + err = table.Write(ctx, containers.ToCNBatch(bat)) + require.NoError(t, err) + + txn.GetWorkspace().(*disttae.Transaction).ForEachTableWrites( + table.GetDBID(ctx), table.GetTableID(ctx), 1, func(entry disttae.Entry) { + waitedDeletes := vector.MustFixedColWithTypeCheck[types.Rowid](entry.Bat().GetVector(0)) + waitedDeletes = waitedDeletes[:deleteCnt] + bat2 = batch.NewWithSize(1) + bat2.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) + bat2.SetRowCount(len(waitedDeletes)) + require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes, nil, mp)) + }) + + //delete 100 rows + require.NoError(t, table.Delete(ctx, bat2, catalog.Row_ID)) + require.NoError(t, txn.Commit(ctx)) + } + + { + _, _, reader, err := testutil.GetTableTxnReader( + ctx, + disttaeEngine, + databaseName, + tableName, + nil, + mp, + t, + ) + require.NoError(t, err) + + ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) + _, err = reader.Read(ctx, ret.Attrs, nil, mp, ret) + require.NoError(t, err) + require.Equal(t, insertCnt-deleteCnt, ret.RowCount()) + } +}