Skip to content

Commit

Permalink
Merge branch 'main' into fix-4068
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Sep 13, 2024
2 parents 6eadfcb + 5cb18b8 commit eeafbd8
Show file tree
Hide file tree
Showing 32 changed files with 853 additions and 225 deletions.
44 changes: 38 additions & 6 deletions cmd/mo-service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ func dumpCommonConfig(cfg Config) (map[string]*logservicepb.ConfigItem, error) {
}

func (c *Config) setFileserviceDefaultValues() {

for i := 0; i < len(c.FileServices); i++ {
config := &c.FileServices[i]

Expand All @@ -607,12 +608,6 @@ func (c *Config) setFileserviceDefaultValues() {
}
}

// set default disk cache dir
if config.Cache.DiskPath == nil {
path := config.DataDir + "-cache"
config.Cache.DiskPath = &path
}

}

// default LOCAL fs
Expand Down Expand Up @@ -666,4 +661,41 @@ func (c *Config) setFileserviceDefaultValues() {
})
}

for i := 0; i < len(c.FileServices); i++ {
config := &c.FileServices[i]

// cache configs
switch config.Name {

case defines.LocalFileServiceName:
// memory
if config.Cache.MemoryCapacity == nil {
capacity := tomlutil.ByteSize(512 * (1 << 20))
config.Cache.MemoryCapacity = &capacity
}
// no disk

case defines.SharedFileServiceName:
// memory
if config.Cache.MemoryCapacity == nil {
capacity := tomlutil.ByteSize(512 * (1 << 20))
config.Cache.MemoryCapacity = &capacity
}
// disk
if config.Cache.DiskPath == nil {
path := config.DataDir + "-cache"
config.Cache.DiskPath = &path
}
if config.Cache.DiskCapacity == nil {
capacity := tomlutil.ByteSize(8 * (1 << 30))
config.Cache.DiskCapacity = &capacity
}

case defines.ETLFileServiceName:
// no caches

}

}

}
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/upgrade_tenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func GetUpgradeTenantTasks(
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
for i := 0; i < rows; i++ {
tenants = append(tenants, vector.GetFixedAtWithTypeCheck[int32](cols[0], i))
versions = append(versions, cols[1].UnsafeGetStringAt(i))
versions = append(versions, cols[1].GetStringAt(i))
}
return true
})
Expand All @@ -131,7 +131,7 @@ func GetTenantCreateVersionForUpdate(
defer res.Close()
version := ""
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
version = cols[0].UnsafeGetStringAt(0)
version = cols[0].GetStringAt(0)
return true
})
if version == "" {
Expand Down Expand Up @@ -174,7 +174,7 @@ func GetTenantVersion(
defer res.Close()
version := ""
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
version = cols[0].UnsafeGetStringAt(0)
version = cols[0].GetStringAt(0)
return true
})
if version == "" {
Expand Down
93 changes: 93 additions & 0 deletions pkg/bootstrap/versions/upgrade_tenant_task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package versions

import (
"fmt"
"strings"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/types"
mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

func TestGetTenantCreateVersionForUpdate(t *testing.T) {
prefixMatchSql := fmt.Sprintf("select create_version from mo_account where account_id = %d for update", catalog.System_Account)

sid := ""
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()

executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
if strings.EqualFold(sql, prefixMatchSql) {
typs := []types.Type{
types.New(types.T_varchar, 50, 0),
}

memRes := executor.NewMemResult(typs, mpool.MustNewZero())
memRes.NewBatchWithRowCount(1)

executor.AppendStringRows(memRes, 0, []string{"1.2.3"})
result := memRes.GetResult()
return result, nil
}
return executor.Result{}, nil
}, txnOperator)
_, err := GetTenantCreateVersionForUpdate(int32(catalog.System_Account), executor)
require.NoError(t, err)
},
)
}

