Skip to content

Commit

Permalink
exp: Support primary and archive storage (#4873)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #4843
- Prototype defining primary and archive storage used by query service

## Description of the changes
- Introduce `trace_storage_archive` argument to extension/jaegerquery
  - This is temporary, need to design a better shape for the config
- Implement initialization of archive storage. Tested manually.
- Changed memstore to implement archive storage (maybe not a good idea
in retrospect)
- Change how no-config condition is detected as the previous changes
worked for all-in-one but broke with-config mode
  - Need to start adding some basic integration tests
- There is a better way than using default configs for all in one -
override `--config` flag with `yaml: ....` string an provide
configuration as a file (embed the file into the binary)
- Some refactoring of querysvc/app to be able to reuse
querysvc.QueryOptions in v2 config

## How was this change tested?
- Run all-in-one
- Run with config
- Run with custom UI config to enable Archive button (it should really
be dynamically determined by backend capability)

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed Oct 21, 2023
1 parent 089fc3c commit 244b4be
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 63 deletions.
3 changes: 3 additions & 0 deletions cmd/jaeger-v2/config-ui.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"archiveEnabled": true
}
4 changes: 4 additions & 0 deletions cmd/jaeger-v2/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ extensions:

jaeger_query:
trace_storage: memstore
trace_storage_archive: memstore_archive
ui_config: ./cmd/jaeger-v2/config-ui.json

jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000


receivers:
Expand Down
12 changes: 3 additions & 9 deletions cmd/jaeger-v2/internal/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package internal

