Skip to content

Commit

Permalink
Merge remote-tracking branch 'me/decop-reader' into decop-reader
Browse files Browse the repository at this point in the history
  • Loading branch information
XuPeng-SH committed Sep 19, 2024
2 parents 2a31778 + 8a2b7fc commit 6c40bbe
Show file tree
Hide file tree
Showing 28 changed files with 788 additions and 494 deletions.
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/catalog/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ func MockDBEntryWithAccInfo(accId uint64, dbId uint64) *DBEntry {

entry.DBNode = &DBNode{}
entry.DBNode.acInfo.TenantID = uint32(accId)
entry.BaseEntryImpl = NewBaseEntry(func() *EmptyMVCCNode { return &EmptyMVCCNode{} })

return entry
}
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/catalog/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,15 +529,15 @@ func (entry *ObjectEntry) PrintPrepareCompactDebugLog() {
logutil.Info(s)
}

func MockObjEntryWithTbl(tbl *TableEntry, size uint64) *ObjectEntry {
func MockObjEntryWithTbl(tbl *TableEntry, size uint64, isTombstone bool) *ObjectEntry {
stats := objectio.NewObjectStats()
objectio.SetObjectStatsSize(stats, uint32(size))
// to make sure pass the stats empty check
objectio.SetObjectStatsRowCnt(stats, uint32(1))
ts := types.BuildTS(time.Now().UnixNano(), 0)
e := &ObjectEntry{
table: tbl,
ObjectNode: ObjectNode{},
ObjectNode: ObjectNode{IsTombstone: isTombstone},
EntryMVCCNode: EntryMVCCNode{
CreatedAt: ts,
},
Expand Down
19 changes: 13 additions & 6 deletions pkg/vm/engine/tae/catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"

pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog"
Expand Down Expand Up @@ -354,12 +353,19 @@ func (entry *TableEntry) ObjectCnt(isTombstone bool) int {
return entry.dataObjects.tree.Load().Len()
}

func (entry *TableEntry) ObjectStats(level common.PPLevel, start, end int) (stat TableStat, w bytes.Buffer) {
func (entry *TableEntry) ObjectStats(level common.PPLevel, start, end int, isTombstone bool) (stat TableStat, w bytes.Buffer) {
var it btree.IterG[*ObjectEntry]
if isTombstone {
w.WriteString("TOMBSTONES\n")
it = entry.MakeTombstoneObjectIt()
} else {
w.WriteString("DATA\n")
it = entry.MakeDataObjectIt()
}

it := entry.MakeDataObjectIt()
defer it.Release()
zonemapKind := common.ZonemapPrintKindNormal
if schema := entry.GetLastestSchemaLocked(false); schema.HasSortKey() && strings.HasPrefix(schema.GetSingleSortKey().Name, "__") {
if schema := entry.GetLastestSchemaLocked(isTombstone); schema.HasSortKey() && schema.GetSingleSortKey().Name == "__mo_cpkey_col" {
zonemapKind = common.ZonemapPrintKindCompose
}

Expand Down Expand Up @@ -411,8 +417,8 @@ func (entry *TableEntry) ObjectStats(level common.PPLevel, start, end int) (stat
return
}

func (entry *TableEntry) ObjectStatsString(level common.PPLevel, start, end int) string {
stat, detail := entry.ObjectStats(level, start, end)
func (entry *TableEntry) ObjectStatsString(level common.PPLevel, start, end int, isTombstone bool) string {
stat, detail := entry.ObjectStats(level, start, end, isTombstone)

var avgCsize, avgRow, avgOsize int
if stat.Loaded > 0 {
Expand Down Expand Up @@ -712,6 +718,7 @@ func MockTableEntryWithDB(dbEntry *DBEntry, tblId uint64) *TableEntry {
entry := NewReplayTableEntry()
entry.TableNode = &TableNode{}
entry.TableNode.schema.Store(NewEmptySchema("test"))
entry.TableNode.tombstoneSchema.Store(NewEmptySchema("tombstone"))
entry.ID = tblId
entry.db = dbEntry
return entry
Expand Down
38 changes: 38 additions & 0 deletions pkg/vm/engine/tae/catalog/table_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package catalog

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
)

func TestTableObjectStats(t *testing.T) {
db := MockDBEntryWithAccInfo(0, 0)
tbl := MockTableEntryWithDB(db, 1)
_, detail := tbl.ObjectStats(common.PPL4, 0, 1, false)
require.Equal(t, "DATA\n", detail.String())

tbl.dataObjects.Set(MockObjEntryWithTbl(tbl, 10, false), true)
_, detail = tbl.ObjectStats(common.PPL4, 0, 1, false)
require.Equal(t, "DATA\n\n00000000-0000-0000-0000-000000000000_0\n loaded:true, oSize:0B, cSzie:10B rows:1, zm: ZM(ANY)0[<nil>,<nil>]--\n", detail.String())

tbl.tombstoneObjects.Set(MockObjEntryWithTbl(tbl, 20, true), true)
_, detail = tbl.ObjectStats(common.PPL4, 0, 1, true)
require.Equal(t, "TOMBSTONES\n\n00000000-0000-0000-0000-000000000000_0\n loaded:true, oSize:0B, cSzie:20B rows:1, zm: ZM(ANY)0[<nil>,<nil>]--\n", detail.String())
}
5 changes: 5 additions & 0 deletions pkg/vm/engine/tae/db/merge/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"slices"
"strings"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
Expand All @@ -33,6 +34,7 @@ var (
MaxOsizeMergedObj: common.DefaultMaxOsizeObjMB * common.Const1MBytes,
ObjectMinOsize: common.DefaultMinOsizeQualifiedMB * common.Const1MBytes,
MinCNMergeSize: common.DefaultMinCNMergeSize * common.Const1MBytes,
TombstoneLifetime: 30 * time.Minute,
}
)

Expand All @@ -46,6 +48,8 @@ type BasicPolicyConfig struct {
MinCNMergeSize uint64
FromUser bool
MergeHints []api.MergeHint

TombstoneLifetime time.Duration
}

func (c *BasicPolicyConfig) String() string {
Expand Down Expand Up @@ -102,6 +106,7 @@ func (o *customConfigProvider) getConfig(tbl *catalog.TableEntry) *BasicPolicyCo
MinCNMergeSize: cnSize,
FromUser: true,
MergeHints: extra.Hints,
TombstoneLifetime: 30 * time.Minute,
}
o.configs[tbl.ID] = p
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/merge/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (e *executor) executeFor(entry *catalog.TableEntry, mobjs []*catalog.Object
}
}

if len(objs) > 1 {
if len(objs) > 0 {
e.scheduleMergeObjects(objScopes, objs, objectBlkCnt, entry, false)
}
if len(tombstones) > 1 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/merge/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ const (

type policy interface {
onObject(*catalog.ObjectEntry, *BasicPolicyConfig) bool
revise(cpu, mem int64, config *BasicPolicyConfig) ([]*catalog.ObjectEntry, TaskHostKind)
revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult
resetForTable(*catalog.TableEntry)
}

Expand Down
57 changes: 36 additions & 21 deletions pkg/vm/engine/tae/db/merge/policyBasic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package merge

import (
"cmp"
"context"
"slices"
"time"

"github.com/matrixorigin/matrixone/pkg/logutil"
Expand Down Expand Up @@ -54,9 +56,11 @@ func (g *policyGroup) onObject(obj *catalog.ObjectEntry) {
func (g *policyGroup) revise(cpu, mem int64) []reviseResult {
results := make([]reviseResult, 0, len(g.policies))
for _, p := range g.policies {
objs, kind := p.revise(cpu, mem, g.config)
if len(objs) > 1 {
results = append(results, reviseResult{objs, kind})
pResult := p.revise(cpu, mem, g.config)
for _, r := range pResult {
if len(r.objs) > 0 {
results = append(results, r)
}
}
}
return results
Expand Down Expand Up @@ -108,16 +112,16 @@ func (g *policyGroup) getConfig(tbl *catalog.TableEntry) *BasicPolicyConfig {
type basic struct {
schema *catalog.Schema
hist *common.MergeHistory
objHeap *heapBuilder[*catalog.ObjectEntry]
accBuf []int
objects []*catalog.ObjectEntry

objectsSize int
accBuf []int
}

func newBasicPolicy() policy {
return &basic{
objHeap: &heapBuilder[*catalog.ObjectEntry]{
items: make(itemSet[*catalog.ObjectEntry], 0, 32),
},
accBuf: make([]int, 1, 32),
objects: make([]*catalog.ObjectEntry, 0, 16),
accBuf: make([]int, 1, 32),
}
}

Expand All @@ -130,7 +134,14 @@ func (o *basic) onObject(obj *catalog.ObjectEntry, config *BasicPolicyConfig) bo
osize := int(obj.OriginSize())

isCandidate := func() bool {
if len(o.objects) >= config.MergeMaxOneRun {
return false
}
if osize < int(config.ObjectMinOsize) {
if o.objectsSize > 2*common.DefaultMaxOsizeObjMB*common.Const1MBytes {
return false
}
o.objectsSize += osize
return true
}
// skip big object as an insurance
Expand All @@ -142,17 +153,17 @@ func (o *basic) onObject(obj *catalog.ObjectEntry, config *BasicPolicyConfig) bo
}

if isCandidate() {
o.objHeap.pushWithCap(&mItem[*catalog.ObjectEntry]{
row: int(obj.Rows()),
entry: obj,
}, config.MergeMaxOneRun)
o.objects = append(o.objects, obj)
return true
}
return false
}

func (o *basic) revise(cpu, mem int64, config *BasicPolicyConfig) ([]*catalog.ObjectEntry, TaskHostKind) {
objs := o.objHeap.finish()
func (o *basic) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult {
slices.SortFunc(o.objects, func(a, b *catalog.ObjectEntry) int {
return cmp.Compare(a.Rows(), b.Rows())
})
objs := o.objects

isStandalone := common.IsStandaloneBoost.Load()
mergeOnDNIfStandalone := !common.ShouldStandaloneCNTakeOver.Load()
Expand All @@ -162,20 +173,23 @@ func (o *basic) revise(cpu, mem int64, config *BasicPolicyConfig) ([]*catalog.Ob

dnosize, _ := estimateMergeConsume(dnobjs)

schedDN := func() ([]*catalog.ObjectEntry, TaskHostKind) {
schedDN := func() []reviseResult {
if cpu > 85 {
if dnosize > 25*common.Const1MBytes {
logutil.Infof("mergeblocks skip big merge for high level cpu usage, %d", cpu)
return nil, TaskHostDN
return nil
}
}
return dnobjs, TaskHostDN
if len(dnobjs) > 1 {
return []reviseResult{{dnobjs, TaskHostDN}}
}
return nil
}

schedCN := func() ([]*catalog.ObjectEntry, TaskHostKind) {
schedCN := func() []reviseResult {
cnobjs := controlMem(objs, int64(common.RuntimeCNMergeMemControl.Load()))
cnobjs = o.optimize(cnobjs, config)
return cnobjs, TaskHostCN
return []reviseResult{{cnobjs, TaskHostCN}}
}

if isStandalone && mergeOnDNIfStandalone {
Expand Down Expand Up @@ -246,5 +260,6 @@ func controlMem(objs []*catalog.ObjectEntry, mem int64) []*catalog.ObjectEntry {
func (o *basic) resetForTable(entry *catalog.TableEntry) {
o.schema = entry.GetLastestSchemaLocked(false)
o.hist = entry.Stats.GetLastMerge()
o.objHeap.reset()
o.objects = o.objects[:0]
o.objectsSize = 0
}
Loading

0 comments on commit 6c40bbe

Please sign in to comment.