func TestGetTenantVersion(t *testing.T) {
prefixMatchSql := fmt.Sprintf("select create_version from mo_account where account_id = %d", catalog.System_Account)
sid := ""
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()

executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
if strings.EqualFold(sql, prefixMatchSql) {
typs := []types.Type{
types.New(types.T_varchar, 50, 0),
}

memRes := executor.NewMemResult(typs, mpool.MustNewZero())
memRes.NewBatchWithRowCount(1)

executor.AppendStringRows(memRes, 0, []string{"1.2.3"})
result := memRes.GetResult()
return result, nil
}
return executor.Result{}, nil
}, txnOperator)
_, err := GetTenantVersion(int32(catalog.System_Account), executor)
require.NoError(t, err)
},
)
}
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/version_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ func getVersionUpgradesBySQL(
for i := 0; i < rows; i++ {
value := VersionUpgrade{}
value.ID = vector.GetFixedAtWithTypeCheck[uint64](cols[0], i)
value.FromVersion = cols[1].UnsafeGetStringAt(i)
value.ToVersion = cols[2].UnsafeGetStringAt(i)
value.FinalVersion = cols[3].UnsafeGetStringAt(i)
value.FromVersion = cols[1].GetStringAt(i)
value.ToVersion = cols[2].GetStringAt(i)
value.FinalVersion = cols[3].GetStringAt(i)
value.FinalVersionOffset = vector.GetFixedAtWithTypeCheck[uint32](cols[4], i)
value.State = vector.GetFixedAtWithTypeCheck[int32](cols[5], i)
value.UpgradeOrder = vector.GetFixedAtWithTypeCheck[int32](cols[6], i)
Expand Down
101 changes: 101 additions & 0 deletions pkg/bootstrap/versions/version_upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package versions

import (
"fmt"
"strings"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/types"
mock_frontend "github.com/matrixorigin/matrixone/pkg/frontend/test"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

func Test_getVersionUpgradesBySQL(t *testing.T) {
preSql := fmt.Sprintf(`select
id,
from_version,
to_version,
final_version,
final_version_offset,
state,
upgrade_order,
upgrade_cluster,
upgrade_tenant,
total_tenant,
ready_tenant
from %s
where state = %d
order by upgrade_order asc`,
catalog.MOUpgradeTable,
StateUpgradingTenant)

sid := ""
runtime.RunTest(
sid,
func(rt runtime.Runtime) {
txnOperator := mock_frontend.NewMockTxnOperator(gomock.NewController(t))
txnOperator.EXPECT().TxnOptions().Return(txn.TxnOptions{CN: sid}).AnyTimes()

executor := executor.NewMemTxnExecutor(func(sql string) (executor.Result, error) {
if strings.EqualFold(sql, preSql) {
typs := []types.Type{
types.New(types.T_uint64, 64, 0),
types.New(types.T_varchar, 50, 0),
types.New(types.T_varchar, 50, 0),
types.New(types.T_varchar, 50, 0),
types.New(types.T_uint32, 32, 0),
types.New(types.T_int32, 64, 0),
types.New(types.T_int32, 64, 0),
types.New(types.T_int32, 64, 0),
types.New(types.T_int32, 64, 0),
types.New(types.T_int32, 32, 0),
types.New(types.T_int32, 32, 0),
}

memRes := executor.NewMemResult(typs, mpool.MustNewZero())
memRes.NewBatchWithRowCount(2)

executor.AppendFixedRows(memRes, 0, []uint64{10001, 10002})
executor.AppendStringRows(memRes, 1, []string{"1.2.1", "1.2.2"})
executor.AppendStringRows(memRes, 2, []string{"1.2.2", "1.2.3"})
executor.AppendStringRows(memRes, 3, []string{"1.2.3", "1.2.3"})
executor.AppendFixedRows(memRes, 4, []uint32{2, 2})
executor.AppendFixedRows(memRes, 5, []int32{2, 2})
executor.AppendFixedRows(memRes, 6, []int32{0, 1})
executor.AppendFixedRows(memRes, 7, []int32{0, 1})
executor.AppendFixedRows(memRes, 8, []int32{0, 1})
executor.AppendFixedRows(memRes, 9, []int32{0, 238})
executor.AppendFixedRows(memRes, 10, []int32{0, 0})

result := memRes.GetResult()
return result, nil
}
return executor.Result{}, nil
}, txnOperator)

_, err := getVersionUpgradesBySQL(preSql, executor)
require.NoError(t, err)
},
)
}
17 changes: 0 additions & 17 deletions pkg/fileservice/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,6 @@ type CacheCallbacks struct {
type CacheCallbackFunc = func(fscache.CacheKey, fscache.Data)

func (c *CacheConfig) setDefaults() {
if c.MemoryCapacity == nil {
size := toml.ByteSize(512 << 20)
c.MemoryCapacity = &size
}
if c.DiskCapacity == nil {
size := toml.ByteSize(8 << 30)
c.DiskCapacity = &size
}
if c.DiskMinEvictInterval == nil {
c.DiskMinEvictInterval = &toml.Duration{
Duration: time.Minute * 7,
}
}
if c.DiskEvictTarget == nil {
target := 0.8
c.DiskEvictTarget = &target
}
c.RPC.Adjust()
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/fileservice/disk_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func (d *diskObjectStorage) Delete(ctx context.Context, keys ...string) (err err
if err := ctx.Err(); err != nil {
return err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Delete.Add(1)
}, d.perfCounterSets...)

for _, key := range keys {
path := filepath.Join(d.path, key)
_ = os.Remove(path)
Expand Down Expand Up @@ -102,6 +107,10 @@ func (d *diskObjectStorage) List(ctx context.Context, prefix string, fn func(isP
return err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.List.Add(1)
}, d.perfCounterSets...)

dir, prefix := path.Split(prefix)

f, err := os.Open(filepath.Join(d.path, dir))
Expand Down Expand Up @@ -144,6 +153,10 @@ func (d *diskObjectStorage) Read(ctx context.Context, key string, min *int64, ma
return nil, err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Get.Add(1)
}, d.perfCounterSets...)

path := filepath.Join(d.path, key)
f, err := os.Open(path)
if err != nil {
Expand Down Expand Up @@ -188,6 +201,10 @@ func (d *diskObjectStorage) Stat(ctx context.Context, key string) (size int64, e
return 0, err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Head.Add(1)
}, d.perfCounterSets...)

path := filepath.Join(d.path, key)
stat, err := os.Stat(path)
if err != nil {
Expand All @@ -207,6 +224,10 @@ func (d *diskObjectStorage) Write(ctx context.Context, key string, r io.Reader,
return err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Put.Add(1)
}, d.perfCounterSets...)

tempFile, err := os.CreateTemp(d.path, "*.mofstemp")
if err != nil {
return err
Expand Down
11 changes: 11 additions & 0 deletions pkg/fileservice/disk_object_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ import (
func TestDiskObjectStorage(t *testing.T) {
ctx := context.Background()

testObjectStorage(t, func(t *testing.T) *diskObjectStorage {
storage, err := newDiskObjectStorage(context.Background(), ObjectStorageArguments{
Endpoint: "disk",
Bucket: t.TempDir(),
}, nil)
if err != nil {
t.Fatal(err)
}
return storage
})

testFileService(t, 0, func(name string) FileService {
dir := t.TempDir()
fs, err := NewS3FS(
Expand Down
Loading

0 comments on commit eeafbd8

Please sign in to comment.