Skip to content

Commit

Permalink
adding flag to block info and object info (#18527)
Browse files Browse the repository at this point in the history
flag:
1. dependable
2. sorted
3. by cn created.

Approved by: @daviszhen, @ouyuanning, @aunjgr, @m-schen, @XuPeng-SH, @LeftHandCold, @triump2020
  • Loading branch information
gouhongshen committed Sep 8, 2024
1 parent 1633f95 commit d1d05f1
Show file tree
Hide file tree
Showing 32 changed files with 254 additions and 212 deletions.
9 changes: 6 additions & 3 deletions pkg/objectio/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,13 @@ func (bm BlockObject) GenerateBlockInfo(objName ObjectName, sorted bool) BlockIn
&sid,
location.Name().Num(),
location.ID()),
//non-appendable block
Appendable: false,
Sorted: sorted,
}
blkInfo.SetMetaLocation(location)

blkInfo.StateFlag |= CNCreatedFlag
if sorted {
blkInfo.StateFlag |= SortedFlag
}

return blkInfo
}
61 changes: 42 additions & 19 deletions pkg/objectio/block_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,48 @@ const (
)

type BlockInfo struct {
BlockID types.Blockid
//It's used to indicate whether the block is appendable block or non-appendable blk for reader.
// for appendable block, the data visibility in the block is determined by the commit ts and abort ts.
Appendable bool
Sorted bool
MetaLoc ObjectLocation
BlockID types.Blockid
MetaLoc ObjectLocation
StateFlag int8

//TODO:: remove it.
PartitionNum int16
}

func (b *BlockInfo) SetFlagByObjStats(stats ObjectStats) {
b.StateFlag = stats.GetFlag()
}

func (b *BlockInfo) IsAppendable() bool {
return b.StateFlag&AppendableFlag != 0
}

func (b *BlockInfo) IsSorted() bool {
return b.StateFlag&SortedFlag != 0
}

func (b *BlockInfo) IsCNCreated() bool {
return b.StateFlag&CNCreatedFlag != 0
}

func (b *BlockInfo) String() string {
return fmt.Sprintf("[A-%v]blk-%s", b.Appendable, b.BlockID.ShortStringEx())
flag := ""
if b.IsAppendable() {
flag = flag + "A"
}
if b.IsSorted() {
flag = flag + "S"
}
if b.IsCNCreated() {
flag = flag + "C"
}

return fmt.Sprintf(
"[%s]ID-%s, MetaLoc: %s, PartitionNum: %d",
flag,
b.BlockID.ShortStringEx(),
b.MetaLocation().String(),
b.PartitionNum)
}

