From 5601dfb28d905d0802f2e450cad6a2cc80029b6b Mon Sep 17 00:00:00 2001 From: Moh Osman <59479562+moh-osman3@users.noreply.github.com> Date: Thu, 19 Sep 2024 01:20:11 -0700 Subject: [PATCH] (otelarrowexporter): Create exporter per unique value of configured metadataKeys (#34827) **Description:** This PR forks https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/34235 and adds unit tests. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34178 --------- Co-authored-by: kristina.pathak Co-authored-by: Joshua MacDonald --- .chloggen/arrow_exporter_by_metadata.yaml | 27 +++ exporter/otelarrowexporter/README.md | 11 + exporter/otelarrowexporter/config.go | 36 ++++ exporter/otelarrowexporter/factory.go | 39 ++-- exporter/otelarrowexporter/factory_test.go | 3 +- exporter/otelarrowexporter/metadata.go | 203 ++++++++++++++++++ exporter/otelarrowexporter/metadata_test.go | 209 +++++++++++++++++++ exporter/otelarrowexporter/otelarrow.go | 48 +++-- exporter/otelarrowexporter/otelarrow_test.go | 14 +- 9 files changed, 547 insertions(+), 43 deletions(-) create mode 100644 .chloggen/arrow_exporter_by_metadata.yaml create mode 100644 exporter/otelarrowexporter/metadata.go create mode 100644 exporter/otelarrowexporter/metadata_test.go diff --git a/.chloggen/arrow_exporter_by_metadata.yaml b/.chloggen/arrow_exporter_by_metadata.yaml new file mode 100644 index 000000000000..1411e5a13ab6 --- /dev/null +++ b/.chloggen/arrow_exporter_by_metadata.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow separate arrow exporter per unique value of configured metadataKeys. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34178] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/exporter/otelarrowexporter/README.md b/exporter/otelarrowexporter/README.md index 4b3bcb0e4fd7..3538bbc4fe73 100644 --- a/exporter/otelarrowexporter/README.md +++ b/exporter/otelarrowexporter/README.md @@ -115,6 +115,17 @@ streams. - `prioritizer` (default: "leastloaded"): policy for distributing load across multiple streams. +### Matching Metadata Per Stream + +The following configuration values allow for separate streams per unique +metadata combinations: +- `metadata_keys` (default = empty): When set, this exporter will create one + arrow exporter instance per distinct combination of values in the + client.Metadata. +- `metadata_cardinality_limit` (default = 1000): When metadata_keys is not empty, + this setting limits the number of unique combinations of metadata key values + that will be processed over the lifetime of the exporter. + ### Network Configuration This component uses `round_robin` by default as the gRPC load diff --git a/exporter/otelarrowexporter/config.go b/exporter/otelarrowexporter/config.go index c512ee30119d..9786e203a3c5 100644 --- a/exporter/otelarrowexporter/config.go +++ b/exporter/otelarrowexporter/config.go @@ -5,6 +5,7 @@ package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-col import ( "fmt" + "strings" "time" "github.com/open-telemetry/otel-arrow/pkg/config" @@ -45,6 +46,23 @@ type Config struct { // exporter is built and configured via code instead of yaml. // Uses include custom dialer, custom user-agent, etc. UserDialOptions []grpc.DialOption `mapstructure:"-"` + + // MetadataKeys is a list of client.Metadata keys that will be + // used to form distinct exporters. If this setting is empty, + // a single exporter instance will be used. When this setting + // is not empty, one exporter will be used per distinct + // combination of values for the listed metadata keys. + // + // Empty value and unset metadata are treated as distinct cases. + // + // Entries are case-insensitive. Duplicated entries will + // trigger a validation error. + MetadataKeys []string `mapstructure:"metadata_keys"` + + // MetadataCardinalityLimit indicates the maximum number of + // exporter instances that will be created through a distinct + // combination of MetadataKeys. + MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` } // ArrowConfig includes whether Arrow is enabled and the number of @@ -90,6 +108,24 @@ var _ component.Config = (*Config)(nil) var _ component.ConfigValidator = (*ArrowConfig)(nil) +func (cfg *Config) Validate() error { + err := cfg.Arrow.Validate() + if err != nil { + return err + } + + uniq := map[string]bool{} + for _, k := range cfg.MetadataKeys { + l := strings.ToLower(k) + if _, has := uniq[l]; has { + return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l) + } + uniq[l] = true + } + + return nil +} + // Validate returns an error when the number of streams is less than 1. func (cfg *ArrowConfig) Validate() error { if cfg.NumStreams < 1 { diff --git a/exporter/otelarrowexporter/factory.go b/exporter/otelarrowexporter/factory.go index 0d3a078388e3..a868fa286d65 100644 --- a/exporter/otelarrowexporter/factory.go +++ b/exporter/otelarrowexporter/factory.go @@ -72,15 +72,16 @@ func createDefaultConfig() component.Config { } } -func (exp *baseExporter) helperOptions() []exporterhelper.Option { +func helperOptions(e exp) []exporterhelper.Option { + cfg := e.getConfig().(*Config) return []exporterhelper.Option{ exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - exporterhelper.WithTimeout(exp.config.TimeoutSettings), - exporterhelper.WithRetry(exp.config.RetryConfig), - exporterhelper.WithQueue(exp.config.QueueSettings), - exporterhelper.WithStart(exp.start), - exporterhelper.WithBatcher(exp.config.BatcherConfig), - exporterhelper.WithShutdown(exp.shutdown), + exporterhelper.WithTimeout(cfg.TimeoutSettings), + exporterhelper.WithRetry(cfg.RetryConfig), + exporterhelper.WithQueue(cfg.QueueSettings), + exporterhelper.WithStart(e.start), + exporterhelper.WithBatcher(cfg.BatcherConfig), + exporterhelper.WithShutdown(e.shutdown), } } @@ -103,13 +104,13 @@ func createTracesExporter( set exporter.Settings, cfg component.Config, ) (exporter.Traces, error) { - exp, err := newExporter(cfg, set, createArrowTracesStream) + e, err := newMetadataExporter(cfg, set, createArrowTracesStream) if err != nil { return nil, err } - return exporterhelper.NewTracesExporter(ctx, exp.settings, exp.config, - exp.pushTraces, - exp.helperOptions()..., + return exporterhelper.NewTracesExporter(ctx, e.getSettings(), e.getConfig(), + e.pushTraces, + helperOptions(e)..., ) } @@ -122,13 +123,13 @@ func createMetricsExporter( set exporter.Settings, cfg component.Config, ) (exporter.Metrics, error) { - exp, err := newExporter(cfg, set, createArrowMetricsStream) + e, err := newMetadataExporter(cfg, set, createArrowMetricsStream) if err != nil { return nil, err } - return exporterhelper.NewMetricsExporter(ctx, exp.settings, exp.config, - exp.pushMetrics, - exp.helperOptions()..., + return exporterhelper.NewMetricsExporter(ctx, e.getSettings(), e.getConfig(), + e.pushMetrics, + helperOptions(e)..., ) } @@ -141,12 +142,12 @@ func createLogsExporter( set exporter.Settings, cfg component.Config, ) (exporter.Logs, error) { - exp, err := newExporter(cfg, set, createArrowLogsStream) + e, err := newMetadataExporter(cfg, set, createArrowLogsStream) if err != nil { return nil, err } - return exporterhelper.NewLogsExporter(ctx, exp.settings, exp.config, - exp.pushLogs, - exp.helperOptions()..., + return exporterhelper.NewLogsExporter(ctx, e.getSettings(), e.getConfig(), + e.pushLogs, + helperOptions(e)..., ) } diff --git a/exporter/otelarrowexporter/factory_test.go b/exporter/otelarrowexporter/factory_test.go index 0ca198b1ddbe..7498a0403dd8 100644 --- a/exporter/otelarrowexporter/factory_test.go +++ b/exporter/otelarrowexporter/factory_test.go @@ -31,8 +31,7 @@ func TestCreateDefaultConfig(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) - ocfg, ok := factory.CreateDefaultConfig().(*Config) - assert.True(t, ok) + ocfg := factory.CreateDefaultConfig().(*Config) assert.Equal(t, ocfg.RetryConfig, configretry.NewDefaultBackOffConfig()) assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueConfig()) assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutConfig()) diff --git a/exporter/otelarrowexporter/metadata.go b/exporter/otelarrowexporter/metadata.go new file mode 100644 index 000000000000..1d24bfc6cb81 --- /dev/null +++ b/exporter/otelarrowexporter/metadata.go @@ -0,0 +1,203 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter" + +import ( + "context" + "errors" + "fmt" + "runtime" + "sort" + "strings" + "sync" + + arrowPkg "github.com/apache/arrow/go/v16/arrow" + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/multierr" + "google.golang.org/grpc/metadata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" +) + +var ( + // errTooManyExporters is returned when the MetadataCardinalityLimit has been reached. + errTooManyExporters = consumererror.NewPermanent(errors.New("too many exporter metadata-value combinations")) +) + +type metadataExporter struct { + config *Config + settings exporter.Settings + scf streamClientFactory + host component.Host + + metadataKeys []string + exporters sync.Map + netReporter *netstats.NetworkReporter + + userAgent string + + // Guards the size and the storing logic to ensure no more than limit items are stored. + // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. + lock sync.Mutex + size int +} + +var _ exp = (*metadataExporter)(nil) + +func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) { + oCfg := cfg.(*Config) + netReporter, err := netstats.NewExporterNetworkReporter(set) + if err != nil { + return nil, err + } + userAgent := fmt.Sprintf("%s/%s (%s/%s)", + set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) + + if !oCfg.Arrow.Disabled { + // Ignoring an error because Validate() was called. + _ = zstd.SetEncoderConfig(oCfg.Arrow.Zstd) + + userAgent += fmt.Sprintf(" ApacheArrow/%s (NumStreams/%d)", arrowPkg.PkgVersion, oCfg.Arrow.NumStreams) + } + // use lower-case, to be consistent with http/2 headers. + mks := make([]string, len(oCfg.MetadataKeys)) + for i, k := range oCfg.MetadataKeys { + mks[i] = strings.ToLower(k) + } + sort.Strings(mks) + if len(mks) == 0 { + return newExporter(cfg, set, streamClientFactory, userAgent, netReporter) + } + return &metadataExporter{ + config: oCfg, + settings: set, + scf: streamClientFactory, + metadataKeys: mks, + userAgent: userAgent, + netReporter: netReporter, + }, nil +} + +func (e *metadataExporter) getSettings() exporter.Settings { + return e.settings +} + +func (e *metadataExporter) getConfig() component.Config { + return e.config +} + +func (e *metadataExporter) start(_ context.Context, host component.Host) (err error) { + e.host = host + return nil +} + +func (e *metadataExporter) shutdown(ctx context.Context) error { + var err error + e.exporters.Range(func(_ any, value any) bool { + be := value.(exp) + err = multierr.Append(err, be.shutdown(ctx)) + return true + }) + return err +} + +func (e *metadataExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { + s, mdata := e.getAttrSet(ctx, e.metadataKeys) + + be, err := e.getOrCreateExporter(ctx, s, mdata) + if err != nil { + return err + } + return be.pushTraces(ctx, td) +} + +func (e *metadataExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { + s, mdata := e.getAttrSet(ctx, e.metadataKeys) + + be, err := e.getOrCreateExporter(ctx, s, mdata) + if err != nil { + return err + } + + return be.pushMetrics(ctx, md) +} + +func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error { + s, mdata := e.getAttrSet(ctx, e.metadataKeys) + + be, err := e.getOrCreateExporter(ctx, s, mdata) + if err != nil { + return err + } + + return be.pushLogs(ctx, ld) +} + +func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set, md metadata.MD) (exp, error) { + e.lock.Lock() + defer e.lock.Unlock() + + if e.config.MetadataCardinalityLimit != 0 && e.size >= int(e.config.MetadataCardinalityLimit) { + return nil, errTooManyExporters + } + + v, ok := e.exporters.Load(s) + if ok { + return v.(exp), nil + } + + newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent, e.netReporter) + if err != nil { + return nil, fmt.Errorf("failed to create exporter: %w", err) + } + + var loaded bool + v, loaded = e.exporters.LoadOrStore(s, newExp) + if !loaded { + // set metadata keys for base exporter to add them to the outgoing context. + newExp.(*baseExporter).setMetadata(md) + + // Start the goroutine only if we added the object to the map, otherwise is already started. + err = newExp.start(ctx, e.host) + if err != nil { + e.exporters.Delete(s) + return nil, fmt.Errorf("failed to start exporter: %w", err) + } + + e.size++ + } + + return v.(exp), nil +} + +// getAttrSet is code taken from the core collector's batchprocessor multibatch logic. +// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.107.0/processor/batchprocessor/batch_processor.go#L298 +func (e *metadataExporter) getAttrSet(ctx context.Context, keys []string) (attribute.Set, metadata.MD) { + // Get each metadata key value, form the corresponding + // attribute set for use as a map lookup key. + info := client.FromContext(ctx) + md := map[string][]string{} + var attrs []attribute.KeyValue + for _, k := range keys { + // Lookup the value in the incoming metadata, copy it + // into the outgoing metadata, and create a unique + // value for the attributeSet. + vs := info.Metadata.Get(k) + md[k] = vs + if len(vs) == 1 { + attrs = append(attrs, attribute.String(k, vs[0])) + } else { + attrs = append(attrs, attribute.StringSlice(k, vs)) + } + } + return attribute.NewSet(attrs...), metadata.MD(md) +} diff --git a/exporter/otelarrowexporter/metadata_test.go b/exporter/otelarrowexporter/metadata_test.go new file mode 100644 index 000000000000..ce18b5f1dee2 --- /dev/null +++ b/exporter/otelarrowexporter/metadata_test.go @@ -0,0 +1,209 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package otelarrowexporter + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exportertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" +) + +func TestSendTracesWithMetadata(t *testing.T) { + // Start an OTel-Arrow receiver. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + rcv, err := otelArrowTracesReceiverOnGRPCServer(ln, false) + rcv.hasMetadata = true + rcv.spanCountByMetadata = make(map[string]int) + + rcv.start() + require.NoError(t, err, "Failed to start mock OTLP receiver") + // Also closes the connection. + defer rcv.srv.GracefulStop() + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := createDefaultConfig().(*Config) + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second + + cfg.MetadataCardinalityLimit = 10 + cfg.MetadataKeys = []string{"key1", "key2"} + set := exportertest.NewNopSettings() + set.BuildInfo.Description = "Collector" + set.BuildInfo.Version = "1.2.3test" + bg := context.Background() + exp, err := factory.CreateTracesExporter(bg, set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + + assert.NoError(t, exp.Start(context.Background(), host)) + + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + callCtxs := []context.Context{ + client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"first"}, + "key2": {"second"}, + }), + }), + client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"third"}, + "key2": {"fourth"}, + }), + }), + } + + expectByContext := make([]int, len(callCtxs)) + + requestCount := 3 + spansPerRequest := 33 + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(fmt.Sprintf("%d-%d", requestNum, spanIndex)) + } + + num := requestNum % len(callCtxs) + expectByContext[num] += spansPerRequest + go func(n int) { + assert.NoError(t, exp.ConsumeTraces(callCtxs[n], td)) + }(num) + } + + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() == int32(requestCount) + }, 1*time.Second, 5*time.Millisecond) + assert.Eventually(t, func() bool { + return rcv.totalItems.Load() == int32(requestCount*spansPerRequest) + }, 1*time.Second, 5*time.Millisecond) + assert.Eventually(t, func() bool { + rcv.mux.Lock() + defer rcv.mux.Unlock() + return len(callCtxs) == len(rcv.spanCountByMetadata) + }, 1*time.Second, 5*time.Millisecond) + + for idx, ctx := range callCtxs { + md := client.FromContext(ctx).Metadata + key := fmt.Sprintf("%s|%s", md.Get("key1"), md.Get("key2")) + require.Equal(t, expectByContext[idx], rcv.spanCountByMetadata[key]) + } +} + +func TestDuplicateMetadataKeys(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.MetadataKeys = []string{"myTOKEN", "mytoken"} + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "duplicate") + require.Contains(t, err.Error(), "mytoken") +} + +func TestMetadataExporterCardinalityLimit(t *testing.T) { + const cardLimit = 10 + // Start an OTel-Arrow receiver. + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + rcv, err := otelArrowTracesReceiverOnGRPCServer(ln, false) + rcv.hasMetadata = true + rcv.spanCountByMetadata = make(map[string]int) + + rcv.start() + require.NoError(t, err, "Failed to start mock OTLP receiver") + // Also closes the connection. + defer rcv.srv.GracefulStop() + + // Start an OTLP exporter and point to the receiver. + factory := NewFactory() + cfg := createDefaultConfig().(*Config) + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + cfg.Arrow.MaxStreamLifetime = 100 * time.Second + + // disable queue settings to allow for error backpropagation. + cfg.QueueSettings.Enabled = false + + cfg.MetadataCardinalityLimit = cardLimit + cfg.MetadataKeys = []string{"key1", "key2"} + set := exportertest.NewNopSettings() + bg := context.Background() + exp, err := factory.CreateTracesExporter(bg, set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + + assert.NoError(t, exp.Start(context.Background(), host)) + + // Ensure that initially there is no data in the receiver. + assert.EqualValues(t, 0, rcv.requestCount.Load()) + + for requestNum := 0; requestNum < cardLimit; requestNum++ { + td := testdata.GenerateTraces(1) + ctx := client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {fmt.Sprint(requestNum)}, + "key2": {fmt.Sprint(requestNum)}, + }), + }) + + assert.NoError(t, exp.ConsumeTraces(ctx, td)) + } + + td := testdata.GenerateTraces(1) + ctx := client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"limit_exceeded"}, + "key2": {"limit_exceeded"}, + }), + }) + + // above the metadata cardinality limit. + err = exp.ConsumeTraces(ctx, td) + require.Error(t, err) + assert.True(t, consumererror.IsPermanent(err)) + assert.Contains(t, err.Error(), "too many") + + assert.Eventually(t, func() bool { + return rcv.requestCount.Load() == int32(cardLimit) + }, 1*time.Second, 5*time.Millisecond) + assert.Eventually(t, func() bool { + return rcv.totalItems.Load() == int32(cardLimit) + }, 1*time.Second, 5*time.Millisecond) + + require.Len(t, rcv.spanCountByMetadata, cardLimit) +} diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index a4a94496717e..07ccb25ea710 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -6,11 +6,8 @@ package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-col import ( "context" "errors" - "fmt" - "runtime" "time" - arrowPkg "github.com/apache/arrow/go/v16/arrow" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" @@ -37,6 +34,18 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" ) +type exp interface { + getSettings() exporter.Settings + getConfig() component.Config + + start(context.Context, component.Host) error + shutdown(context.Context) error + + pushTraces(context.Context, ptrace.Traces) error + pushMetrics(context.Context, pmetric.Metrics) error + pushLogs(context.Context, plog.Logs) error +} + type baseExporter struct { // Input configuration. config *Config @@ -60,31 +69,19 @@ type baseExporter struct { streamClientFactory streamClientFactory } +var _ exp = (*baseExporter)(nil) + type streamClientFactory func(conn *grpc.ClientConn) arrow.StreamClientFunc // Crete new exporter and start it. The exporter will begin connecting but // this function may return before the connection is established. -func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (*baseExporter, error) { +func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory, userAgent string, netReporter *netstats.NetworkReporter) (exp, error) { oCfg := cfg.(*Config) if oCfg.Endpoint == "" { return nil, errors.New("OTLP exporter config requires an Endpoint") } - netReporter, err := netstats.NewExporterNetworkReporter(set) - if err != nil { - return nil, err - } - userAgent := fmt.Sprintf("%s/%s (%s/%s)", - set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) - - if !oCfg.Arrow.Disabled { - // Ignoring an error because Validate() was called. - _ = zstd.SetEncoderConfig(oCfg.Arrow.Zstd) - - userAgent += fmt.Sprintf(" ApacheArrow/%s (NumStreams/%d)", arrowPkg.PkgVersion, oCfg.Arrow.NumStreams) - } - return &baseExporter{ config: oCfg, settings: set, @@ -94,6 +91,18 @@ func newExporter(cfg component.Config, set exporter.Settings, streamClientFactor }, nil } +func (e *baseExporter) getSettings() exporter.Settings { + return e.settings +} + +func (e *baseExporter) getConfig() component.Config { + return e.config +} + +func (e *baseExporter) setMetadata(md metadata.MD) { + e.metadata = metadata.Join(e.metadata, md) +} + // start actually creates the gRPC connection. The client construction is deferred till this point as this // is the only place we get hold of Extensions which are required to construct auth round tripper. func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) { @@ -114,7 +123,8 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro for k, v := range e.config.ClientConfig.Headers { headers[k] = string(v) } - e.metadata = metadata.New(headers) + headerMetadata := metadata.New(headers) + e.metadata = metadata.Join(e.metadata, headerMetadata) e.callOptions = []grpc.CallOption{ grpc.WaitForReady(e.config.ClientConfig.WaitForReady), } diff --git a/exporter/otelarrowexporter/otelarrow_test.go b/exporter/otelarrowexporter/otelarrow_test.go index dfa73f7417cc..4c73c153f342 100644 --- a/exporter/otelarrowexporter/otelarrow_test.go +++ b/exporter/otelarrowexporter/otelarrow_test.go @@ -78,8 +78,10 @@ func (r *mockReceiver) setExportError(err error) { type mockTracesReceiver struct { ptraceotlp.UnimplementedGRPCServer mockReceiver - exportResponse func() ptraceotlp.ExportResponse - lastRequest ptrace.Traces + exportResponse func() ptraceotlp.ExportResponse + lastRequest ptrace.Traces + hasMetadata bool + spanCountByMetadata map[string]int } func (r *mockTracesReceiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { @@ -88,8 +90,14 @@ func (r *mockTracesReceiver) Export(ctx context.Context, req ptraceotlp.ExportRe r.totalItems.Add(int32(td.SpanCount())) r.mux.Lock() defer r.mux.Unlock() - r.lastRequest = td r.metadata, _ = metadata.FromIncomingContext(ctx) + if r.hasMetadata { + v1 := r.metadata.Get("key1") + v2 := r.metadata.Get("key2") + hashKey := fmt.Sprintf("%s|%s", v1, v2) + r.spanCountByMetadata[hashKey] += (td.SpanCount()) + } + r.lastRequest = td return r.exportResponse(), r.exportError }