Skip to content

Commit

Permalink
Implement spans exporting for ClickHouse storage in Jaeger V2
Browse files Browse the repository at this point in the history
Signed-off-by: haanhvu <[email protected]>
  • Loading branch information
haanhvu committed Jun 23, 2024
1 parent ca8cdae commit 4554632
Show file tree
Hide file tree
Showing 16 changed files with 646 additions and 28 deletions.
18 changes: 15 additions & 3 deletions cmd/jaeger/internal/exporters/storageexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type storageExporter struct {
config *Config
logger *zap.Logger
traceWriter spanstore.Writer
clickhouse bool
// Separate traces exporting function for ClickHouse storage.
// This is temporary until we have v2 storage API.
chExportTraces func(ctx context.Context, td ptrace.Traces) error
}

func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter {
Expand All @@ -29,13 +33,17 @@ func newExporter(config *Config, otel component.TelemetrySettings) *storageExpor
}

func (exp *storageExporter) start(_ context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host)
clickhouse, f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}

if exp.traceWriter, err = f.CreateTraceWriter(); err != nil {
return fmt.Errorf("cannot create trace writer: %w", err)
if clickhouse == true {
exp.chExportTraces = f.ChExportSpans
} else {
if exp.traceWriter, err = f.CreateTraceWriter(); err != nil {
return fmt.Errorf("cannot create trace writer: %w", err)
}
}

return nil
Expand All @@ -47,5 +55,9 @@ func (*storageExporter) close(_ context.Context) error {
}

func (exp *storageExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
if exp.clickhouse {
return exp.chExportTraces(ctx, td)
}

return exp.traceWriter.WriteTraces(ctx, td)
}
3 changes: 2 additions & 1 deletion cmd/jaeger/internal/exporters/storageexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func createTracesExporter(ctx context.Context, set exporter.Settings, config com
// Disable Timeout/RetryOnFailure and SendingQueue
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}),
// Enable queue settings for Clickhouse only
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: ex.clickhouse}),
exporterhelper.WithStart(ex.start),
exporterhelper.WithShutdown(ex.close),
)
Expand Down
7 changes: 2 additions & 5 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc"
)

Expand All @@ -22,16 +23,12 @@ type Config struct {
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandra.Options `mapstructure:"cassandra"`
ClickHouse map[string]clickhouse.Config `mapstructure:"clickhouse"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
}

type MemoryStorage struct {
Name string `mapstructure:"name"`
memoryCfg.Configuration
}

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
if reflect.DeepEqual(*cfg, *emptyCfg) {
Expand Down
51 changes: 36 additions & 15 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
Expand Down Expand Up @@ -64,13 +65,19 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error
return f, nil
}

func GetStorageFactoryV2(name string, host component.Host) (spanstore.Factory, error) {
func GetStorageFactoryV2(name string, host component.Host) (bool, spanstore.Factory, error) {
f, err := GetStorageFactory(name, host)
if err != nil {
return nil, err
return false, nil, err
}

return factoryadapter.NewFactory(f), nil
var clickhouse bool
switch f.(type) {
case *ch.Factory:
clickhouse = true
}

return clickhouse, factoryadapter.NewFactory(f), nil
}

func newStorageExt(config *Config, otel component.TelemetrySettings) *storageExt {
Expand All @@ -82,24 +89,31 @@ func newStorageExt(config *Config, otel component.TelemetrySettings) *storageExt
}

type starter[Config any, Factory storage.Factory] struct {
ext *storageExt
storageKind string
cfg map[string]Config
builder func(Config, metrics.Factory, *zap.Logger) (Factory, error)
ext *storageExt
storageKind string
cfg map[string]Config
builder func(Config, metrics.Factory, *zap.Logger) (Factory, error)
clickhouseBuilder func(context.Context, Config, *zap.Logger) Factory
}

func (s *starter[Config, Factory]) build(_ context.Context, _ component.Host) error {
func (s *starter[Config, Factory]) build(ctx context.Context, _ component.Host) error {
for name, cfg := range s.cfg {
if _, ok := s.ext.factories[name]; ok {
return fmt.Errorf("duplicate %s storage name %s", s.storageKind, name)
}
factory, err := s.builder(
cfg,
metrics.NullFactory,
s.ext.logger.With(zap.String("storage_name", name)),
)
if err != nil {
return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err)
var factory Factory
if s.clickhouseBuilder != nil {
factory = s.clickhouseBuilder(ctx, cfg, s.ext.logger.With(zap.String("storage_name", name)))
} else {
var err error
factory, err = s.builder(
cfg,
metrics.NullFactory,
s.ext.logger.With(zap.String("storage_name", name)),
)
if err != nil {
return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err)
}
}
s.ext.factories[name] = factory
}
Expand Down Expand Up @@ -150,6 +164,12 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Cassandra,
builder: cassandra.NewFactoryWithConfig,
}
clickhouseStarter := &starter[ch.Config, *ch.Factory]{
ext: s,
storageKind: "clickhouse",
cfg: s.config.ClickHouse,
clickhouseBuilder: ch.NewFactory,
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
Expand All @@ -158,6 +178,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
esStarter.build,
osStarter.build,
cassandraStarter.build,
clickhouseStarter.build,
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerstorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestStorageFactoryBadShutdownError(t *testing.T) {

func TestStorageFactoryV2Error(t *testing.T) {
host := componenttest.NewNopHost()
_, err := GetStorageFactoryV2("something", host)
_, _, err := GetStorageFactoryV2("something", host)
require.ErrorContains(t, err, "cannot find extension")
}

Expand All @@ -127,7 +127,7 @@ func TestStorageExtension(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, f)

f2, err := GetStorageFactoryV2(name, host)
_, f2, err := GetStorageFactoryV2(name, host)
require.NoError(t, err)
require.NotNil(t, f2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"io"

"go.opentelemetry.io/collector/pdata/ptrace"

storage_v1 "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)
Expand All @@ -26,6 +28,10 @@ func (*Factory) Initialize(_ context.Context) error {
panic("not implemented")
}

func (*Factory) ChExportSpans(_ context.Context, _ ptrace.Traces) error {
panic("not implemented")
}

// Close implements spanstore.Factory.
func (f *Factory) Close(_ context.Context) error {
if closer, ok := f.ss.(io.Closer); ok {
Expand Down
53 changes: 53 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
# health_check:
# pprof:
# endpoint: 0.0.0.0:1777
# zpages:
# endpoint: 0.0.0.0:55679

jaeger_query:
trace_storage: ch_store
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000
clickhouse:
ch_store:
endpoint: tcp://127.0.0.1:9000?dial_timeout=10s&compress=lz4
spans_table_name: jaeger_spans

receivers:
otlp:
protocols:
grpc:
endpoint: 127.0.0.1:4317
http:
endpoint: 127.0.0.1:4318

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

zipkin:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: ch_store
10 changes: 9 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,25 @@ require (
)

require (
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.15.0
github.com/IBM/sarama v1.43.2 // indirect
github.com/andybalholm/brotli v1.0.6 // indirect
github.com/aws/aws-sdk-go v1.53.11 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
Expand Down Expand Up @@ -156,6 +161,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.103.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.3 // indirect
github.com/paulmach/orb v0.10.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
Expand All @@ -170,8 +176,10 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v4 v4.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
Expand Down
Loading

0 comments on commit 4554632

Please sign in to comment.