func (b *BlockInfo) MarshalWithBuf(w *bytes.Buffer) (uint32, error) {
Expand All @@ -77,21 +106,16 @@ func (b *BlockInfo) MarshalWithBuf(w *bytes.Buffer) (uint32, error) {
}
space += uint32(types.BlockidSize)

if _, err := w.Write(types.EncodeBool(&b.Appendable)); err != nil {
if _, err := w.Write(types.EncodeSlice(b.MetaLoc[:])); err != nil {
return 0, err
}
space++
space += uint32(LocationLen)

if _, err := w.Write(types.EncodeBool(&b.Sorted)); err != nil {
if _, err := w.Write(types.EncodeInt8(&b.StateFlag)); err != nil {
return 0, err
}
space++

if _, err := w.Write(types.EncodeSlice(b.MetaLoc[:])); err != nil {
return 0, err
}
space += uint32(LocationLen)

if _, err := w.Write(types.EncodeInt16(&b.PartitionNum)); err != nil {
return 0, err
}
Expand All @@ -103,14 +127,13 @@ func (b *BlockInfo) MarshalWithBuf(w *bytes.Buffer) (uint32, error) {
func (b *BlockInfo) Unmarshal(buf []byte) error {
b.BlockID = types.DecodeFixed[types.Blockid](buf[:types.BlockidSize])
buf = buf[types.BlockidSize:]
b.Appendable = types.DecodeFixed[bool](buf)
buf = buf[1:]
b.Sorted = types.DecodeFixed[bool](buf)
buf = buf[1:]

copy(b.MetaLoc[:], buf[:LocationLen])
buf = buf[LocationLen:]

b.StateFlag = types.DecodeInt8(buf[:1])
buf = buf[1:]

b.PartitionNum = types.DecodeFixed[int16](buf[:2])
return nil
}
Expand Down
29 changes: 17 additions & 12 deletions pkg/objectio/block_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,21 @@ func TestBlockInfoSliceTraverse(t *testing.T) {
for i := 0; i < s.Len(); i++ {
blkInfo := s.Get(i)
require.Equal(t, intToBlockid(int32(i)), blkInfo.BlockID)
require.Equal(t, false, blkInfo.Appendable)
blkInfo.Appendable = true
require.Equal(t, false, blkInfo.IsAppendable())
blkInfo.StateFlag |= AppendableFlag
}

for i := 0; i < s.Len(); i++ {
require.Equal(t, true, s.Get(i).Appendable)
require.Equal(t, true, s.Get(i).IsAppendable())
}

s.AppendBlockInfo(BlockInfo{BlockID: intToBlockid(1000), Appendable: true})
blk := BlockInfo{BlockID: intToBlockid(1000)}
blk.StateFlag |= AppendableFlag

s.AppendBlockInfo(blk)

for i := 0; i < s.Len(); i++ {
require.Equal(t, true, s.Get(i).Appendable)
require.Equal(t, true, s.Get(i).IsAppendable())
}
}

Expand All @@ -109,14 +112,16 @@ func TestBytesToBlockInfoSlice(t *testing.T) {
for i := 0; i < s.Len(); i++ {
blkInfo := s.Get(i)
require.Equal(t, intToBlockid(int32(i)), blkInfo.BlockID)
require.Equal(t, false, blkInfo.Appendable)
blkInfo.Appendable = true
require.Equal(t, false, blkInfo.IsAppendable())
blkInfo.StateFlag |= AppendableFlag
}

s.AppendBlockInfo(BlockInfo{BlockID: intToBlockid(1000), Appendable: true})
blk := BlockInfo{BlockID: intToBlockid(1000)}
blk.StateFlag |= AppendableFlag
s.AppendBlockInfo(blk)

for i := 0; i < s.Len(); i++ {
require.Equal(t, true, s.Get(i).Appendable)
require.Equal(t, true, s.Get(i).IsAppendable())
}

require.Equal(t, 1000*BlockInfoSize, len(bs))
Expand All @@ -125,10 +130,10 @@ func TestBytesToBlockInfoSlice(t *testing.T) {
require.Equal(t, 1001*BlockInfoSize, len(bs))
require.Equal(t, s.GetAllBytes(), bs)

s.Get(999).Appendable = false
require.Equal(t, false, s.Get(999).Appendable)
s.Get(999).StateFlag &= ^AppendableFlag
require.Equal(t, false, s.Get(999).IsAppendable())
blkInfo := DecodeBlockInfo(bs[999*BlockInfoSize:])
require.Equal(t, false, blkInfo.Appendable)
require.Equal(t, false, blkInfo.IsAppendable())
}

func TestBlockInfoSlice_Slice(t *testing.T) {
Expand Down
59 changes: 43 additions & 16 deletions pkg/objectio/object_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

type ObjectDescriber interface {
DescribeObject() ([]ObjectStats, error)
DescribeObject() (ObjectStats, error)
}

const (
Expand All @@ -44,6 +44,12 @@ const (
ObjectStatsLen = reservedOffset + reservedLen
)

const (
AppendableFlag = 0x1
SortedFlag = 0x2
CNCreatedFlag = 0x4
)

var ZeroObjectStats ObjectStats

// ObjectStats has format:
Expand All @@ -52,6 +58,26 @@ var ZeroObjectStats ObjectStats
// +------------------------------------------------------------------------------------------------+--------+
type ObjectStats [ObjectStatsLen]byte

type ObjectStatsOptions func(*ObjectStats)

func WithCNCreated() ObjectStatsOptions {
return func(o *ObjectStats) {
o[reservedOffset] |= CNCreatedFlag
}
}

func WithSorted() ObjectStatsOptions {
return func(o *ObjectStats) {
o[reservedOffset] |= SortedFlag
}
}

func WithAppendable() ObjectStatsOptions {
return func(o *ObjectStats) {
o[reservedOffset] |= AppendableFlag
}
}

func NewObjectStats() *ObjectStats {
return new(ObjectStats)
}
Expand All @@ -60,14 +86,17 @@ func NewObjectStatsWithObjectID(id *ObjectId, appendable, sorted, cnCreated bool
stats := new(ObjectStats)
SetObjectStatsObjectName(stats, BuildObjectNameWithObjectID(id))
if appendable {
stats.setAppendable()
stats[reservedOffset] = stats[reservedOffset] | AppendableFlag
}

if sorted {
stats.SetSorted()
stats[reservedOffset] = stats[reservedOffset] | SortedFlag
}

if cnCreated {
stats.SetCNCreated()
stats[reservedOffset] = stats[reservedOffset] | CNCreatedFlag
}

return stats
}
func (des *ObjectStats) Marshal() []byte {
Expand All @@ -78,29 +107,27 @@ func (des *ObjectStats) UnMarshal(data []byte) {
copy(des[:], data)
}

func (des *ObjectStats) GetFlag() int8 {
return int8(des[reservedOffset])
}

// Clone deep copies the stats and returns its pointer
func (des *ObjectStats) Clone() *ObjectStats {
copied := NewObjectStats()
copy(copied[:], des[:])
return copied
}
func (des *ObjectStats) setAppendable() {
des[reservedOffset] = des[reservedOffset] | 0x1
}

func (des *ObjectStats) GetAppendable() bool {
return des[reservedOffset]&0x1 != 0
}
func (des *ObjectStats) SetSorted() {
des[reservedOffset] = des[reservedOffset] | 0x2
return des[reservedOffset]&AppendableFlag != 0
}

func (des *ObjectStats) GetSorted() bool {
return des[reservedOffset]&0x2 != 0
}
func (des *ObjectStats) SetCNCreated() {
des[reservedOffset] = des[reservedOffset] | 0x4
return des[reservedOffset]&SortedFlag != 0
}

func (des *ObjectStats) GetCNCreated() bool {
return des[reservedOffset]&0x4 != 0
return des[reservedOffset]&CNCreatedFlag != 0
}
func (des *ObjectStats) IsZero() bool {
return bytes.Equal(des[:], ZeroObjectStats[:])
Expand Down
17 changes: 17 additions & 0 deletions pkg/objectio/object_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,20 @@ func TestObjectStats_Marshal_UnMarshal(t *testing.T) {
require.True(t, bytes.Equal(stats.Marshal(), rawBytes))
fmt.Println(stats.String())
}

func TestObjectStatsOptions(t *testing.T) {
stats := NewObjectStats()
require.True(t, stats.IsZero())
require.False(t, stats.GetAppendable())
require.False(t, stats.GetCNCreated())
require.False(t, stats.GetSorted())

WithCNCreated()(stats)
require.True(t, stats.GetCNCreated())

WithSorted()(stats)
require.True(t, stats.GetSorted())

WithAppendable()(stats)
require.True(t, stats.GetAppendable())
}
26 changes: 18 additions & 8 deletions pkg/objectio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type objectWriterV1 struct {
name ObjectName
compressBuf []byte
bloomFilter []byte
objStats []ObjectStats
objStats ObjectStats
sortKeySeqnum uint16
appendable bool
originSize uint32
Expand Down Expand Up @@ -132,8 +132,14 @@ func newObjectWriterV1(name ObjectName, fs fileservice.FileService, schemaVersio
return writer, nil
}

func (w *objectWriterV1) GetObjectStats() []ObjectStats {
return w.objStats
func (w *objectWriterV1) GetObjectStats(opts ...ObjectStatsOptions) (copied ObjectStats) {
copied = w.objStats

for _, opt := range opts {
opt(&copied)
}

return copied
}

func describeObjectHelper(w *objectWriterV1, colmeta []ColumnMeta, idx DataMetaType) ObjectStats {
Expand Down Expand Up @@ -163,14 +169,18 @@ func describeObjectHelper(w *objectWriterV1, colmeta []ColumnMeta, idx DataMetaT
// if an object only has deletes, only the tombstone object stats valid.
//
// if an object has both inserts and deletes, both stats are valid.
func (w *objectWriterV1) DescribeObject() ([]ObjectStats, error) {
stats := make([]ObjectStats, 2)
func (w *objectWriterV1) DescribeObject() (ObjectStats, error) {
var stats ObjectStats

if len(w.blocks[SchemaData]) != 0 {
stats[SchemaData] = describeObjectHelper(w, w.colmeta, SchemaData)
stats = describeObjectHelper(w, w.colmeta, SchemaData)
}

if len(w.blocks[SchemaTombstone]) != 0 {
stats[SchemaTombstone] = describeObjectHelper(w, w.tombstonesColmeta, SchemaTombstone)
if !stats.IsZero() {
panic("schema data and schema tombstone should not all be valid at the same time")
}
stats = describeObjectHelper(w, w.tombstonesColmeta, SchemaTombstone)
}

return stats, nil
Expand Down Expand Up @@ -611,7 +621,7 @@ func (w *objectWriterV1) Sync(ctx context.Context, items ...WriteOptions) error
}

func (w *objectWriterV1) GetDataStats() ObjectStats {
return w.objStats[SchemaData]
return w.objStats
}

func (w *objectWriterV1) WriteWithCompress(offset uint32, buf []byte) (data []byte, extent Extent, err error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/objectio/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestNewObjectWriter(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 3, len(blocks))
assert.Nil(t, objectWriter.buffer)
require.Equal(t, objectWriter.objStats[0].Size(), blocks[0].GetExtent().End()+FooterSize)
require.Equal(t, objectWriter.objStats.Size(), blocks[0].GetExtent().End()+FooterSize)

objectReader, _ := NewObjectReaderWithStr(name, service)
extents := make([]Extent, 3)
Expand All @@ -130,7 +130,7 @@ func TestNewObjectWriter(t *testing.T) {
oSize += meta.BlockHeader().ZoneMapArea().OriginSize()
// 24 is the size of empty bf and zm
oSize += HeaderSize + FooterSize + 24 + extents[0].OriginSize()
require.Equal(t, objectWriter.objStats[0].OriginSize(), oSize)
require.Equal(t, objectWriter.objStats.OriginSize(), oSize)
assert.Equal(t, uint32(3), meta.BlockCount())
assert.True(t, meta.BlockHeader().Appendable())
assert.Equal(t, uint16(math.MaxUint16), meta.BlockHeader().SortKey())
Expand Down
Loading

0 comments on commit d1d05f1

Please sign in to comment.