Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka exporter and receiver configuration #5687

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
service:
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
exporters: [kafka]

receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

zipkin:

processors:
batch:

exporters:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto
32 changes: 32 additions & 0 deletions cmd/jaeger/ingester-remote-backend-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
service:
extensions: [jaeger_storage]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter]
telemetry:
metrics:
address: 0.0.0.0:8889

extensions:
jaeger_storage:
grpc:
external-storage:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use name that doesn't look like it's part of the config struct

Suggested change
external-storage:
some-external-storage:

server: localhost:17271
connection-timeout: 5s

receivers:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto
initial_offset: earliest

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: memstore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trace_storage: memstore
trace_storage: some-external-storage


32 changes: 32 additions & 0 deletions cmd/jaeger/ingester.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
service:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this file? Ingester with memory storage doesn't make a lot of sense, since nothing else can access that storage. And you also have jaeger_query enabled, which doesn't fit the meaning of ingester.

extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter]
telemetry:
metrics:
address: 0.0.0.0:8889

extensions:
jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000

receivers:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto
initial_offset: earliest

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: memstore
57 changes: 57 additions & 0 deletions cmd/jaeger/internal/integration/receivers/datareceiver/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package datareceivers

import (
"context"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
)

type kafkaDataReceiver struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for?

testbed.DataReceiverBase
receiver receiver.Traces
}

func NewKafkaDataReceiver(port int) testbed.DataReceiver {
return &kafkaDataReceiver{DataReceiverBase: testbed.DataReceiverBase{Port: port}}
}

func (dr *kafkaDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error {
factory := kafkareceiver.NewFactory()
cfg := factory.CreateDefaultConfig().(*kafkareceiver.Config)
cfg.Brokers = []string{fmt.Sprintf("localhost:%d", dr.Port)}
cfg.GroupID = "testbed_collector"

var err error
set := receivertest.NewNopSettings()
dr.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc)
if err != nil {
return err
}

return dr.receiver.Start(context.Background(), componenttest.NewNopHost())
}

func (dr *kafkaDataReceiver) Stop() error {
return dr.receiver.Shutdown(context.Background())
}

func (dr *kafkaDataReceiver) GenConfigYAMLStr() string {
return fmt.Sprintf(`
kafka:
brokers:
- localhost:%d
encoding: otlp_proto`, dr.Port)
}

func (*kafkaDataReceiver) ProtocolName() string {
return "kafka"
}
27 changes: 26 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,30 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is gomod changing?

github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/expr-lang/expr v1.16.9 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/leodido/go-syslog/v4 v4.1.0 // indirect
github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.103.0 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/valyala/fastjson v1.6.4 // indirect
)

require (
github.com/IBM/sarama v1.43.2 // indirect
github.com/aws/aws-sdk-go v1.53.11 // indirect
Expand Down Expand Up @@ -154,6 +178,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.103.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.103.0
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.3 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
Expand All @@ -166,7 +191,7 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/relvacode/iso8601 v1.4.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/rs/cors v1.10.1 // indirect
github.com/rs/cors v1.11.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
Expand Down
Loading
Loading