diff --git a/cmd/jaeger/internal/exporters/storageexporter/exporter.go b/cmd/jaeger/internal/exporters/storageexporter/exporter.go index 2e29be4cc39..33e70df2208 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/exporter.go +++ b/cmd/jaeger/internal/exporters/storageexporter/exporter.go @@ -19,6 +19,10 @@ type storageExporter struct { config *Config logger *zap.Logger traceWriter spanstore.Writer + clickhouse bool + // Separate traces exporting function for ClickHouse storage. + // This is temporary until we have v2 storage API. + chExportTraces func(ctx context.Context, td ptrace.Traces) error } func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter { @@ -29,13 +33,17 @@ func newExporter(config *Config, otel component.TelemetrySettings) *storageExpor } func (exp *storageExporter) start(_ context.Context, host component.Host) error { - f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host) + clickhouse, f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host) if err != nil { return fmt.Errorf("cannot find storage factory: %w", err) } - if exp.traceWriter, err = f.CreateTraceWriter(); err != nil { - return fmt.Errorf("cannot create trace writer: %w", err) + if clickhouse == true { + exp.chExportTraces = f.ChExportSpans + } else { + if exp.traceWriter, err = f.CreateTraceWriter(); err != nil { + return fmt.Errorf("cannot create trace writer: %w", err) + } } return nil @@ -47,5 +55,9 @@ func (*storageExporter) close(_ context.Context) error { } func (exp *storageExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { + if exp.clickhouse { + return exp.chExportTraces(ctx, td) + } + return exp.traceWriter.WriteTraces(ctx, td) } diff --git a/cmd/jaeger/internal/exporters/storageexporter/factory.go b/cmd/jaeger/internal/exporters/storageexporter/factory.go index b7663170eda..9e0a0d9496c 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/factory.go +++ b/cmd/jaeger/internal/exporters/storageexporter/factory.go @@ -41,7 +41,8 @@ func createTracesExporter(ctx context.Context, set exporter.Settings, config com // Disable Timeout/RetryOnFailure and SendingQueue exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}), - exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}), + // Enable queue settings for Clickhouse only + exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: ex.clickhouse}), exporterhelper.WithStart(ex.start), exporterhelper.WithShutdown(ex.close), ) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index 88b43f9f56a..402c1f85363 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -11,6 +11,7 @@ import ( memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" + "github.com/jaegertracing/jaeger/plugin/storage/clickhouse" grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc" ) @@ -22,16 +23,12 @@ type Config struct { Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"` Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` Cassandra map[string]cassandra.Options `mapstructure:"cassandra"` + ClickHouse map[string]clickhouse.Config `mapstructure:"clickhouse"` // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? // Option: instead of looking for specific name, check interface. } -type MemoryStorage struct { - Name string `mapstructure:"name"` - memoryCfg.Configuration -} - func (cfg *Config) Validate() error { emptyCfg := createDefaultConfig().(*Config) if reflect.DeepEqual(*cfg, *emptyCfg) { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 9b5d11fcac1..767a7b58d18 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -19,6 +19,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" + ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse" "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/plugin/storage/grpc" "github.com/jaegertracing/jaeger/plugin/storage/memory" @@ -64,13 +65,19 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error return f, nil } -func GetStorageFactoryV2(name string, host component.Host) (spanstore.Factory, error) { +func GetStorageFactoryV2(name string, host component.Host) (bool, spanstore.Factory, error) { f, err := GetStorageFactory(name, host) if err != nil { - return nil, err + return false, nil, err } - return factoryadapter.NewFactory(f), nil + var clickhouse bool + switch f.(type) { + case *ch.Factory: + clickhouse = true + } + + return clickhouse, factoryadapter.NewFactory(f), nil } func newStorageExt(config *Config, otel component.TelemetrySettings) *storageExt { @@ -82,24 +89,31 @@ func newStorageExt(config *Config, otel component.TelemetrySettings) *storageExt } type starter[Config any, Factory storage.Factory] struct { - ext *storageExt - storageKind string - cfg map[string]Config - builder func(Config, metrics.Factory, *zap.Logger) (Factory, error) + ext *storageExt + storageKind string + cfg map[string]Config + builder func(Config, metrics.Factory, *zap.Logger) (Factory, error) + clickhouseBuilder func(context.Context, Config, *zap.Logger) Factory } -func (s *starter[Config, Factory]) build(_ context.Context, _ component.Host) error { +func (s *starter[Config, Factory]) build(ctx context.Context, _ component.Host) error { for name, cfg := range s.cfg { if _, ok := s.ext.factories[name]; ok { return fmt.Errorf("duplicate %s storage name %s", s.storageKind, name) } - factory, err := s.builder( - cfg, - metrics.NullFactory, - s.ext.logger.With(zap.String("storage_name", name)), - ) - if err != nil { - return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err) + var factory Factory + if s.clickhouseBuilder != nil { + factory = s.clickhouseBuilder(ctx, cfg, s.ext.logger.With(zap.String("storage_name", name))) + } else { + var err error + factory, err = s.builder( + cfg, + metrics.NullFactory, + s.ext.logger.With(zap.String("storage_name", name)), + ) + if err != nil { + return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err) + } } s.ext.factories[name] = factory } @@ -150,6 +164,12 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { cfg: s.config.Cassandra, builder: cassandra.NewFactoryWithConfig, } + clickhouseStarter := &starter[ch.Config, *ch.Factory]{ + ext: s, + storageKind: "clickhouse", + cfg: s.config.ClickHouse, + clickhouseBuilder: ch.NewFactory, + } builders := []func(ctx context.Context, host component.Host) error{ memStarter.build, @@ -158,6 +178,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { esStarter.build, osStarter.build, cassandraStarter.build, + clickhouseStarter.build, // TODO add support for other backends } for _, builder := range builders { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index 399358dbd7a..c19e3dead2d 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -116,7 +116,7 @@ func TestStorageFactoryBadShutdownError(t *testing.T) { func TestStorageFactoryV2Error(t *testing.T) { host := componenttest.NewNopHost() - _, err := GetStorageFactoryV2("something", host) + _, _, err := GetStorageFactoryV2("something", host) require.ErrorContains(t, err, "cannot find extension") } @@ -127,7 +127,7 @@ func TestStorageExtension(t *testing.T) { require.NoError(t, err) require.NotNil(t, f) - f2, err := GetStorageFactoryV2(name, host) + _, f2, err := GetStorageFactoryV2(name, host) require.NoError(t, err) require.NotNil(t, f2) } diff --git a/cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go b/cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go index 57b2cece3ca..d34b82480cb 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go @@ -7,6 +7,8 @@ import ( "context" "io" + "go.opentelemetry.io/collector/pdata/ptrace" + storage_v1 "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage_v2/spanstore" ) @@ -26,6 +28,10 @@ func (*Factory) Initialize(_ context.Context) error { panic("not implemented") } +func (*Factory) ChExportSpans(_ context.Context, _ ptrace.Traces) error { + panic("not implemented") +} + // Close implements spanstore.Factory. func (f *Factory) Close(_ context.Context) error { if closer, ok := f.ss.(io.Closer); ok { diff --git a/config.yaml b/config.yaml new file mode 100644 index 00000000000..6d362d280e3 --- /dev/null +++ b/config.yaml @@ -0,0 +1,53 @@ +service: + extensions: [jaeger_storage, jaeger_query] + pipelines: + traces: + receivers: [otlp, jaeger, zipkin] + processors: [batch] + exporters: [jaeger_storage_exporter] + +extensions: + # health_check: + # pprof: + # endpoint: 0.0.0.0:1777 + # zpages: + # endpoint: 0.0.0.0:55679 + + jaeger_query: + trace_storage: ch_store + ui_config: ./cmd/jaeger/config-ui.json + + jaeger_storage: + memory: + memstore: + max_traces: 100000 + memstore_archive: + max_traces: 100000 + clickhouse: + ch_store: + endpoint: tcp://127.0.0.1:9000?dial_timeout=10s&compress=lz4 + spans_table_name: jaeger_spans + +receivers: + otlp: + protocols: + grpc: + endpoint: 127.0.0.1:4317 + http: + endpoint: 127.0.0.1:4318 + + jaeger: + protocols: + grpc: + thrift_binary: + thrift_compact: + thrift_http: + + zipkin: + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: ch_store diff --git a/go.mod b/go.mod index d45b4b4711d..fb979624480 100644 --- a/go.mod +++ b/go.mod @@ -88,7 +88,10 @@ require ( ) require ( + github.com/ClickHouse/ch-go v0.58.2 // indirect + github.com/ClickHouse/clickhouse-go/v2 v2.15.0 github.com/IBM/sarama v1.43.2 // indirect + github.com/andybalholm/brotli v1.0.6 // indirect github.com/aws/aws-sdk-go v1.53.11 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -96,12 +99,14 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/eapache/go-resiliency v1.6.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.6.1 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect @@ -156,6 +161,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.103.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/openzipkin/zipkin-go v0.4.3 // indirect + github.com/paulmach/orb v0.10.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect @@ -170,8 +176,10 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/segmentio/asm v1.2.0 // indirect github.com/shirou/gopsutil/v4 v4.24.5 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/shopspring/decimal v1.3.1 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect diff --git a/go.sum b/go.sum index dd882121efa..b3ceaa84e57 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,10 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/ClickHouse/ch-go v0.58.2 h1:jSm2szHbT9MCAB1rJ3WuCJqmGLi5UTjlNu+f530UTS0= +github.com/ClickHouse/ch-go v0.58.2/go.mod h1:Ap/0bEmiLa14gYjCiRkYGbXvbe8vwdrfTYWhsuQ99aw= +github.com/ClickHouse/clickhouse-go/v2 v2.15.0 h1:G0hTKyO8fXXR1bGnZ0DY3vTG01xYfOGW76zgjg5tmC4= +github.com/ClickHouse/clickhouse-go/v2 v2.15.0/go.mod h1:kXt1SRq0PIRa6aKZD7TnFnY9PQKmc2b13sHtOYcK6cQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw= @@ -11,6 +15,8 @@ github.com/Shopify/sarama v1.33.0/go.mod h1:lYO7LwEBkE0iAeTl94UfPSrDaavFzSFlmn+5 github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= +github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= +github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/apache/thrift v0.20.0 h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI= github.com/apache/thrift v0.20.0/go.mod h1:hOk1BQqcp2OLzGsyVXdfMk7YFlMxK3aoEVhjD06QhB8= github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= @@ -49,8 +55,9 @@ github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWa github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= @@ -79,6 +86,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= +github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= @@ -116,8 +127,10 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -128,8 +141,10 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -197,6 +212,7 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= @@ -234,6 +250,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mLPqKctH7Uo//I= github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= @@ -286,6 +303,9 @@ github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7sjsSdg= github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c= +github.com/paulmach/orb v0.10.0 h1:guVYVqzxHE/CQ1KpfGO077TR0ATHSNjp4s6XGLn3W9s= +github.com/paulmach/orb v0.10.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= @@ -328,12 +348,16 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shirou/gopsutil/v4 v4.24.5 h1:gGsArG5K6vmsh5hcFOHaPm87UD003CaDMkAOweSQjhM= github.com/shirou/gopsutil/v4 v4.24.5/go.mod h1:aoebb2vxetJ/yIDZISmduFvVNPHqXQ9SEJwRXxkf0RA= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= @@ -369,6 +393,7 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tilinna/clock v1.1.0 h1:6IQQQCo6KoBxVudv6gwtY8o4eDfhHo8ojA5dP0MfhSs= github.com/tilinna/clock v1.1.0/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= @@ -388,11 +413,13 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/collector v0.103.0 h1:mssWo1y31p1F/SRsSBnVUX6YocgawCqM1blpE+hkWog= @@ -534,6 +561,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= @@ -585,6 +613,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= @@ -673,6 +702,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/plugin/storage/clickhouse/config.go b/plugin/storage/clickhouse/config.go new file mode 100644 index 00000000000..3ba74ac6945 --- /dev/null +++ b/plugin/storage/clickhouse/config.go @@ -0,0 +1,84 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/url" + + "github.com/ClickHouse/clickhouse-go/v2" +) + +type Config struct { + Endpoint string `mapstructure:"endpoint"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + Database string `mapstructure:"database"` + SpansTableName string `mapstructure:"spans_table_name"` + // Materialized views' names? +} + +var driverName = "clickhouse" + +const ( + defaultDatabase = "default" + defaultUsername = "default" + defaultPassword = "" +) + +func (cfg *Config) NewClient(ctx context.Context) (*sql.DB, error) { + if cfg.Endpoint == "" { + return nil, errors.New("no endpoints specified") + } + + dsnURL, err := url.Parse(cfg.Endpoint) + if err != nil { + return nil, err + } + + queryParams := dsnURL.Query() + + if dsnURL.Scheme == "https" { + queryParams.Set("secure", "true") + } + + if cfg.Database == "" { + cfg.Database = defaultDatabase + } + dsnURL.Path = cfg.Database + + if cfg.Username == "" { + cfg.Username = defaultUsername + } + + if cfg.Password == "" { + cfg.Password = defaultPassword + } + + dsnURL.User = url.UserPassword(cfg.Username, cfg.Password) + + dsnURL.RawQuery = queryParams.Encode() + + dsn := dsnURL.String() + + if _, err = clickhouse.ParseDSN(dsn); err != nil { + return nil, err + } + + db, err := sql.Open(driverName, dsn) + if err != nil { + return nil, err + } + + createDBquery := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database) + _, err = db.ExecContext(ctx, createDBquery) + if err != nil { + return nil, err + } + + return db, nil +} diff --git a/plugin/storage/clickhouse/factory.go b/plugin/storage/clickhouse/factory.go new file mode 100644 index 00000000000..8a35f27a4ec --- /dev/null +++ b/plugin/storage/clickhouse/factory.go @@ -0,0 +1,69 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "context" + "database/sql" + + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/metrics" + chSpanStore "github.com/jaegertracing/jaeger/plugin/storage/clickhouse/spanstore" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +type Factory struct { + client *sql.DB + spansTableName string + logger *zap.Logger +} + +func NewFactory(ctx context.Context, cfg Config, logger *zap.Logger) *Factory { + f := &Factory{ + logger: logger, + spansTableName: cfg.SpansTableName, + } + + client, err := cfg.NewClient(ctx) + if err != nil { + f.logger.Error("failed to create ClickHouse client", zap.Error(err)) + } else { + f.client = client + } + + if err = chSpanStore.CreateSpansTable(ctx, f.client, f.spansTableName); err != nil { + f.logger.Error("failed to create spans table", zap.Error(err)) + } + + return f + + // TODO: Move some steps to Initialize() +} + +func (f *Factory) ChExportSpans(ctx context.Context, td ptrace.Traces) error { + if err := chSpanStore.ExportSpans(ctx, f.client, f.spansTableName, td); err != nil { + return err + } + + return nil +} + +func (*Factory) Initialize(_ metrics.Factory, _ *zap.Logger) error { + return nil +} + +func (*Factory) CreateSpanReader() (spanstore.Reader, error) { + return nil, nil +} + +func (*Factory) CreateSpanWriter() (spanstore.Writer, error) { + return nil, nil +} + +func (*Factory) CreateDependencyReader() (dependencystore.Reader, error) { + return nil, nil +} diff --git a/plugin/storage/clickhouse/factory_test.go b/plugin/storage/clickhouse/factory_test.go new file mode 100644 index 00000000000..c08a0f1f110 --- /dev/null +++ b/plugin/storage/clickhouse/factory_test.go @@ -0,0 +1,145 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "context" + "database/sql" + "database/sql/driver" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} + +func TestExportSpans(t *testing.T) { + t.Run("create factory and export spans", func(t *testing.T) { + var items int + initClickhouseTestServer(t, func(query string, values []driver.Value) error { + if strings.HasPrefix(query, "INSERT") { + items++ + require.Equal(t, "test-operation", values[4]) + require.Equal(t, "test-service", values[5]) + require.Equal(t, []string{"attKey0", "attKey1"}, values[6]) + require.Equal(t, []string{"attVal0", "attVal1"}, values[7]) + } + return nil + }) + + c := Config{ + Endpoint: "clickhouse://127.0.0.1:9000", + SpansTableName: "jaeger_spans", + } + + f := NewFactory(context.TODO(), c, zap.NewNop()) + require.NotNil(t, f.client) + + err := f.ChExportSpans(context.TODO(), createTraces(5)) + require.NoError(t, err) + + err = f.ChExportSpans(context.TODO(), createTraces(10)) + require.NoError(t, err) + + require.Equal(t, 15, items) + }) +} + +func createTraces(count int) ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("service.name", "test-service") + ss := rs.ScopeSpans().AppendEmpty() + for i := 0; i < count; i++ { + s := ss.Spans().AppendEmpty() + s.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + s.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + s.Attributes().PutStr("attKey0", "attVal0") + s.Attributes().PutStr("attKey1", "attVal1") + s.SetName("test-operation") + } + return traces +} + +func initClickhouseTestServer(_ *testing.T, recorder recorder) { + driverName = "test" + sql.Register(driverName, &testClickhouseDriver{ + recorder: recorder, + }) +} + +type recorder func(query string, values []driver.Value) error + +type testClickhouseDriver struct { + recorder recorder +} + +func (t *testClickhouseDriver) Open(_ string) (driver.Conn, error) { + return &testClickhouseDriverConn{ + recorder: t.recorder, + }, nil +} + +type testClickhouseDriverConn struct { + recorder recorder +} + +func (*testClickhouseDriverConn) Begin() (driver.Tx, error) { + return &testClickhouseDriverTx{}, nil +} + +func (*testClickhouseDriverConn) Close() error { + return nil +} + +func (t *testClickhouseDriverConn) Prepare(query string) (driver.Stmt, error) { + return &testClickhouseDriverStmt{ + query: query, + recorder: t.recorder, + }, nil +} + +func (*testClickhouseDriverConn) CheckNamedValue(_ *driver.NamedValue) error { + return nil +} + +type testClickhouseDriverTx struct{} + +func (*testClickhouseDriverTx) Commit() error { + return nil +} + +func (*testClickhouseDriverTx) Rollback() error { + return nil +} + +type testClickhouseDriverStmt struct { + query string + recorder recorder +} + +func (*testClickhouseDriverStmt) Close() error { + return nil +} + +func (t *testClickhouseDriverStmt) Exec(args []driver.Value) (driver.Result, error) { + return nil, t.recorder(t.query, args) +} + +func (t *testClickhouseDriverStmt) NumInput() int { + return strings.Count(t.query, "?") +} + +func (*testClickhouseDriverStmt) Query(_ []driver.Value) (driver.Rows, error) { + return nil, nil +} diff --git a/plugin/storage/clickhouse/spanstore/empty_test.go b/plugin/storage/clickhouse/spanstore/empty_test.go new file mode 100644 index 00000000000..39377b3c1bc --- /dev/null +++ b/plugin/storage/clickhouse/spanstore/empty_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/plugin/storage/clickhouse/spanstore/exporter.go b/plugin/storage/clickhouse/spanstore/exporter.go new file mode 100644 index 00000000000..aa586708911 --- /dev/null +++ b/plugin/storage/clickhouse/spanstore/exporter.go @@ -0,0 +1,130 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "context" + "database/sql" + "encoding/hex" + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.18.0" +) + +const ( + insertSpansSQL = `INSERT INTO %s ( + Timestamp, + TraceId, + SpanId, + ParentSpanId, + Operation, + Service, + Tags.keys, + Tags.values, + Duration + ) VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ) + ` +) + +func ExportSpans(ctx context.Context, db *sql.DB, tableName string, td ptrace.Traces) error { + tx, err := db.Begin() + if err != nil { + return err + } + + defer func() { + tx.Rollback() + }() + + if err = insertSpans(ctx, tx, tableName, td); err != nil { + return err + } + + return tx.Commit() +} + +func insertSpans(ctx context.Context, tx *sql.Tx, tableName string, td ptrace.Traces) error { + statement, err := tx.PrepareContext(ctx, fmt.Sprintf(insertSpansSQL, tableName)) + if err != nil { + return err + } + + defer func() { + _ = statement.Close() + }() + + for i := 0; i < td.ResourceSpans().Len(); i++ { + spans := td.ResourceSpans().At(i) + res := spans.Resource() + var serviceName string + if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { + serviceName = v.Str() + } + for j := 0; j < spans.ScopeSpans().Len(); j++ { + rs := spans.ScopeSpans().At(j).Spans() + for k := 0; k < rs.Len(); k++ { + r := rs.At(k) + tagKeys, tagValues := attributesToArrays(r.Attributes()) + _, err = statement.ExecContext(ctx, + r.StartTimestamp().AsTime(), + traceIDToHexOrEmptyString(r.TraceID()), + spanIDToHexOrEmptyString(r.SpanID()), + spanIDToHexOrEmptyString(r.ParentSpanID()), + r.Name(), + serviceName, + tagKeys, + tagValues, + uint64(r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime()).Nanoseconds()), + ) + if err != nil { + return fmt.Errorf("exec context: %w", err) + } + } + } + } + + return nil +} + +func attributesToArrays(attributes pcommon.Map) ([]string, []string) { + keys := make([]string, 0) + values := make([]string, 0) + + attributes.Range(func(k string, v pcommon.Value) bool { + keys = append(keys, k) + values = append(values, v.AsString()) + return true + }) + return keys, values +} + +// spanIDToHexOrEmptyString returns a hex string from SpanID. +// An empty string is returned, if SpanID is empty. +func spanIDToHexOrEmptyString(id pcommon.SpanID) string { + if id.IsEmpty() { + return "" + } + return hex.EncodeToString(id[:]) +} + +// traceIDToHexOrEmptyString returns a hex string from TraceID. +// An empty string is returned, if TraceID is empty. +func traceIDToHexOrEmptyString(id pcommon.TraceID) string { + if id.IsEmpty() { + return "" + } + return hex.EncodeToString(id[:]) +} diff --git a/plugin/storage/clickhouse/spanstore/schema.go b/plugin/storage/clickhouse/spanstore/schema.go new file mode 100644 index 00000000000..de0cd2ccca3 --- /dev/null +++ b/plugin/storage/clickhouse/spanstore/schema.go @@ -0,0 +1,41 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "context" + "database/sql" + "fmt" +) + +const ( + createSpansTableSQL = `CREATE TABLE IF NOT EXISTS %s ( + Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), + TraceId String CODEC(ZSTD(1)), + SpanId String CODEC(ZSTD(1)), + ParentSpanId String CODEC(ZSTD(1)), + Operation LowCardinality(String) CODEC(ZSTD(1)), + Service LowCardinality(String) CODEC(ZSTD(1)), + Tags Nested + ( + keys LowCardinality(String), + values String + ) CODEC (ZSTD(1)), + Duration UInt64 CODEC(ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_tags_keys Tags.keys TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_tags_values Tags.values TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_duration Duration TYPE minmax GRANULARITY 1 + ) ENGINE MergeTree() + + PARTITION BY toDate(Timestamp) + ORDER BY (Service, Operation, toUnixTimestamp(Timestamp), Duration, TraceId) + SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; + ` +) + +func CreateSpansTable(ctx context.Context, db *sql.DB, tableName string) error { + _, err := db.ExecContext(ctx, fmt.Sprintf(createSpansTableSQL, tableName)) + return err +} diff --git a/storage_v2/spanstore/factory.go b/storage_v2/spanstore/factory.go index 3701a97ccda..4c1611da3bd 100644 --- a/storage_v2/spanstore/factory.go +++ b/storage_v2/spanstore/factory.go @@ -4,6 +4,10 @@ package spanstore import ( + "context" + + "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/storage_v2" ) @@ -17,4 +21,6 @@ type Factory interface { // CreateTraceWriter creates a spanstore.Writer. CreateTraceWriter() (Writer, error) + + ChExportSpans(ctx context.Context, td ptrace.Traces) error }