Skip to content

Commit

Permalink
Merge branch 'main' into sinkers
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Sep 20, 2024
2 parents df9c455 + 949c826 commit a2656cf
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 212 deletions.
27 changes: 19 additions & 8 deletions pkg/gossip/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,34 @@ package gossip

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/lni/goutils/leaktest"
"github.com/stretchr/testify/assert"
)

func getRandomPort() int {
rand.New(rand.NewSource(time.Now().UnixNano()))
return rand.Intn(65535-1024) + 1024
}

func TestNodeGossip(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
svcPort1 := getRandomPort()
cachePort1 := getRandomPort()
n1, err := NewNode(ctx, "n1",
WithListenAddrFn(func() string {
return "127.0.0.1:38001"
return fmt.Sprintf("127.0.0.1:%d", svcPort1)
}),
WithServiceAddrFn(func() string {
return "127.0.0.1:38001"
return fmt.Sprintf("127.0.0.1:%d", svcPort1)
}),
WithCacheServerAddrFn(func() string {
return "127.0.0.1:38101"
return fmt.Sprintf("127.0.0.1:%d", cachePort1)
}))
assert.NoError(t, err)
assert.NotNil(t, n1)
Expand All @@ -46,17 +55,19 @@ func TestNodeGossip(t *testing.T) {
err = n1.Join(nil)
assert.Equal(t, 1, n1.NumMembers())
assert.NotNil(t, n1.DistKeyCacheGetter()())

assert.NoError(t, err)

svcPort2 := getRandomPort()
cachePort2 := getRandomPort()
n2, err := NewNode(ctx, "n2",
WithListenAddrFn(func() string {
return "127.0.0.1:38002"
return fmt.Sprintf("127.0.0.1:%d", svcPort2)
}),
WithServiceAddrFn(func() string {
return "127.0.0.1:38002"
return fmt.Sprintf("127.0.0.1:%d", svcPort2)
}),
WithCacheServerAddrFn(func() string {
return "127.0.0.1:38201"
return fmt.Sprintf("127.0.0.1:%d", cachePort2)
}))
assert.NoError(t, err)
assert.NotNil(t, n2)
Expand All @@ -65,7 +76,7 @@ func TestNodeGossip(t *testing.T) {
defer func() {
assert.NoError(t, n2.Leave(time.Millisecond*300))
}()
err = n2.Join([]string{"127.0.0.1:38001"})
err = n2.Join([]string{fmt.Sprintf("127.0.0.1:%d", svcPort1)})
assert.NoError(t, err)
assert.Equal(t, 2, n2.NumMembers())
assert.NotNil(t, n2.DistKeyCacheGetter()())
Expand Down
262 changes: 113 additions & 149 deletions pkg/pb/shard/shard.pb.go

Large diffs are not rendered by default.

12 changes: 0 additions & 12 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,16 +631,6 @@ func (e *Engine) Hints() (h engine.Hints) {
return
}

func determineScanType(relData engine.RelData, readerNum int) (scanType int) {
scanType = NORMAL
if relData.DataCnt() < readerNum*SMALLSCAN_THRESHOLD || readerNum == 1 {
scanType = SMALL
} else if (readerNum * LARGESCAN_THRESHOLD) <= relData.DataCnt() {
scanType = LARGE
}
return
}

func (e *Engine) BuildBlockReaders(
ctx context.Context,
p any,
Expand All @@ -667,7 +657,6 @@ func (e *Engine) BuildBlockReaders(
return nil, err
}

scanType := determineScanType(relData, newNum)
mod := blkCnt % newNum
divide := blkCnt / newNum
for i := 0; i < newNum; i++ {
Expand All @@ -694,7 +683,6 @@ func (e *Engine) BuildBlockReaders(
if err != nil {
return nil, err
}
rd.scanType = scanType
rds = append(rds, rd)
}
return rds, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/vm/engine/disttae/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ func (r *reader) Read(
}

var policy fileservice.Policy
if r.scanType == LARGE || r.scanType == NORMAL {
if r.readBlockCnt == 0 {
r.smallScanThreshHold = GetSmallScanThreshHold()
}
if r.readBlockCnt > r.smallScanThreshHold {
policy = fileservice.SkipMemoryCacheWrites
}

Expand All @@ -381,6 +384,7 @@ func (r *reader) Read(
if err != nil {
return false, err
}
r.readBlockCnt++

if filter.Valid {
// we collect mem cache hit related statistics info for blk read here
Expand Down
3 changes: 0 additions & 3 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1747,7 +1747,6 @@ func (tbl *txnTable) BuildReaders(
}
}

scanType := determineScanType(relData, newNum)
def := tbl.GetTableDef(ctx)
mod := blkCnt % newNum
divide := blkCnt / newNum
Expand Down Expand Up @@ -1777,8 +1776,6 @@ func (tbl *txnTable) BuildReaders(
if err != nil {
return nil, err
}

rd.scanType = scanType
rds = append(rds, rd)
}
return rds, nil
Expand Down
5 changes: 0 additions & 5 deletions pkg/vm/engine/disttae/txn_table_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ type shardingLocalReader struct {
//relation data to distribute to remote CN which holds shard's partition state.
remoteRelData engine.RelData
remoteTombApplyPolicy engine.TombstoneApplyPolicy
remoteScanType int
}

// TODO::
Expand Down Expand Up @@ -478,7 +477,6 @@ func (r *shardingLocalReader) Read(
func(param *shard.ReadParam) {
param.ReaderBuildParam.RelData = relData
param.ReaderBuildParam.Expr = expr
param.ReaderBuildParam.ScanType = int32(r.remoteScanType)
param.ReaderBuildParam.TombstoneApplyPolicy = int32(r.remoteTombApplyPolicy)
},
func(resp []byte) {
Expand Down Expand Up @@ -626,7 +624,6 @@ func (tbl *txnTableDelegate) BuildShardingReaders(
}
}

scanType := determineScanType(relData, newNum)
mod := blkCnt % newNum
divide := blkCnt / newNum
current := 0
Expand All @@ -647,7 +644,6 @@ func (tbl *txnTableDelegate) BuildShardingReaders(
tblDelegate: tbl,
//remoteRelData: remoteRelData,
remoteTombApplyPolicy: engine.Policy_SkipUncommitedInMemory | engine.Policy_SkipUncommitedS3,
remoteScanType: scanType,
}

if localRelData.DataCnt() > 0 {
Expand All @@ -672,7 +668,6 @@ func (tbl *txnTableDelegate) BuildShardingReaders(
if err != nil {
return nil, err
}
lrd.scanType = scanType
srd.lrd = lrd
}

Expand Down
27 changes: 12 additions & 15 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

const (
PREFETCH_THRESHOLD = 256
PREFETCH_ROUNDS = 24
SMALLSCAN_THRESHOLD = 100
LARGESCAN_THRESHOLD = 1500
)
func GetSmallScanThreshHold() uint64 {
if ncpu > 32 {
return 100
}
if ncpu > 16 {
return 200
}
return 400
}

const (
INSERT = iota
Expand Down Expand Up @@ -117,12 +120,6 @@ func noteSplitAlter(note string) (bool, int, uint64, string) {
panic("bad format of alter note")
}

const (
SMALL = iota
NORMAL
LARGE
)

const (
MO_DATABASE_ID_NAME_IDX = 1
MO_DATABASE_ID_ACCOUNT_IDX = 2
Expand Down Expand Up @@ -921,9 +918,9 @@ type reader struct {
isTombstone bool
source engine.DataSource

memFilter MemPKFilter

scanType int
memFilter MemPKFilter
readBlockCnt uint64
smallScanThreshHold uint64
}

type mergeReader struct {
Expand Down
1 change: 0 additions & 1 deletion pkg/vm/engine/test/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,6 @@ func Test_ShardingRemoteReader(t *testing.T) {
data, err := relData.MarshalBinary()
require.NoError(t, err)
readerBuildParam.ReaderBuildParam.RelData = data
readerBuildParam.ReaderBuildParam.ScanType = disttae.SMALL
readerBuildParam.ReaderBuildParam.TombstoneApplyPolicy =
int32(engine.Policy_SkipUncommitedInMemory | engine.Policy_SkipUncommitedS3)
res, err := disttae.HandleShardingReadBuildReader(
Expand Down
3 changes: 1 addition & 2 deletions proto/shard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,7 @@ message KeyParam {
message ReaderBuildParam {
bytes relData = 1;
plan.Expr expr = 2;
int32 scanType = 3;
int32 tombstoneApplyPolicy = 4;
int32 tombstoneApplyPolicy = 3;
}

message ReaderBuildResult {
Expand Down
23 changes: 10 additions & 13 deletions test/distributed/cases/join/leftjoin.result
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,26 @@ mo_ctl(dn, flush, d1.t2)
select mo_ctl('dn', 'flush', 'd1.t3');
mo_ctl(dn, flush, d1.t3)
{\n "method": "Flush",\n "result": [\n {\n "returnStr": "OK"\n }\n ]\n}\n
select Sleep(5);
sleep(5)
0
explain select t2.c1 from t2 left join t1 on t1.c1 =t2.c1;
QUERY PLAN
TP QURERY PLAN
Project
-> Table Scan on d1.t2
explain select t1.c1,t2.c1 from t2 left join t1 on t1.c1 =t2.c1;
QUERY PLAN
TP QURERY PLAN
Project
-> Join
Join Type: LEFT hashOnPK
Join Cond: (t2.c1 = t1.c1)
-> Table Scan on d1.t2
-> Table Scan on d1.t1
explain select t3.c1 from t3 left join t1 on t1.c1 =t3.c1 and t1.c1 > t3.c2 where t3.c1<10;
QUERY PLAN
TP QURERY PLAN
Project
-> Table Scan on d1.t3
Filter Cond: (t3.c1 < 10)
Block Filter Cond: (t3.c1 < 10)
explain select t1.c1,t3.c1 from t3 left join t1 on t1.c1 =t3.c1 where t3.c1<10;
QUERY PLAN
TP QURERY PLAN
Project
-> Join
Join Type: LEFT hashOnPK
Expand All @@ -56,23 +53,23 @@ Project
Filter Cond: (t1.c1 < 10)
Block Filter Cond: (t1.c1 < 10)
explain select t2.c1 from t2 left join t1 on t1.c1 =t2.c2;
QUERY PLAN
TP QURERY PLAN
Project
-> Table Scan on d1.t2
explain select t3.c1 from t3 left join t2 on t3.c2=t2.c2;
QUERY PLAN
TP QURERY PLAN
Project
-> Join
Join Type: LEFT
Join Cond: (t3.c2 = t2.c2)
-> Table Scan on d1.t3
-> Table Scan on d1.t2
explain select t2.c1 from t2 left join (select t3.c1,t3.c2 from t1 join t3 on t1.c1=t3.c1) v1 on t2.c1 =v1.c1 and t2.c2=v1.c2;
QUERY PLAN
TP QURERY PLAN
Project
-> Table Scan on d1.t2
explain select t2.c1 from t2 left join (select t3.c1,t3.c2 from t1 join t3 on t1.c1=t3.c1 limit 5000) v1 on t2.c1 =v1.c1;
QUERY PLAN
TP QURERY PLAN
Project
-> Join
Join Type: LEFT
Expand Down Expand Up @@ -150,9 +147,9 @@ insert into routines values('specific_name01','routine_catalog01','specific_name
insert into routines values('specific_name02','routine_catalog02','specific_name01','routine_name01','rrr01','data',10,10,10,20,20,'character_set_name','collation','dtd_identifier01','111','routine_definition01','1','external_language01','aaa','bbb','sql_data','sql_path01','111','2020-10-10 12:12:12.000','2021-10-10 12:12:12.000','sql_mode01','routine_comment01','definer01','character_set_client','collation_connection','database_collation');
insert into routines values('specific_name01','routine_catalog01','specific_name02','routine_name02','rrr02','data',10,10,10,20,20,'character_set_name','collation','dtd_identifier01','111','routine_definition01','1','external_language01','aaa','bbb','sql_data','sql_path01','111','2020-10-10 12:12:12.000','2021-10-10 12:12:12.000','sql_mode01','routine_comment01','definer01','character_set_client','collation_connection','database_collation');
SELECT DISTINCT ROUTINE_SCHEMA, ROUTINE_NAME, PARAMS.PARAMETER FROM ROUTINES LEFT JOIN ( SELECT SPECIFIC_SCHEMA, SPECIFIC_NAME, GROUP_CONCAT(CONCAT(DATA_TYPE, ' ', PARAMETER_NAME) ORDER BY ORDINAL_POSITION SEPARATOR ', ') PARAMETER, ROUTINE_TYPE FROM PARAMETERS GROUP BY SPECIFIC_SCHEMA, SPECIFIC_NAME, ROUTINE_TYPE ) PARAMS ON ROUTINES.ROUTINE_SCHEMA = PARAMS.SPECIFIC_SCHEMA AND ROUTINES.ROUTINE_NAME = PARAMS.SPECIFIC_NAME AND ROUTINES.ROUTINE_TYPE = PARAMS.ROUTINE_TYPE WHERE ROUTINE_SCHEMA = 'specific_name01' ORDER BY ROUTINE_SCHEMA;
routine_schema routine_name parameter
ROUTINE_SCHEMA ROUTINE_NAME PARAMETER
specific_name01 routine_name02 null
specific_name01 routine_name01 data_type01 parametername01
drop table routines;
drop table parameters;
drop database if exists d1;
drop database if exists d1;
3 changes: 0 additions & 3 deletions test/distributed/cases/join/leftjoin.test
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ select mo_ctl('dn', 'flush', 'd1.t1');
select mo_ctl('dn', 'flush', 'd1.t2');
-- @separator:table
select mo_ctl('dn', 'flush', 'd1.t3');
select Sleep(5);
-- @separator:table
explain select t2.c1 from t2 left join t1 on t1.c1 =t2.c1;
-- @separator:table
Expand All @@ -33,9 +32,7 @@ explain select t2.c1 from t2 left join t1 on t1.c1 =t2.c2;
-- @separator:table
explain select t3.c1 from t3 left join t2 on t3.c2=t2.c2;
-- @separator:table
-- @bvt:issue#15201
explain select t2.c1 from t2 left join (select t3.c1,t3.c2 from t1 join t3 on t1.c1=t3.c1) v1 on t2.c1 =v1.c1 and t2.c2=v1.c2;
-- @bvt:issue
-- @separator:table
explain select t2.c1 from t2 left join (select t3.c1,t3.c2 from t1 join t3 on t1.c1=t3.c1 limit 5000) v1 on t2.c1 =v1.c1;
drop table if exists parameters;
Expand Down

0 comments on commit a2656cf

Please sign in to comment.