Skip to content

Commit

Permalink
mo-service: fix default cache configs
Browse files Browse the repository at this point in the history
fileservice: do not set default memory and disk config in setDefaults
  • Loading branch information
reusee committed Sep 13, 2024
1 parent 28d798f commit 8f7bb7d
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 108 deletions.
44 changes: 38 additions & 6 deletions cmd/mo-service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ func dumpCommonConfig(cfg Config) (map[string]*logservicepb.ConfigItem, error) {
}

func (c *Config) setFileserviceDefaultValues() {

for i := 0; i < len(c.FileServices); i++ {
config := &c.FileServices[i]

Expand All @@ -607,12 +608,6 @@ func (c *Config) setFileserviceDefaultValues() {
}
}

// set default disk cache dir
if config.Cache.DiskPath == nil {
path := config.DataDir + "-cache"
config.Cache.DiskPath = &path
}

}

// default LOCAL fs
Expand Down Expand Up @@ -666,4 +661,41 @@ func (c *Config) setFileserviceDefaultValues() {
})
}

for i := 0; i < len(c.FileServices); i++ {
config := &c.FileServices[i]

// cache configs
switch config.Name {

case defines.LocalFileServiceName:
// memory
if config.Cache.MemoryCapacity == nil {
capacity := tomlutil.ByteSize(512 * (1 << 20))
config.Cache.MemoryCapacity = &capacity
}
// no disk

case defines.SharedFileServiceName:
// memory
if config.Cache.MemoryCapacity == nil {
capacity := tomlutil.ByteSize(512 * (1 << 20))
config.Cache.MemoryCapacity = &capacity
}
// disk
if config.Cache.DiskPath == nil {
path := config.DataDir + "-cache"
config.Cache.DiskPath = &path
}
if config.Cache.DiskCapacity == nil {
capacity := tomlutil.ByteSize(8 * (1 << 30))
config.Cache.DiskCapacity = &capacity
}

case defines.ETLFileServiceName:
// no caches

}

}

}
17 changes: 0 additions & 17 deletions pkg/fileservice/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,6 @@ type CacheCallbacks struct {
type CacheCallbackFunc = func(fscache.CacheKey, fscache.Data)

func (c *CacheConfig) setDefaults() {
if c.MemoryCapacity == nil {
size := toml.ByteSize(512 << 20)
c.MemoryCapacity = &size
}
if c.DiskCapacity == nil {
size := toml.ByteSize(8 << 30)
c.DiskCapacity = &size
}
if c.DiskMinEvictInterval == nil {
c.DiskMinEvictInterval = &toml.Duration{
Duration: time.Minute * 7,
}
}
if c.DiskEvictTarget == nil {
target := 0.8
c.DiskEvictTarget = &target
}
c.RPC.Adjust()
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/fileservice/disk_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func (d *diskObjectStorage) Delete(ctx context.Context, keys ...string) (err err
if err := ctx.Err(); err != nil {
return err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Delete.Add(1)
}, d.perfCounterSets...)

for _, key := range keys {
path := filepath.Join(d.path, key)
_ = os.Remove(path)
Expand Down Expand Up @@ -102,6 +107,10 @@ func (d *diskObjectStorage) List(ctx context.Context, prefix string, fn func(isP
return err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.List.Add(1)
}, d.perfCounterSets...)

dir, prefix := path.Split(prefix)

f, err := os.Open(filepath.Join(d.path, dir))
Expand Down Expand Up @@ -144,6 +153,10 @@ func (d *diskObjectStorage) Read(ctx context.Context, key string, min *int64, ma
return nil, err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Get.Add(1)
}, d.perfCounterSets...)

path := filepath.Join(d.path, key)
f, err := os.Open(path)
if err != nil {
Expand Down Expand Up @@ -188,6 +201,10 @@ func (d *diskObjectStorage) Stat(ctx context.Context, key string) (size int64, e
return 0, err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Head.Add(1)
}, d.perfCounterSets...)

path := filepath.Join(d.path, key)
stat, err := os.Stat(path)
if err != nil {
Expand All @@ -207,6 +224,10 @@ func (d *diskObjectStorage) Write(ctx context.Context, key string, r io.Reader,
return err
}

perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
counter.FileService.S3.Put.Add(1)
}, d.perfCounterSets...)

tempFile, err := os.CreateTemp(d.path, "*.mofstemp")
if err != nil {
return err
Expand Down
11 changes: 11 additions & 0 deletions pkg/fileservice/disk_object_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ import (
func TestDiskObjectStorage(t *testing.T) {
ctx := context.Background()

testObjectStorage(t, func(t *testing.T) *diskObjectStorage {
storage, err := newDiskObjectStorage(context.Background(), ObjectStorageArguments{
Endpoint: "disk",
Bucket: t.TempDir(),
}, nil)
if err != nil {
t.Fatal(err)
}
return storage
})

testFileService(t, 0, func(name string) FileService {
dir := t.TempDir()
fs, err := NewS3FS(
Expand Down
43 changes: 24 additions & 19 deletions pkg/fileservice/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (l *LocalFS) AllocateCacheData(size int) fscache.Data {
func (l *LocalFS) initCaches(ctx context.Context, config CacheConfig) error {
config.setDefaults()

// remote
if config.RemoteCacheEnabled {
if config.QueryClient == nil {
return moerr.NewInternalError(ctx, "query client is nil")
Expand All @@ -131,7 +132,9 @@ func (l *LocalFS) initCaches(ctx context.Context, config CacheConfig) error {
)
}

if *config.MemoryCapacity > DisableCacheCapacity { // 1 means disable
// memory
if config.MemoryCapacity != nil &&
*config.MemoryCapacity > DisableCacheCapacity { // 1 means disable
l.memCache = NewMemCache(
fscache.ConstCapacity(int64(*config.MemoryCapacity)),
&config.CacheCallbacks,
Expand All @@ -143,25 +146,27 @@ func (l *LocalFS) initCaches(ctx context.Context, config CacheConfig) error {
)
}

if config.enableDiskCacheForLocalFS {
if *config.DiskCapacity > DisableCacheCapacity && config.DiskPath != nil {
var err error
l.diskCache, err = NewDiskCache(
ctx,
*config.DiskPath,
fscache.ConstCapacity(int64(*config.DiskCapacity)),
l.perfCounterSets,
true,
l,
)
if err != nil {
return err
}
logutil.Info("fileservice: disk cache initialized",
zap.Any("fs-name", l.name),
zap.Any("config", config),
)
// disk
if config.enableDiskCacheForLocalFS &&
config.DiskCapacity != nil &&
*config.DiskCapacity > DisableCacheCapacity &&
config.DiskPath != nil {
var err error
l.diskCache, err = NewDiskCache(
ctx,
*config.DiskPath,
fscache.ConstCapacity(int64(*config.DiskCapacity)),
l.perfCounterSets,
true,
l,
)
if err != nil {
return err
}
logutil.Info("fileservice: disk cache initialized",
zap.Any("fs-name", l.name),
zap.Any("config", config),
)
}

return nil
Expand Down
82 changes: 82 additions & 0 deletions pkg/fileservice/object_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileservice

import (
"bytes"
"context"
"io"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func testObjectStorage[T ObjectStorage](
t *testing.T,
newStorage func(t *testing.T) T,
) {

t.Run("basic", func(t *testing.T) {
storage := newStorage(t)
ctx := context.Background()

prefix := time.Now().Format("2006-01-02-15-04-05.000000")
name := path.Join(prefix, "foo")

// write
err := storage.Write(ctx, name, bytes.NewReader([]byte("foo")), 3, nil)
assert.Nil(t, err)

// list
n := 0
err = storage.List(ctx, prefix+"/", func(isPrefix bool, key string, size int64) (bool, error) {
n++
assert.Equal(t, false, isPrefix)
assert.Equal(t, name, key)
assert.Equal(t, int64(3), size)
return true, nil
})
assert.Nil(t, err)
assert.Equal(t, 1, n)

// stat
size, err := storage.Stat(ctx, name)
assert.Nil(t, err)
assert.Equal(t, int64(3), size)

// exists
exists, err := storage.Exists(ctx, name)
assert.Nil(t, err)
assert.True(t, exists)
exists, err = storage.Exists(ctx, "bar")
assert.Nil(t, err)
assert.False(t, exists)

// read
r, err := storage.Read(ctx, name, nil, nil)
assert.Nil(t, err)
content, err := io.ReadAll(r)
assert.Nil(t, err)
assert.Equal(t, []byte("foo"), content)
assert.Nil(t, r.Close())

// delete
err = storage.Delete(ctx, name)
assert.Nil(t, err)
})

}
4 changes: 3 additions & 1 deletion pkg/fileservice/remote_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/queryservice"
"github.com/matrixorigin/matrixone/pkg/queryservice/client"
"github.com/matrixorigin/matrixone/pkg/util/toml"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -125,7 +126,8 @@ func runTestWithTwoFileServices(t *testing.T, fn func(sf1 *cacheFs, sf2 *cacheFs
KeyRouterFactory: func() client.KeyRouter[query.CacheKey] {
return keyRouter
},
QueryClient: qt,
QueryClient: qt,
MemoryCapacity: ptrTo[toml.ByteSize](1 << 30),
}
cacheCfg.setDefaults()
cacheCfg.SetRemoteCacheCallback()
Expand Down
7 changes: 5 additions & 2 deletions pkg/fileservice/s3_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func (s *S3FS) initCaches(ctx context.Context, config CacheConfig) error {
}

// memory cache
if *config.MemoryCapacity > DisableCacheCapacity {
if config.MemoryCapacity != nil &&
*config.MemoryCapacity > DisableCacheCapacity {
s.memCache = NewMemCache(
fscache.ConstCapacity(int64(*config.MemoryCapacity)),
&config.CacheCallbacks,
Expand All @@ -170,7 +171,9 @@ func (s *S3FS) initCaches(ctx context.Context, config CacheConfig) error {
}

// disk cache
if *config.DiskCapacity > DisableCacheCapacity && config.DiskPath != nil {
if config.DiskCapacity != nil &&
*config.DiskCapacity > DisableCacheCapacity &&
config.DiskPath != nil {
var err error
s.diskCache, err = NewDiskCache(
ctx,
Expand Down
Loading

0 comments on commit 8f7bb7d

Please sign in to comment.