Skip to content

Commit

Permalink
Update upgrade code main (#18733)
Browse files Browse the repository at this point in the history
Method for reading query results during upgrade modification
Change from Unsafe to call Get String At ()

Approved by: @daviszhen, @zhangxu19830126
  • Loading branch information
qingxinhome committed Sep 13, 2024
1 parent 6a33c32 commit 5cb18b8
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 6 deletions.
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/upgrade_tenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func GetUpgradeTenantTasks(
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
for i := 0; i < rows; i++ {
tenants = append(tenants, vector.GetFixedAtWithTypeCheck[int32](cols[0], i))
versions = append(versions, cols[1].UnsafeGetStringAt(i))
versions = append(versions, cols[1].GetStringAt(i))
}
return true
})
Expand All @@ -131,7 +131,7 @@ func GetTenantCreateVersionForUpdate(
defer res.Close()
version := ""
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
version = cols[0].UnsafeGetStringAt(0)
version = cols[0].GetStringAt(0)
return true
})
if version == "" {
Expand Down Expand Up @@ -174,7 +174,7 @@ func GetTenantVersion(
defer res.Close()
version := ""
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
version = cols[0].UnsafeGetStringAt(0)
version = cols[0].GetStringAt(0)
return true
})
if version == "" {
Expand Down
93 changes: 93 additions & 0 deletions pkg/bootstrap/versions/upgrade_tenant_task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package versions

import (
"fmt"
"strings"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/types"
mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

func TestGetTenantCreateVersionForUpdate(t *testing.T) {
prefixMatchSql := fmt.Sprintf("select create_version from mo_account where account_id = %d for update", catalog.System_Account)

sid := ""
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()

executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
if strings.EqualFold(sql, prefixMatchSql) {
typs := []types.Type{
types.New(types.T_varchar, 50, 0),
}

memRes := executor.NewMemResult(typs, mpool.MustNewZero())
memRes.NewBatchWithRowCount(1)

executor.AppendStringRows(memRes, 0, []string{"1.2.3"})
result := memRes.GetResult()
return result, nil
}
return executor.Result{}, nil
}, txnOperator)
_, err := GetTenantCreateVersionForUpdate(int32(catalog.System_Account), executor)
require.NoError(t, err)
},
)
}

func TestGetTenantVersion(t *testing.T) {
prefixMatchSql := fmt.Sprintf("select create_version from mo_account where account_id = %d", catalog.System_Account)
sid := ""
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()

executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
if strings.EqualFold(sql, prefixMatchSql) {
typs := []types.Type{
types.New(types.T_varchar, 50, 0),
}

memRes := executor.NewMemResult(typs, mpool.MustNewZero())
memRes.NewBatchWithRowCount(1)

executor.AppendStringRows(memRes, 0, []string{"1.2.3"})
result := memRes.GetResult()
return result, nil
}
return executor.Result{}, nil
}, txnOperator)
_, err := GetTenantVersion(int32(catalog.System_Account), executor)
require.NoError(t, err)
},
)
}
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/version_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ func getVersionUpgradesBySQL(
for i := 0; i < rows; i++ {
value := VersionUpgrade{}
value.ID = vector.GetFixedAtWithTypeCheck[uint64](cols[0], i)
value.FromVersion = cols[1].UnsafeGetStringAt(i)
value.ToVersion = cols[2].UnsafeGetStringAt(i)
value.FinalVersion = cols[3].UnsafeGetStringAt(i)
value.FromVersion = cols[1].GetStringAt(i)
value.ToVersion = cols[2].GetStringAt(i)
value.FinalVersion = cols[3].GetStringAt(i)
value.FinalVersionOffset = vector.GetFixedAtWithTypeCheck[uint32](cols[4], i)
value.State = vector.GetFixedAtWithTypeCheck[int32](cols[5], i)
value.UpgradeOrder = vector.GetFixedAtWithTypeCheck[int32](cols[6], i)
Expand Down
101 changes: 101 additions & 0 deletions pkg/bootstrap/versions/version_upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package versions

import (
"fmt"
"strings"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/types"
mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

func Test_getVersionUpgradesBySQL(t *testing.T) {
preSql := fmt.Sprintf(`select
id,
from_version,
to_version,
final_version,
final_version_offset,
state,
upgrade_order,
upgrade_cluster,
upgrade_tenant,
total_tenant,
ready_tenant
from %s
where state = %d
order by upgrade_order asc`,
catalog.MOUpgradeTable,
StateUpgradingTenant)

sid := ""
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()

executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
if strings.EqualFold(sql, preSql) {
typs := []types.Type{
types.New(types.T_uint64, 64, 0),
types.New(types.T_varchar, 50, 0),
types.New(types.T_varchar, 50, 0),
types.New(types.T_varchar, 50, 0),
types.New(types.T_uint32, 32, 0),
types.New(types.T_int32, 64, 0),
types.New(types.T_int32, 64, 0),
types.New(types.T_int32, 64, 0),
types.New(types.T_int32, 64, 0),
types.New(types.T_int32, 32, 0),
types.New(types.T_int32, 32, 0),
}

memRes := executor.NewMemResult(typs, mpool.MustNewZero())
memRes.NewBatchWithRowCount(2)

executor.AppendFixedRows(memRes, 0, []uint64{10001, 10002})
executor.AppendStringRows(memRes, 1, []string{"1.2.1", "1.2.2"})
executor.AppendStringRows(memRes, 2, []string{"1.2.2", "1.2.3"})
executor.AppendStringRows(memRes, 3, []string{"1.2.3", "1.2.3"})
executor.AppendFixedRows(memRes, 4, []uint32{2, 2})
executor.AppendFixedRows(memRes, 5, []int32{2, 2})
executor.AppendFixedRows(memRes, 6, []int32{0, 1})
executor.AppendFixedRows(memRes, 7, []int32{0, 1})
executor.AppendFixedRows(memRes, 8, []int32{0, 1})
executor.AppendFixedRows(memRes, 9, []int32{0, 238})
executor.AppendFixedRows(memRes, 10, []int32{0, 0})

result := memRes.GetResult()
return result, nil
}
return executor.Result{}, nil
}, txnOperator)

_, err := getVersionUpgradesBySQL(preSql, executor)
require.NoError(t, err)
},
)
}
10 changes: 10 additions & 0 deletions pkg/util/executor/mem_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (m *MemResult) NewBatch() {
m.res.Batches = append(m.res.Batches, newBatch(m.cols))
}

func (m *MemResult) NewBatchWithRowCount(rowcount int) {
m.res.Batches = append(m.res.Batches, newBatchWithRowCount(m.cols, rowcount))
}

func (m *MemResult) GetResult() Result {
return m.res
}
Expand All @@ -123,6 +127,12 @@ func newBatch(cols int) *batch.Batch {
return bat
}

func newBatchWithRowCount(cols int, rowcout int) *batch.Batch {
bat := batch.NewWithSize(cols)
bat.SetRowCount(rowcout)
return bat
}

func appendCols[T any](
bat *batch.Batch,
colIndex int,
Expand Down

0 comments on commit 5cb18b8

Please sign in to comment.