Skip to content

Commit

Permalink
fix review comments
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai committed Feb 6, 2023
1 parent 8b9f873 commit 8c06b70
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 59 deletions.
24 changes: 20 additions & 4 deletions client/pkg/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,11 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti
b.readableMu.RLock()
defer b.readableMu.RUnlock()
if len(b.readableLogs) == 0 {
b.refreshReadableLogs(ctx)
func() {
b.readableMu.RUnlock()
defer b.readableMu.RLock()
b.refreshReadableLogs(ctx)
}()
}
if l, ok := b.readableLogs[logID]; ok {
return l, nil
Expand All @@ -197,7 +201,11 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti
b.writableMu.RLock()
defer b.writableMu.RUnlock()
if len(b.writableLogs) == 0 {
b.refreshWritableLogs(ctx)
func() {
b.writableMu.RUnlock()
defer b.writableMu.RLock()
b.refreshWritableLogs(ctx)
}()
}
if l, ok := b.writableLogs[logID]; ok {
return l, nil
Expand All @@ -222,7 +230,11 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev
b.readableMu.RLock()
defer b.readableMu.RUnlock()
if len(b.readableLogs) == 0 {
b.refreshReadableLogs(ctx)
func() {
b.readableMu.RUnlock()
defer b.readableMu.RLock()
b.refreshReadableLogs(ctx)
}()
}
eventlogs := make([]api.Eventlog, 0)
for _, el := range b.readableLogs {
Expand All @@ -233,7 +245,11 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev
b.writableMu.RLock()
defer b.writableMu.RUnlock()
if len(b.writableLogs) == 0 {
b.refreshWritableLogs(ctx)
func() {
b.writableMu.RUnlock()
defer b.writableMu.RLock()
b.refreshWritableLogs(ctx)
}()
}
eventlogs := make([]api.Eventlog, 0)
for _, el := range b.writableLogs {
Expand Down
9 changes: 2 additions & 7 deletions client/pkg/eventlog/eventlog_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func (l *eventlog) ID() uint64 {
func (l *eventlog) Close(ctx context.Context) {
l.writableWatcher.Close()
l.readableWatcher.Close()
l.logWriter = nil
l.logReader = nil
l.logWriter.Close(ctx)
l.logReader.Close(ctx)

for _, segment := range l.writableSegments {
segment.Close(ctx)
Expand Down Expand Up @@ -222,11 +222,6 @@ func (l *eventlog) QueryOffsetByTime(ctx context.Context, timestamp int64) (int6
// LastEntryStime
// time.UnixMilli
tailSeg := fetchTailSegment(ctx, segs)
if tailSeg.firstEventBornAt.Before(t) {
// the target offset maybe in newer segment, refresh immediately
l.refreshReadableSegments(ctx)
segs = l.fetchReadableSegments(ctx)
}

for idx := range segs {
seg := segs[idx]
Expand Down
16 changes: 10 additions & 6 deletions internal/store/block/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,22 @@ type Raw interface {
Delete(context.Context) error
}

type State string

const (
StateWorking = State("working")
StateArchiving = State("archiving")
StateArchived = State("archived")
Working = 0
Archived = 1
Archiving = 2
)

var (
StateWorking = uint32(Working)
StateArchived = uint32(Archived)
StateArchiving = uint32(Archiving)
)

type Statistics struct {
ID vanus.ID
Capacity uint64
State State
State uint32
EntryNum uint32
EntrySize uint64
// FirstEntryStime is the millisecond timestamp when the first Entry will be written to Block.
Expand Down
25 changes: 15 additions & 10 deletions internal/store/segment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,14 +425,17 @@ func (s *server) runHeartbeat(_ context.Context) error {
break
}

s.replicas.Range(func(key, value interface{}) bool {
value, ok := s.replicas.Load(info.leader.Uint64())
if ok {
b, _ := value.(replica)
if b.appender.Status().Leader.Equals(vanus.ID(segment.LeaderBlockId)) {
b.appender.Archive(context.Background())
return false
if b.appender.Status().Term != info.term {
break
}
return true
})
if info.leader.Uint64() != segment.LeaderBlockId {
break
}
b.appender.Archive(context.Background())
}
}
}
}()
Expand Down Expand Up @@ -904,13 +907,15 @@ func (s *server) checkState() error {
return nil
}

func toSegmentState(state block.State) string {
func toSegmentState(state uint32) string {
switch state {
case block.StateArchiving:
return "freezing"
case block.StateWorking:
return "working"
case block.StateArchived:
return "frozen"
case block.StateArchiving:
return "freezing"
default:
return string(state)
return "working"
}
}
24 changes: 14 additions & 10 deletions internal/store/vsb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type meta struct {
entryLength int64
// entryNum is the number of persisted entries.
entryNum int64
// archived is the flag indicating Block is archived.
archived bool
// state is the flag indicating Block state.
state uint32
}

// vsBlock is Vanus block file.
Expand Down Expand Up @@ -97,7 +97,7 @@ func (b *vsBlock) Close(ctx context.Context) error {
}

// Flush metadata.
if b.fm.archived != m.archived || b.fm.entryLength != m.entryLength {
if b.fm.state != m.state || b.fm.entryLength != m.entryLength {
if err := b.persistHeader(ctx, m); err != nil {
return err
}
Expand All @@ -112,15 +112,11 @@ func (b *vsBlock) Delete(context.Context) error {
}

func (b *vsBlock) status() block.Statistics {
state := block.StateWorking
m, indexes := b.makeSnapshot()
if m.archived {
state = block.StateArchiving
}
return b.stat(m, indexes, state)
return b.stat(m, indexes, m.state)
}

func (b *vsBlock) stat(m meta, indexes []index.Index, state block.State) block.Statistics {
func (b *vsBlock) stat(m meta, indexes []index.Index, state uint32) block.Statistics {
s := block.Statistics{
ID: b.id,
Capacity: uint64(b.capacity),
Expand All @@ -137,6 +133,14 @@ func (b *vsBlock) stat(m meta, indexes []index.Index, state block.State) block.S
return s
}

func (b *vsBlock) prefull(state uint32) block.Statistics {
s := block.Statistics{
ID: b.id,
State: state,
}
return s
}

func (b *vsBlock) full() bool {
return atomic.LoadUint32(&b.actx.archived) != 0
return atomic.LoadUint32(&b.actx.state) != block.StateWorking
}
22 changes: 13 additions & 9 deletions internal/store/vsb/block_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ var (
)

type appendContext struct {
seq int64
offset int64
archived uint32
seq int64
offset int64
state uint32
}

// Make sure appendContext implements block.AppendContext.
Expand All @@ -56,8 +56,12 @@ func (c *appendContext) WriteOffset() int64 {
return c.offset
}

func (c *appendContext) Archiving() bool {
return c.state == block.StateArchiving
}

func (c *appendContext) Archived() bool {
return c.archived != 0
return c.state == block.StateArchived
}

// Make sure vsBlock implements block.TwoPCAppender.
Expand All @@ -70,9 +74,10 @@ func (b *vsBlock) NewAppendContext(last block.Fragment) block.AppendContext {
actx := &appendContext{
seq: seq + 1,
offset: last.EndOffset(),
state: block.StateWorking,
}
if ceschema.EntryType(entry) == ceschema.End {
actx.archived = 1
actx.state = block.StateArchived
}
return actx
}
Expand Down Expand Up @@ -110,8 +115,7 @@ func (b *vsBlock) PrepareAppend(

full := actx.size(b.dataOffset) >= b.capacity
if full && b.lis != nil {
m, indexes := makeSnapshot(b.actx, b.indexes)
b.lis.OnArchived(b.stat(m, indexes, block.StateArchiving))
b.lis.OnArchived(b.prefull(block.StateArchiving))
}

return seqs, frag, full, nil
Expand All @@ -129,7 +133,7 @@ func (b *vsBlock) PrepareArchive(ctx context.Context, appendCtx block.AppendCont

actx.offset += int64(frag.Size())
actx.seq++
actx.archived = 1
actx.state = block.StateArchiving

return frag, nil
}
Expand Down Expand Up @@ -190,7 +194,7 @@ func (b *vsBlock) CommitAppend(ctx context.Context, frag block.Fragment, cb bloc
return
}

atomic.StoreUint32(&b.actx.archived, 1)
atomic.StoreUint32(&b.actx.state, block.StateArchived)

b.wg.Add(1)
b.s.Append(bytes.NewReader(frag.Payload()), func(n int, err error) {
Expand Down
5 changes: 3 additions & 2 deletions internal/store/vsb/block_append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestVSBlock_Append(t *testing.T) {
So(actx.WriteOffset(), ShouldEqual, vsbtest.EndEntryOffset+vsbtest.EndEntrySize)
So(actx.Archived(), ShouldBeTrue)

b.actx.archived = 1
b.actx.state = block.StateArchived
actx = b.NewAppendContext(nil)
So(actx, ShouldNotBeNil)
So(actx.WriteOffset(), ShouldEqual, headerBlockSize)
Expand Down Expand Up @@ -102,6 +102,7 @@ func TestVSBlock_Append(t *testing.T) {
dataOffset: headerBlockSize,
actx: appendContext{
offset: headerBlockSize,
state: block.StateWorking,
},
enc: codec.NewEncoder(),
dec: dec,
Expand Down Expand Up @@ -288,7 +289,7 @@ func TestVSBlock_Append(t *testing.T) {
<-ch

stat = b.status()
So(stat.State, ShouldEqual, block.StateArchiving)
So(stat.State, ShouldEqual, block.StateArchived)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

Expand Down
16 changes: 13 additions & 3 deletions internal/store/vsb/block_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"hash/crc32"

// this project.
"github.com/linkall-labs/vanus/internal/store/block"
"github.com/linkall-labs/vanus/internal/store/block/raw"
)

Expand Down Expand Up @@ -52,8 +53,10 @@ func (b *vsBlock) persistHeader(ctx context.Context, m meta) error {
binary.LittleEndian.PutUint32(buf[flagsOffset:], 0) // flags
binary.LittleEndian.PutUint32(buf[breakFlagsOffset:], 0) // break flags
binary.LittleEndian.PutUint32(buf[dataOffsetOffset:], uint32(b.dataOffset)) // data offset
if m.archived { // state
if m.state == block.StateArchived { // state
buf[stateOffset] = 1
} else if m.state == block.StateArchiving {
buf[stateOffset] = 2
}
binary.LittleEndian.PutUint16(buf[indexSizeOffset:], b.indexSize) // index size
binary.LittleEndian.PutUint64(buf[capacityOffset:], uint64(b.capacity)) // capacity
Expand Down Expand Up @@ -94,8 +97,15 @@ func (b *vsBlock) loadHeader(ctx context.Context) error {
return errIncomplete
}

b.dataOffset = int64(binary.LittleEndian.Uint32(buf[dataOffsetOffset:])) // data offset
b.fm.archived = buf[stateOffset] != 0 // state
b.dataOffset = int64(binary.LittleEndian.Uint32(buf[dataOffsetOffset:])) // data offset
switch buf[stateOffset] { // state
case block.Working:
b.fm.state = block.StateWorking
case block.Archived:
b.fm.state = block.StateArchived
case block.Archiving:
b.fm.state = block.StateArchived
}
b.indexSize = binary.LittleEndian.Uint16(buf[indexSizeOffset:]) // index size
b.capacity = int64(binary.LittleEndian.Uint64(buf[capacityOffset:])) // capacity
b.fm.entryLength = int64(binary.LittleEndian.Uint64(buf[entryLengthOffset:])) // entry length
Expand Down
6 changes: 3 additions & 3 deletions internal/store/vsb/block_open.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ func (b *vsBlock) init(ctx context.Context) error {
func (b *vsBlock) repairMeta() error {
off := b.dataOffset + b.fm.entryLength
seq := b.fm.entryNum
full := b.fm.archived

var entry block.Entry
var full bool
var err error
var n, en int

// Scan entries.
indexes := make([]index.Index, 0)
// Note: use math.MaxInt64-off to avoid overflow.
r := io.NewSectionReader(b.f, off, math.MaxInt64-off)
if full {
if b.fm.state == block.StateArchiving || b.fm.state == block.StateArchived {
n, entry, err = b.dec.UnmarshalReader(r)
if err != nil || ceschema.EntryType(entry) != ceschema.End {
return errCorrupted
Expand Down Expand Up @@ -161,7 +161,7 @@ SET_META:
b.actx.seq = seq
b.actx.offset = off
if full {
b.actx.archived = 1
b.actx.state = block.StateArchived
}

return nil
Expand Down
5 changes: 4 additions & 1 deletion internal/store/vsb/block_open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestVSBlock_Open(t *testing.T) {
stat := b.status()
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.State, ShouldEqual, block.StateArchiving)
So(stat.State, ShouldEqual, block.StateArchived)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

So(b.indexes, ShouldHaveLength, 2)
Expand Down Expand Up @@ -133,6 +133,9 @@ func TestVSBlock_Open(t *testing.T) {
So(err, ShouldBeNil)

b := &vsBlock{
actx: appendContext{
state: block.StateWorking,
},
path: f.Name(),
}

Expand Down
2 changes: 1 addition & 1 deletion internal/store/vsb/block_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestVSBlock_Read(t *testing.T) {
So(err, ShouldBeError, block.ErrExceeded)

Convey("after block is full", func() {
b.actx.archived = 1
b.actx.state = block.StateArchived

_, err = b.Read(context.Background(), 2, 1)
So(err, ShouldBeError, block.ErrExceeded)
Expand Down
Loading

0 comments on commit 8c06b70

Please sign in to comment.