diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml new file mode 100644 index 00000000000..7297f7c4009 --- /dev/null +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -0,0 +1,38 @@ +service: + extensions: [jaeger_storage, jaeger_query] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger_storage_exporter] + +extensions: + jaeger_query: + trace_storage: es_main + trace_storage_archive: es_archive + ui_config: ./cmd/jaeger/config-ui.json + + jaeger_storage: + elasticsearch: + es_main: + server_urls: http://localhost:9200 + log_level: "error" + index_prefix: "jaeger-main" + use_aliases: true + es_archive: + server_urls: http://localhost:9200 + log_level: "error" + index_prefix: "jaeger-archive" + use_aliases: true +receivers: + otlp: + protocols: + grpc: + http: + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: es_main diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index db32f6c79cd..795331eab2a 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -7,14 +7,16 @@ import ( "fmt" "reflect" + esCfg "github.com/jaegertracing/jaeger/pkg/es/config" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" ) // Config has the configuration for jaeger-query, type Config struct { - Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` - Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` + Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` + Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` + Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? // Option: instead of looking for specific name, check interface. diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index a1ac028e772..fcecd0c0229 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -13,10 +13,12 @@ import ( "go.opentelemetry.io/collector/extension" "go.uber.org/zap" + esCfg "github.com/jaegertracing/jaeger/pkg/es/config" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/badger" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" + "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" ) @@ -107,10 +109,17 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { cfg: s.config.Badger, builder: badger.NewFactoryWithConfig, } + esStarter := &starter[esCfg.Configuration, *es.Factory]{ + ext: s, + storageKind: "elasticsearch", + cfg: s.config.Elasticsearch, + builder: es.NewFactoryWithConfig, + } builders := []func(ctx context.Context, host component.Host) error{ memStarter.build, badgerStarter.build, + esStarter.build, // TODO add support for other backends } for _, builder := range builders { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index 9f65f782895..0ee7ed1c173 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -6,6 +6,8 @@ package jaegerstorage import ( "context" "fmt" + "net/http" + "net/http/httptest" "testing" "github.com/stretchr/testify/require" @@ -16,6 +18,7 @@ import ( nooptrace "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" + esCfg "github.com/jaegertracing/jaeger/pkg/es/config" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/metrics" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" @@ -151,6 +154,46 @@ func TestBadgerStorageExtensionError(t *testing.T) { require.ErrorContains(t, err, "/bad/path") } +func TestESStorageExtension(t *testing.T) { + mockEsServerResponse := []byte(` + { + "Version": { + "Number": "6" + } + } + `) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(mockEsServerResponse) + })) + defer server.Close() + storageExtension := makeStorageExtenion(t, &Config{ + Elasticsearch: map[string]esCfg.Configuration{ + "foo": { + Servers: []string{server.URL}, + LogLevel: "error", + }, + }, + }) + ctx := context.Background() + err := storageExtension.Start(ctx, componenttest.NewNopHost()) + require.NoError(t, err) + require.NoError(t, storageExtension.Shutdown(ctx)) +} + +func TestESStorageExtensionError(t *testing.T) { + ext := makeStorageExtenion(t, &Config{ + Elasticsearch: map[string]esCfg.Configuration{ + "foo": { + Servers: []string{"http://badurl"}, + LogLevel: "error", + }, + }, + }) + err := ext.Start(context.Background(), componenttest.NewNopHost()) + require.ErrorContains(t, err, "failed to initialize elasticsearch storage") + require.ErrorContains(t, err, "badurl") +} + func noopTelemetrySettings() component.TelemetrySettings { return component.TelemetrySettings{ Logger: zap.L(), diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 1549156cff6..6fc76db5615 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -29,6 +29,7 @@ import ( "sync" "time" + "github.com/asaskevich/govalidator" esV8 "github.com/elastic/go-elasticsearch/v8" "github.com/olivere/elastic" "go.uber.org/zap" @@ -45,7 +46,7 @@ import ( // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { - Servers []string `mapstructure:"server_urls"` + Servers []string `mapstructure:"server_urls" valid:"required,url"` RemoteReadClusters []string `mapstructure:"remote_read_clusters"` Username string `mapstructure:"username"` Password string `mapstructure:"password" json:"-"` @@ -54,14 +55,14 @@ type Configuration struct { AllowTokenFromContext bool `mapstructure:"-"` Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` - MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query - MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads - NumShards int64 `yaml:"shards" mapstructure:"num_shards"` - NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"` - PrioritySpanTemplate int64 `yaml:"priority_span_template" mapstructure:"priority_span_template"` - PriorityServiceTemplate int64 `yaml:"priority_service_template" mapstructure:"priority_service_template"` - PriorityDependenciesTemplate int64 `yaml:"priority_dependencies_template" mapstructure:"priority_dependencies_template"` - Timeout time.Duration `validate:"min=500" mapstructure:"-"` + MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query + MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads + NumShards int64 `mapstructure:"num_shards"` + NumReplicas int64 `mapstructure:"num_replicas"` + PrioritySpanTemplate int64 `mapstructure:"priority_span_template"` + PriorityServiceTemplate int64 `mapstructure:"priority_service_template"` + PriorityDependenciesTemplate int64 `mapstructure:"priority_dependencies_template"` + Timeout time.Duration `mapstructure:"-"` BulkSize int `mapstructure:"-"` BulkWorkers int `mapstructure:"-"` BulkActions int `mapstructure:"-"` @@ -467,3 +468,8 @@ func loadTokenFromFile(path string) (string, error) { } return strings.TrimRight(string(b), "\r\n"), nil } + +func (c *Configuration) Validate() error { + _, err := govalidator.ValidateStruct(c) + return err +} diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 8ce0de4f386..5187ac5db7a 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -83,6 +83,39 @@ func NewFactory() *Factory { } } +func NewFactoryWithConfig( + cfg config.Configuration, + metricsFactory metrics.Factory, + logger *zap.Logger, +) (*Factory, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + cfg.MaxDocCount = defaultMaxDocCount + cfg.Enabled = true + + archive := make(map[string]*namespaceConfig) + archive[archiveNamespace] = &namespaceConfig{ + Configuration: cfg, + namespace: archiveNamespace, + } + + f := NewFactory() + f.InitFromOptions(Options{ + Primary: namespaceConfig{ + Configuration: cfg, + namespace: primaryNamespace, + }, + others: archive, + }) + err := f.Initialize(metricsFactory, logger) + if err != nil { + return nil, err + } + return f, nil +} + // AddFlags implements plugin.Configurable func (f *Factory) AddFlags(flagSet *flag.FlagSet) { f.Options.AddFlags(flagSet) diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index d3e1d34ab4a..039552c574b 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -263,6 +263,63 @@ func TestInitFromOptions(t *testing.T) { assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig) } +func TestESStorageFactoryWithConfig(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(mockEsServerResponse) + })) + defer server.Close() + cfg := escfg.Configuration{ + Servers: []string{server.URL}, + LogLevel: "error", + } + factory, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) + require.NoError(t, err) + defer factory.Close() +} + +func TestConfigurationValidation(t *testing.T) { + testCases := []struct { + name string + cfg escfg.Configuration + wantErr bool + }{ + { + name: "valid configuration", + cfg: escfg.Configuration{ + Servers: []string{"http://localhost:9200"}, + }, + wantErr: false, + }, + { + name: "missing servers", + cfg: escfg.Configuration{}, + wantErr: true, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + err := test.cfg.Validate() + if test.wantErr { + require.Error(t, err) + _, err = NewFactoryWithConfig(test.cfg, metrics.NullFactory, zap.NewNop()) + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestESStorageFactoryWithConfigError(t *testing.T) { + cfg := escfg.Configuration{ + Servers: []string{"http://badurl"}, + LogLevel: "error", + } + _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) + require.Error(t, err) + require.ErrorContains(t, err, "failed to create primary Elasticsearch client") +} + func TestPasswordFromFile(t *testing.T) { defer testutils.VerifyGoLeaksOnce(t) t.Run("primary client", func(t *testing.T) {