Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sinker framework for gc, checkpoint, transfer etc. #18891

Closed
wants to merge 15 commits into from
17 changes: 13 additions & 4 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/panjf2000/ants/v2"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
Expand All @@ -55,12 +56,20 @@ func TestBackupData(t *testing.T) {
defer opts.Fs.Close()

schema := catalog.MockSchemaAll(13, 3)
schema.BlockMaxRows = 10
schema.ObjectMaxBlocks = 10
schema.Extra.BlockMaxRows = 10
schema.Extra.ObjectMaxBlocks = 10
db.BindSchema(schema)
testutil.CreateRelation(t, db.DB, "db", schema, true)
{
txn, err := db.DB.StartTxn(nil)
require.NoError(t, err)
dbH, err := testutil.CreateDatabase2(ctx, txn, "db")
require.NoError(t, err)
_, err = testutil.CreateRelation2(ctx, txn, dbH, schema)
require.NoError(t, err)
require.NoError(t, txn.Commit(ctx))
}

totalRows := uint64(schema.BlockMaxRows * 30)
totalRows := uint64(schema.Extra.BlockMaxRows * 30)
bat := catalog.MockBatch(schema, int(totalRows))
defer bat.Close()
bats := bat.Split(100)
Expand Down
37 changes: 13 additions & 24 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,12 @@ func GetDefines(sid string) *Defines {
func NewDefines() *Defines {
d := &Defines{}

d.MoDatabaseTableDefs = make([]engine.TableDef, len(MoDatabaseSchema))
for i, name := range MoDatabaseSchema {
d.MoDatabaseTableDefs[i] = newAttributeDef(name, MoDatabaseTypes[i], i == 0)
}
d.MoTablesTableDefs = make([]engine.TableDef, len(MoTablesSchema))
for i, name := range MoTablesSchema {
d.MoTablesTableDefs[i] = newAttributeDef(name, MoTablesTypes[i], i == 0)
}
d.MoColumnsTableDefs = make([]engine.TableDef, len(MoColumnsSchema))
for i, name := range MoColumnsSchema {
d.MoColumnsTableDefs[i] = newAttributeDef(name, MoColumnsTypes[i], i == 0)
}
d.MoTableMetaDefs = make([]engine.TableDef, len(MoTableMetaSchema))

{
// mo_database
dbCpkPos := len(MoDatabaseSchema) - 1 // cpk is the last column
dbCpkPos := MO_DATABASE_CPKEY_IDX
d.MoDatabaseTableDefs = make([]engine.TableDef, len(MoDatabaseSchema))
for i, name := range MoDatabaseSchema {
d.MoDatabaseTableDefs[i] = newAttributeDef(name, MoDatabaseTypes[i], i == dbCpkPos)
d.MoDatabaseTableDefs[i] = newAttributeDef(name, MoDatabaseTypes[i], i == dbCpkPos, i == dbCpkPos)
}
def := &engine.ConstraintDef{
Cts: []engine.Constraint{
Expand Down Expand Up @@ -105,10 +91,10 @@ func NewDefines() *Defines {

{
// mo_tables
tblCpkPos := len(MoTablesSchema) - 1
tblCpkPos := MO_TABLES_CPKEY_IDX
d.MoTablesTableDefs = make([]engine.TableDef, len(MoTablesSchema))
for i, name := range MoTablesSchema {
d.MoTablesTableDefs[i] = newAttributeDef(name, MoTablesTypes[i], i == tblCpkPos)
d.MoTablesTableDefs[i] = newAttributeDef(name, MoTablesTypes[i], i == tblCpkPos, i == tblCpkPos || i == MO_TABLES_EXTRA_INFO_IDX)
}
def := &engine.ConstraintDef{
Cts: []engine.Constraint{
Expand Down Expand Up @@ -136,10 +122,10 @@ func NewDefines() *Defines {

{
// mo_columns
colCpkPos := len(MoColumnsSchema) - 1
colCpkPos := MO_COLUMNS_ATT_CPKEY_IDX
d.MoColumnsTableDefs = make([]engine.TableDef, len(MoColumnsSchema))
for i, name := range MoColumnsSchema {
d.MoColumnsTableDefs[i] = newAttributeDef(name, MoColumnsTypes[i], i == colCpkPos)
d.MoColumnsTableDefs[i] = newAttributeDef(name, MoColumnsTypes[i], i == colCpkPos, i == colCpkPos)
}
def := &engine.ConstraintDef{
Cts: []engine.Constraint{
Expand All @@ -164,11 +150,14 @@ func NewDefines() *Defines {
d.MoColumnConstraint, _ = def.MarshalBinary()
d.MoColumnsTableDefs = append(d.MoColumnsTableDefs, def)
}

d.MoTableMetaDefs = make([]engine.TableDef, len(MoTableMetaSchema))
for i, name := range MoTableMetaSchema {
d.MoTableMetaDefs[i] = newAttributeDef(name, MoTableMetaTypes[i], i == 0)
{
// block meta
d.MoTableMetaDefs = make([]engine.TableDef, len(MoTableMetaSchema))
for i, name := range MoTableMetaSchema {
d.MoTableMetaDefs[i] = newAttributeDef(name, MoTableMetaTypes[i], i == 0, i == 0)
}
}

return d
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/catalog/tuplesGen.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Table struct {
Viewdef string
Constraint []byte
Version uint32
ExtraInfo []byte
}

// genColumnsFromDefs generates column struct from TableDef.
Expand Down Expand Up @@ -453,6 +454,11 @@ func GenCreateTableTuple(tbl Table, m *mpool.MPool, packer *types.Packer) (*batc
if err = vector.AppendBytes(bat.Vecs[idx], packer.Bytes(), false, m); err != nil {
return nil, err
}
idx = MO_TABLES_EXTRA_INFO_IDX
bat.Vecs[idx] = vector.NewVec(MoTablesTypes[idx]) // extra_info
if err = vector.AppendBytes(bat.Vecs[idx], tbl.ExtraInfo, false, m); err != nil {
return nil, err
}
}
return bat, nil
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/catalog/tuplesParse.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine"
)

func newAttributeDef(name string, typ types.Type, isPrimary bool) engine.TableDef {
func newAttributeDef(name string, typ types.Type, isPrimary, isHidden bool) engine.TableDef {
return &engine.AttributeDef{
Attr: engine.Attribute{
Type: typ,
Name: name,
Primary: isPrimary,
Alg: compress.Lz4,
Default: &plan.Default{NullAbility: true},
IsHidden: name == CPrimaryKeyColName,
IsHidden: isHidden,
},
}
}
Expand Down Expand Up @@ -235,6 +235,7 @@ func genCreateTables(rows [][]any) []CreateTable {
cmds[i].Viewdef = string(row[MO_TABLES_VIEWDEF_IDX].([]byte))
cmds[i].Constraint = row[MO_TABLES_CONSTRAINT_IDX].([]byte)
cmds[i].RelKind = string(row[MO_TABLES_RELKIND_IDX].([]byte))
cmds[i].ExtraInfo = row[MO_TABLES_EXTRA_INFO_IDX].([]byte)
}

for i := range cmds {
Expand Down Expand Up @@ -271,6 +272,10 @@ func genCreateTables(rows [][]any) []CreateTable {
Key: SystemRelAttr_CreateSQL,
Value: cmds[i].CreateSql,
})
pro.Properties = append(pro.Properties, engine.Property{
Key: PropSchemaExtra,
Value: string(cmds[i].ExtraInfo),
})
cmds[i].Defs = append(cmds[i].Defs, pro)
}
return cmds
Expand Down Expand Up @@ -327,6 +332,7 @@ func genTableDefs(row []any) (engine.TableDef, error) {
attr.Primary = string(row[MO_COLUMNS_ATT_CONSTRAINT_TYPE_IDX].([]byte)) == "p"
attr.ClusterBy = row[MO_COLUMNS_ATT_IS_CLUSTERBY].(int8) == 1
attr.EnumVlaues = string(row[MO_COLUMNS_ATT_ENUM_IDX].([]byte))
attr.Seqnum = row[MO_COLUMNS_ATT_SEQNUM_IDX].(uint16)
return &engine.AttributeDef{Attr: attr}, nil
}

Expand Down
Loading
Loading