diff --git a/internal/trigger/reader/reader.go b/internal/trigger/reader/reader.go index 06e2a36e5..a23a15e53 100644 --- a/internal/trigger/reader/reader.go +++ b/internal/trigger/reader/reader.go @@ -222,6 +222,7 @@ func (elReader *eventlogReader) loop(ctx context.Context, lr api.BusReader) erro if err = elReader.putEvent(ctx, eo); err != nil { span.RecordError(err, trace.WithTimestamp(time.Now())) span.SetStatus(otelcode.Error, "put event to el reader failed") + span.End() return err } elReader.offset = offset diff --git a/observability/tracing/exporter/event_trace_exporter.go b/observability/tracing/exporter/event_trace_exporter.go index f427d293e..0f5e01f74 100644 --- a/observability/tracing/exporter/event_trace_exporter.go +++ b/observability/tracing/exporter/event_trace_exporter.go @@ -30,11 +30,13 @@ type Option func(*Options) type Options struct { Endpoints string + Eventbus string } func defaultOptions() *Options { return &Options{ Endpoints: "127.0.0.1:8080", + Eventbus: "event-tracing", } } @@ -44,6 +46,12 @@ func WithEndpoint(endpoint string) Option { } } +func WithEventbus(eventbus string) Option { + return func(options *Options) { + options.Eventbus = eventbus + } +} + // Exporter exports trace data in the OTLP wire format. type Exporter struct { endpoints string @@ -69,7 +77,7 @@ func New(ctx context.Context, opts ...Option) (*Exporter, error) { panic("failed to connect to Vanus cluster, error: " + err.Error()) } - ebOpt := vanussdk.WithEventbus("default", "tracing") + ebOpt := vanussdk.WithEventbus("default", defaultOpts.Eventbus) exporter := &Exporter{ endpoints: defaultOpts.Endpoints, client: c, diff --git a/observability/tracing/tracing.go b/observability/tracing/tracing.go index 34e9da65f..317ada8e2 100644 --- a/observability/tracing/tracing.go +++ b/observability/tracing/tracing.go @@ -47,6 +47,7 @@ type Config struct { OtelCollector string `yaml:"otel_collector"` EventTracingEnable bool `yaml:"event_tracing_enable"` EventCollector string `yaml:"event_collector"` + Eventbus string `yaml:"eventbus"` } var tp *tracerProvider @@ -188,7 +189,9 @@ func newTracerProvider(serviceName string, cfg Config) (*trace.TracerProvider, e if cfg.EventTracingEnable && cfg.EventCollector != "" { // Set up a event exporter - eventExporter, err := exporter.New(ctx, exporter.WithEndpoint(cfg.EventCollector)) + endpoint := exporter.WithEndpoint(cfg.EventCollector) + eventbus := exporter.WithEventbus(cfg.Eventbus) + eventExporter, err := exporter.New(ctx, endpoint, eventbus) if err != nil { return nil, fmt.Errorf("failed to create event exporter: %w", err) }