Skip to content

Commit

Permalink
Merge branch 'main' into fix_internal_executor_for_partition_table
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Sep 19, 2024
2 parents 479dc11 + 717cb94 commit cd6f8eb
Show file tree
Hide file tree
Showing 3 changed files with 429 additions and 93 deletions.
108 changes: 108 additions & 0 deletions pkg/sql/colexec/merge_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
package colexec

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/sort"
)

Expand Down Expand Up @@ -187,3 +193,105 @@ func (x *heapSlice[T]) Less(i, j int) bool {
}
func (x *heapSlice[T]) Swap(i, j int) { x.s[i], x.s[j] = x.s[j], x.s[i] }
func (x *heapSlice[T]) Len() int { return len(x.s) }

type SinkerT func(*batch.Batch) error

func MergeSortBatches(
batches []*batch.Batch,
sortKeyIdx int,
buffer *batch.Batch,
sinker SinkerT,
mp *mpool.MPool,
) error {
var merge MergeInterface
nulls := make([]*nulls.Nulls, len(batches))
for i, b := range batches {
nulls[i] = b.Vecs[sortKeyIdx].GetNulls()
}
switch batches[0].Vecs[sortKeyIdx].GetType().Oid {
case types.T_bool:
merge = newMerge(sort.BoolLess, getFixedCols[bool](batches, sortKeyIdx), nulls)
case types.T_bit:
merge = newMerge(sort.GenericLess[uint64], getFixedCols[uint64](batches, sortKeyIdx), nulls)
case types.T_int8:
merge = newMerge(sort.GenericLess[int8], getFixedCols[int8](batches, sortKeyIdx), nulls)
case types.T_int16:
merge = newMerge(sort.GenericLess[int16], getFixedCols[int16](batches, sortKeyIdx), nulls)
case types.T_int32:
merge = newMerge(sort.GenericLess[int32], getFixedCols[int32](batches, sortKeyIdx), nulls)
case types.T_int64:
merge = newMerge(sort.GenericLess[int64], getFixedCols[int64](batches, sortKeyIdx), nulls)
case types.T_uint8:
merge = newMerge(sort.GenericLess[uint8], getFixedCols[uint8](batches, sortKeyIdx), nulls)
case types.T_uint16:
merge = newMerge(sort.GenericLess[uint16], getFixedCols[uint16](batches, sortKeyIdx), nulls)
case types.T_uint32:
merge = newMerge(sort.GenericLess[uint32], getFixedCols[uint32](batches, sortKeyIdx), nulls)
case types.T_uint64:
merge = newMerge(sort.GenericLess[uint64], getFixedCols[uint64](batches, sortKeyIdx), nulls)
case types.T_float32:
merge = newMerge(sort.GenericLess[float32], getFixedCols[float32](batches, sortKeyIdx), nulls)
case types.T_float64:
merge = newMerge(sort.GenericLess[float64], getFixedCols[float64](batches, sortKeyIdx), nulls)
case types.T_date:
merge = newMerge(sort.GenericLess[types.Date], getFixedCols[types.Date](batches, sortKeyIdx), nulls)
case types.T_datetime:
merge = newMerge(sort.GenericLess[types.Datetime], getFixedCols[types.Datetime](batches, sortKeyIdx), nulls)
case types.T_time:
merge = newMerge(sort.GenericLess[types.Time], getFixedCols[types.Time](batches, sortKeyIdx), nulls)
case types.T_timestamp:
merge = newMerge(sort.GenericLess[types.Timestamp], getFixedCols[types.Timestamp](batches, sortKeyIdx), nulls)
case types.T_enum:
merge = newMerge(sort.GenericLess[types.Enum], getFixedCols[types.Enum](batches, sortKeyIdx), nulls)
case types.T_decimal64:
merge = newMerge(sort.Decimal64Less, getFixedCols[types.Decimal64](batches, sortKeyIdx), nulls)
case types.T_decimal128:
merge = newMerge(sort.Decimal128Less, getFixedCols[types.Decimal128](batches, sortKeyIdx), nulls)
case types.T_uuid:
merge = newMerge(sort.UuidLess, getFixedCols[types.Uuid](batches, sortKeyIdx), nulls)
case types.T_char, types.T_varchar, types.T_blob, types.T_text, types.T_datalink:
merge = newMerge(sort.GenericLess[string], getStrCols(batches, sortKeyIdx), nulls)
case types.T_Rowid:
merge = newMerge(sort.RowidLess, getFixedCols[types.Rowid](batches, sortKeyIdx), nulls)
default:
panic(fmt.Sprintf("invalid type: %s", batches[0].Vecs[sortKeyIdx].GetType()))
}
var (
batchIndex int
rowIndex int
lens int
)
size := len(batches)
buffer.CleanOnlyData()
for size > 0 {
batchIndex, rowIndex, size = merge.getNextPos()
for i := range buffer.Vecs {
err := buffer.Vecs[i].UnionOne(batches[batchIndex].Vecs[i], int64(rowIndex), mp)
if err != nil {
return err
}
}
// all data in batches[batchIndex] are used. Clean it.
if rowIndex+1 == batches[batchIndex].RowCount() {
batches[batchIndex].Clean(mp)
}
lens++
if lens == objectio.BlockMaxRows {
lens = 0
buffer.SetRowCount(objectio.BlockMaxRows)
if err := sinker(buffer); err != nil {
return err
}
// force clean
buffer.CleanOnlyData()
}
}
if lens > 0 {
buffer.SetRowCount(lens)
if err := sinker(buffer); err != nil {
return err
}
buffer.CleanOnlyData()
}
return nil
}
104 changes: 12 additions & 92 deletions pkg/sql/colexec/s3util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package colexec

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
Expand Down Expand Up @@ -343,100 +341,22 @@ func (w *S3Writer) SortAndSync(proc *process.Process) ([]objectio.BlockInfo, obj

w.initBuffers(proc, w.batches[0])

var merge MergeInterface
nulls := make([]*nulls.Nulls, 0, len(w.batches))
for i := 0; i < len(w.batches); i++ {
nulls = append(nulls, w.batches[i].Vecs[w.sortIndex].GetNulls())
}
pos := w.sortIndex
switch w.batches[0].Vecs[w.sortIndex].GetType().Oid {
case types.T_bool:
merge = newMerge(sort.BoolLess, getFixedCols[bool](w.batches, pos), nulls)
case types.T_bit:
merge = newMerge(sort.GenericLess[uint64], getFixedCols[uint64](w.batches, pos), nulls)
case types.T_int8:
merge = newMerge(sort.GenericLess[int8], getFixedCols[int8](w.batches, pos), nulls)
case types.T_int16:
merge = newMerge(sort.GenericLess[int16], getFixedCols[int16](w.batches, pos), nulls)
case types.T_int32:
merge = newMerge(sort.GenericLess[int32], getFixedCols[int32](w.batches, pos), nulls)
case types.T_int64:
merge = newMerge(sort.GenericLess[int64], getFixedCols[int64](w.batches, pos), nulls)
case types.T_uint8:
merge = newMerge(sort.GenericLess[uint8], getFixedCols[uint8](w.batches, pos), nulls)
case types.T_uint16:
merge = newMerge(sort.GenericLess[uint16], getFixedCols[uint16](w.batches, pos), nulls)
case types.T_uint32:
merge = newMerge(sort.GenericLess[uint32], getFixedCols[uint32](w.batches, pos), nulls)
case types.T_uint64:
merge = newMerge(sort.GenericLess[uint64], getFixedCols[uint64](w.batches, pos), nulls)
case types.T_float32:
merge = newMerge(sort.GenericLess[float32], getFixedCols[float32](w.batches, pos), nulls)
case types.T_float64:
merge = newMerge(sort.GenericLess[float64], getFixedCols[float64](w.batches, pos), nulls)
case types.T_date:
merge = newMerge(sort.GenericLess[types.Date], getFixedCols[types.Date](w.batches, pos), nulls)
case types.T_datetime:
merge = newMerge(sort.GenericLess[types.Datetime], getFixedCols[types.Datetime](w.batches, pos), nulls)
case types.T_time:
merge = newMerge(sort.GenericLess[types.Time], getFixedCols[types.Time](w.batches, pos), nulls)
case types.T_timestamp:
merge = newMerge(sort.GenericLess[types.Timestamp], getFixedCols[types.Timestamp](w.batches, pos), nulls)
case types.T_enum:
merge = newMerge(sort.GenericLess[types.Enum], getFixedCols[types.Enum](w.batches, pos), nulls)
case types.T_decimal64:
merge = newMerge(sort.Decimal64Less, getFixedCols[types.Decimal64](w.batches, pos), nulls)
case types.T_decimal128:
merge = newMerge(sort.Decimal128Less, getFixedCols[types.Decimal128](w.batches, pos), nulls)
case types.T_uuid:
merge = newMerge(sort.UuidLess, getFixedCols[types.Uuid](w.batches, pos), nulls)
case types.T_char, types.T_varchar, types.T_blob, types.T_text, types.T_datalink:
merge = newMerge(sort.GenericLess[string], getStrCols(w.batches, pos), nulls)
case types.T_Rowid:
merge = newMerge(sort.RowidLess, getFixedCols[types.Rowid](w.batches, pos), nulls)
//TODO: check if we need T_array here? T_json is missing here.
// Update Oct 20 2023: I don't think it is necessary to add T_array here. Keeping this comment,
// in case anything fails in vector S3 flush in future.
default:
panic(fmt.Sprintf("invalid type: %s", w.batches[0].Vecs[w.sortIndex].GetType().Oid))
}
if _, err := w.generateWriter(proc); err != nil {
return nil, objectio.ObjectStats{}, err
}
lens := 0
size := len(w.batches)
w.buffer.CleanOnlyData()
var batchIndex int
var rowIndex int
for size > 0 {
batchIndex, rowIndex, size = merge.getNextPos()
for i := range w.buffer.Vecs {
err := w.buffer.Vecs[i].UnionOne(w.batches[batchIndex].Vecs[i], int64(rowIndex), proc.GetMPool())
if err != nil {
return nil, objectio.ObjectStats{}, err
}
}
// all data in w.batches[batchIndex] are used. Clean it.
if rowIndex+1 == w.batches[batchIndex].RowCount() {
w.batches[batchIndex].Clean(proc.GetMPool())
}
lens++
if lens == options.DefaultBlockMaxRows {
lens = 0
w.buffer.SetRowCount(options.DefaultBlockMaxRows)
if _, err := w.writer.WriteBatch(w.buffer); err != nil {
return nil, objectio.ObjectStats{}, err
}
// force clean
w.buffer.CleanOnlyData()
}

sinker := func(bat *batch.Batch) error {
_, err := w.writer.WriteBatch(bat)
return err
}
if lens > 0 {
w.buffer.SetRowCount(lens)
if _, err := w.writer.WriteBatch(w.buffer); err != nil {
return nil, objectio.ObjectStats{}, err
}
w.buffer.CleanOnlyData()
if err := MergeSortBatches(
w.batches,
w.sortIndex,
w.buffer,
sinker,
proc.GetMPool(),
); err != nil {
return nil, objectio.ObjectStats{}, err
}
return w.sync(proc)
}
Expand Down
Loading

0 comments on commit cd6f8eb

Please sign in to comment.