Skip to content

Commit

Permalink
Merge branch 'main' into dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Sep 18, 2024
2 parents aa44818 + 5e3b78f commit 3ce18d8
Show file tree
Hide file tree
Showing 16 changed files with 674 additions and 532 deletions.
3 changes: 2 additions & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@
# pkg/sql
/pkg/sql @aunjgr
/pkg/sql/colexec @m-schen
/pkg/sql/compile @m-schen @ouyuanning @aunjgr @badboynt1
/pkg/sql/compile @m-schen @ouyuanning @aunjgr @badboynt1 @qingxinhome
/pkg/sql/models @qingxinhome
/pkg/sql/parsers @iamlinjunhong
/pkg/sql/plan @ouyuanning @aunjgr @badboynt1
/pkg/sql/plan/function @m-schen
Expand Down
986 changes: 493 additions & 493 deletions pkg/pb/plan/plan.pb.go

Large diffs are not rendered by default.

9 changes: 1 addition & 8 deletions pkg/sql/colexec/aggexec/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/vector"
)

const (
groupConcatMaxLen = 1024
)

// group_concat is a special string aggregation function.
type groupConcatExec struct {
multiAggInfo
Expand Down Expand Up @@ -117,9 +113,6 @@ func (exec *groupConcatExec) Fill(groupIndex int, row int, vectors []*vector.Vec
exec.ret.groupToSet = groupIndex
exec.ret.setGroupNotEmpty(groupIndex)
r := exec.ret.aggGet()
if len(r) > groupConcatMaxLen {
return nil
}
if len(r) > 0 {
r = append(r, exec.separator...)
}
Expand Down Expand Up @@ -173,7 +166,7 @@ func (exec *groupConcatExec) merge(other *groupConcatExec, idx1, idx2 int) error

v1 := exec.ret.aggGet()
v2 := other.ret.aggGet()
if len(v2) == 0 || len(v1) > groupConcatMaxLen {
if len(v2) == 0 {
return nil
}
if len(v1) > 0 && len(v2) > 0 {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/colexec/aggexec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ func TestGroupConcatExec(t *testing.T) {
require.NoError(t, executor.Fill(0, 1, inputs))
require.NoError(t, executor.Fill(0, 2, inputs))
require.NoError(t, executor.Fill(0, 3, inputs))
// data merge
executor2 := MakeAgg(mg, info.aggID, info.distinct, info.argTypes...)
require.NoError(t, executor2.GroupGrow(1))
require.NoError(t, executor.Merge(executor2, 0, 0))
executor2.Free()
}
{
// result check.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/table_scan/table_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (tableScan *TableScan) Call(proc *process.Process) (vm.CallResult, error) {
tableScan.ctr.buf)

analyzer.InputBlock()
analyzer.S3IOByte(tableScan.ctr.buf)
analyzer.ScanBytes(tableScan.ctr.buf)
batSize := tableScan.ctr.buf.Size()
tableScan.ctr.maxAllocSize = max(tableScan.ctr.maxAllocSize, batSize)
break
Expand Down
46 changes: 40 additions & 6 deletions pkg/sql/compile/analyze_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func updateScopesLastFlag(updateScopes []*Scope) {

// applyOpStatsToNode Recursive traversal of PhyOperator tree,
// and add OpStats statistics to the corresponding NodeAnalyze Info
func applyOpStatsToNode(op *models.PhyOperator, nodes []*plan.Node) {
func applyOpStatsToNode(op *models.PhyOperator, nodes []*plan.Node, scopeParalleInfo *ParallelScopeInfo) {
if op == nil {
return
}
Expand All @@ -139,17 +139,30 @@ func applyOpStatsToNode(op *models.PhyOperator, nodes []*plan.Node) {
node.AnalyzeInfo.TimeConsumed += op.OpStats.TotalTimeConsumed
node.AnalyzeInfo.MemorySize += op.OpStats.TotalMemorySize
node.AnalyzeInfo.WaitTimeConsumed += op.OpStats.TotalWaitTimeConsumed
node.AnalyzeInfo.S3IOByte += op.OpStats.TotalS3IOByte
node.AnalyzeInfo.ScanBytes += op.OpStats.TotalScanBytes
node.AnalyzeInfo.NetworkIO += op.OpStats.TotalNetworkIO
node.AnalyzeInfo.InputBlocks += op.OpStats.TotalInputBlocks

node.AnalyzeInfo.ScanTime += op.OpStats.GetMetricByKey(process.OpScanTime)
node.AnalyzeInfo.InsertTime += op.OpStats.GetMetricByKey(process.OpInsertTime)

if _, isMinorOp := vm.MinorOpMap[op.OpName]; isMinorOp {
isMinor := true
if op.OpName == vm.OperatorToStrMap[vm.Filter] {
if op.OpName != vm.OperatorToStrMap[vm.TableScan] && op.OpName != vm.OperatorToStrMap[vm.External] {
isMinor = false // restrict operator is minor only for scan
}
}
if isMinor {
scopeParalleInfo.NodeIdxTimeConsumeMinor[op.NodeIdx] += op.OpStats.TotalTimeConsumed
}
} else if _, isMajorOp := vm.MajorOpMap[op.OpName]; isMajorOp {
scopeParalleInfo.NodeIdxTimeConsumeMajor[op.NodeIdx] += op.OpStats.TotalTimeConsumed
}
}

// Recursive processing of sub operators
for _, childOp := range op.Children {
applyOpStatsToNode(childOp, nodes)
applyOpStatsToNode(childOp, nodes, scopeParalleInfo)
}
}

Expand All @@ -161,7 +174,16 @@ func processPhyScope(scope *models.PhyScope, nodes []*plan.Node) {

// handle current Scope operator pipeline
if scope.RootOperator != nil {
applyOpStatsToNode(scope.RootOperator, nodes)
scopeParallInfo := NewParallelScopeInfo()
applyOpStatsToNode(scope.RootOperator, nodes, scopeParallInfo)

for nodeIdx, timeConsumeMajor := range scopeParallInfo.NodeIdxTimeConsumeMajor {
nodes[nodeIdx].AnalyzeInfo.TimeConsumedArrayMajor = append(nodes[nodeIdx].AnalyzeInfo.TimeConsumedArrayMajor, timeConsumeMajor)
}

for nodeIdx, timeConsumeMinor := range scopeParallInfo.NodeIdxTimeConsumeMinor {
nodes[nodeIdx].AnalyzeInfo.TimeConsumedArrayMinor = append(nodes[nodeIdx].AnalyzeInfo.TimeConsumedArrayMinor, timeConsumeMinor)
}
}

// handle preScopes recursively
Expand Down Expand Up @@ -374,7 +396,6 @@ func (c *Compile) GenPhyPlan(runC *Compile) {
}
}

//-------------------------------------------------------------------------------------------
// record the number of s3 requests
c.anal.phyPlan.S3IOInputCount += runC.counterSet.FileService.S3.Put.Load()
c.anal.phyPlan.S3IOInputCount += runC.counterSet.FileService.S3.List.Load()
Expand All @@ -392,6 +413,19 @@ func (c *Compile) GenPhyPlan(runC *Compile) {
}
}

type ParallelScopeInfo struct {
NodeIdxTimeConsumeMajor map[int]int64
NodeIdxTimeConsumeMinor map[int]int64
//scopeId int32
}

func NewParallelScopeInfo() *ParallelScopeInfo {
return &ParallelScopeInfo{
NodeIdxTimeConsumeMajor: make(map[int]int64),
NodeIdxTimeConsumeMinor: make(map[int]int64),
}
}

//---------------------------------------------------------------------------------------------------

type ExplainOption struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/analyze_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Test_processPhyScope(t *testing.T) {
TotalInputSize: 2048,
TotalInputBlocks: 0,
TotalOutputSize: 1900,
TotalS3IOByte: 0,
TotalScanBytes: 0,
TotalNetworkIO: 600,
//TotalScanTime: 1500,
//TotalInsertTime: 0,
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/models/logic_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const InputSize = "Input Size"
const OutputSize = "Output Size"
const MemorySize = "Memory Size"
const DiskIO = "Disk IO"
const S3IOByte = "S3 IO Byte"
const ScanBytes = "Scan Bytes"
const S3IOInputCount = "S3 IO Input Count"
const S3IOOutputCount = "S3 IO Output Count"
const Network = "Network"
Expand Down Expand Up @@ -238,7 +238,7 @@ func (graphData *GraphData) StatisticsGlobalResource(ctx context.Context) error

//io
gDiskIO := NewStatisticValue(DiskIO, "byte")
gS3IOByte := NewStatisticValue(S3IOByte, "byte")
gS3IOByte := NewStatisticValue(ScanBytes, "byte")
gS3IOInputCount := NewStatisticValue(S3IOInputCount, "count")
gS3IOOutputCount := NewStatisticValue(S3IOOutputCount, "count")

Expand Down Expand Up @@ -286,7 +286,7 @@ func (graphData *GraphData) StatisticsGlobalResource(ctx context.Context) error
if ioValue.Name == DiskIO {
gDiskIO.Value += ioValue.Value
}
if ioValue.Name == S3IOByte {
if ioValue.Name == ScanBytes {
gS3IOByte.Value += ioValue.Value
}
if ioValue.Name == S3IOInputCount {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/models/phy_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPhyPlanJSON(t *testing.T) {
TotalInputSize: 2048,
TotalInputBlocks: 0,
TotalOutputSize: 1900,
TotalS3IOByte: 0,
TotalScanBytes: 0,
TotalNetworkIO: 600,
//TotalScanTime: 1500,
//TotalInsertTime: 2500,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/models/show_phyplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestExplainPhyPlan(t *testing.T) {
TotalInputSize: 2048,
TotalInputBlocks: 0,
TotalOutputSize: 1900,
TotalS3IOByte: 0,
TotalScanBytes: 0,
TotalNetworkIO: 600,
//TotalScanTime: 1500,
//TotalInsertTime: 0,
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/plan/deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package plan
import (
"bytes"

"github.com/matrixorigin/matrixone/pkg/pb/plan"
"golang.org/x/exp/constraints"

"github.com/matrixorigin/matrixone/pkg/pb/plan"
)

func DeepCopyExprList(list []*Expr) []*Expr {
Expand Down Expand Up @@ -1090,7 +1091,7 @@ func DeepCopyAnalyzeInfo(analyzeinfo *plan.AnalyzeInfo) *plan.AnalyzeInfo {
MemorySize: analyzeinfo.GetMemorySize(),
WaitTimeConsumed: analyzeinfo.GetWaitTimeConsumed(),
DiskIO: analyzeinfo.GetDiskIO(),
S3IOByte: analyzeinfo.GetS3IOByte(),
ScanBytes: analyzeinfo.GetScanBytes(),
S3IOInputCount: analyzeinfo.GetS3IOInputCount(),
S3IOOutputCount: analyzeinfo.GetS3IOOutputCount(),
NetworkIO: analyzeinfo.GetNetworkIO(),
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/plan/explain/marshal_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ const InputSize = "Input Size"
const OutputSize = "Output Size"
const MemorySize = "Memory Size"
const DiskIO = "Disk IO"
const S3IOByte = "S3 IO Byte"
const ScanBytes = "Scan Bytes"
const S3IOInputCount = "S3 IO Input Count"
const S3IOOutputCount = "S3 IO Output Count"
const Network = "Network"
Expand Down Expand Up @@ -791,8 +791,8 @@ func (m MarshalNodeImpl) GetStatistics(ctx context.Context, options *ExplainOpti
Unit: Statistic_Unit_byte, //"byte",
},
{
Name: S3IOByte,
Value: analyzeInfo.S3IOByte,
Name: ScanBytes,
Value: analyzeInfo.ScanBytes,
Unit: Statistic_Unit_byte, //"byte",
},
{
Expand Down
12 changes: 6 additions & 6 deletions pkg/vm/process/operator_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Analyzer interface {
Reset()

InputBlock()
S3IOByte(*batch.Batch) // delete it, unused
ScanBytes(*batch.Batch)
}

// Operator Resource operatorAnalyzer
Expand Down Expand Up @@ -158,13 +158,13 @@ func (opAlyzr *operatorAnalyzer) ChildrenCallStop(start time.Time) {
opAlyzr.childrenCallDuration += time.Since(start)
}

func (opAlyzr *operatorAnalyzer) S3IOByte(bat *batch.Batch) {
func (opAlyzr *operatorAnalyzer) ScanBytes(bat *batch.Batch) {
if opAlyzr.opStats == nil {
panic("operatorAnalyzer.S3IOByte: operatorAnalyzer.opStats is nil")
}

if bat != nil {
opAlyzr.opStats.TotalS3IOByte += int64(bat.Size())
opAlyzr.opStats.TotalScanBytes += int64(bat.Size())
}
}

Expand Down Expand Up @@ -221,7 +221,7 @@ type OperatorStats struct {
TotalOutputSize int64 `json:"TotalOutputSize,omitempty"`
TotalNetworkIO int64 `json:"TotalNetworkIO,omitempty"`
TotalInputBlocks int64 `json:"-"`
TotalS3IOByte int64 `json:"-"`
TotalScanBytes int64 `json:"-"`
OperatorMetrics map[MetricType]int64 `json:"OperatorMetrics,omitempty"`
}

Expand Down Expand Up @@ -277,7 +277,7 @@ func (ps *OperatorStats) String() string {
"InBlock:%d "+
"OutSize:%dbytes "+
"MemSize:%dbytes "+
"S3IOByte:%dbytes "+
"ScanBytes:%dbytes "+
"NetworkIO:%dbytes"+
"%s",
ps.CallNum,
Expand All @@ -289,7 +289,7 @@ func (ps *OperatorStats) String() string {
ps.TotalInputBlocks,
ps.TotalOutputSize,
ps.TotalMemorySize,
ps.TotalS3IOByte,
ps.TotalScanBytes,
ps.TotalNetworkIO,
metricsStr)
}
Expand Down
Loading

0 comments on commit 3ce18d8

Please sign in to comment.