diff --git a/pkg/bootstrap/versions/upgrade_tenant_task.go b/pkg/bootstrap/versions/upgrade_tenant_task.go index 15cef72d05be..cfb497e935a8 100644 --- a/pkg/bootstrap/versions/upgrade_tenant_task.go +++ b/pkg/bootstrap/versions/upgrade_tenant_task.go @@ -111,7 +111,7 @@ func GetUpgradeTenantTasks( res.ReadRows(func(rows int, cols []*vector.Vector) bool { for i := 0; i < rows; i++ { tenants = append(tenants, vector.GetFixedAtWithTypeCheck[int32](cols[0], i)) - versions = append(versions, cols[1].UnsafeGetStringAt(i)) + versions = append(versions, cols[1].GetStringAt(i)) } return true }) @@ -131,7 +131,7 @@ func GetTenantCreateVersionForUpdate( defer res.Close() version := "" res.ReadRows(func(rows int, cols []*vector.Vector) bool { - version = cols[0].UnsafeGetStringAt(0) + version = cols[0].GetStringAt(0) return true }) if version == "" { @@ -174,7 +174,7 @@ func GetTenantVersion( defer res.Close() version := "" res.ReadRows(func(rows int, cols []*vector.Vector) bool { - version = cols[0].UnsafeGetStringAt(0) + version = cols[0].GetStringAt(0) return true }) if version == "" { diff --git a/pkg/bootstrap/versions/upgrade_tenant_task_test.go b/pkg/bootstrap/versions/upgrade_tenant_task_test.go new file mode 100644 index 000000000000..2e2e6e8943a8 --- /dev/null +++ b/pkg/bootstrap/versions/upgrade_tenant_task_test.go @@ -0,0 +1,93 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package versions + +import ( + "fmt" + "strings" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/types" + mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test" + "github.com/matrixorigin/matrixone/pkg/pb/txn" + "github.com/matrixorigin/matrixone/pkg/util/executor" +) + +func TestGetTenantCreateVersionForUpdate(t *testing.T) { + prefixMatchSql := fmt.Sprintf("select create_version from mo_account where account_id = %d for update", catalog.System_Account) + + sid := "" + runtime.RunTest( + sid, + func(rt runtime.Runtime) { + txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t)) + txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes() + + executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) { + if strings.EqualFold(sql, prefixMatchSql) { + typs := []types.Type{ + types.New(types.T_varchar, 50, 0), + } + + memRes := executor.NewMemResult(typs, mpool.MustNewZero()) + memRes.NewBatchWithRowCount(1) + + executor.AppendStringRows(memRes, 0, []string{"1.2.3"}) + result := memRes.GetResult() + return result, nil + } + return executor.Result{}, nil + }, txnOperator) + _, err := GetTenantCreateVersionForUpdate(int32(catalog.System_Account), executor) + require.NoError(t, err) + }, + ) +} + +func TestGetTenantVersion(t *testing.T) { + prefixMatchSql := fmt.Sprintf("select create_version from mo_account where account_id = %d", catalog.System_Account) + sid := "" + runtime.RunTest( + sid, + func(rt runtime.Runtime) { + txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t)) + txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes() + + executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) { + if strings.EqualFold(sql, prefixMatchSql) { + typs := []types.Type{ + types.New(types.T_varchar, 50, 0), + } + + memRes := executor.NewMemResult(typs, mpool.MustNewZero()) + memRes.NewBatchWithRowCount(1) + + executor.AppendStringRows(memRes, 0, []string{"1.2.3"}) + result := memRes.GetResult() + return result, nil + } + return executor.Result{}, nil + }, txnOperator) + _, err := GetTenantVersion(int32(catalog.System_Account), executor) + require.NoError(t, err) + }, + ) +} diff --git a/pkg/bootstrap/versions/version_upgrade.go b/pkg/bootstrap/versions/version_upgrade.go index db7a076ab3e9..35b6b1114c93 100644 --- a/pkg/bootstrap/versions/version_upgrade.go +++ b/pkg/bootstrap/versions/version_upgrade.go @@ -241,9 +241,9 @@ func getVersionUpgradesBySQL( for i := 0; i < rows; i++ { value := VersionUpgrade{} value.ID = vector.GetFixedAtWithTypeCheck[uint64](cols[0], i) - value.FromVersion = cols[1].UnsafeGetStringAt(i) - value.ToVersion = cols[2].UnsafeGetStringAt(i) - value.FinalVersion = cols[3].UnsafeGetStringAt(i) + value.FromVersion = cols[1].GetStringAt(i) + value.ToVersion = cols[2].GetStringAt(i) + value.FinalVersion = cols[3].GetStringAt(i) value.FinalVersionOffset = vector.GetFixedAtWithTypeCheck[uint32](cols[4], i) value.State = vector.GetFixedAtWithTypeCheck[int32](cols[5], i) value.UpgradeOrder = vector.GetFixedAtWithTypeCheck[int32](cols[6], i) diff --git a/pkg/bootstrap/versions/version_upgrade_test.go b/pkg/bootstrap/versions/version_upgrade_test.go new file mode 100644 index 000000000000..f0843c627e83 --- /dev/null +++ b/pkg/bootstrap/versions/version_upgrade_test.go @@ -0,0 +1,101 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package versions + +import ( + "fmt" + "strings" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/types" + mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test" + "github.com/matrixorigin/matrixone/pkg/pb/txn" + "github.com/matrixorigin/matrixone/pkg/util/executor" +) + +func Test_getVersionUpgradesBySQL(t *testing.T) { + preSql := fmt.Sprintf(`select + id, + from_version, + to_version, + final_version, + final_version_offset, + state, + upgrade_order, + upgrade_cluster, + upgrade_tenant, + total_tenant, + ready_tenant + from %s + where state = %d + order by upgrade_order asc`, + catalog.MOUpgradeTable, + StateUpgradingTenant) + + sid := "" + runtime.RunTest( + sid, + func(rt runtime.Runtime) { + txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t)) + txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes() + + executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) { + if strings.EqualFold(sql, preSql) { + typs := []types.Type{ + types.New(types.T_uint64, 64, 0), + types.New(types.T_varchar, 50, 0), + types.New(types.T_varchar, 50, 0), + types.New(types.T_varchar, 50, 0), + types.New(types.T_uint32, 32, 0), + types.New(types.T_int32, 64, 0), + types.New(types.T_int32, 64, 0), + types.New(types.T_int32, 64, 0), + types.New(types.T_int32, 64, 0), + types.New(types.T_int32, 32, 0), + types.New(types.T_int32, 32, 0), + } + + memRes := executor.NewMemResult(typs, mpool.MustNewZero()) + memRes.NewBatchWithRowCount(2) + + executor.AppendFixedRows(memRes, 0, []uint64{10001, 10002}) + executor.AppendStringRows(memRes, 1, []string{"1.2.1", "1.2.2"}) + executor.AppendStringRows(memRes, 2, []string{"1.2.2", "1.2.3"}) + executor.AppendStringRows(memRes, 3, []string{"1.2.3", "1.2.3"}) + executor.AppendFixedRows(memRes, 4, []uint32{2, 2}) + executor.AppendFixedRows(memRes, 5, []int32{2, 2}) + executor.AppendFixedRows(memRes, 6, []int32{0, 1}) + executor.AppendFixedRows(memRes, 7, []int32{0, 1}) + executor.AppendFixedRows(memRes, 8, []int32{0, 1}) + executor.AppendFixedRows(memRes, 9, []int32{0, 238}) + executor.AppendFixedRows(memRes, 10, []int32{0, 0}) + + result := memRes.GetResult() + return result, nil + } + return executor.Result{}, nil + }, txnOperator) + + _, err := getVersionUpgradesBySQL(preSql, executor) + require.NoError(t, err) + }, + ) +} diff --git a/pkg/util/executor/mem_executor.go b/pkg/util/executor/mem_executor.go index a779dd63e388..3d98cbfbd67d 100644 --- a/pkg/util/executor/mem_executor.go +++ b/pkg/util/executor/mem_executor.go @@ -98,6 +98,10 @@ func (m *MemResult) NewBatch() { m.res.Batches = append(m.res.Batches, newBatch(m.cols)) } +func (m *MemResult) NewBatchWithRowCount(rowcount int) { + m.res.Batches = append(m.res.Batches, newBatchWithRowCount(m.cols, rowcount)) +} + func (m *MemResult) GetResult() Result { return m.res } @@ -123,6 +127,12 @@ func newBatch(cols int) *batch.Batch { return bat } +func newBatchWithRowCount(cols int, rowcout int) *batch.Batch { + bat := batch.NewWithSize(cols) + bat.SetRowCount(rowcout) + return bat +} + func appendCols[T any]( bat *batch.Batch, colIndex int,