import (
"log"
"strings"

"github.com/spf13/cobra"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -45,14 +44,9 @@ func Command() *cobra.Command {
// back to the official RunE.
otelRunE := cmd.RunE
cmd.RunE = func(cmd *cobra.Command, args []string) error {
configProvided := false
for _, arg := range args {
if strings.HasPrefix(arg, "--config") {
configProvided = true
break
}
}
if configProvided {
// a bit of a hack to check if '--config' flag was present
configFlag := cmd.Flag("config").Value.String()
if configFlag != "" && configFlag != "[]" {
return otelRunE(cmd, args)
}
log.Print("No '--config' flags detected, using default All-in-One configuration with memory storage.")
Expand Down
6 changes: 5 additions & 1 deletion cmd/jaeger-v2/internal/extension/jaegerquery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"

queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

var _ component.ConfigValidator = (*Config)(nil)

// Config represents the configuration for jaeger-query,
type Config struct {
TraceStorage string `valid:"required" mapstructure:"trace_storage"`
queryApp.QueryOptionsBase `mapstructure:",squash"`

TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"`
TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"`
confighttp.HTTPServerSettings `mapstructure:",squash"`
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger-v2/internal/extension/jaegerquery/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewFactory() extension.Factory {

func createDefaultConfig() component.Config {
return &Config{
TraceStorage: jaegerstorage.DefaultMemoryStore,
TraceStoragePrimary: jaegerstorage.DefaultMemoryStore,
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: ports.PortToHostPort(ports.QueryHTTP),
},
Expand Down
35 changes: 29 additions & 6 deletions cmd/jaeger-v2/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func newServer(config *Config, otel component.TelemetrySettings) *server {
}

func (s *server) Start(ctx context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactory(s.config.TraceStorage, host)
f, err := jaegerstorage.GetStorageFactory(s.config.TraceStoragePrimary, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
return fmt.Errorf("cannot find primary storage %s: %w", s.config.TraceStoragePrimary, err)
}

spanReader, err := f.CreateSpanReader()
Expand All @@ -53,7 +53,11 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("cannot create dependencies reader: %w", err)
}

qs := querysvc.NewQueryService(spanReader, depReader, querysvc.QueryServiceOptions{})
var opts querysvc.QueryServiceOptions
if err := s.addArchiveStorage(&opts, host); err != nil {
return err
}
qs := querysvc.NewQueryService(spanReader, depReader, opts)
metricsQueryService, _ := disabled.NewMetricsReader()
tm := tenancy.NewManager(&s.config.Tenancy)

Expand All @@ -63,7 +67,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
s.logger,
qs,
metricsQueryService,
makeQueryOptions(),
s.makeQueryOptions(),
tm,
jtracer.NoOp(),
)
Expand All @@ -78,9 +82,28 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return nil
}

func makeQueryOptions() *queryApp.QueryOptions {
func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host component.Host) error {
if s.config.TraceStorageArchive == "" {
s.logger.Info("Archive storage not configured")
return nil
}

f, err := jaegerstorage.GetStorageFactory(s.config.TraceStorageArchive, host)
if err != nil {
return fmt.Errorf("cannot find archive storage factory: %w", err)
}

if !opts.InitArchiveStorage(f, s.logger) {
s.logger.Info("Archive storage not initialized")
}
return nil
}

func (s *server) makeQueryOptions() *queryApp.QueryOptions {
return &queryApp.QueryOptions{
// TODO
QueryOptionsBase: s.config.QueryOptionsBase,

// TODO expose via config
HTTPHostPort: ports.PortToHostPort(ports.QueryHTTP),
GRPCHostPort: ports.PortToHostPort(ports.QueryGRPC),
}
Expand Down
49 changes: 31 additions & 18 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,25 @@ var tlsHTTPFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "query.http",
}

// QueryOptions holds configuration for query service
type QueryOptions struct {
// HTTPHostPort is the host:port address that the query service listens in on for http requests
HTTPHostPort string
// GRPCHostPort is the host:port address that the query service listens in on for gRPC requests
GRPCHostPort string
// BasePath is the prefix for all UI and API HTTP routes
// QueryOptionsStaticAssets contains configuration for handling static assets
type QueryOptionsStaticAssets struct {
// Path is the path for the static assets for the UI (https://github.com/uber/jaeger-ui)
Path string `valid:"optional" mapstructure:"path"`
// LogAccess tells static handler to log access to static assets, useful in debugging
LogAccess bool `valid:"optional" mapstructure:"log_access"`
}

// QueryOptionsBase holds configuration for query service shared with jaeger-v2
type QueryOptionsBase struct {
// BasePath is the base path for all HTTP routes
BasePath string
// StaticAssets is the path for the static assets for the UI (https://github.com/uber/jaeger-ui)
StaticAssets string
// LogStaticAssetsAccess tells static handler to log access to static assets, useful in debugging
LogStaticAssetsAccess bool

StaticAssets QueryOptionsStaticAssets `valid:"optional" mapstructure:"static_assets"`

// UIConfig is the path to a configuration file for the UI
UIConfig string
UIConfig string `valid:"optional" mapstructure:"ui_config"`
// BearerTokenPropagation activate/deactivate bearer token propagation to storage
BearerTokenPropagation bool
// TLSGRPC configures secure transport (Consumer to Query service GRPC API)
TLSGRPC tlscfg.Options
// TLSHTTP configures secure transport (Consumer to Query service HTTP API)
TLSHTTP tlscfg.Options
// AdditionalHeaders
AdditionalHeaders http.Header
// MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span
Expand All @@ -88,6 +87,20 @@ type QueryOptions struct {
EnableTracing bool
}

// QueryOptions holds configuration for query service
type QueryOptions struct {
QueryOptionsBase

// HTTPHostPort is the host:port address that the query service listens in on for http requests
HTTPHostPort string
// GRPCHostPort is the host:port address that the query service listens in on for gRPC requests
GRPCHostPort string
// TLSGRPC configures secure transport (Consumer to Query service GRPC API)
TLSGRPC tlscfg.Options
// TLSHTTP configures secure transport (Consumer to Query service HTTP API)
TLSHTTP tlscfg.Options
}

// AddFlags adds flags for QueryOptions
func AddFlags(flagSet *flag.FlagSet) {
flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`)
Expand Down Expand Up @@ -119,8 +132,8 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
return qOpts, fmt.Errorf("failed to process HTTP TLS options: %w", err)
}
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.LogStaticAssetsAccess = v.GetBool(queryLogStaticAssetsAccess)
qOpts.StaticAssets.Path = v.GetString(queryStaticFiles)
qOpts.StaticAssets.LogAccess = v.GetBool(queryLogStaticAssetsAccess)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)

Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func TestQueryBuilderFlags(t *testing.T) {
})
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.Equal(t, "/dev/null", qOpts.StaticAssets)
assert.True(t, qOpts.LogStaticAssetsAccess)
assert.Equal(t, "/dev/null", qOpts.StaticAssets.Path)
assert.True(t, qOpts.StaticAssets.LogAccess)
assert.Equal(t, "some.json", qOpts.UIConfig)
assert.Equal(t, "/jaeger", qOpts.BasePath)
assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort)
Expand Down
62 changes: 44 additions & 18 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,13 @@ func TestServerHTTPTLS(t *testing.T) {
}

serverOptions := &QueryOptions{
GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""),
HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""),
TLSHTTP: test.TLS,
TLSGRPC: TLSGRPC,
BearerTokenPropagation: true,
GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""),
HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""),
TLSHTTP: test.TLS,
TLSGRPC: TLSGRPC,
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
}
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
Expand Down Expand Up @@ -483,11 +485,13 @@ func TestServerGRPCTLS(t *testing.T) {
TLSHTTP = enabledTLSCfg
}
serverOptions := &QueryOptions{
GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""),
HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""),
TLSHTTP: TLSHTTP,
TLSGRPC: test.TLS,
BearerTokenPropagation: true,
GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""),
HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""),
TLSHTTP: TLSHTTP,
TLSGRPC: test.TLS,
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
}
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
Expand Down Expand Up @@ -553,13 +557,25 @@ func TestServerGRPCTLS(t *testing.T) {

func TestServerBadHostPort(t *testing.T) {
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true},
&QueryOptions{
HTTPHostPort: "8080",
GRPCHostPort: "127.0.0.1:8081",
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())

assert.NotNil(t, err)
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true},
&QueryOptions{
HTTPHostPort: "127.0.0.1:8081",
GRPCHostPort: "9123",
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())

Expand Down Expand Up @@ -587,9 +603,11 @@ func TestServerInUseHostPort(t *testing.T) {
&querysvc.QueryService{},
nil,
&QueryOptions{
HTTPHostPort: tc.httpHostPort,
GRPCHostPort: tc.grpcHostPort,
BearerTokenPropagation: true,
HTTPHostPort: tc.httpHostPort,
GRPCHostPort: tc.grpcHostPort,
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp(),
Expand Down Expand Up @@ -620,7 +638,13 @@ func TestServerSinglePort(t *testing.T) {

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, querySvc, nil,
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true},
&QueryOptions{
GRPCHostPort: hostPort,
HTTPHostPort: hostPort,
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())
assert.Nil(t, err)
Expand Down Expand Up @@ -747,8 +771,10 @@ func TestServerHTTPTenancy(t *testing.T) {
serverOptions := &QueryOptions{
HTTPHostPort: ":8080",
GRPCHostPort: ":8080",
Tenancy: tenancy.Options{
Enabled: true,
QueryOptionsBase: QueryOptionsBase{
Tenancy: tenancy.Options{
Enabled: true,
},
},
}
tenancyMgr := tenancy.NewManager(&serverOptions.Tenancy)
Expand Down
5 changes: 3 additions & 2 deletions cmd/query/app/static_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ var (

// RegisterStaticHandler adds handler for static assets to the router.
func RegisterStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *QueryOptions) {
staticHandler, err := NewStaticAssetsHandler(qOpts.StaticAssets, StaticAssetsHandlerOptions{
staticHandler, err := NewStaticAssetsHandler(qOpts.StaticAssets.Path, StaticAssetsHandlerOptions{
BasePath: qOpts.BasePath,
UIConfigPath: qOpts.UIConfig,
Logger: logger,
LogAccess: qOpts.LogStaticAssetsAccess,
LogAccess: qOpts.StaticAssets.LogAccess,
})
if err != nil {
logger.Panic("Could not create static assets handler", zap.Error(err))
Expand Down Expand Up @@ -100,6 +100,7 @@ func NewStaticAssetsHandler(staticAssetsRoot string, options StaticAssetsHandler
assetsFS: assetsFS,
}

options.Logger.Info("Using UI configuration", zap.String("path", options.UIConfigPath))
watcher, err := fswatcher.New([]string{options.UIConfigPath}, h.reloadUIConfig, h.options.Logger)
if err != nil {
return nil, err
Expand Down
24 changes: 19 additions & 5 deletions cmd/query/app/static_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ func TestNotExistingUiConfig(t *testing.T) {
func TestRegisterStaticHandlerPanic(t *testing.T) {
logger, buf := testutils.NewLogger()
assert.Panics(t, func() {
RegisterStaticHandler(mux.NewRouter(), logger, &QueryOptions{StaticAssets: "/foo/bar"})
RegisterStaticHandler(
mux.NewRouter(),
logger,
&QueryOptions{
QueryOptionsBase: QueryOptionsBase{
StaticAssets: QueryOptionsStaticAssets{
Path: "/foo/bar",
},
},
},
)
})
assert.Contains(t, buf.String(), "Could not create static assets handler")
assert.Contains(t, buf.String(), "no such file or directory")
Expand Down Expand Up @@ -99,10 +109,14 @@ func TestRegisterStaticHandler(t *testing.T) {
r = r.PathPrefix(testCase.basePath).Subrouter()
}
RegisterStaticHandler(r, logger, &QueryOptions{
StaticAssets: "fixture",
BasePath: testCase.basePath,
UIConfig: testCase.UIConfigPath,
LogStaticAssetsAccess: testCase.logAccess,
QueryOptionsBase: QueryOptionsBase{
StaticAssets: QueryOptionsStaticAssets{
Path: "fixture",
LogAccess: testCase.logAccess,
},
BasePath: testCase.basePath,
UIConfig: testCase.UIConfigPath,
},
})

server := httptest.NewServer(r)
Expand Down
Loading

0 comments on commit 244b4be

Please sign in to comment.