Skip to content

Commit

Permalink
Revert "reduce the memory of agg's execution context. (#16215)" (#16226)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-schen committed May 17, 2024
1 parent 54801e5 commit 6c8e877
Show file tree
Hide file tree
Showing 38 changed files with 6,551 additions and 5,186 deletions.
236 changes: 135 additions & 101 deletions pkg/sql/colexec/aggexec/aggContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,107 +16,64 @@ package aggexec

import "github.com/matrixorigin/matrixone/pkg/container/types"

type AggContext struct {
hasCommonContext bool
hasGroupContext bool
commonContext AggCommonExecContext
groupContext []AggGroupExecContext
initGroup AggGroupContextInit
}

func newAggContextFromImpl(
ctxImpl aggContextImplementation,
result types.Type,
args ...types.Type) *AggContext {

ctx := &AggContext{
hasCommonContext: ctxImpl.hasCommonContext,
hasGroupContext: ctxImpl.hasGroupContext,
}
if ctx.hasGroupContext {
ctx.initGroup = ctxImpl.generateGroupContext
}
if ctxImpl.hasCommonContext {
ctx.setCommonContext(ctxImpl.generateCommonContext(result, args...))
}
return ctx
}

type AggGroupContextInit func(resultType types.Type, parameters ...types.Type) AggGroupExecContext
type AggCommonContextInit func(resultType types.Type, parameters ...types.Type) AggCommonExecContext

func (a *AggContext) setCommonContext(c AggCommonExecContext) {
a.commonContext = c
}

func (a *AggContext) preAllocate(more int) {
if !a.hasGroupContext {
return
}

n := len(a.groupContext) + more
if n <= cap(a.groupContext) {
return
}

oldLen := len(a.groupContext)
a.groupContext = append(a.groupContext, make([]AggGroupExecContext, n-cap(a.groupContext))...)
a.groupContext = a.groupContext[:oldLen]
}

func (a *AggContext) growsGroupContext(
more int,
resultType types.Type, parameters ...types.Type) {
if !a.hasGroupContext {
return
}
oldLen, newLen := len(a.groupContext), len(a.groupContext)+more
if newLen > cap(a.groupContext) {
a.groupContext = append(a.groupContext, make([]AggGroupExecContext, more)...)
for i := oldLen; i < newLen; i++ {
a.groupContext[i] = a.initGroup(resultType, parameters...)
}

} else {
a.groupContext = a.groupContext[:newLen]
for i := oldLen; i < newLen; i++ {
a.groupContext[i] = a.initGroup(resultType, parameters...)
}
}
}

func (a *AggContext) getCommonContext() AggCommonExecContext {
return a.commonContext
}

func (a *AggContext) getGroupContext(i int) AggGroupExecContext {
if a.hasGroupContext {
return a.groupContext[i]
}
return nil
}

func (a *AggContext) getGroupContextEncodings() [][]byte {
if !a.hasGroupContext {
return nil
}
encodings := make([][]byte, len(a.groupContext))
for i := range a.groupContext {
encodings[i] = a.groupContext[i].Marshal()
}
return encodings
}

func (a *AggContext) decodeGroupContexts(encodings [][]byte, resultType types.Type, parameters ...types.Type) {
if !a.hasGroupContext {
return
}
a.groupContext = make([]AggGroupExecContext, len(encodings))
for i := range encodings {
a.groupContext[i] = a.initGroup(resultType, parameters...)
a.groupContext[i].Unmarshal(encodings[i])
}
}
// todo: sca.
//type AggContext struct {
// hasCommonContext bool
// hasGroupContext bool
// commonContext AggCommonExecContext
// groupContext []AggGroupExecContext
//}
//
//func (a *AggContext) setCommonContext(c AggCommonExecContext) {
// if c == nil {
// return
// }
// a.hasCommonContext = true
// a.commonContext = c
//}
//
//func (a *AggContext) preExtend(n int) {
// if !a.hasGroupContext {
// return
// }
// if n <= cap(a.groupContext) {
// return
// }
//
// oldLen := len(a.groupContext)
// a.groupContext = append(a.groupContext, make([]AggGroupExecContext, n-cap(a.groupContext))...)
// a.groupContext = a.groupContext[:oldLen]
//}
//
//func (a *AggContext) growsGroupContext(initGroup func() AggGroupExecContext, n int) {
// if !a.hasGroupContext {
// return
// }
// oldLen, newLen := len(a.groupContext), len(a.groupContext)+n
// if newLen > cap(a.groupContext) {
// a.groupContext = append(a.groupContext, make([]AggGroupExecContext, n)...)
// for i := oldLen; i < newLen; i++ {
// a.groupContext[i] = initGroup()
// }
//
// } else {
// a.groupContext = a.groupContext[:newLen]
// for i := oldLen; i < newLen; i++ {
// a.groupContext[i] = initGroup()
// }
// }
//}
//
//func (a *AggContext) getCommonContext() AggCommonExecContext {
// return a.commonContext
//}
//
//func (a *AggContext) getGroupContext(i int) AggGroupExecContext {
// if a.hasGroupContext {
// return a.groupContext[i]
// }
// return nil
//}

// AggCommonExecContext stores the common context for all the groups.
// like the type scale, timezone and so on.
Expand All @@ -129,3 +86,80 @@ type AggCommonExecContext interface {
type AggGroupExecContext interface {
AggCanMarshal
}

/*
prepared context structures for agg.
EmptyContextOfSingleAggRetFixed and EmptyContextOfSingleAggRetBytes are used for aggregation
which does not need to store any context.
ContextWithEmptyFlagOfSingleAggRetFixed and ContextWithEmptyFlagOfSingleAggRetBytes are used for aggregation
which only needs to store a flag to indicate whether it is empty.
*/

type EmptyContextOfSingleAggRetFixed[T types.FixedSizeTExceptStrType] struct{}

func (a EmptyContextOfSingleAggRetFixed[T]) Marshal() []byte { return nil }
func (a EmptyContextOfSingleAggRetFixed[T]) Unmarshal([]byte) {}
func GenerateEmptyContextFromFixedToFixed[from, to types.FixedSizeTExceptStrType]() SingleAggFromFixedRetFixed[from, to] {
return EmptyContextOfSingleAggRetFixed[to]{}
}
func GenerateEmptyContextFromVarToFixed[to types.FixedSizeTExceptStrType]() SingleAggFromVarRetFixed[to] {
return EmptyContextOfSingleAggRetFixed[to]{}
}

type EmptyContextOfSingleAggRetBytes struct{}

func (a EmptyContextOfSingleAggRetBytes) Marshal() []byte { return nil }
func (a EmptyContextOfSingleAggRetBytes) Unmarshal([]byte) {}
func GenerateEmptyContextFromFixedToVar[from types.FixedSizeTExceptStrType]() SingleAggFromFixedRetVar[from] {
return EmptyContextOfSingleAggRetBytes{}
}
func GenerateEmptyContextFromVarToVar() SingleAggFromVarRetVar {
return EmptyContextOfSingleAggRetBytes{}
}

type ContextWithEmptyFlagOfSingleAggRetFixed[T types.FixedSizeTExceptStrType] struct {
IsEmpty bool
}

func (a *ContextWithEmptyFlagOfSingleAggRetFixed[T]) Marshal() []byte {
return types.EncodeBool(&a.IsEmpty)
}
func (a *ContextWithEmptyFlagOfSingleAggRetFixed[T]) Unmarshal(data []byte) {
a.IsEmpty = types.DecodeBool(data)
}
func GenerateFlagContextFromFixedToFixed[from, to types.FixedSizeTExceptStrType]() SingleAggFromFixedRetFixed[from, to] {
return &ContextWithEmptyFlagOfSingleAggRetFixed[to]{}
}
func InitFlagContextFromFixedToFixed[from, to types.FixedSizeTExceptStrType](exec SingleAggFromFixedRetFixed[from, to], setter AggSetter[to], arg, ret types.Type) error {
a := exec.(*ContextWithEmptyFlagOfSingleAggRetFixed[to])
a.IsEmpty = true
return nil
}

func GenerateFlagContextFromVarToFixed[to types.FixedSizeTExceptStrType]() SingleAggFromVarRetFixed[to] {
return &ContextWithEmptyFlagOfSingleAggRetFixed[to]{}
}

type ContextWithEmptyFlagOfSingleAggRetBytes struct {
IsEmpty bool
}

func (a *ContextWithEmptyFlagOfSingleAggRetBytes) Marshal() []byte {
return types.EncodeBool(&a.IsEmpty)
}
func (a *ContextWithEmptyFlagOfSingleAggRetBytes) Unmarshal(data []byte) {
a.IsEmpty = types.DecodeBool(data)
}
func GenerateFlagContextFromFixedToVar[from types.FixedSizeTExceptStrType]() SingleAggFromFixedRetVar[from] {
return &ContextWithEmptyFlagOfSingleAggRetBytes{}
}
func GenerateFlagContextFromVarToVar() SingleAggFromVarRetVar {
return &ContextWithEmptyFlagOfSingleAggRetBytes{}
}
func InitFlagContextFromVarToVar(exec SingleAggFromVarRetVar, setter AggBytesSetter, arg, ret types.Type) error {
a := exec.(*ContextWithEmptyFlagOfSingleAggRetBytes)
a.IsEmpty = true
return nil
}
117 changes: 0 additions & 117 deletions pkg/sql/colexec/aggexec/aggInformation.go

This file was deleted.

Loading

0 comments on commit 6c8e877

Please sign in to comment.