Skip to content

Commit

Permalink
feat: transport layer performance optimization
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai committed Dec 17, 2022
1 parent 6e9856b commit f285759
Show file tree
Hide file tree
Showing 12 changed files with 1,225 additions and 338 deletions.
309 changes: 225 additions & 84 deletions client/internal/vanus/store/block_store.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion client/pkg/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ type Eventbus interface {
type BusWriter interface {
AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error)
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (eid string, err error)
AppendOneStream(ctx context.Context, event *ce.Event, cb Callback, opts ...WriteOption)
SyncAppendOneStream(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error)
}

type BusReader interface {
Read(ctx context.Context, opts ...ReadOption) ([]*ce.Event, int64, uint64, error)
SyncReadStream(ctx context.Context, opts ...ReadOption) ([]*ce.Event, int64, uint64, error)
}

type Eventlog interface {
Expand Down
52 changes: 47 additions & 5 deletions client/pkg/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.
return encoded, nil
}

func (w *busWriter) AppendOneStream(ctx context.Context, event *ce.Event, cb api.Callback, opts ...api.WriteOption) {
_ctx, span := w.tracer.Start(ctx, "AppendOneStream")
func (w *busWriter) SyncAppendOneStream(ctx context.Context, event *ce.Event, opts ...api.WriteOption) (eid string, err error) {
_ctx, span := w.tracer.Start(ctx, "SyncAppendOneStream")
defer span.End()

var writeOpts *api.WriteOptions = w.opts
Expand All @@ -493,12 +493,22 @@ func (w *busWriter) AppendOneStream(ctx context.Context, event *ce.Event, cb api
// 1. pick a writer of eventlog
lw, err := w.pickWritableLog(_ctx, writeOpts)
if err != nil {
cb(err)
return
return "", err
}

// 2. append the event to the eventlog
lw.AppendStream(_ctx, event, cb)
off, err := lw.SyncAppendStream(_ctx, event)
if err != nil {
return "", err
}

// 3. generate event ID
var buf [16]byte
binary.BigEndian.PutUint64(buf[0:8], lw.Log().ID())
binary.BigEndian.PutUint64(buf[8:16], uint64(off))
encoded := base64.StdEncoding.EncodeToString(buf[:])

return encoded, nil
}

func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...api.WriteOption) (eid string, err error) {
Expand Down Expand Up @@ -567,6 +577,38 @@ func (r *busReader) Read(ctx context.Context, opts ...api.ReadOption) ([]*ce.Eve
return events, off, lr.Log().ID(), nil
}

func (r *busReader) SyncReadStream(ctx context.Context, opts ...api.ReadOption) ([]*ce.Event, int64, uint64, error) {
_ctx, span := r.tracer.Start(ctx, "Read")
defer span.End()

var readOpts *api.ReadOptions = r.opts
if len(opts) > 0 {
readOpts = r.opts.Copy()
for _, opt := range opts {
opt(readOpts)
}
}

// 1. pick a reader of eventlog
lr, err := r.pickReadableLog(_ctx, readOpts)
if err != nil {
return []*ce.Event{}, 0, 0, err
}

// TODO(jiangkai): refactor eventlog interface to avoid seek every time, by jiangkai, 2022.10.24
off, err := lr.Seek(_ctx, readOpts.Policy.Offset(), io.SeekStart)
if err != nil {
return []*ce.Event{}, 0, 0, err
}

// 2. read the event to the eventlog
events, err := lr.SyncReadStream(_ctx, int16(readOpts.BatchSize))
if err != nil {
return []*ce.Event{}, 0, 0, err
}
return events, off, lr.Log().ID(), nil
}

func (r *busReader) Bus() api.Eventbus {
return r.ebus
}
Expand Down
4 changes: 3 additions & 1 deletion client/pkg/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type LogWriter interface {

Append(ctx context.Context, event *ce.Event) (off int64, err error)

AppendStream(ctx context.Context, event *ce.Event, cb api.Callback)
SyncAppendStream(ctx context.Context, event *ce.Event) (int64, error)
}

type LogReader interface {
Expand All @@ -62,6 +62,8 @@ type LogReader interface {
// TODO: async
Read(ctx context.Context, size int16) (events []*ce.Event, err error)

SyncReadStream(ctx context.Context, size int16) (events []*ce.Event, err error)

// Seek sets the offset for the next Read to offset,
// interpreted according to whence.
//
Expand Down
71 changes: 66 additions & 5 deletions client/pkg/eventlog/eventlog_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

// this project.
el "github.com/linkall-labs/vanus/client/internal/vanus/eventlog"
"github.com/linkall-labs/vanus/client/pkg/api"
"github.com/linkall-labs/vanus/client/pkg/record"
vlog "github.com/linkall-labs/vanus/observability/log"
"github.com/linkall-labs/vanus/pkg/errors"
Expand Down Expand Up @@ -386,13 +385,43 @@ func (w *logWriter) doAppend(ctx context.Context, event *ce.Event) (int64, error
return offset, nil
}

func (w *logWriter) AppendStream(ctx context.Context, event *ce.Event, cb api.Callback) {
func (w *logWriter) SyncAppendStream(ctx context.Context, event *ce.Event) (int64, error) {
// TODO: async for throughput

retryTimes := defaultRetryTimes
for i := 1; i <= retryTimes; i++ {
offset, err := w.doSyncAppendStream(ctx, event)
if err == nil {
return offset, nil
}
vlog.Warning(ctx, "failed to Append", map[string]interface{}{
vlog.KeyError: err,
"offset": offset,
})
if errors.Is(err, errors.ErrSegmentFull) {
if i < retryTimes {
continue
}
}
return -1, err
}

return -1, errors.ErrUnknown
}

func (w *logWriter) doSyncAppendStream(ctx context.Context, event *ce.Event) (int64, error) {
segment, err := w.selectWritableSegment(ctx)
if err != nil {
cb(err)
return
return -1, err
}
segment.AppendStream(ctx, event, cb)
offset, err := segment.SyncAppendStream(ctx, event)
if err != nil {
if errors.Is(err, errors.ErrSegmentFull) {
segment.SetNotWritable()
}
return -1, err
}
return offset, nil
}

func (w *logWriter) selectWritableSegment(ctx context.Context) (*segment, error) {
Expand Down Expand Up @@ -470,6 +499,38 @@ func (r *logReader) Read(ctx context.Context, size int16) ([]*ce.Event, error) {
return events, nil
}

func (r *logReader) SyncReadStream(ctx context.Context, size int16) ([]*ce.Event, error) {
if r.cur == nil {
segment, err := r.elog.selectReadableSegment(ctx, r.pos)
if errors.Is(err, errors.ErrOffsetOnEnd) {
r.elog.refreshReadableSegments(ctx)
segment, err = r.elog.selectReadableSegment(ctx, r.pos)
}
if err != nil {
return nil, err
}
r.cur = segment
}

events, err := r.cur.SyncReadStream(ctx, r.pos, size, uint32(r.pollingTimeout(ctx)))
if err != nil {
if errors.Is(err, errors.ErrOffsetOverflow) {
r.elog.refreshReadableSegments(ctx)
if r.switchSegment(ctx) {
return nil, errors.ErrTryAgain
}
}
return nil, err
}

r.pos += int64(len(events))
if r.pos == r.cur.EndOffset() {
r.switchSegment(ctx)
}

return events, nil
}

func (r *logReader) pollingTimeout(ctx context.Context) int64 {
if r.cfg.PollingTimeout == 0 {
return 0
Expand Down
57 changes: 52 additions & 5 deletions client/pkg/eventlog/log_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"

// this project.
"github.com/linkall-labs/vanus/client/pkg/api"

"github.com/linkall-labs/vanus/client/pkg/record"
"github.com/linkall-labs/vanus/pkg/errors"
)
Expand Down Expand Up @@ -171,16 +171,19 @@ func (s *segment) Append(ctx context.Context, event *ce.Event) (int64, error) {
return off + s.startOffset, nil
}

func (s *segment) AppendStream(ctx context.Context, event *ce.Event, cb api.Callback) {
func (s *segment) SyncAppendStream(ctx context.Context, event *ce.Event) (int64, error) {
_ctx, span := s.tracer.Start(ctx, "AppendStream")
defer span.End()

b := s.preferSegmentBlock()
if b == nil {
cb(errors.ErrNotLeader)
return
return -1, errors.ErrNotLeader
}
off, err := b.SyncAppendStream(_ctx, event)
if err != nil {
return -1, err
}
b.AppendStream(_ctx, event, cb)
return off + s.startOffset, nil
}

func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) {
Expand Down Expand Up @@ -227,6 +230,50 @@ func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeo
return events, err
}

func (s *segment) SyncReadStream(ctx context.Context, from int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) {
if from < s.startOffset {
return nil, errors.ErrOffsetUnderflow
}
ctx, span := s.tracer.Start(ctx, "Read")
defer span.End()

if eo := s.endOffset.Load(); eo >= 0 {
if from > eo {
return nil, errors.ErrOffsetOverflow
}
if int64(size) > eo-from {
size = int16(eo - from)
}
}
// TODO: cached read
b := s.preferSegmentBlock()
if b == nil {
return nil, errors.ErrBlockNotFound
}
events, err := b.SyncReadStream(ctx, from-s.startOffset, size, pollingTimeout)
if err != nil {
return nil, err
}

for _, e := range events {
v, ok := e.Extensions()[segpb.XVanusBlockOffset]
if !ok {
continue
}
off, ok := v.(int32)
if !ok {
return events, errors.ErrCorruptedEvent
}
offset := s.startOffset + int64(off)
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(offset))
e.SetExtension(XVanusLogOffset, buf)
e.SetExtension(segpb.XVanusBlockOffset, nil)
}

return events, err
}

func (s *segment) preferSegmentBlock() *block {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down
19 changes: 16 additions & 3 deletions client/pkg/eventlog/segment_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

// this project
"github.com/linkall-labs/vanus/client/internal/vanus/store"
"github.com/linkall-labs/vanus/client/pkg/api"
"github.com/linkall-labs/vanus/client/pkg/record"
"github.com/linkall-labs/vanus/pkg/errors"
)
Expand Down Expand Up @@ -57,8 +56,8 @@ func (s *block) Append(ctx context.Context, event *ce.Event) (int64, error) {
return s.store.Append(ctx, s.id, event)
}

func (s *block) AppendStream(ctx context.Context, event *ce.Event, cb api.Callback) {
s.store.AppendStream(ctx, s.id, event, cb)
func (s *block) SyncAppendStream(ctx context.Context, event *ce.Event) (int64, error) {
return s.store.SyncAppendStream(ctx, s.id, event)
}

func (s *block) Read(ctx context.Context, offset int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) {
Expand All @@ -74,3 +73,17 @@ func (s *block) Read(ctx context.Context, offset int64, size int16, pollingTimeo
}
return s.store.Read(ctx, s.id, offset, size, pollingTimeout)
}

func (s *block) SyncReadStream(ctx context.Context, offset int64, size int16, pollingTimeout uint32) ([]*ce.Event, error) {
if offset < 0 {
return nil, errors.ErrOffsetUnderflow
}
if size > 0 {
// doRead
} else if size == 0 {
return make([]*ce.Event, 0), nil
} else if size < 0 {
return nil, errors.ErrInvalidArgument
}
return s.store.SyncReadStream(ctx, s.id, offset, size, pollingTimeout)
}
2 changes: 1 addition & 1 deletion internal/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (ga *ceGateway) receive(ctx context.Context, event v2.Event) (*v2.Event, pr
v, _ = ga.busWriter.LoadOrStore(ebName, ga.client.Eventbus(ctx, ebName).Writer())
}
writer, _ := v.(api.BusWriter)
eventID, err := writer.AppendOne(_ctx, &event)
eventID, err := writer.SyncAppendOneStream(_ctx, &event)
if err != nil {
log.Warning(_ctx, "append to failed", map[string]interface{}{
log.KeyError: err,
Expand Down
Loading

0 comments on commit f285759

Please sign in to comment.