diff --git a/client/client.go b/client/client.go index a1ce9a880..61425d525 100644 --- a/client/client.go +++ b/client/client.go @@ -28,26 +28,20 @@ import ( "google.golang.org/grpc/credentials/insecure" // this project. - eb "github.com/vanus-labs/vanus/client/internal/vanus/eventbus" + eb "github.com/vanus-labs/vanus/client/internal/eventbus" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventbus" ) -type Client interface { - Eventbus(ctx context.Context, opts ...api.EventbusOption) api.Eventbus - Disconnect(ctx context.Context) -} - type client struct { - // Endpoints is a list of URLs. - Endpoints []string - eventbusCache sync.Map + endpoints []string + cache sync.Map mu sync.RWMutex tracer *tracing.Tracer } -func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.Eventbus { +func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) (api.Eventbus, error) { _, span := c.tracer.Start(ctx, "EventbusService") defer span.End() @@ -56,19 +50,18 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.E apply(defaultOpts) } - err := GetEventbusIDIfNotSet(ctx, c.Endpoints, defaultOpts) + err := GetEventbusIDIfNotSet(ctx, c.endpoints, defaultOpts) if err != nil { log.Error(ctx).Err(err). Str("eventbus_name", defaultOpts.Name). Uint64("eventbus_id", defaultOpts.ID). Msg("get eventbus id failed") - return nil + return nil, err } bus := func() api.Eventbus { - c.mu.RLock() - defer c.mu.RUnlock() - if value, ok := c.eventbusCache.Load(defaultOpts.ID); ok { + if value, ok := c.cache.Load(defaultOpts.ID); ok { + value.(*eventbus.Eventbus).Acquire() return value.(api.Eventbus) } else { return nil @@ -78,37 +71,39 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.E if bus == nil { c.mu.Lock() defer c.mu.Unlock() - if value, ok := c.eventbusCache.Load(defaultOpts.ID); ok { // double check - return value.(api.Eventbus) + if value, ok := c.cache.Load(defaultOpts.ID); ok { // double check + return value.(api.Eventbus), nil } else { cfg := &eb.Config{ - Endpoints: c.Endpoints, + Endpoints: c.endpoints, ID: defaultOpts.ID, } - newEventbus := eventbus.NewEventbus(cfg) - c.eventbusCache.Store(defaultOpts.ID, newEventbus) - return newEventbus + newEventbus := eventbus.NewEventbus(cfg, c.close) + newEventbus.Acquire() + c.cache.Store(defaultOpts.ID, newEventbus) + return newEventbus, nil } } - return bus + return bus, nil } func (c *client) Disconnect(ctx context.Context) { - c.mu.Lock() - defer c.mu.Unlock() - c.eventbusCache.Range(func(key, value interface{}) bool { + c.cache.Range(func(key, value interface{}) bool { value.(api.Eventbus).Close(ctx) - c.eventbusCache.Delete(key) return true }) } -func Connect(endpoints []string) Client { +func (c *client) close(id uint64) { + c.cache.Delete(id) +} + +func Connect(endpoints []string) api.Client { if len(endpoints) == 0 { return nil } return &client{ - Endpoints: endpoints, + endpoints: endpoints, } } diff --git a/client/examples/eventbus/append/main.go b/client/examples/eventbus/append/main.go index a76c9bfe1..1ec39f0d4 100644 --- a/client/examples/eventbus/append/main.go +++ b/client/examples/eventbus/append/main.go @@ -43,7 +43,10 @@ func main() { if err != nil { panic("invalid id") } - bus := c.Eventbus(ctx, api.WithName("quick-start"), api.WithID(eventbusID.Uint64())) + bus, err := c.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + panic(err.Error()) + } w := bus.Writer() // Create an Event. event := ce.NewEvent() diff --git a/client/examples/eventbus/read/main.go b/client/examples/eventbus/read/main.go index e318a34cb..24fca5f7c 100644 --- a/client/examples/eventbus/read/main.go +++ b/client/examples/eventbus/read/main.go @@ -37,7 +37,10 @@ func main() { if err != nil { panic("invalid id") } - eb := c.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + eb, err := c.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + panic(err.Error()) + } ls, err := eb.ListLog(ctx) if err != nil { log.Print(err.Error()) diff --git a/client/internal/vanus/eventbus/config.go b/client/internal/eventbus/config.go similarity index 100% rename from client/internal/vanus/eventbus/config.go rename to client/internal/eventbus/config.go diff --git a/client/internal/vanus/eventbus/name_service.go b/client/internal/eventbus/name_service.go similarity index 100% rename from client/internal/vanus/eventbus/name_service.go rename to client/internal/eventbus/name_service.go diff --git a/client/internal/vanus/eventlog/config.go b/client/internal/eventlog/config.go similarity index 100% rename from client/internal/vanus/eventlog/config.go rename to client/internal/eventlog/config.go diff --git a/client/internal/vanus/eventlog/name_service.go b/client/internal/eventlog/name_service.go similarity index 100% rename from client/internal/vanus/eventlog/name_service.go rename to client/internal/eventlog/name_service.go diff --git a/client/internal/vanus/net/connection.go b/client/internal/net/connection.go similarity index 89% rename from client/internal/vanus/net/connection.go rename to client/internal/net/connection.go index 008bec852..f88b6c469 100644 --- a/client/internal/vanus/net/connection.go +++ b/client/internal/net/connection.go @@ -14,6 +14,6 @@ package net -import "github.com/vanus-labs/vanus/client/internal/vanus/net/connection" +import "github.com/vanus-labs/vanus/client/internal/net/connection" var Connect = connection.Connect diff --git a/client/internal/vanus/net/connection/connect.go b/client/internal/net/connection/connect.go similarity index 100% rename from client/internal/vanus/net/connection/connect.go rename to client/internal/net/connection/connect.go diff --git a/client/internal/vanus/net/connection/pool.go b/client/internal/net/connection/pool.go similarity index 100% rename from client/internal/vanus/net/connection/pool.go rename to client/internal/net/connection/pool.go diff --git a/client/internal/vanus/net/rpc/bare/client.go b/client/internal/net/rpc/bare/client.go similarity index 95% rename from client/internal/vanus/net/rpc/bare/client.go rename to client/internal/net/rpc/bare/client.go index 7b6f59442..45ce17443 100644 --- a/client/internal/vanus/net/rpc/bare/client.go +++ b/client/internal/net/rpc/bare/client.go @@ -31,8 +31,8 @@ import ( "github.com/vanus-labs/vanus/pkg/errors" // this project. - "github.com/vanus-labs/vanus/client/internal/vanus/net/connection" - "github.com/vanus-labs/vanus/client/internal/vanus/net/rpc" + "github.com/vanus-labs/vanus/client/internal/net/connection" + "github.com/vanus-labs/vanus/client/internal/net/rpc" ) const ( diff --git a/client/internal/vanus/net/rpc/client.go b/client/internal/net/rpc/client.go similarity index 100% rename from client/internal/vanus/net/rpc/client.go rename to client/internal/net/rpc/client.go diff --git a/client/internal/vanus/net/rpc/creator.go b/client/internal/net/rpc/creator.go similarity index 100% rename from client/internal/vanus/net/rpc/creator.go rename to client/internal/net/rpc/creator.go diff --git a/client/internal/vanus/store/alloc.go b/client/internal/store/alloc.go similarity index 100% rename from client/internal/vanus/store/alloc.go rename to client/internal/store/alloc.go diff --git a/client/internal/vanus/store/allocator.go b/client/internal/store/allocator.go similarity index 100% rename from client/internal/vanus/store/allocator.go rename to client/internal/store/allocator.go diff --git a/client/internal/vanus/store/block_store.go b/client/internal/store/block_store.go similarity index 96% rename from client/internal/vanus/store/block_store.go rename to client/internal/store/block_store.go index 5da4acc90..e9a97df53 100644 --- a/client/internal/vanus/store/block_store.go +++ b/client/internal/store/block_store.go @@ -29,8 +29,8 @@ import ( segpb "github.com/vanus-labs/vanus/proto/pkg/segment" // this project. - "github.com/vanus-labs/vanus/client/internal/vanus/net/rpc" - "github.com/vanus-labs/vanus/client/internal/vanus/net/rpc/bare" + "github.com/vanus-labs/vanus/client/internal/net/rpc" + "github.com/vanus-labs/vanus/client/internal/net/rpc/bare" "github.com/vanus-labs/vanus/client/pkg/primitive" ) diff --git a/client/pkg/api/client.go b/client/pkg/api/client.go index 721655c24..25d9cbb21 100644 --- a/client/pkg/api/client.go +++ b/client/pkg/api/client.go @@ -23,6 +23,13 @@ import ( "github.com/vanus-labs/vanus/proto/pkg/codec" ) +type CloseFunc func(id uint64) + +type Client interface { + Eventbus(ctx context.Context, opts ...EventbusOption) (Eventbus, error) + Disconnect(ctx context.Context) +} + type Eventbus interface { Writer(opts ...WriteOption) BusWriter Reader(opts ...ReadOption) BusReader diff --git a/client/pkg/api/mock_client.go b/client/pkg/api/mock_client.go index 6c61a5192..ac7b6594d 100644 --- a/client/pkg/api/mock_client.go +++ b/client/pkg/api/mock_client.go @@ -12,6 +12,61 @@ import ( cloudevents "github.com/vanus-labs/vanus/proto/pkg/cloudevents" ) +// MockClient is a mock of Client interface. +type MockClient struct { + ctrl *gomock.Controller + recorder *MockClientMockRecorder +} + +// MockClientMockRecorder is the mock recorder for MockClient. +type MockClientMockRecorder struct { + mock *MockClient +} + +// NewMockClient creates a new mock instance. +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder +} + +// Disconnect mocks base method. +func (m *MockClient) Disconnect(ctx context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Disconnect", ctx) +} + +// Disconnect indicates an expected call of Disconnect. +func (mr *MockClientMockRecorder) Disconnect(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnect", reflect.TypeOf((*MockClient)(nil).Disconnect), ctx) +} + +// Eventbus mocks base method. +func (m *MockClient) Eventbus(ctx context.Context, opts ...EventbusOption) (Eventbus, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Eventbus", varargs...) + ret0, _ := ret[0].(Eventbus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Eventbus indicates an expected call of Eventbus. +func (mr *MockClientMockRecorder) Eventbus(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Eventbus", reflect.TypeOf((*MockClient)(nil).Eventbus), varargs...) +} + // MockEventbus is a mock of Eventbus interface. type MockEventbus struct { ctrl *gomock.Controller @@ -252,7 +307,7 @@ func (mr *MockEventlogMockRecorder) EarliestOffset(ctx interface{}) *gomock.Call // ID mocks base method. func (m *MockEventlog) ID() uint64 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "VolumeID") + ret := m.ctrl.Call(m, "ID") ret0, _ := ret[0].(uint64) return ret0 } @@ -260,7 +315,7 @@ func (m *MockEventlog) ID() uint64 { // ID indicates an expected call of ID. func (mr *MockEventlogMockRecorder) ID() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "VolumeID", reflect.TypeOf((*MockEventlog)(nil).ID)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockEventlog)(nil).ID)) } // LatestOffset mocks base method. diff --git a/client/pkg/eventbus/eventbus.go b/client/pkg/eventbus/eventbus.go index 126e2e049..f4ab0c115 100644 --- a/client/pkg/eventbus/eventbus.go +++ b/client/pkg/eventbus/eventbus.go @@ -19,7 +19,6 @@ import ( "context" "encoding/base64" "encoding/binary" - stderrors "errors" "io" "sync" @@ -34,16 +33,18 @@ import ( "github.com/vanus-labs/vanus/proto/pkg/cloudevents" // this project. - eb "github.com/vanus-labs/vanus/client/internal/vanus/eventbus" - el "github.com/vanus-labs/vanus/client/internal/vanus/eventlog" + eb "github.com/vanus-labs/vanus/client/internal/eventbus" + el "github.com/vanus-labs/vanus/client/internal/eventlog" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventlog" "github.com/vanus-labs/vanus/client/pkg/policy" + "github.com/vanus-labs/vanus/client/pkg/primitive" ) -func NewEventbus(cfg *eb.Config) *eventbus { - bus := &eventbus{ +func NewEventbus(cfg *eb.Config, close api.CloseFunc) *Eventbus { + bus := &Eventbus{ cfg: cfg, + close: close, nameService: eb.NewNameService(cfg.Endpoints), writableLogSet: u64set.New(), readableLogSet: u64set.New(), @@ -53,6 +54,7 @@ func NewEventbus(cfg *eb.Config) *eventbus { readableMu: sync.RWMutex{}, writableState: nil, readableState: nil, + RefCount: primitive.RefCount{}, tracer: tracing.NewTracer("pkg.eventbus.impl", trace.SpanKindClient), } @@ -102,9 +104,11 @@ func NewEventbus(cfg *eb.Config) *eventbus { return bus } -type eventbus struct { +type Eventbus struct { cfg *eb.Config + close api.CloseFunc nameService *eb.NameService + closeOnce sync.Once writableWatcher *WritableLogsWatcher writableLogSet *u64set.Set @@ -118,20 +122,21 @@ type eventbus struct { readableMu sync.RWMutex readableState error + primitive.RefCount tracer *tracing.Tracer } // make sure eventbus implements api.Eventbus. -var _ api.Eventbus = (*eventbus)(nil) +var _ api.Eventbus = (*Eventbus)(nil) -func (b *eventbus) defaultWriteOptions() *api.WriteOptions { +func (b *Eventbus) defaultWriteOptions() *api.WriteOptions { return &api.WriteOptions{ Oneway: false, Policy: policy.NewRoundRobinWritePolicy(b), } } -func (b *eventbus) defaultReadOptions() *api.ReadOptions { +func (b *Eventbus) defaultReadOptions() *api.ReadOptions { return &api.ReadOptions{ BatchSize: 1, PollingTimeout: api.DefaultPollingTimeout, @@ -139,7 +144,7 @@ func (b *eventbus) defaultReadOptions() *api.ReadOptions { } } -func (b *eventbus) Writer(opts ...api.WriteOption) api.BusWriter { +func (b *Eventbus) Writer(opts ...api.WriteOption) api.BusWriter { writeOpts := b.defaultWriteOptions() for _, opt := range opts { opt(writeOpts) @@ -153,7 +158,7 @@ func (b *eventbus) Writer(opts ...api.WriteOption) api.BusWriter { return w } -func (b *eventbus) Reader(opts ...api.ReadOption) api.BusReader { +func (b *Eventbus) Reader(opts ...api.ReadOption) api.BusReader { readOpts := b.defaultReadOptions() for _, opt := range opts { opt(readOpts) @@ -167,7 +172,7 @@ func (b *eventbus) Reader(opts ...api.ReadOption) api.BusReader { return r } -func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOption) (api.Eventlog, error) { +func (b *Eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOption) (api.Eventlog, error) { _, span := b.tracer.Start(ctx, "pkg.eventbus.getlog") defer span.End() op := &api.LogOptions{ @@ -198,7 +203,7 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti } } -func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Eventlog, error) { +func (b *Eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Eventlog, error) { _, span := b.tracer.Start(ctx, "pkg.eventbus.listlog") defer span.End() op := &api.LogOptions{ @@ -231,35 +236,43 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev } } -func (b *eventbus) ID() uint64 { +func (b *Eventbus) ID() uint64 { return b.cfg.ID } -func (b *eventbus) Close(ctx context.Context) { - b.writableWatcher.Close() - b.readableWatcher.Close() - - for _, w := range b.writableLogs { - w.Close(ctx) - } - for _, r := range b.readableLogs { - r.Close(ctx) +func (b *Eventbus) Close(ctx context.Context) { + if b.Release() { + func() { + if b.UseCount() == 0 { // double check + b.closeOnce.Do(func() { + b.writableWatcher.Close() + b.readableWatcher.Close() + for _, w := range b.writableLogs { + w.Close(ctx) + } + for _, r := range b.readableLogs { + r.Close(ctx) + } + b.close(b.cfg.ID) + }) + } + }() } } -func (b *eventbus) getWritableState() error { +func (b *Eventbus) getWritableState() error { b.writableMu.RLock() defer b.writableMu.RUnlock() return b.writableState } -func (b *eventbus) setWritableState(err error) { +func (b *Eventbus) setWritableState(err error) { b.writableMu.Lock() defer b.writableMu.Unlock() b.writableState = err } -func (b *eventbus) isNeedUpdateWritableLogs(err error) bool { +func (b *Eventbus) isNeedUpdateWritableLogs(err error) bool { if err == nil { b.setWritableState(nil) return true @@ -271,7 +284,7 @@ func (b *eventbus) isNeedUpdateWritableLogs(err error) bool { return false } -func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResult) { +func (b *Eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResult) { _, span := b.tracer.Start(ctx, "updateWritableLogs") defer span.End() @@ -310,17 +323,21 @@ func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResul b.setWritableLogs(s, lws) } -func (b *eventbus) setWritableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventlog) { +func (b *Eventbus) setWritableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventlog) { b.writableMu.Lock() defer b.writableMu.Unlock() b.writableLogSet = s b.writableLogs = lws } -func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) eventlog.Eventlog { +func (b *Eventbus) getWritableLog(ctx context.Context, logID uint64) (eventlog.Eventlog, error) { b.writableMu.RLock() defer b.writableMu.RUnlock() + if errors.Is(b.writableState, errors.ErrResourceNotFound) { + return nil, errors.ErrResourceNotFound.WithMessage("eventbus not found") + } + if len(b.writableLogs) == 0 { func() { b.writableMu.RUnlock() @@ -329,29 +346,29 @@ func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) eventlog.Ev }() } - return b.writableLogs[logID] + return b.writableLogs[logID], nil } -func (b *eventbus) refreshWritableLogs(ctx context.Context) { +func (b *Eventbus) refreshWritableLogs(ctx context.Context) { _ctx, span := b.tracer.Start(ctx, "refreshWritableLogs") defer span.End() _ = b.writableWatcher.Refresh(_ctx) } -func (b *eventbus) getReadableState() error { +func (b *Eventbus) getReadableState() error { b.readableMu.RLock() defer b.readableMu.RUnlock() return b.readableState } -func (b *eventbus) setReadableState(err error) { +func (b *Eventbus) setReadableState(err error) { b.readableMu.Lock() defer b.readableMu.Unlock() b.readableState = err } -func (b *eventbus) isNeedUpdateReadableLogs(err error) bool { +func (b *Eventbus) isNeedUpdateReadableLogs(err error) bool { if err == nil { b.setReadableState(nil) return true @@ -363,7 +380,7 @@ func (b *eventbus) isNeedUpdateReadableLogs(err error) bool { return false } -func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResult) { +func (b *Eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResult) { _, span := b.tracer.Start(ctx, "updateReadableLogs") defer span.End() @@ -402,17 +419,21 @@ func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResul b.setReadableLogs(s, lws) } -func (b *eventbus) setReadableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventlog) { +func (b *Eventbus) setReadableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventlog) { b.readableMu.Lock() defer b.readableMu.Unlock() b.readableLogSet = s b.readableLogs = lws } -func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) eventlog.Eventlog { +func (b *Eventbus) getReadableLog(ctx context.Context, logID uint64) (eventlog.Eventlog, error) { b.readableMu.RLock() defer b.readableMu.RUnlock() + if errors.Is(b.readableState, errors.ErrResourceNotFound) { + return nil, errors.ErrResourceNotFound.WithMessage("eventbus not found") + } + if len(b.readableLogs) == 0 { func() { b.readableMu.RUnlock() @@ -421,10 +442,10 @@ func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) eventlog.Ev }() } - return b.readableLogs[logID] + return b.readableLogs[logID], nil } -func (b *eventbus) refreshReadableLogs(ctx context.Context) { +func (b *Eventbus) refreshReadableLogs(ctx context.Context) { _ctx, span := b.tracer.Start(ctx, "refreshReadableLogs") defer span.End() @@ -432,7 +453,7 @@ func (b *eventbus) refreshReadableLogs(ctx context.Context) { } type busWriter struct { - ebus *eventbus + ebus *Eventbus opts *api.WriteOptions tracer *tracing.Tracer } @@ -487,9 +508,12 @@ func (w *busWriter) pickWritableLog(ctx context.Context, opts *api.WriteOptions) return nil, err } - lw := w.ebus.getWritableLog(_ctx, l.ID()) + lw, err := w.ebus.getWritableLog(_ctx, l.ID()) + if err != nil { + return nil, err + } if lw == nil { - return nil, stderrors.New("can not pick writable log") + return nil, errors.ErrResourceCanNotOp.WithMessage("can not pick writable log") } return lw.Writer(), nil @@ -504,7 +528,7 @@ func genEventID(logID uint64, off int64) string { } type busReader struct { - ebus *eventbus + ebus *Eventbus opts *api.ReadOptions tracer *tracing.Tracer } @@ -561,9 +585,12 @@ func (r *busReader) pickReadableLog(ctx context.Context, opts *api.ReadOptions) if err != nil { return nil, err } - lr := r.ebus.getReadableLog(_ctx, l.ID()) + lr, err := r.ebus.getReadableLog(_ctx, l.ID()) + if err != nil { + return nil, err + } if lr == nil { - return nil, stderrors.New("can not pick readable log") + return nil, errors.ErrResourceCanNotOp.WithMessage("can not pick readable log") } return lr.Reader(eventlog.ReaderConfig{PollingTimeout: opts.PollingTimeout}), nil diff --git a/client/pkg/eventbus/lookup.go b/client/pkg/eventbus/lookup.go index 51835e30c..551112016 100644 --- a/client/pkg/eventbus/lookup.go +++ b/client/pkg/eventbus/lookup.go @@ -45,7 +45,7 @@ func (w *WritableLogsWatcher) Start() { go w.Watcher.Run() } -func WatchWritableLogs(bus *eventbus) *WritableLogsWatcher { +func WatchWritableLogs(bus *Eventbus) *WritableLogsWatcher { ch := make(chan *WritableLogsResult, 1) w := primitive.NewWatcher(30*time.Second, func() { rs, err := bus.nameService.LookupWritableLogs(context.Background(), bus.cfg.ID) @@ -86,7 +86,7 @@ func (w *ReadableLogsWatcher) Start() { go w.Watcher.Run() } -func WatchReadableLogs(bus *eventbus) *ReadableLogsWatcher { +func WatchReadableLogs(bus *Eventbus) *ReadableLogsWatcher { ch := make(chan *ReadableLogsResult, 1) w := primitive.NewWatcher(30*time.Second, func() { rs, err := bus.nameService.LookupReadableLogs(context.Background(), bus.cfg.ID) diff --git a/client/pkg/eventlog/eventlog_impl.go b/client/pkg/eventlog/eventlog_impl.go index 9b2e213fa..f7774fb36 100644 --- a/client/pkg/eventlog/eventlog_impl.go +++ b/client/pkg/eventlog/eventlog_impl.go @@ -32,7 +32,7 @@ import ( "github.com/vanus-labs/vanus/proto/pkg/cloudevents" // this project. - el "github.com/vanus-labs/vanus/client/internal/vanus/eventlog" + el "github.com/vanus-labs/vanus/client/internal/eventlog" "github.com/vanus-labs/vanus/client/pkg/record" ) diff --git a/client/pkg/eventlog/segment_block.go b/client/pkg/eventlog/segment_block.go index 55b495418..722eaa4fd 100644 --- a/client/pkg/eventlog/segment_block.go +++ b/client/pkg/eventlog/segment_block.go @@ -24,7 +24,7 @@ import ( "github.com/vanus-labs/vanus/proto/pkg/cloudevents" // this project. - "github.com/vanus-labs/vanus/client/internal/vanus/store" + "github.com/vanus-labs/vanus/client/internal/store" "github.com/vanus-labs/vanus/client/pkg/record" ) diff --git a/internal/controller/trigger/controller.go b/internal/controller/trigger/controller.go index 73fd0458a..a0d41a78a 100644 --- a/internal/controller/trigger/controller.go +++ b/internal/controller/trigger/controller.go @@ -25,6 +25,8 @@ import ( "time" eb "github.com/vanus-labs/vanus/client" + "github.com/vanus-labs/vanus/client/pkg/api" + "github.com/vanus-labs/vanus/internal/controller/member" "github.com/vanus-labs/vanus/internal/controller/trigger/metadata" "github.com/vanus-labs/vanus/internal/controller/trigger/secret" @@ -81,7 +83,7 @@ type controller struct { stopFunc context.CancelFunc state primitive.ServerState cl cluster.Cluster - ebClient eb.Client + ebClient api.Client } func (ctrl *controller) SetDeadLetterEventOffset( diff --git a/internal/controller/trigger/subscription/offset.go b/internal/controller/trigger/subscription/offset.go index a2a0946f6..6d65f7de0 100644 --- a/internal/controller/trigger/subscription/offset.go +++ b/internal/controller/trigger/subscription/offset.go @@ -160,7 +160,11 @@ func (m *manager) SaveDeadLetterOffset(ctx context.Context, id vanus.ID, offset func (m *manager) getOffsetFromCli(ctx context.Context, eventbusID vanus.ID, config primitive.SubscriptionConfig, ) (info.ListOffsetInfo, error) { - logs, err := m.ebCli.Eventbus(ctx, api.WithID(eventbusID.Uint64())).ListLog(ctx) + eb, err := m.ebCli.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + return nil, err + } + logs, err := eb.ListLog(ctx) if err != nil { return nil, err } diff --git a/internal/controller/trigger/subscription/offset_test.go b/internal/controller/trigger/subscription/offset_test.go index 03455e385..e5443476c 100644 --- a/internal/controller/trigger/subscription/offset_test.go +++ b/internal/controller/trigger/subscription/offset_test.go @@ -22,7 +22,6 @@ import ( "github.com/golang/mock/gomock" . "github.com/smartystreets/goconvey/convey" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/internal/controller/trigger/metadata" "github.com/vanus-labs/vanus/internal/controller/trigger/secret" @@ -42,7 +41,7 @@ func TestSaveOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -83,7 +82,7 @@ func TestGetOrSaveOffsetOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -129,7 +128,7 @@ func TestGetOrSaveOffsetOffset(t *testing.T) { offsetManager.EXPECT().Offset(gomock.Any(), gomock.Any(), gomock.Any(), true).AnyTimes().Return(nil) mockEventbus := api.NewMockEventbus(ctrl) mockEventlog := api.NewMockEventlog(ctrl) - ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().ListLog(gomock.Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockEventlog.EXPECT().ID().AnyTimes().Return(logID.Uint64()) mockEventlog.EXPECT().LatestOffset(gomock.Any()).AnyTimes().Return(int64(offsetV), nil) @@ -173,7 +172,7 @@ func TestGetOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -224,7 +223,7 @@ func TestResetOffsetByTimestamp(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -245,7 +244,7 @@ func TestResetOffsetByTimestamp(t *testing.T) { offsetManager.EXPECT().Offset(ctx, id, gomock.Any(), true).Return(nil) mockEventbus := api.NewMockEventbus(ctrl) mockEventlog := api.NewMockEventlog(ctrl) - ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().ListLog(gomock.Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockEventlog.EXPECT().ID().AnyTimes().Return(logID.Uint64()) time := uint64(time.Now().Unix()) @@ -262,7 +261,7 @@ func TestGetDeadLetterOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -316,7 +315,7 @@ func TestGetDeadLetterOffset(t *testing.T) { offsetManager.EXPECT().Offset(gomock.Any(), gomock.Any(), gomock.Any(), true).AnyTimes().Return(nil) mockEventbus := api.NewMockEventbus(ctrl) mockEventlog := api.NewMockEventlog(ctrl) - ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().ListLog(gomock.Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockEventlog.EXPECT().ID().AnyTimes().Return(logID.Uint64()) @@ -338,7 +337,7 @@ func TestSaveDeadLetterOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -370,7 +369,7 @@ func TestSaveDeadLetterOffset(t *testing.T) { Convey("dead letter eventlogID hasn't init", func() { mockEventbus := api.NewMockEventbus(ctrl) mockEventlog := api.NewMockEventlog(ctrl) - ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().ListLog(gomock.Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockEventlog.EXPECT().ID().AnyTimes().Return(logID.Uint64()) err := m.SaveDeadLetterOffset(ctx, id, offsetV) diff --git a/internal/controller/trigger/subscription/subscription.go b/internal/controller/trigger/subscription/subscription.go index 7b82588fc..702dbe236 100644 --- a/internal/controller/trigger/subscription/subscription.go +++ b/internal/controller/trigger/subscription/subscription.go @@ -22,7 +22,7 @@ import ( "sync" "time" - eb "github.com/vanus-labs/vanus/client" + "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/internal/controller/trigger/metadata" "github.com/vanus-labs/vanus/internal/controller/trigger/secret" "github.com/vanus-labs/vanus/internal/controller/trigger/storage" @@ -64,7 +64,7 @@ const ( type manager struct { cl cluster.Cluster - ebCli eb.Client + ebCli api.Client secretStorage secret.Storage storage storage.Storage offsetManager offset.Manager @@ -80,7 +80,7 @@ type manager struct { } func NewSubscriptionManager(storage storage.Storage, secretStorage secret.Storage, - ebCli eb.Client, cl cluster.Cluster) Manager { + ebCli api.Client, cl cluster.Cluster) Manager { m := &manager{ cl: cl, ebCli: ebCli, diff --git a/internal/controller/trigger/subscription/subscription_test.go b/internal/controller/trigger/subscription/subscription_test.go index c7e357aea..a75e70c29 100644 --- a/internal/controller/trigger/subscription/subscription_test.go +++ b/internal/controller/trigger/subscription/subscription_test.go @@ -22,7 +22,7 @@ import ( "github.com/golang/mock/gomock" . "github.com/smartystreets/goconvey/convey" - "github.com/vanus-labs/vanus/client" + "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/internal/controller/trigger/metadata" "github.com/vanus-labs/vanus/internal/controller/trigger/secret" "github.com/vanus-labs/vanus/internal/controller/trigger/storage" @@ -39,7 +39,7 @@ func TestSubscriptionInit(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl) @@ -68,7 +68,7 @@ func TestGetListSubscription(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl) id := vanus.NewTestID() diff --git a/internal/gateway/gateway_test.go b/internal/gateway/gateway_test.go index 3b51c8cfc..cab28a0c9 100644 --- a/internal/gateway/gateway_test.go +++ b/internal/gateway/gateway_test.go @@ -28,7 +28,6 @@ import ( . "github.com/prashantv/gostub" . "github.com/smartystreets/goconvey/convey" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/internal/primitive/vanus" "github.com/vanus-labs/vanus/pkg/cluster" @@ -189,10 +188,10 @@ func TestGateway_EventID(t *testing.T) { port = 8087 ) - mockClient := client.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) mockEventbus := api.NewMockEventbus(ctrl) mockBusWriter := api.NewMockBusWriter(ctrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{"AABBCC"}, nil) diff --git a/internal/gateway/proxy/deadletter.go b/internal/gateway/proxy/deadletter.go index 77f312ef0..5c609ed33 100644 --- a/internal/gateway/proxy/deadletter.go +++ b/internal/gateway/proxy/deadletter.go @@ -76,7 +76,11 @@ func (cp *ControllerProxy) GetDeadLetterEvent( if err != nil { return nil, err } - ls, err := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())).ListLog(ctx) + eb, err := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())) + if err != nil { + return nil, err + } + ls, err := eb.ListLog(ctx) if err != nil { return nil, err } @@ -93,7 +97,7 @@ func (cp *ControllerProxy) GetDeadLetterEvent( } readPolicy := policy.NewManuallyReadPolicy(ls[0], int64(offset)) - busReader := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())).Reader( + busReader := eb.Reader( option.WithDisablePolling(), option.WithReadPolicy(readPolicy), option.WithBatchSize(int(num)), @@ -182,7 +186,11 @@ func (cp *ControllerProxy) ResendDeadLetterEvent( //nolint:funlen // ok if err != nil { return nil, err } - ls, err := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())).ListLog(ctx) + eb, err := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())) + if err != nil { + return nil, err + } + ls, err := eb.ListLog(ctx) if err != nil { return nil, err } @@ -198,7 +206,7 @@ func (cp *ControllerProxy) ResendDeadLetterEvent( //nolint:funlen // ok fmt.Sprintf("end_offset is invalid, param is %d it but start is %d", offset, req.GetEndOffset())) } readPolicy := policy.NewManuallyReadPolicy(ls[0], int64(offset)) - busReader := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())).Reader( + busReader := eb.Reader( option.WithDisablePolling(), option.WithReadPolicy(readPolicy), option.WithBatchSize(readSize), diff --git a/internal/gateway/proxy/proxy.go b/internal/gateway/proxy/proxy.go index 57b37c501..69e8dd21c 100644 --- a/internal/gateway/proxy/proxy.go +++ b/internal/gateway/proxy/proxy.go @@ -140,7 +140,7 @@ func (s *subscribeCache) stream() proxypb.StoreProxy_SubscribeServer { type ControllerProxy struct { cfg Config tracer *tracing.Tracer - client eb.Client + client api.Client eventbusCtrl ctrlpb.EventbusControllerClient eventlogCtrl ctrlpb.EventlogControllerClient triggerCtrl ctrlpb.TriggerControllerClient @@ -237,8 +237,11 @@ func (cp *ControllerProxy) writeEvents( ) error { val, exist := cp.writerMap.Load(eventbusID) if !exist { - val, _ = cp.writerMap.LoadOrStore(eventbusID, - cp.client.Eventbus(ctx, api.WithID(eventbusID.Uint64())).Writer()) + eb, err := cp.client.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + return v2.NewHTTPResult(http.StatusInternalServerError, err.Error()) + } + val, _ = cp.writerMap.LoadOrStore(eventbusID, eb.Writer()) } w, _ := val.(api.BusWriter) _, err := w.Append(ctx, events) @@ -554,7 +557,7 @@ func NewControllerProxy(cfg Config) *ControllerProxy { } // SetClient just for test. -func (cp *ControllerProxy) SetClient(client eb.Client) { +func (cp *ControllerProxy) SetClient(client api.Client) { cp.client = client } @@ -708,16 +711,20 @@ func authLookupOffset(_ context.Context, req interface{}) (authorization.Resourc func (cp *ControllerProxy) LookupOffset( ctx context.Context, req *proxypb.LookupOffsetRequest, ) (*proxypb.LookupOffsetResponse, error) { + eb, err := cp.client.Eventbus(ctx, api.WithID(req.EventbusId)) + if err != nil { + return nil, err + } elList := make([]api.Eventlog, 0) if req.EventlogId > 0 { id := vanus.NewIDFromUint64(req.EventlogId) - l, err := cp.client.Eventbus(ctx, api.WithID(req.EventbusId)).GetLog(ctx, id.Uint64()) + l, err := eb.GetLog(ctx, id.Uint64()) if err != nil { return nil, err } elList = append(elList, l) } else { - ls, err := cp.client.Eventbus(ctx, api.WithID(req.EventbusId)).ListLog(ctx) + ls, err := eb.ListLog(ctx) if err != nil { return nil, err } @@ -770,11 +777,15 @@ func (cp *ControllerProxy) GetEvent( num = maximumNumberPerGetRequest } - ls, err := cp.client.Eventbus(ctx, api.WithID(vid.Uint64())).ListLog(ctx) + eb, err := cp.client.Eventbus(ctx, api.WithID(vid.Uint64())) + if err != nil { + return nil, err + } + ls, err := eb.ListLog(ctx) if err != nil { return nil, err } - reader := cp.client.Eventbus(ctx, api.WithID(vid.Uint64())).Reader( + reader := eb.Reader( option.WithDisablePolling(), option.WithReadPolicy(policy.NewManuallyReadPolicy(ls[0], offset)), option.WithBatchSize(int(num)), @@ -856,12 +867,17 @@ func (cp *ControllerProxy) getByEventID( return nil, err } - l, err := cp.client.Eventbus(ctx, api.WithID(req.GetEventbusId())).GetLog(ctx, logID) + eb, err := cp.client.Eventbus(ctx, api.WithID(req.GetEventbusId())) + if err != nil { + return nil, err + } + + l, err := eb.GetLog(ctx, logID) if err != nil { return nil, err } - reader := cp.client.Eventbus(ctx, api.WithID(req.GetEventbusId())).Reader( + reader := eb.Reader( option.WithReadPolicy(policy.NewManuallyReadPolicy(l, off)), option.WithDisablePolling(), ) diff --git a/internal/gateway/proxy/proxy_test.go b/internal/gateway/proxy/proxy_test.go index 645af768d..24ebe987a 100644 --- a/internal/gateway/proxy/proxy_test.go +++ b/internal/gateway/proxy/proxy_test.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/emptypb" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/policy" "github.com/vanus-labs/vanus/internal/convert" @@ -58,10 +57,10 @@ func TestControllerProxy_GetEvent(t *testing.T) { Credentials: insecure.NewCredentials(), }) ctrl := gomock.NewController(t) - mockClient := client.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) cp.client = mockClient utEB1 := api.NewMockEventbus(ctrl) - mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(utEB1) + mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(utEB1, nil) Convey("test invalid params", func() { res, err := cp.GetEvent(stdCtx.Background(), &proxypb.GetEventRequest{ @@ -360,7 +359,7 @@ func TestControllerProxy_ValidateSubscription(t *testing.T) { }) ctrl := gomock.NewController(t) - cli := client.NewMockClient(ctrl) + cli := api.NewMockClient(ctrl) cp.client = cli eb := api.NewMockEventbus(ctrl) mockTriggerCtrl := ctrlpb.NewMockTriggerControllerClient(ctrl) @@ -369,7 +368,7 @@ func TestControllerProxy_ValidateSubscription(t *testing.T) { ctx := stdCtx.Background() // mock eventbus - cli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).Times(2).Return(eb) + cli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).Times(1).Return(eb, nil) eb.EXPECT().ListLog(gomock.Any()).Times(1).Return([]api.Eventlog{nil}, nil) rd := api.NewMockBusReader(ctrl) eb.EXPECT().Reader(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(rd) diff --git a/internal/timer/timingwheel/bucket.go b/internal/timer/timingwheel/bucket.go index 92940f6a5..055ba2671 100644 --- a/internal/timer/timingwheel/bucket.go +++ b/internal/timer/timingwheel/bucket.go @@ -24,7 +24,8 @@ import ( "time" ce "github.com/cloudevents/sdk-go/v2" - "github.com/vanus-labs/vanus/client" + "k8s.io/apimachinery/pkg/util/wait" + "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/option" "github.com/vanus-labs/vanus/client/pkg/policy" @@ -33,7 +34,6 @@ import ( "github.com/vanus-labs/vanus/internal/timer/metadata" "github.com/vanus-labs/vanus/observability/log" "github.com/vanus-labs/vanus/pkg/errors" - "k8s.io/apimachinery/pkg/util/wait" ) const ( @@ -97,7 +97,7 @@ type bucket struct { wg sync.WaitGroup exitC chan struct{} kvStore kv.Client - client client.Client + client api.Client eventbusWriter api.BusWriter eventbusReader api.BusReader @@ -139,8 +139,9 @@ func (b *bucket) start(ctx context.Context) error { if err = b.createEventbus(ctx); err != nil { return err } - - b.connectEventbus(ctx) + if err = b.connectEventbus(ctx); err != nil { + return err + } b.run(ctx) return nil } @@ -340,9 +341,14 @@ func (b *bucket) createEventbus(ctx context.Context) error { return nil } -func (b *bucket) connectEventbus(ctx context.Context) { - b.eventbusWriter = b.client.Eventbus(ctx, api.WithName(b.eventbus)).Writer() - b.eventbusReader = b.client.Eventbus(ctx, api.WithName(b.eventbus)).Reader() +func (b *bucket) connectEventbus(ctx context.Context) error { + eb, err := b.client.Eventbus(ctx, api.WithName(b.eventbus)) + if err != nil { + return err + } + b.eventbusWriter = eb.Writer() + b.eventbusReader = eb.Reader() + return nil } func (b *bucket) putEvent(ctx context.Context, tm *timingMsg) (err error) { @@ -386,7 +392,13 @@ func (b *bucket) getEvent(ctx context.Context, number int16) (events []*ce.Event time.Sleep(time.Second) return []*ce.Event{}, errors.ErrOffsetOnEnd } - ls, err := b.client.Eventbus(ctx, api.WithName(b.eventbus)).ListLog(ctx) + + eb, err := b.client.Eventbus(ctx, api.WithName(b.eventbus)) + if err != nil { + return []*ce.Event{}, err + } + + ls, err := eb.ListLog(ctx) if err != nil { return []*ce.Event{}, err } diff --git a/internal/timer/timingwheel/bucket_test.go b/internal/timer/timingwheel/bucket_test.go index 94ade65f4..1b22bdcf6 100644 --- a/internal/timer/timingwheel/bucket_test.go +++ b/internal/timer/timingwheel/bucket_test.go @@ -83,11 +83,11 @@ func TestBucket_start(t *testing.T) { bucket := newBucket(tw, nil, 1, "", 1, 0) bucket.timingwheel.leader = true mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) bucket.client = mockClient @@ -125,12 +125,12 @@ func TestBucket_run(t *testing.T) { bucket := newBucket(tw, nil, time.Second, "", 1, 0) bucket.timingwheel = tw mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) mockStoreCli := kv.NewMockClient(mockCtrl) @@ -273,11 +273,11 @@ func TestBucket_connectEventbus(t *testing.T) { tw := newtimingwheel(cfg()) bucket := newBucket(tw, nil, 1, "", 1, 0) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) bucket.client = mockClient @@ -321,12 +321,12 @@ func TestBucket_getEvent(t *testing.T) { bucket := newBucket(tw, nil, 1, "", 1, 0) bucket.timingwheel = tw mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) bucket.eventbusReader = mockBusReader diff --git a/internal/timer/timingwheel/timingwheel.go b/internal/timer/timingwheel/timingwheel.go index e27f9f0bb..e82b5b75d 100644 --- a/internal/timer/timingwheel/timingwheel.go +++ b/internal/timer/timingwheel/timingwheel.go @@ -78,7 +78,7 @@ type timingWheel struct { config *Config kvStore kv.Client ctrlCli ctrlpb.EventbusControllerClient - client client.Client + client api.Client cache sync.Map twList *list.List // element: *timingWheelElement @@ -349,7 +349,9 @@ func (tw *timingWheel) startReceivingStation(ctx context.Context) error { return err } - tw.getReceivingStation().connectEventbus(ctx) + if err = tw.getReceivingStation().connectEventbus(ctx); err != nil { + return err + } tw.runReceivingStation(ctx) return nil } @@ -459,7 +461,9 @@ func (tw *timingWheel) startDistributionStation(ctx context.Context) error { return err } - tw.getDistributionStation().connectEventbus(ctx) + if err = tw.getDistributionStation().connectEventbus(ctx); err != nil { + return err + } tw.runDistributionStation(ctx) return nil } @@ -580,7 +584,11 @@ func (tw *timingWheel) deliver(ctx context.Context, e *ce.Event) error { } v, exist := tw.cache.Load(eventbusID) if !exist { - v, _ = tw.cache.LoadOrStore(eventbusID, tw.client.Eventbus(ctx, api.WithID(eventbusID.Uint64())).Writer()) + eb, err := tw.client.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + return err + } + v, _ = tw.cache.LoadOrStore(eventbusID, eb.Writer()) } writer, _ := v.(api.BusWriter) _, err = api.AppendOne(ctx, writer, e) diff --git a/internal/timer/timingwheel/timingwheel_test.go b/internal/timer/timingwheel/timingwheel_test.go index 4d4d6f872..e05d9b4ca 100644 --- a/internal/timer/timingwheel/timingwheel_test.go +++ b/internal/timer/timingwheel/timingwheel_test.go @@ -56,12 +56,12 @@ func TestTimingWheel_Start(t *testing.T) { tw := newtimingwheel(cfg()) tw.SetLeader(true) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) @@ -295,7 +295,7 @@ func TestTimingWheel_runReceivingStation(t *testing.T) { tw := newtimingwheel(cfg()) tw.SetLeader(true) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) @@ -309,7 +309,7 @@ func TestTimingWheel_runReceivingStation(t *testing.T) { tw.receivingStation.kvStore = mockStoreCli tw.receivingStation.timingwheel = tw tw.distributionStation.timingwheel = tw - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) @@ -358,12 +358,12 @@ func TestTimingWheel_startDistributionStation(t *testing.T) { tw := newtimingwheel(cfg()) tw.SetLeader(true) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) // mockEventlog := eventlog.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) mockEventbusCtrlCli := ctrlpb.NewMockEventbusControllerClient(mockCtrl) @@ -400,13 +400,13 @@ func TestTimingWheel_runDistributionStation(t *testing.T) { tw := newtimingwheel(cfg()) tw.SetLeader(true) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) mockStoreCli := kv.NewMockClient(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tw.client = mockClient @@ -461,11 +461,11 @@ func TestTimingWheel_deliver(t *testing.T) { e := event(2000) tw := newtimingwheel(cfg()) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tw.client = mockClient @@ -545,13 +545,13 @@ func TestTimingWheelElement_pushBack(t *testing.T) { ctx := context.Background() tw := newtimingwheel(cfg()) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockStoreCli := kv.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) // mockEventlog := eventlog.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) diff --git a/internal/trigger/reader/reader.go b/internal/trigger/reader/reader.go index 542d7bdc8..995017838 100644 --- a/internal/trigger/reader/reader.go +++ b/internal/trigger/reader/reader.go @@ -23,7 +23,6 @@ import ( "time" ce "github.com/cloudevents/sdk-go/v2" - eb "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventlog" "github.com/vanus-labs/vanus/client/pkg/option" @@ -47,7 +46,7 @@ const ( type Config struct { EventbusID vanus.ID - Client eb.Client + Client api.Client SubscriptionID vanus.ID SubscriptionIDStr string Offset EventlogOffset @@ -91,7 +90,11 @@ func (r *reader) Start() error { r.stop = cancel timeoutCtx, cancel := context.WithTimeout(ctx, lookupReadableLogsTimeout) defer cancel() - logs, err := r.config.Client.Eventbus(timeoutCtx, api.WithID(r.config.EventbusID.Uint64())).ListLog(timeoutCtx) + eb, err := r.config.Client.Eventbus(timeoutCtx, api.WithID(r.config.EventbusID.Uint64())) + if err != nil { + return err + } + logs, err := eb.ListLog(timeoutCtx) if err != nil { log.Warn(ctx).Err(err). Stringer(log.KeyEventbusID, r.config.EventbusID). @@ -151,7 +154,11 @@ type eventlogReader struct { } func (elReader *eventlogReader) run(ctx context.Context) { - r := elReader.config.Client.Eventbus(ctx, api.WithID(elReader.config.EventbusID.Uint64())).Reader( + eb, err := elReader.config.Client.Eventbus(ctx, api.WithID(elReader.config.EventbusID.Uint64())) + if err != nil { + return + } + r := eb.Reader( option.WithReadPolicy(elReader.policy), option.WithBatchSize(elReader.config.BatchSize)) log.Info(ctx). Stringer(log.KeyEventbusID, elReader.config.EventbusID). diff --git a/internal/trigger/reader/reader_test.go b/internal/trigger/reader/reader_test.go index ac1a9b744..ca5fcb48b 100644 --- a/internal/trigger/reader/reader_test.go +++ b/internal/trigger/reader/reader_test.go @@ -26,7 +26,6 @@ import ( "github.com/google/uuid" . "github.com/smartystreets/goconvey/convey" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventlog" "github.com/vanus-labs/vanus/internal/primitive/vanus" @@ -39,12 +38,12 @@ import ( func TestReaderStart(t *testing.T) { mockCtrl := NewController(t) defer mockCtrl.Finish() - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader(Any(), Any()).AnyTimes().Return(mockBusReader) mockEventbus.EXPECT().GetLog(Any(), Any()).AnyTimes().Return(mockEventlog, nil) diff --git a/internal/trigger/trigger/trigger.go b/internal/trigger/trigger/trigger.go index 6fb00026d..c5459eecf 100644 --- a/internal/trigger/trigger/trigger.go +++ b/internal/trigger/trigger/trigger.go @@ -73,7 +73,7 @@ type trigger struct { sendCh chan *toSendEvent batchSendCh chan []*toSendEvent eventCli client.EventClient - client eb.Client + client api.Client filter filter.Filter transformer *transform.Transformer rateLimiter ratelimit.Limiter @@ -550,9 +550,17 @@ func (t *trigger) Init(ctx context.Context) error { t.eventCli = newEventClient(t.subscription.Sink, t.subscription.Protocol, t.subscription.SinkCredential) t.client = eb.Connect(t.config.Controllers) - t.timerEventWriter = t.client.Eventbus(ctx, api.WithID(t.subscription.TimerEventbusID.Uint64())).Writer() + eb, err := t.client.Eventbus(ctx, api.WithID(t.subscription.TimerEventbusID.Uint64())) + if err != nil { + return err + } + t.timerEventWriter = eb.Writer() if !t.config.DisableDeadLetter { - t.dlEventWriter = t.client.Eventbus(ctx, api.WithID(t.subscription.DeadLetterEventbusID.Uint64())).Writer() + eb, err = t.client.Eventbus(ctx, api.WithID(t.subscription.DeadLetterEventbusID.Uint64())) + if err != nil { + return err + } + t.dlEventWriter = eb.Writer() } t.eventCh = make(chan info.EventRecord, t.config.BufferSize) t.sendCh = make(chan *toSendEvent, t.config.BufferSize) diff --git a/internal/trigger/trigger/trigger_test.go b/internal/trigger/trigger/trigger_test.go index 13de9155d..9ac190cd6 100644 --- a/internal/trigger/trigger/trigger_test.go +++ b/internal/trigger/trigger/trigger_test.go @@ -29,7 +29,6 @@ import ( "github.com/google/uuid" . "github.com/smartystreets/goconvey/convey" - eb "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/proto/pkg/cloudevents" @@ -76,11 +75,11 @@ func TestTriggerStartStop(t *testing.T) { r := reader.NewMockReader(ctrl) r2 := reader.NewMockReader(ctrl) ctx := context.Background() - mockClient := eb.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) mockEventbus := api.NewMockEventbus(ctrl) mockBusWriter := api.NewMockBusWriter(ctrl) mockBusReader := api.NewMockBusReader(ctrl) - mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tg.client = mockClient @@ -108,11 +107,11 @@ func TestTriggerWriteFailEvent(t *testing.T) { ctx := context.Background() id := vanus.NewTestID() tg := NewTrigger(makeSubscription(id), WithControllers([]string{"test"})).(*trigger) - mockClient := eb.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) mockEventbus := api.NewMockEventbus(ctrl) mockBusWriter := api.NewMockBusWriter(ctrl) mockBusReader := api.NewMockBusReader(ctrl) - mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tg.client = mockClient @@ -163,11 +162,11 @@ func TestTriggerRunEventSend(t *testing.T) { ctx := context.Background() id := vanus.NewTestID() tg := NewTrigger(makeSubscription(id), WithControllers([]string{"test"})).(*trigger) - mockClient := eb.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) mockEventbus := api.NewMockEventbus(ctrl) mockBusWriter := api.NewMockBusWriter(ctrl) mockBusReader := api.NewMockBusReader(ctrl) - mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tg.client = mockClient