From b777d52f49202e9e71988f4db94ecaaccb0174b4 Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Sun, 26 Mar 2023 14:09:06 +0800 Subject: [PATCH 1/3] refactor: add eventbus state check Signed-off-by: jyjiangkai --- client/client.go | 49 ++++++++------- client/examples/eventbus/append/main.go | 6 +- client/examples/eventbus/read/main.go | 5 +- .../internal/{vanus => }/eventbus/config.go | 0 .../{vanus => }/eventbus/name_service.go | 0 .../internal/{vanus => }/eventlog/config.go | 0 .../{vanus => }/eventlog/name_service.go | 0 client/internal/{vanus => }/net/connection.go | 2 +- .../{vanus => }/net/connection/connect.go | 0 .../{vanus => }/net/connection/pool.go | 0 .../{vanus => }/net/rpc/bare/client.go | 4 +- client/internal/{vanus => }/net/rpc/client.go | 0 .../internal/{vanus => }/net/rpc/creator.go | 0 client/internal/{vanus => }/store/alloc.go | 0 .../internal/{vanus => }/store/allocator.go | 0 .../internal/{vanus => }/store/block_store.go | 4 +- client/pkg/api/client.go | 7 +++ client/pkg/api/mock_client.go | 59 ++++++++++++++++++- client/pkg/eventbus/eventbus.go | 58 ++++++++++-------- client/pkg/eventlog/eventlog_impl.go | 2 +- client/pkg/eventlog/segment_block.go | 2 +- internal/controller/trigger/controller.go | 3 +- .../controller/trigger/subscription/offset.go | 6 +- .../trigger/subscription/offset_test.go | 21 ++++--- .../trigger/subscription/subscription.go | 6 +- .../trigger/subscription/subscription_test.go | 6 +- internal/gateway/gateway_test.go | 5 +- internal/gateway/proxy/deadletter.go | 16 +++-- internal/gateway/proxy/proxy.go | 36 +++++++---- internal/gateway/proxy/proxy_test.go | 9 ++- internal/timer/timingwheel/bucket.go | 27 ++++++--- internal/timer/timingwheel/bucket_test.go | 16 ++--- internal/timer/timingwheel/timingwheel.go | 16 +++-- .../timer/timingwheel/timingwheel_test.go | 24 ++++---- internal/trigger/reader/reader.go | 15 +++-- internal/trigger/reader/reader_test.go | 5 +- internal/trigger/trigger/trigger.go | 14 ++++- internal/trigger/trigger/trigger_test.go | 13 ++-- 38 files changed, 286 insertions(+), 150 deletions(-) rename client/internal/{vanus => }/eventbus/config.go (100%) rename client/internal/{vanus => }/eventbus/name_service.go (100%) rename client/internal/{vanus => }/eventlog/config.go (100%) rename client/internal/{vanus => }/eventlog/name_service.go (100%) rename client/internal/{vanus => }/net/connection.go (89%) rename client/internal/{vanus => }/net/connection/connect.go (100%) rename client/internal/{vanus => }/net/connection/pool.go (100%) rename client/internal/{vanus => }/net/rpc/bare/client.go (95%) rename client/internal/{vanus => }/net/rpc/client.go (100%) rename client/internal/{vanus => }/net/rpc/creator.go (100%) rename client/internal/{vanus => }/store/alloc.go (100%) rename client/internal/{vanus => }/store/allocator.go (100%) rename client/internal/{vanus => }/store/block_store.go (96%) diff --git a/client/client.go b/client/client.go index c18d3e747..fbe71f3b0 100644 --- a/client/client.go +++ b/client/client.go @@ -28,26 +28,20 @@ import ( "google.golang.org/grpc/credentials/insecure" // this project. - eb "github.com/vanus-labs/vanus/client/internal/vanus/eventbus" + eb "github.com/vanus-labs/vanus/client/internal/eventbus" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventbus" ) -type Client interface { - Eventbus(ctx context.Context, opts ...api.EventbusOption) api.Eventbus - Disconnect(ctx context.Context) -} - type client struct { - // Endpoints is a list of URLs. - Endpoints []string - eventbusCache sync.Map + endpoints []string + cache sync.Map mu sync.RWMutex tracer *tracing.Tracer } -func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.Eventbus { +func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) (api.Eventbus, error) { _, span := c.tracer.Start(ctx, "EventbusService") defer span.End() @@ -56,20 +50,20 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.E apply(defaultOpts) } - err := GetEventbusIDIfNotSet(ctx, c.Endpoints, defaultOpts) + err := GetEventbusIDIfNotSet(ctx, c.endpoints, defaultOpts) if err != nil { log.Error(ctx, "get eventbus id failed", map[string]interface{}{ log.KeyError: err, "eventbus_name": defaultOpts.Name, "eventbus_id": defaultOpts.ID, }) - return nil + return nil, err } bus := func() api.Eventbus { c.mu.RLock() defer c.mu.RUnlock() - if value, ok := c.eventbusCache.Load(defaultOpts.ID); ok { + if value, ok := c.cache.Load(defaultOpts.ID); ok { return value.(api.Eventbus) } else { return nil @@ -79,37 +73,43 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.E if bus == nil { c.mu.Lock() defer c.mu.Unlock() - if value, ok := c.eventbusCache.Load(defaultOpts.ID); ok { // double check - return value.(api.Eventbus) + if value, ok := c.cache.Load(defaultOpts.ID); ok { // double check + return value.(api.Eventbus), nil } else { cfg := &eb.Config{ - Endpoints: c.Endpoints, + Endpoints: c.endpoints, ID: defaultOpts.ID, } - newEventbus := eventbus.NewEventbus(cfg) - c.eventbusCache.Store(defaultOpts.ID, newEventbus) - return newEventbus + newEventbus := eventbus.NewEventbus(cfg, c.close) + c.cache.Store(defaultOpts.ID, newEventbus) + return newEventbus, nil } } - return bus + return bus, nil } func (c *client) Disconnect(ctx context.Context) { c.mu.Lock() defer c.mu.Unlock() - c.eventbusCache.Range(func(key, value interface{}) bool { + c.cache.Range(func(key, value interface{}) bool { value.(api.Eventbus).Close(ctx) - c.eventbusCache.Delete(key) + c.cache.Delete(key) return true }) } -func Connect(endpoints []string) Client { +func (c *client) close(id uint64) { + c.mu.Lock() + defer c.mu.Unlock() + c.cache.Delete(id) +} + +func Connect(endpoints []string) api.Client { if len(endpoints) == 0 { return nil } return &client{ - Endpoints: endpoints, + endpoints: endpoints, } } @@ -129,4 +129,3 @@ func GetEventbusIDIfNotSet(ctx context.Context, endpoints []string, opts *api.Ev } return nil } - diff --git a/client/examples/eventbus/append/main.go b/client/examples/eventbus/append/main.go index ea9753a12..bb250dda5 100644 --- a/client/examples/eventbus/append/main.go +++ b/client/examples/eventbus/append/main.go @@ -41,7 +41,10 @@ func main() { if err != nil { panic("invalid id") } - bus := c.Eventbus(ctx, api.WithName("quick-start"), api.WithID(eventbusID.Uint64())) + bus, err := c.Eventbus(ctx, api.WithName("quick-start"), api.WithID(eventbusID.Uint64())) + if err != nil { + panic(err.Error()) + } w := bus.Writer() // Create an Event. event := ce.NewEvent() @@ -64,4 +67,3 @@ func main() { log.Printf("success! eventID:%s\n", eventID) } } - diff --git a/client/examples/eventbus/read/main.go b/client/examples/eventbus/read/main.go index a837f6ac8..dcc6362e8 100644 --- a/client/examples/eventbus/read/main.go +++ b/client/examples/eventbus/read/main.go @@ -35,7 +35,10 @@ func main() { if err != nil { panic("invalid id") } - eb := c.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + eb, err := c.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + panic(err.Error()) + } ls, err := eb.ListLog(ctx) if err != nil { log.Print(err.Error()) diff --git a/client/internal/vanus/eventbus/config.go b/client/internal/eventbus/config.go similarity index 100% rename from client/internal/vanus/eventbus/config.go rename to client/internal/eventbus/config.go diff --git a/client/internal/vanus/eventbus/name_service.go b/client/internal/eventbus/name_service.go similarity index 100% rename from client/internal/vanus/eventbus/name_service.go rename to client/internal/eventbus/name_service.go diff --git a/client/internal/vanus/eventlog/config.go b/client/internal/eventlog/config.go similarity index 100% rename from client/internal/vanus/eventlog/config.go rename to client/internal/eventlog/config.go diff --git a/client/internal/vanus/eventlog/name_service.go b/client/internal/eventlog/name_service.go similarity index 100% rename from client/internal/vanus/eventlog/name_service.go rename to client/internal/eventlog/name_service.go diff --git a/client/internal/vanus/net/connection.go b/client/internal/net/connection.go similarity index 89% rename from client/internal/vanus/net/connection.go rename to client/internal/net/connection.go index 008bec852..f88b6c469 100644 --- a/client/internal/vanus/net/connection.go +++ b/client/internal/net/connection.go @@ -14,6 +14,6 @@ package net -import "github.com/vanus-labs/vanus/client/internal/vanus/net/connection" +import "github.com/vanus-labs/vanus/client/internal/net/connection" var Connect = connection.Connect diff --git a/client/internal/vanus/net/connection/connect.go b/client/internal/net/connection/connect.go similarity index 100% rename from client/internal/vanus/net/connection/connect.go rename to client/internal/net/connection/connect.go diff --git a/client/internal/vanus/net/connection/pool.go b/client/internal/net/connection/pool.go similarity index 100% rename from client/internal/vanus/net/connection/pool.go rename to client/internal/net/connection/pool.go diff --git a/client/internal/vanus/net/rpc/bare/client.go b/client/internal/net/rpc/bare/client.go similarity index 95% rename from client/internal/vanus/net/rpc/bare/client.go rename to client/internal/net/rpc/bare/client.go index 7b6f59442..45ce17443 100644 --- a/client/internal/vanus/net/rpc/bare/client.go +++ b/client/internal/net/rpc/bare/client.go @@ -31,8 +31,8 @@ import ( "github.com/vanus-labs/vanus/pkg/errors" // this project. - "github.com/vanus-labs/vanus/client/internal/vanus/net/connection" - "github.com/vanus-labs/vanus/client/internal/vanus/net/rpc" + "github.com/vanus-labs/vanus/client/internal/net/connection" + "github.com/vanus-labs/vanus/client/internal/net/rpc" ) const ( diff --git a/client/internal/vanus/net/rpc/client.go b/client/internal/net/rpc/client.go similarity index 100% rename from client/internal/vanus/net/rpc/client.go rename to client/internal/net/rpc/client.go diff --git a/client/internal/vanus/net/rpc/creator.go b/client/internal/net/rpc/creator.go similarity index 100% rename from client/internal/vanus/net/rpc/creator.go rename to client/internal/net/rpc/creator.go diff --git a/client/internal/vanus/store/alloc.go b/client/internal/store/alloc.go similarity index 100% rename from client/internal/vanus/store/alloc.go rename to client/internal/store/alloc.go diff --git a/client/internal/vanus/store/allocator.go b/client/internal/store/allocator.go similarity index 100% rename from client/internal/vanus/store/allocator.go rename to client/internal/store/allocator.go diff --git a/client/internal/vanus/store/block_store.go b/client/internal/store/block_store.go similarity index 96% rename from client/internal/vanus/store/block_store.go rename to client/internal/store/block_store.go index 5da4acc90..e9a97df53 100644 --- a/client/internal/vanus/store/block_store.go +++ b/client/internal/store/block_store.go @@ -29,8 +29,8 @@ import ( segpb "github.com/vanus-labs/vanus/proto/pkg/segment" // this project. - "github.com/vanus-labs/vanus/client/internal/vanus/net/rpc" - "github.com/vanus-labs/vanus/client/internal/vanus/net/rpc/bare" + "github.com/vanus-labs/vanus/client/internal/net/rpc" + "github.com/vanus-labs/vanus/client/internal/net/rpc/bare" "github.com/vanus-labs/vanus/client/pkg/primitive" ) diff --git a/client/pkg/api/client.go b/client/pkg/api/client.go index 721655c24..25d9cbb21 100644 --- a/client/pkg/api/client.go +++ b/client/pkg/api/client.go @@ -23,6 +23,13 @@ import ( "github.com/vanus-labs/vanus/proto/pkg/codec" ) +type CloseFunc func(id uint64) + +type Client interface { + Eventbus(ctx context.Context, opts ...EventbusOption) (Eventbus, error) + Disconnect(ctx context.Context) +} + type Eventbus interface { Writer(opts ...WriteOption) BusWriter Reader(opts ...ReadOption) BusReader diff --git a/client/pkg/api/mock_client.go b/client/pkg/api/mock_client.go index 6c61a5192..ac7b6594d 100644 --- a/client/pkg/api/mock_client.go +++ b/client/pkg/api/mock_client.go @@ -12,6 +12,61 @@ import ( cloudevents "github.com/vanus-labs/vanus/proto/pkg/cloudevents" ) +// MockClient is a mock of Client interface. +type MockClient struct { + ctrl *gomock.Controller + recorder *MockClientMockRecorder +} + +// MockClientMockRecorder is the mock recorder for MockClient. +type MockClientMockRecorder struct { + mock *MockClient +} + +// NewMockClient creates a new mock instance. +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder +} + +// Disconnect mocks base method. +func (m *MockClient) Disconnect(ctx context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Disconnect", ctx) +} + +// Disconnect indicates an expected call of Disconnect. +func (mr *MockClientMockRecorder) Disconnect(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disconnect", reflect.TypeOf((*MockClient)(nil).Disconnect), ctx) +} + +// Eventbus mocks base method. +func (m *MockClient) Eventbus(ctx context.Context, opts ...EventbusOption) (Eventbus, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Eventbus", varargs...) + ret0, _ := ret[0].(Eventbus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Eventbus indicates an expected call of Eventbus. +func (mr *MockClientMockRecorder) Eventbus(ctx interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Eventbus", reflect.TypeOf((*MockClient)(nil).Eventbus), varargs...) +} + // MockEventbus is a mock of Eventbus interface. type MockEventbus struct { ctrl *gomock.Controller @@ -252,7 +307,7 @@ func (mr *MockEventlogMockRecorder) EarliestOffset(ctx interface{}) *gomock.Call // ID mocks base method. func (m *MockEventlog) ID() uint64 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "VolumeID") + ret := m.ctrl.Call(m, "ID") ret0, _ := ret[0].(uint64) return ret0 } @@ -260,7 +315,7 @@ func (m *MockEventlog) ID() uint64 { // ID indicates an expected call of ID. func (mr *MockEventlogMockRecorder) ID() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "VolumeID", reflect.TypeOf((*MockEventlog)(nil).ID)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockEventlog)(nil).ID)) } // LatestOffset mocks base method. diff --git a/client/pkg/eventbus/eventbus.go b/client/pkg/eventbus/eventbus.go index f6454060d..52a23455c 100644 --- a/client/pkg/eventbus/eventbus.go +++ b/client/pkg/eventbus/eventbus.go @@ -19,7 +19,6 @@ import ( "context" "encoding/base64" "encoding/binary" - stderrors "errors" "io" "sync" @@ -34,16 +33,17 @@ import ( "github.com/vanus-labs/vanus/proto/pkg/cloudevents" // this project. - eb "github.com/vanus-labs/vanus/client/internal/vanus/eventbus" - el "github.com/vanus-labs/vanus/client/internal/vanus/eventlog" + eb "github.com/vanus-labs/vanus/client/internal/eventbus" + el "github.com/vanus-labs/vanus/client/internal/eventlog" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventlog" "github.com/vanus-labs/vanus/client/pkg/policy" ) -func NewEventbus(cfg *eb.Config) *eventbus { +func NewEventbus(cfg *eb.Config, close api.CloseFunc) *eventbus { bus := &eventbus{ cfg: cfg, + close: close, nameService: eb.NewNameService(cfg.Endpoints), writableLogSet: u64set.New(), readableLogSet: u64set.New(), @@ -108,7 +108,9 @@ func NewEventbus(cfg *eb.Config) *eventbus { type eventbus struct { cfg *eb.Config + close api.CloseFunc nameService *eb.NameService + closeOnce sync.Once writableWatcher *WritableLogsWatcher writableLogSet *u64set.Set @@ -240,15 +242,17 @@ func (b *eventbus) ID() uint64 { } func (b *eventbus) Close(ctx context.Context) { - b.writableWatcher.Close() - b.readableWatcher.Close() - - for _, w := range b.writableLogs { - w.Close(ctx) - } - for _, r := range b.readableLogs { - r.Close(ctx) - } + b.closeOnce.Do(func() { + b.writableWatcher.Close() + b.readableWatcher.Close() + for _, w := range b.writableLogs { + w.Close(ctx) + } + for _, r := range b.readableLogs { + r.Close(ctx) + } + b.close(b.cfg.ID) + }) } func (b *eventbus) getWritableState() error { @@ -321,10 +325,14 @@ func (b *eventbus) setWritableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventl b.writableLogs = lws } -func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) eventlog.Eventlog { +func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) (eventlog.Eventlog, error) { b.writableMu.RLock() defer b.writableMu.RUnlock() + if errors.Is(b.writableState, errors.ErrResourceNotFound) { + return nil, errors.ErrResourceNotFound.WithMessage("eventbus not found") + } + if len(b.writableLogs) == 0 { func() { b.writableMu.RUnlock() @@ -333,7 +341,7 @@ func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) eventlog.Ev }() } - return b.writableLogs[logID] + return b.writableLogs[logID], nil } func (b *eventbus) refreshWritableLogs(ctx context.Context) { @@ -413,10 +421,14 @@ func (b *eventbus) setReadableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventl b.readableLogs = lws } -func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) eventlog.Eventlog { +func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) (eventlog.Eventlog, error) { b.readableMu.RLock() defer b.readableMu.RUnlock() + if errors.Is(b.readableState, errors.ErrResourceNotFound) { + return nil, errors.ErrResourceNotFound.WithMessage("eventbus not found") + } + if len(b.readableLogs) == 0 { func() { b.readableMu.RUnlock() @@ -425,7 +437,7 @@ func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) eventlog.Ev }() } - return b.readableLogs[logID] + return b.readableLogs[logID], nil } func (b *eventbus) refreshReadableLogs(ctx context.Context) { @@ -496,9 +508,9 @@ func (w *busWriter) pickWritableLog(ctx context.Context, opts *api.WriteOptions) return nil, err } - lw := w.ebus.getWritableLog(_ctx, l.ID()) - if lw == nil { - return nil, stderrors.New("can not pick writable log") + lw, err := w.ebus.getWritableLog(_ctx, l.ID()) + if err != nil { + return nil, err } return lw.Writer(), nil @@ -572,9 +584,9 @@ func (r *busReader) pickReadableLog(ctx context.Context, opts *api.ReadOptions) if err != nil { return nil, err } - lr := r.ebus.getReadableLog(_ctx, l.ID()) - if lr == nil { - return nil, stderrors.New("can not pick readable log") + lr, err := r.ebus.getReadableLog(_ctx, l.ID()) + if err != nil { + return nil, err } return lr.Reader(eventlog.ReaderConfig{PollingTimeout: opts.PollingTimeout}), nil diff --git a/client/pkg/eventlog/eventlog_impl.go b/client/pkg/eventlog/eventlog_impl.go index c8fb2f7f1..0c63789a4 100644 --- a/client/pkg/eventlog/eventlog_impl.go +++ b/client/pkg/eventlog/eventlog_impl.go @@ -32,7 +32,7 @@ import ( "github.com/vanus-labs/vanus/proto/pkg/cloudevents" // this project. - el "github.com/vanus-labs/vanus/client/internal/vanus/eventlog" + el "github.com/vanus-labs/vanus/client/internal/eventlog" "github.com/vanus-labs/vanus/client/pkg/record" ) diff --git a/client/pkg/eventlog/segment_block.go b/client/pkg/eventlog/segment_block.go index 55b495418..722eaa4fd 100644 --- a/client/pkg/eventlog/segment_block.go +++ b/client/pkg/eventlog/segment_block.go @@ -24,7 +24,7 @@ import ( "github.com/vanus-labs/vanus/proto/pkg/cloudevents" // this project. - "github.com/vanus-labs/vanus/client/internal/vanus/store" + "github.com/vanus-labs/vanus/client/internal/store" "github.com/vanus-labs/vanus/client/pkg/record" ) diff --git a/internal/controller/trigger/controller.go b/internal/controller/trigger/controller.go index 215c4c486..73d472dc2 100644 --- a/internal/controller/trigger/controller.go +++ b/internal/controller/trigger/controller.go @@ -28,6 +28,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" eb "github.com/vanus-labs/vanus/client" + "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/observability/log" "github.com/vanus-labs/vanus/observability/metrics" "github.com/vanus-labs/vanus/pkg/cluster" @@ -83,7 +84,7 @@ type controller struct { stopFunc context.CancelFunc state primitive.ServerState cl cluster.Cluster - ebClient eb.Client + ebClient api.Client } func (ctrl *controller) SetDeadLetterEventOffset( diff --git a/internal/controller/trigger/subscription/offset.go b/internal/controller/trigger/subscription/offset.go index caa34826e..598c65399 100644 --- a/internal/controller/trigger/subscription/offset.go +++ b/internal/controller/trigger/subscription/offset.go @@ -163,7 +163,11 @@ func (m *manager) SaveDeadLetterOffset(ctx context.Context, id vanus.ID, offset func (m *manager) getOffsetFromCli(ctx context.Context, eventbusID vanus.ID, config primitive.SubscriptionConfig, ) (info.ListOffsetInfo, error) { - logs, err := m.ebCli.Eventbus(ctx, api.WithID(eventbusID.Uint64())).ListLog(ctx) + eb, err := m.ebCli.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + return nil, err + } + logs, err := eb.ListLog(ctx) if err != nil { return nil, err } diff --git a/internal/controller/trigger/subscription/offset_test.go b/internal/controller/trigger/subscription/offset_test.go index 03455e385..e5443476c 100644 --- a/internal/controller/trigger/subscription/offset_test.go +++ b/internal/controller/trigger/subscription/offset_test.go @@ -22,7 +22,6 @@ import ( "github.com/golang/mock/gomock" . "github.com/smartystreets/goconvey/convey" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/internal/controller/trigger/metadata" "github.com/vanus-labs/vanus/internal/controller/trigger/secret" @@ -42,7 +41,7 @@ func TestSaveOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -83,7 +82,7 @@ func TestGetOrSaveOffsetOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -129,7 +128,7 @@ func TestGetOrSaveOffsetOffset(t *testing.T) { offsetManager.EXPECT().Offset(gomock.Any(), gomock.Any(), gomock.Any(), true).AnyTimes().Return(nil) mockEventbus := api.NewMockEventbus(ctrl) mockEventlog := api.NewMockEventlog(ctrl) - ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().ListLog(gomock.Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockEventlog.EXPECT().ID().AnyTimes().Return(logID.Uint64()) mockEventlog.EXPECT().LatestOffset(gomock.Any()).AnyTimes().Return(int64(offsetV), nil) @@ -173,7 +172,7 @@ func TestGetOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -224,7 +223,7 @@ func TestResetOffsetByTimestamp(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -245,7 +244,7 @@ func TestResetOffsetByTimestamp(t *testing.T) { offsetManager.EXPECT().Offset(ctx, id, gomock.Any(), true).Return(nil) mockEventbus := api.NewMockEventbus(ctrl) mockEventlog := api.NewMockEventlog(ctrl) - ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().ListLog(gomock.Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockEventlog.EXPECT().ID().AnyTimes().Return(logID.Uint64()) time := uint64(time.Now().Unix()) @@ -262,7 +261,7 @@ func TestGetDeadLetterOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -316,7 +315,7 @@ func TestGetDeadLetterOffset(t *testing.T) { offsetManager.EXPECT().Offset(gomock.Any(), gomock.Any(), gomock.Any(), true).AnyTimes().Return(nil) mockEventbus := api.NewMockEventbus(ctrl) mockEventlog := api.NewMockEventlog(ctrl) - ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().ListLog(gomock.Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockEventlog.EXPECT().ID().AnyTimes().Return(logID.Uint64()) @@ -338,7 +337,7 @@ func TestSaveDeadLetterOffset(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl).(*manager) offsetManager := offset.NewMockManager(ctrl) @@ -370,7 +369,7 @@ func TestSaveDeadLetterOffset(t *testing.T) { Convey("dead letter eventlogID hasn't init", func() { mockEventbus := api.NewMockEventbus(ctrl) mockEventlog := api.NewMockEventlog(ctrl) - ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + ebCli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().ListLog(gomock.Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) mockEventlog.EXPECT().ID().AnyTimes().Return(logID.Uint64()) err := m.SaveDeadLetterOffset(ctx, id, offsetV) diff --git a/internal/controller/trigger/subscription/subscription.go b/internal/controller/trigger/subscription/subscription.go index 4f087ae93..6d91ba971 100644 --- a/internal/controller/trigger/subscription/subscription.go +++ b/internal/controller/trigger/subscription/subscription.go @@ -22,7 +22,7 @@ import ( "sync" "time" - eb "github.com/vanus-labs/vanus/client" + "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/internal/controller/trigger/metadata" "github.com/vanus-labs/vanus/internal/controller/trigger/secret" "github.com/vanus-labs/vanus/internal/controller/trigger/storage" @@ -64,7 +64,7 @@ const ( type manager struct { cl cluster.Cluster - ebCli eb.Client + ebCli api.Client secretStorage secret.Storage storage storage.Storage offsetManager offset.Manager @@ -80,7 +80,7 @@ type manager struct { } func NewSubscriptionManager(storage storage.Storage, secretStorage secret.Storage, - ebCli eb.Client, cl cluster.Cluster) Manager { + ebCli api.Client, cl cluster.Cluster) Manager { m := &manager{ cl: cl, ebCli: ebCli, diff --git a/internal/controller/trigger/subscription/subscription_test.go b/internal/controller/trigger/subscription/subscription_test.go index c7e357aea..a75e70c29 100644 --- a/internal/controller/trigger/subscription/subscription_test.go +++ b/internal/controller/trigger/subscription/subscription_test.go @@ -22,7 +22,7 @@ import ( "github.com/golang/mock/gomock" . "github.com/smartystreets/goconvey/convey" - "github.com/vanus-labs/vanus/client" + "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/internal/controller/trigger/metadata" "github.com/vanus-labs/vanus/internal/controller/trigger/secret" "github.com/vanus-labs/vanus/internal/controller/trigger/storage" @@ -39,7 +39,7 @@ func TestSubscriptionInit(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl) @@ -68,7 +68,7 @@ func TestGetListSubscription(t *testing.T) { defer ctrl.Finish() storage := storage.NewMockStorage(ctrl) secret := secret.NewMockStorage(ctrl) - ebCli := client.NewMockClient(ctrl) + ebCli := api.NewMockClient(ctrl) cl := cluster.NewMockCluster(ctrl) m := NewSubscriptionManager(storage, secret, ebCli, cl) id := vanus.NewTestID() diff --git a/internal/gateway/gateway_test.go b/internal/gateway/gateway_test.go index 3b51c8cfc..cab28a0c9 100644 --- a/internal/gateway/gateway_test.go +++ b/internal/gateway/gateway_test.go @@ -28,7 +28,6 @@ import ( . "github.com/prashantv/gostub" . "github.com/smartystreets/goconvey/convey" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/internal/primitive/vanus" "github.com/vanus-labs/vanus/pkg/cluster" @@ -189,10 +188,10 @@ func TestGateway_EventID(t *testing.T) { port = 8087 ) - mockClient := client.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) mockEventbus := api.NewMockEventbus(ctrl) mockBusWriter := api.NewMockBusWriter(ctrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockBusWriter.EXPECT().Append(Any(), Any()).AnyTimes().Return([]string{"AABBCC"}, nil) diff --git a/internal/gateway/proxy/deadletter.go b/internal/gateway/proxy/deadletter.go index b540670e8..3c9564509 100644 --- a/internal/gateway/proxy/deadletter.go +++ b/internal/gateway/proxy/deadletter.go @@ -69,7 +69,11 @@ func (cp *ControllerProxy) GetDeadLetterEvent( if err != nil { return nil, err } - ls, err := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())).ListLog(ctx) + eb, err := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())) + if err != nil { + return nil, err + } + ls, err := eb.ListLog(ctx) if err != nil { return nil, err } @@ -86,7 +90,7 @@ func (cp *ControllerProxy) GetDeadLetterEvent( } readPolicy := policy.NewManuallyReadPolicy(ls[0], int64(offset)) - busReader := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())).Reader( + busReader := eb.Reader( option.WithDisablePolling(), option.WithReadPolicy(readPolicy), option.WithBatchSize(int(num)), @@ -169,7 +173,11 @@ func (cp *ControllerProxy) ResendDeadLetterEvent( //nolint:funlen // ok if err != nil { return nil, err } - ls, err := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())).ListLog(ctx) + eb, err := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())) + if err != nil { + return nil, err + } + ls, err := eb.ListLog(ctx) if err != nil { return nil, err } @@ -185,7 +193,7 @@ func (cp *ControllerProxy) ResendDeadLetterEvent( //nolint:funlen // ok fmt.Sprintf("end_offset is invalid, param is %d it but start is %d", offset, req.GetEndOffset())) } readPolicy := policy.NewManuallyReadPolicy(ls[0], int64(offset)) - busReader := cp.client.Eventbus(ctx, api.WithID(deadLetterEventbusID.Uint64())).Reader( + busReader := eb.Reader( option.WithDisablePolling(), option.WithReadPolicy(readPolicy), option.WithBatchSize(readSize), diff --git a/internal/gateway/proxy/proxy.go b/internal/gateway/proxy/proxy.go index 7f5fbb561..7befef4ed 100644 --- a/internal/gateway/proxy/proxy.go +++ b/internal/gateway/proxy/proxy.go @@ -138,7 +138,7 @@ func (s *subscribeCache) stream() proxypb.StoreProxy_SubscribeServer { type ControllerProxy struct { cfg Config tracer *tracing.Tracer - client eb.Client + client api.Client eventbusCtrl ctrlpb.EventbusControllerClient eventlogCtrl ctrlpb.EventlogControllerClient triggerCtrl ctrlpb.TriggerControllerClient @@ -229,8 +229,11 @@ func (cp *ControllerProxy) writeEvents( ) error { val, exist := cp.writerMap.Load(eventbusID) if !exist { - val, _ = cp.writerMap.LoadOrStore(eventbusID, - cp.client.Eventbus(ctx, api.WithID(eventbusID.Uint64())).Writer()) + eb, err := cp.client.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + return v2.NewHTTPResult(http.StatusInternalServerError, err.Error()) + } + val, _ = cp.writerMap.LoadOrStore(eventbusID, eb.Writer()) } w, _ := val.(api.BusWriter) _, err := w.Append(ctx, events) @@ -551,7 +554,7 @@ func NewControllerProxy(cfg Config) *ControllerProxy { } // SetClient just for test. -func (cp *ControllerProxy) SetClient(client eb.Client) { +func (cp *ControllerProxy) SetClient(client api.Client) { cp.client = client } @@ -696,16 +699,20 @@ func (cp *ControllerProxy) ClusterInfo(_ context.Context, _ *emptypb.Empty) (*pr func (cp *ControllerProxy) LookupOffset( ctx context.Context, req *proxypb.LookupOffsetRequest, ) (*proxypb.LookupOffsetResponse, error) { + eb, err := cp.client.Eventbus(ctx, api.WithID(req.EventbusId)) + if err != nil { + return nil, err + } elList := make([]api.Eventlog, 0) if req.EventlogId > 0 { id := vanus.NewIDFromUint64(req.EventlogId) - l, err := cp.client.Eventbus(ctx, api.WithID(req.EventbusId)).GetLog(ctx, id.Uint64()) + l, err := eb.GetLog(ctx, id.Uint64()) if err != nil { return nil, err } elList = append(elList, l) } else { - ls, err := cp.client.Eventbus(ctx, api.WithID(req.EventbusId)).ListLog(ctx) + ls, err := eb.ListLog(ctx) if err != nil { return nil, err } @@ -753,11 +760,15 @@ func (cp *ControllerProxy) GetEvent( num = maximumNumberPerGetRequest } - ls, err := cp.client.Eventbus(ctx, api.WithID(vid.Uint64())).ListLog(ctx) + eb, err := cp.client.Eventbus(ctx, api.WithID(vid.Uint64())) + if err != nil { + return nil, err + } + ls, err := eb.ListLog(ctx) if err != nil { return nil, err } - reader := cp.client.Eventbus(ctx, api.WithID(vid.Uint64())).Reader( + reader := eb.Reader( option.WithDisablePolling(), option.WithReadPolicy(policy.NewManuallyReadPolicy(ls[0], offset)), option.WithBatchSize(int(num)), @@ -839,12 +850,17 @@ func (cp *ControllerProxy) getByEventID( return nil, err } - l, err := cp.client.Eventbus(ctx, api.WithID(req.GetEventbusId())).GetLog(ctx, logID) + eb, err := cp.client.Eventbus(ctx, api.WithID(req.GetEventbusId())) + if err != nil { + return nil, err + } + + l, err := eb.GetLog(ctx, logID) if err != nil { return nil, err } - reader := cp.client.Eventbus(ctx, api.WithID(req.GetEventbusId())).Reader( + reader := eb.Reader( option.WithReadPolicy(policy.NewManuallyReadPolicy(l, off)), option.WithDisablePolling(), ) diff --git a/internal/gateway/proxy/proxy_test.go b/internal/gateway/proxy/proxy_test.go index 0e84f268a..e804584a8 100644 --- a/internal/gateway/proxy/proxy_test.go +++ b/internal/gateway/proxy/proxy_test.go @@ -26,7 +26,6 @@ import ( "github.com/golang/mock/gomock" . "github.com/smartystreets/goconvey/convey" "github.com/tidwall/gjson" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/policy" "github.com/vanus-labs/vanus/internal/convert" @@ -55,10 +54,10 @@ func TestControllerProxy_GetEvent(t *testing.T) { Credentials: insecure.NewCredentials(), }) ctrl := gomock.NewController(t) - mockClient := client.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) cp.client = mockClient utEB1 := api.NewMockEventbus(ctrl) - mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(utEB1) + mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(utEB1, nil) Convey("test invalid params", func() { res, err := cp.GetEvent(stdCtx.Background(), &proxypb.GetEventRequest{ @@ -353,7 +352,7 @@ func TestControllerProxy_ValidateSubscription(t *testing.T) { }) ctrl := gomock.NewController(t) - cli := client.NewMockClient(ctrl) + cli := api.NewMockClient(ctrl) cp.client = cli eb := api.NewMockEventbus(ctrl) mockTriggerCtrl := ctrlpb.NewMockTriggerControllerClient(ctrl) @@ -362,7 +361,7 @@ func TestControllerProxy_ValidateSubscription(t *testing.T) { ctx := stdCtx.Background() // mock eventbus - cli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).Times(2).Return(eb) + cli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).Times(1).Return(eb, nil) eb.EXPECT().ListLog(gomock.Any()).Times(1).Return([]api.Eventlog{nil}, nil) rd := api.NewMockBusReader(ctrl) eb.EXPECT().Reader(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(rd) diff --git a/internal/timer/timingwheel/bucket.go b/internal/timer/timingwheel/bucket.go index ad3e24980..15fb4a477 100644 --- a/internal/timer/timingwheel/bucket.go +++ b/internal/timer/timingwheel/bucket.go @@ -26,7 +26,6 @@ import ( ce "github.com/cloudevents/sdk-go/v2" "k8s.io/apimachinery/pkg/util/wait" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/option" "github.com/vanus-labs/vanus/client/pkg/policy" @@ -99,7 +98,7 @@ type bucket struct { wg sync.WaitGroup exitC chan struct{} kvStore kv.Client - client client.Client + client api.Client eventbusWriter api.BusWriter eventbusReader api.BusReader @@ -141,8 +140,9 @@ func (b *bucket) start(ctx context.Context) error { if err = b.createEventbus(ctx); err != nil { return err } - - b.connectEventbus(ctx) + if err = b.connectEventbus(ctx); err != nil { + return err + } b.run(ctx) return nil } @@ -343,9 +343,14 @@ func (b *bucket) createEventbus(ctx context.Context) error { return nil } -func (b *bucket) connectEventbus(ctx context.Context) { - b.eventbusWriter = b.client.Eventbus(ctx, api.WithName(b.eventbus)).Writer() - b.eventbusReader = b.client.Eventbus(ctx, api.WithName(b.eventbus)).Reader() +func (b *bucket) connectEventbus(ctx context.Context) error { + eb, err := b.client.Eventbus(ctx, api.WithName(b.eventbus)) + if err != nil { + return err + } + b.eventbusWriter = eb.Writer() + b.eventbusReader = eb.Reader() + return nil } func (b *bucket) putEvent(ctx context.Context, tm *timingMsg) (err error) { @@ -392,7 +397,13 @@ func (b *bucket) getEvent(ctx context.Context, number int16) (events []*ce.Event time.Sleep(time.Second) return []*ce.Event{}, errors.ErrOffsetOnEnd } - ls, err := b.client.Eventbus(ctx, api.WithName(b.eventbus)).ListLog(ctx) + + eb, err := b.client.Eventbus(ctx, api.WithName(b.eventbus)) + if err != nil { + return []*ce.Event{}, err + } + + ls, err := eb.ListLog(ctx) if err != nil { return []*ce.Event{}, err } diff --git a/internal/timer/timingwheel/bucket_test.go b/internal/timer/timingwheel/bucket_test.go index 94ade65f4..1b22bdcf6 100644 --- a/internal/timer/timingwheel/bucket_test.go +++ b/internal/timer/timingwheel/bucket_test.go @@ -83,11 +83,11 @@ func TestBucket_start(t *testing.T) { bucket := newBucket(tw, nil, 1, "", 1, 0) bucket.timingwheel.leader = true mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) bucket.client = mockClient @@ -125,12 +125,12 @@ func TestBucket_run(t *testing.T) { bucket := newBucket(tw, nil, time.Second, "", 1, 0) bucket.timingwheel = tw mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) mockStoreCli := kv.NewMockClient(mockCtrl) @@ -273,11 +273,11 @@ func TestBucket_connectEventbus(t *testing.T) { tw := newtimingwheel(cfg()) bucket := newBucket(tw, nil, 1, "", 1, 0) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) bucket.client = mockClient @@ -321,12 +321,12 @@ func TestBucket_getEvent(t *testing.T) { bucket := newBucket(tw, nil, 1, "", 1, 0) bucket.timingwheel = tw mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) bucket.eventbusReader = mockBusReader diff --git a/internal/timer/timingwheel/timingwheel.go b/internal/timer/timingwheel/timingwheel.go index 4a06e5e91..6e3ba54ed 100644 --- a/internal/timer/timingwheel/timingwheel.go +++ b/internal/timer/timingwheel/timingwheel.go @@ -80,7 +80,7 @@ type timingWheel struct { config *Config kvStore kv.Client ctrlCli ctrlpb.EventbusControllerClient - client client.Client + client api.Client cache sync.Map twList *list.List // element: *timingWheelElement @@ -352,7 +352,9 @@ func (tw *timingWheel) startReceivingStation(ctx context.Context) error { return err } - tw.getReceivingStation().connectEventbus(ctx) + if err = tw.getReceivingStation().connectEventbus(ctx); err != nil { + return err + } tw.runReceivingStation(ctx) return nil } @@ -462,7 +464,9 @@ func (tw *timingWheel) startDistributionStation(ctx context.Context) error { return err } - tw.getDistributionStation().connectEventbus(ctx) + if err = tw.getDistributionStation().connectEventbus(ctx); err != nil { + return err + } tw.runDistributionStation(ctx) return nil } @@ -589,7 +593,11 @@ func (tw *timingWheel) deliver(ctx context.Context, e *ce.Event) error { } v, exist := tw.cache.Load(eventbusID) if !exist { - v, _ = tw.cache.LoadOrStore(eventbusID, tw.client.Eventbus(ctx, api.WithID(eventbusID.Uint64())).Writer()) + eb, err := tw.client.Eventbus(ctx, api.WithID(eventbusID.Uint64())) + if err != nil { + return err + } + v, _ = tw.cache.LoadOrStore(eventbusID, eb.Writer()) } writer, _ := v.(api.BusWriter) _, err = api.AppendOne(ctx, writer, e) diff --git a/internal/timer/timingwheel/timingwheel_test.go b/internal/timer/timingwheel/timingwheel_test.go index 4d4d6f872..e05d9b4ca 100644 --- a/internal/timer/timingwheel/timingwheel_test.go +++ b/internal/timer/timingwheel/timingwheel_test.go @@ -56,12 +56,12 @@ func TestTimingWheel_Start(t *testing.T) { tw := newtimingwheel(cfg()) tw.SetLeader(true) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil) @@ -295,7 +295,7 @@ func TestTimingWheel_runReceivingStation(t *testing.T) { tw := newtimingwheel(cfg()) tw.SetLeader(true) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) @@ -309,7 +309,7 @@ func TestTimingWheel_runReceivingStation(t *testing.T) { tw.receivingStation.kvStore = mockStoreCli tw.receivingStation.timingwheel = tw tw.distributionStation.timingwheel = tw - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) @@ -358,12 +358,12 @@ func TestTimingWheel_startDistributionStation(t *testing.T) { tw := newtimingwheel(cfg()) tw.SetLeader(true) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) // mockEventlog := eventlog.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) mockEventbusCtrlCli := ctrlpb.NewMockEventbusControllerClient(mockCtrl) @@ -400,13 +400,13 @@ func TestTimingWheel_runDistributionStation(t *testing.T) { tw := newtimingwheel(cfg()) tw.SetLeader(true) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) mockStoreCli := kv.NewMockClient(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tw.client = mockClient @@ -461,11 +461,11 @@ func TestTimingWheel_deliver(t *testing.T) { e := event(2000) tw := newtimingwheel(cfg()) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tw.client = mockClient @@ -545,13 +545,13 @@ func TestTimingWheelElement_pushBack(t *testing.T) { ctx := context.Background() tw := newtimingwheel(cfg()) mockCtrl := NewController(t) - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockStoreCli := kv.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) // mockEventlog := eventlog.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) diff --git a/internal/trigger/reader/reader.go b/internal/trigger/reader/reader.go index 8bf4b7e1d..41bf70ec1 100644 --- a/internal/trigger/reader/reader.go +++ b/internal/trigger/reader/reader.go @@ -26,7 +26,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - eb "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventlog" "github.com/vanus-labs/vanus/client/pkg/option" @@ -49,7 +48,7 @@ const ( type Config struct { EventbusID vanus.ID - Client eb.Client + Client api.Client SubscriptionID vanus.ID SubscriptionIDStr string Offset EventlogOffset @@ -95,7 +94,11 @@ func (r *reader) Start() error { r.stop = cancel timeoutCtx, cancel := context.WithTimeout(ctx, lookupReadableLogsTimeout) defer cancel() - logs, err := r.config.Client.Eventbus(timeoutCtx, api.WithID(r.config.EventbusID.Uint64())).ListLog(timeoutCtx) + eb, err := r.config.Client.Eventbus(timeoutCtx, api.WithID(r.config.EventbusID.Uint64())) + if err != nil { + return err + } + logs, err := eb.ListLog(timeoutCtx) if err != nil { log.Warning(ctx, "eventbus lookup Readable eventlog error", map[string]interface{}{ log.KeyEventbusID: r.config.EventbusID, @@ -156,7 +159,11 @@ type eventlogReader struct { } func (elReader *eventlogReader) run(ctx context.Context) { - r := elReader.config.Client.Eventbus(ctx, api.WithID(elReader.config.EventbusID.Uint64())).Reader( + eb, err := elReader.config.Client.Eventbus(ctx, api.WithID(elReader.config.EventbusID.Uint64())) + if err != nil { + return + } + r := eb.Reader( option.WithReadPolicy(elReader.policy), option.WithBatchSize(elReader.config.BatchSize)) log.Info(ctx, "eventlog reader init success", map[string]interface{}{ log.KeyEventbusID: elReader.config.EventbusID, diff --git a/internal/trigger/reader/reader_test.go b/internal/trigger/reader/reader_test.go index ac1a9b744..ca5fcb48b 100644 --- a/internal/trigger/reader/reader_test.go +++ b/internal/trigger/reader/reader_test.go @@ -26,7 +26,6 @@ import ( "github.com/google/uuid" . "github.com/smartystreets/goconvey/convey" - "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventlog" "github.com/vanus-labs/vanus/internal/primitive/vanus" @@ -39,12 +38,12 @@ import ( func TestReaderStart(t *testing.T) { mockCtrl := NewController(t) defer mockCtrl.Finish() - mockClient := client.NewMockClient(mockCtrl) + mockClient := api.NewMockClient(mockCtrl) mockEventbus := api.NewMockEventbus(mockCtrl) mockEventlog := api.NewMockEventlog(mockCtrl) mockBusWriter := api.NewMockBusWriter(mockCtrl) mockBusReader := api.NewMockBusReader(mockCtrl) - mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader(Any(), Any()).AnyTimes().Return(mockBusReader) mockEventbus.EXPECT().GetLog(Any(), Any()).AnyTimes().Return(mockEventlog, nil) diff --git a/internal/trigger/trigger/trigger.go b/internal/trigger/trigger/trigger.go index 0be3b5a19..c35ef2dd9 100644 --- a/internal/trigger/trigger/trigger.go +++ b/internal/trigger/trigger/trigger.go @@ -73,7 +73,7 @@ type trigger struct { sendCh chan *toSendEvent batchSendCh chan []*toSendEvent eventCli client.EventClient - client eb.Client + client api.Client filter filter.Filter transformer *transform.Transformer rateLimiter ratelimit.Limiter @@ -556,9 +556,17 @@ func (t *trigger) Init(ctx context.Context) error { t.eventCli = newEventClient(t.subscription.Sink, t.subscription.Protocol, t.subscription.SinkCredential) t.client = eb.Connect(t.config.Controllers) - t.timerEventWriter = t.client.Eventbus(ctx, api.WithID(t.subscription.TimerEventbusID.Uint64())).Writer() + eb, err := t.client.Eventbus(ctx, api.WithID(t.subscription.TimerEventbusID.Uint64())) + if err != nil { + return err + } + t.timerEventWriter = eb.Writer() if !t.config.DisableDeadLetter { - t.dlEventWriter = t.client.Eventbus(ctx, api.WithID(t.subscription.DeadLetterEventbusID.Uint64())).Writer() + eb, err = t.client.Eventbus(ctx, api.WithID(t.subscription.DeadLetterEventbusID.Uint64())) + if err != nil { + return err + } + t.dlEventWriter = eb.Writer() } t.eventCh = make(chan info.EventRecord, t.config.BufferSize) t.sendCh = make(chan *toSendEvent, t.config.BufferSize) diff --git a/internal/trigger/trigger/trigger_test.go b/internal/trigger/trigger/trigger_test.go index 13de9155d..9ac190cd6 100644 --- a/internal/trigger/trigger/trigger_test.go +++ b/internal/trigger/trigger/trigger_test.go @@ -29,7 +29,6 @@ import ( "github.com/google/uuid" . "github.com/smartystreets/goconvey/convey" - eb "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/proto/pkg/cloudevents" @@ -76,11 +75,11 @@ func TestTriggerStartStop(t *testing.T) { r := reader.NewMockReader(ctrl) r2 := reader.NewMockReader(ctrl) ctx := context.Background() - mockClient := eb.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) mockEventbus := api.NewMockEventbus(ctrl) mockBusWriter := api.NewMockBusWriter(ctrl) mockBusReader := api.NewMockBusReader(ctrl) - mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tg.client = mockClient @@ -108,11 +107,11 @@ func TestTriggerWriteFailEvent(t *testing.T) { ctx := context.Background() id := vanus.NewTestID() tg := NewTrigger(makeSubscription(id), WithControllers([]string{"test"})).(*trigger) - mockClient := eb.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) mockEventbus := api.NewMockEventbus(ctrl) mockBusWriter := api.NewMockBusWriter(ctrl) mockBusReader := api.NewMockBusReader(ctrl) - mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tg.client = mockClient @@ -163,11 +162,11 @@ func TestTriggerRunEventSend(t *testing.T) { ctx := context.Background() id := vanus.NewTestID() tg := NewTrigger(makeSubscription(id), WithControllers([]string{"test"})).(*trigger) - mockClient := eb.NewMockClient(ctrl) + mockClient := api.NewMockClient(ctrl) mockEventbus := api.NewMockEventbus(ctrl) mockBusWriter := api.NewMockBusWriter(ctrl) mockBusReader := api.NewMockBusReader(ctrl) - mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus) + mockClient.EXPECT().Eventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(mockEventbus, nil) mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter) mockEventbus.EXPECT().Reader().AnyTimes().Return(mockBusReader) tg.client = mockClient From 1ce9fc24e6f66690b74f267cb97cadfc8fb4e6ad Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Mon, 27 Mar 2023 15:27:19 +0800 Subject: [PATCH 2/3] fix review comments Signed-off-by: jyjiangkai --- client/client.go | 7 ------- client/examples/eventbus/append/main.go | 2 +- client/pkg/eventbus/eventbus.go | 6 ++++++ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index fbe71f3b0..b7dbe9f26 100644 --- a/client/client.go +++ b/client/client.go @@ -61,8 +61,6 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) (api. } bus := func() api.Eventbus { - c.mu.RLock() - defer c.mu.RUnlock() if value, ok := c.cache.Load(defaultOpts.ID); ok { return value.(api.Eventbus) } else { @@ -89,18 +87,13 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) (api. } func (c *client) Disconnect(ctx context.Context) { - c.mu.Lock() - defer c.mu.Unlock() c.cache.Range(func(key, value interface{}) bool { value.(api.Eventbus).Close(ctx) - c.cache.Delete(key) return true }) } func (c *client) close(id uint64) { - c.mu.Lock() - defer c.mu.Unlock() c.cache.Delete(id) } diff --git a/client/examples/eventbus/append/main.go b/client/examples/eventbus/append/main.go index bb250dda5..db4022039 100644 --- a/client/examples/eventbus/append/main.go +++ b/client/examples/eventbus/append/main.go @@ -41,7 +41,7 @@ func main() { if err != nil { panic("invalid id") } - bus, err := c.Eventbus(ctx, api.WithName("quick-start"), api.WithID(eventbusID.Uint64())) + bus, err := c.Eventbus(ctx, api.WithID(eventbusID.Uint64())) if err != nil { panic(err.Error()) } diff --git a/client/pkg/eventbus/eventbus.go b/client/pkg/eventbus/eventbus.go index 52a23455c..798a89b77 100644 --- a/client/pkg/eventbus/eventbus.go +++ b/client/pkg/eventbus/eventbus.go @@ -512,6 +512,9 @@ func (w *busWriter) pickWritableLog(ctx context.Context, opts *api.WriteOptions) if err != nil { return nil, err } + if lw == nil { + return nil, errors.ErrResourceCanNotOp.WithMessage("can not pick writable log") + } return lw.Writer(), nil } @@ -588,6 +591,9 @@ func (r *busReader) pickReadableLog(ctx context.Context, opts *api.ReadOptions) if err != nil { return nil, err } + if lr == nil { + return nil, errors.ErrResourceCanNotOp.WithMessage("can not pick readable log") + } return lr.Reader(eventlog.ReaderConfig{PollingTimeout: opts.PollingTimeout}), nil } From fe0913815e479609f78f773bf549afb1c66d642e Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Mon, 27 Mar 2023 17:43:09 +0800 Subject: [PATCH 3/3] add eventbus close ref account Signed-off-by: jyjiangkai --- client/client.go | 2 + client/pkg/eventbus/eventbus.go | 87 ++++++++++++++++++--------------- client/pkg/eventbus/lookup.go | 4 +- 3 files changed, 52 insertions(+), 41 deletions(-) diff --git a/client/client.go b/client/client.go index b7dbe9f26..877d0744d 100644 --- a/client/client.go +++ b/client/client.go @@ -62,6 +62,7 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) (api. bus := func() api.Eventbus { if value, ok := c.cache.Load(defaultOpts.ID); ok { + value.(*eventbus.Eventbus).Acquire() return value.(api.Eventbus) } else { return nil @@ -79,6 +80,7 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) (api. ID: defaultOpts.ID, } newEventbus := eventbus.NewEventbus(cfg, c.close) + newEventbus.Acquire() c.cache.Store(defaultOpts.ID, newEventbus) return newEventbus, nil } diff --git a/client/pkg/eventbus/eventbus.go b/client/pkg/eventbus/eventbus.go index 798a89b77..5ba1f1847 100644 --- a/client/pkg/eventbus/eventbus.go +++ b/client/pkg/eventbus/eventbus.go @@ -38,10 +38,11 @@ import ( "github.com/vanus-labs/vanus/client/pkg/api" "github.com/vanus-labs/vanus/client/pkg/eventlog" "github.com/vanus-labs/vanus/client/pkg/policy" + "github.com/vanus-labs/vanus/client/pkg/primitive" ) -func NewEventbus(cfg *eb.Config, close api.CloseFunc) *eventbus { - bus := &eventbus{ +func NewEventbus(cfg *eb.Config, close api.CloseFunc) *Eventbus { + bus := &Eventbus{ cfg: cfg, close: close, nameService: eb.NewNameService(cfg.Endpoints), @@ -53,6 +54,7 @@ func NewEventbus(cfg *eb.Config, close api.CloseFunc) *eventbus { readableMu: sync.RWMutex{}, writableState: nil, readableState: nil, + RefCount: primitive.RefCount{}, tracer: tracing.NewTracer("pkg.eventbus.impl", trace.SpanKindClient), } @@ -106,7 +108,7 @@ func NewEventbus(cfg *eb.Config, close api.CloseFunc) *eventbus { return bus } -type eventbus struct { +type Eventbus struct { cfg *eb.Config close api.CloseFunc nameService *eb.NameService @@ -124,20 +126,21 @@ type eventbus struct { readableMu sync.RWMutex readableState error + primitive.RefCount tracer *tracing.Tracer } // make sure eventbus implements api.Eventbus. -var _ api.Eventbus = (*eventbus)(nil) +var _ api.Eventbus = (*Eventbus)(nil) -func (b *eventbus) defaultWriteOptions() *api.WriteOptions { +func (b *Eventbus) defaultWriteOptions() *api.WriteOptions { return &api.WriteOptions{ Oneway: false, Policy: policy.NewRoundRobinWritePolicy(b), } } -func (b *eventbus) defaultReadOptions() *api.ReadOptions { +func (b *Eventbus) defaultReadOptions() *api.ReadOptions { return &api.ReadOptions{ BatchSize: 1, PollingTimeout: api.DefaultPollingTimeout, @@ -145,7 +148,7 @@ func (b *eventbus) defaultReadOptions() *api.ReadOptions { } } -func (b *eventbus) Writer(opts ...api.WriteOption) api.BusWriter { +func (b *Eventbus) Writer(opts ...api.WriteOption) api.BusWriter { writeOpts := b.defaultWriteOptions() for _, opt := range opts { opt(writeOpts) @@ -159,7 +162,7 @@ func (b *eventbus) Writer(opts ...api.WriteOption) api.BusWriter { return w } -func (b *eventbus) Reader(opts ...api.ReadOption) api.BusReader { +func (b *Eventbus) Reader(opts ...api.ReadOption) api.BusReader { readOpts := b.defaultReadOptions() for _, opt := range opts { opt(readOpts) @@ -173,7 +176,7 @@ func (b *eventbus) Reader(opts ...api.ReadOption) api.BusReader { return r } -func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOption) (api.Eventlog, error) { +func (b *Eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOption) (api.Eventlog, error) { _, span := b.tracer.Start(ctx, "pkg.eventbus.getlog") defer span.End() op := &api.LogOptions{ @@ -204,7 +207,7 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti } } -func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Eventlog, error) { +func (b *Eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Eventlog, error) { _, span := b.tracer.Start(ctx, "pkg.eventbus.listlog") defer span.End() op := &api.LogOptions{ @@ -237,37 +240,43 @@ func (b *eventbus) ListLog(ctx context.Context, opts ...api.LogOption) ([]api.Ev } } -func (b *eventbus) ID() uint64 { +func (b *Eventbus) ID() uint64 { return b.cfg.ID } -func (b *eventbus) Close(ctx context.Context) { - b.closeOnce.Do(func() { - b.writableWatcher.Close() - b.readableWatcher.Close() - for _, w := range b.writableLogs { - w.Close(ctx) - } - for _, r := range b.readableLogs { - r.Close(ctx) - } - b.close(b.cfg.ID) - }) +func (b *Eventbus) Close(ctx context.Context) { + if b.Release() { + func() { + if b.UseCount() == 0 { // double check + b.closeOnce.Do(func() { + b.writableWatcher.Close() + b.readableWatcher.Close() + for _, w := range b.writableLogs { + w.Close(ctx) + } + for _, r := range b.readableLogs { + r.Close(ctx) + } + b.close(b.cfg.ID) + }) + } + }() + } } -func (b *eventbus) getWritableState() error { +func (b *Eventbus) getWritableState() error { b.writableMu.RLock() defer b.writableMu.RUnlock() return b.writableState } -func (b *eventbus) setWritableState(err error) { +func (b *Eventbus) setWritableState(err error) { b.writableMu.Lock() defer b.writableMu.Unlock() b.writableState = err } -func (b *eventbus) isNeedUpdateWritableLogs(err error) bool { +func (b *Eventbus) isNeedUpdateWritableLogs(err error) bool { if err == nil { b.setWritableState(nil) return true @@ -279,7 +288,7 @@ func (b *eventbus) isNeedUpdateWritableLogs(err error) bool { return false } -func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResult) { +func (b *Eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResult) { _, span := b.tracer.Start(ctx, "updateWritableLogs") defer span.End() @@ -318,14 +327,14 @@ func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResul b.setWritableLogs(s, lws) } -func (b *eventbus) setWritableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventlog) { +func (b *Eventbus) setWritableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventlog) { b.writableMu.Lock() defer b.writableMu.Unlock() b.writableLogSet = s b.writableLogs = lws } -func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) (eventlog.Eventlog, error) { +func (b *Eventbus) getWritableLog(ctx context.Context, logID uint64) (eventlog.Eventlog, error) { b.writableMu.RLock() defer b.writableMu.RUnlock() @@ -344,26 +353,26 @@ func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) (eventlog.E return b.writableLogs[logID], nil } -func (b *eventbus) refreshWritableLogs(ctx context.Context) { +func (b *Eventbus) refreshWritableLogs(ctx context.Context) { _ctx, span := b.tracer.Start(ctx, "refreshWritableLogs") defer span.End() _ = b.writableWatcher.Refresh(_ctx) } -func (b *eventbus) getReadableState() error { +func (b *Eventbus) getReadableState() error { b.readableMu.RLock() defer b.readableMu.RUnlock() return b.readableState } -func (b *eventbus) setReadableState(err error) { +func (b *Eventbus) setReadableState(err error) { b.readableMu.Lock() defer b.readableMu.Unlock() b.readableState = err } -func (b *eventbus) isNeedUpdateReadableLogs(err error) bool { +func (b *Eventbus) isNeedUpdateReadableLogs(err error) bool { if err == nil { b.setReadableState(nil) return true @@ -375,7 +384,7 @@ func (b *eventbus) isNeedUpdateReadableLogs(err error) bool { return false } -func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResult) { +func (b *Eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResult) { _, span := b.tracer.Start(ctx, "updateReadableLogs") defer span.End() @@ -414,14 +423,14 @@ func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResul b.setReadableLogs(s, lws) } -func (b *eventbus) setReadableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventlog) { +func (b *Eventbus) setReadableLogs(s *u64set.Set, lws map[uint64]eventlog.Eventlog) { b.readableMu.Lock() defer b.readableMu.Unlock() b.readableLogSet = s b.readableLogs = lws } -func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) (eventlog.Eventlog, error) { +func (b *Eventbus) getReadableLog(ctx context.Context, logID uint64) (eventlog.Eventlog, error) { b.readableMu.RLock() defer b.readableMu.RUnlock() @@ -440,7 +449,7 @@ func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) (eventlog.E return b.readableLogs[logID], nil } -func (b *eventbus) refreshReadableLogs(ctx context.Context) { +func (b *Eventbus) refreshReadableLogs(ctx context.Context) { _ctx, span := b.tracer.Start(ctx, "refreshReadableLogs") defer span.End() @@ -448,7 +457,7 @@ func (b *eventbus) refreshReadableLogs(ctx context.Context) { } type busWriter struct { - ebus *eventbus + ebus *Eventbus opts *api.WriteOptions tracer *tracing.Tracer } @@ -528,7 +537,7 @@ func genEventID(logID uint64, off int64) string { } type busReader struct { - ebus *eventbus + ebus *Eventbus opts *api.ReadOptions tracer *tracing.Tracer } diff --git a/client/pkg/eventbus/lookup.go b/client/pkg/eventbus/lookup.go index f5ea0397a..38cc6725c 100644 --- a/client/pkg/eventbus/lookup.go +++ b/client/pkg/eventbus/lookup.go @@ -45,7 +45,7 @@ func (w *WritableLogsWatcher) Start() { go w.Watcher.Run() } -func WatchWritableLogs(bus *eventbus) *WritableLogsWatcher { +func WatchWritableLogs(bus *Eventbus) *WritableLogsWatcher { ch := make(chan *WritableLogsResult, 1) w := primitive.NewWatcher(30*time.Second, func() { rs, err := bus.nameService.LookupWritableLogs(context.Background(), bus.cfg.ID) @@ -87,7 +87,7 @@ func (w *ReadableLogsWatcher) Start() { go w.Watcher.Run() } -func WatchReadableLogs(bus *eventbus) *ReadableLogsWatcher { +func WatchReadableLogs(bus *Eventbus) *ReadableLogsWatcher { ch := make(chan *ReadableLogsResult, 1) w := primitive.NewWatcher(30*time.Second, func() { rs, err := bus.nameService.LookupReadableLogs(context.Background(), bus.cfg.ID)