Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Align Cassandra Storage Config With OTEL #5949

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,26 @@ extensions:
backends:
some_storage:
cassandra:
keyspace: "jaeger_v1_dc1"
username: "cassandra"
password: "cassandra"
schema:
keyspace: "jaeger_v1_dc1"
connection:
auth:
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
basic:
username: "cassandra"
password: "cassandra"
tls:
insecure: true
another_storage:
cassandra:
keyspace: "jaeger_v1_dc1"
username: "cassandra"
password: "cassandra"
schema:
keyspace: "jaeger_v1_dc1"
connection:
auth:
basic:
username: "cassandra"
password: "cassandra"
tls:
insecure: true
receivers:
otlp:
protocols:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ backends:
`)
cfg := createDefaultConfig().(*Config)
require.NoError(t, conf.Unmarshal(cfg))
assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Servers)
assert.NotEmpty(t, cfg.Backends["some_storage"].Cassandra.Primary.Connection.Servers)
}

func TestConfigDefaultElasticsearch(t *testing.T) {
Expand Down
203 changes: 119 additions & 84 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,99 +5,138 @@
package config

import (
"context"
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.uber.org/zap"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/cassandra"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
)

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
// Configuration describes the configuration properties needed to connect to a Cassandra cluster.
type Configuration struct {
Servers []string `valid:"required,url" mapstructure:"servers"`
Keyspace string `mapstructure:"keyspace"`
LocalDC string `mapstructure:"local_dc"`
ConnectionsPerHost int `mapstructure:"connections_per_host"`
Timeout time.Duration `mapstructure:"-"`
ConnectTimeout time.Duration `mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
ProtoVersion int `mapstructure:"proto_version"`
Consistency string `mapstructure:"consistency"`
DisableCompression bool `mapstructure:"disable_compression"`
Port int `mapstructure:"port"`
Authenticator Authenticator `mapstructure:",squash"`
DisableAutoDiscovery bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
Schema Schema `mapstructure:"schema"`
Connection Connection `mapstructure:"connection"`
Query Query `mapstructure:"query"`
}

func DefaultConfiguration() Configuration {
return Configuration{
Servers: []string{"127.0.0.1"},
Port: 9042,
MaxRetryAttempts: 3,
Keyspace: "jaeger_v1_test",
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
}
type Connection struct {
// Servers contains a list of hosts that are used to connect to the cluster.
Servers []string `mapstructure:"servers" valid:"required,url"`
// LocalDC contains the name of the local Data Center (DC) for DC-aware host selection
LocalDC string `mapstructure:"local_dc"`
// The port used when dialing to a cluster.
Port int `mapstructure:"port"`
// DisableAutoDiscovery, if set to true, will disable the cluster's auto-discovery features.
DisableAutoDiscovery bool `mapstructure:"disable_auto_discovery"`
// ConnectionsPerHost contains the maximum number of open connections for each host on the cluster.
ConnectionsPerHost int `mapstructure:"connections_per_host"`
// ReconnectInterval contains the regular interval after which the driver tries to connect to
// nodes that are down.
ReconnectInterval time.Duration `mapstructure:"reconnect_interval"`
// SocketKeepAlive contains the keep alive period for the default dialer to the cluster.
SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"`
// TLS contains the TLS configuration for the connection to the cluster.
TLS configtls.ClientConfig `mapstructure:"tls"`
// Timeout contains the maximum time spent to connect to a cluster.
Timeout time.Duration `mapstructure:"timeout"`
// Authenticator contains the details of the authentication mechanism that is used for
// connecting to a cluster.
Authenticator Authenticator `mapstructure:"auth"`
// ProtoVersion contains the version of the native protocol to use when connecting to a cluster.
ProtoVersion int `mapstructure:"proto_version"`
}

type Schema struct {
// Keyspace contains the namespace where Jaeger data will be stored.
Keyspace string `mapstructure:"keyspace"`
// DisableCompression, if set to true, disables the use of the default Snappy Compression
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
}

type Query struct {
// Timeout contains the maximum time spent executing a query.
Timeout time.Duration `mapstructure:"timeout"`
// MaxRetryAttempts indicates the maximum number of times a query will be retried for execution.
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
// Consistency specifies the consistency level which needs to be satisified before responding
// to a query.
Consistency string `mapstructure:"consistency"`
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster
// Authenticator holds the authentication properties needed to connect to a Cassandra cluster.
type Authenticator struct {
Basic BasicAuthenticator `yaml:"basic" mapstructure:",squash"`
Basic BasicAuthenticator `mapstructure:"basic"`
// TODO: add more auth types
}

// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster
// BasicAuthenticator holds the username and password for a password authenticator for a Cassandra cluster.
type BasicAuthenticator struct {
Username string `yaml:"username" mapstructure:"username"`
Password string `yaml:"password" mapstructure:"password" json:"-"`
AllowedAuthenticators []string `yaml:"allowed_authenticators" mapstructure:"allowed_authenticators"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
AllowedAuthenticators []string `mapstructure:"allowed_authenticators"`
}

func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Port: 9042,
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
},
Query: Query{
MaxRetryAttempts: 3,
},
}
}

// ApplyDefaults copies settings from source unless its own value is non-zero.
func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.ConnectionsPerHost == 0 {
c.ConnectionsPerHost = source.ConnectionsPerHost
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace

Check warning on line 107 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L107

Added line #L107 was not covered by tests
}
if c.MaxRetryAttempts == 0 {
c.MaxRetryAttempts = source.MaxRetryAttempts
if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
if c.Timeout == 0 {
c.Timeout = source.Timeout
if c.Connection.ReconnectInterval == 0 {
c.Connection.ReconnectInterval = source.Connection.ReconnectInterval
}
if c.ReconnectInterval == 0 {
c.ReconnectInterval = source.ReconnectInterval
if c.Connection.Port == 0 {
c.Connection.Port = source.Connection.Port
}
if c.Port == 0 {
c.Port = source.Port
if c.Connection.ProtoVersion == 0 {
c.Connection.ProtoVersion = source.Connection.ProtoVersion
}
if c.Keyspace == "" {
c.Keyspace = source.Keyspace
if c.Connection.SocketKeepAlive == 0 {
c.Connection.SocketKeepAlive = source.Connection.SocketKeepAlive
}
if c.ProtoVersion == 0 {
c.ProtoVersion = source.ProtoVersion
if c.Query.MaxRetryAttempts == 0 {
c.Query.MaxRetryAttempts = source.Query.MaxRetryAttempts
}
if c.SocketKeepAlive == 0 {
c.SocketKeepAlive = source.SocketKeepAlive
if c.Query.Timeout == 0 {
c.Query.Timeout = source.Query.Timeout
}
}

// SessionBuilder creates new cassandra.Session
type SessionBuilder interface {
NewSession(logger *zap.Logger) (cassandra.Session, error)
NewSession() (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error) {
cluster, err := c.NewCluster(logger)
func (c *Configuration) NewSession() (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}
Expand All @@ -109,68 +148,64 @@
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Servers...)
cluster.Keyspace = c.Keyspace
cluster.NumConns = c.ConnectionsPerHost
cluster.Timeout = c.Timeout
cluster.ConnectTimeout = c.ConnectTimeout
cluster.ReconnectInterval = c.ReconnectInterval
cluster.SocketKeepalive = c.SocketKeepAlive
if c.ProtoVersion > 0 {
cluster.ProtoVersion = c.ProtoVersion
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
cluster.Keyspace = c.Schema.Keyspace
cluster.NumConns = c.Connection.ConnectionsPerHost
cluster.ConnectTimeout = c.Connection.Timeout
cluster.ReconnectInterval = c.Connection.ReconnectInterval
cluster.SocketKeepalive = c.Connection.SocketKeepAlive
cluster.Timeout = c.Query.Timeout
if c.Connection.ProtoVersion > 0 {
cluster.ProtoVersion = c.Connection.ProtoVersion
}
if c.MaxRetryAttempts > 1 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.MaxRetryAttempts - 1}
if c.Query.MaxRetryAttempts > 1 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: c.Query.MaxRetryAttempts - 1}
}
if c.Port != 0 {
cluster.Port = c.Port
if c.Connection.Port != 0 {
cluster.Port = c.Connection.Port
}

if !c.DisableCompression {
if !c.Schema.DisableCompression {
cluster.Compressor = gocql.SnappyCompressor{}
}

if c.Consistency == "" {
if c.Query.Consistency == "" {
cluster.Consistency = gocql.LocalOne
} else {
cluster.Consistency = gocql.ParseConsistency(c.Consistency)
cluster.Consistency = gocql.ParseConsistency(c.Query.Consistency)

Check warning on line 176 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L176

Added line #L176 was not covered by tests
}

fallbackHostSelectionPolicy := gocql.RoundRobinHostPolicy()
if c.LocalDC != "" {
fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.LocalDC)
if c.Connection.LocalDC != "" {
fallbackHostSelectionPolicy = gocql.DCAwareRoundRobinPolicy(c.Connection.LocalDC)

Check warning on line 181 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L181

Added line #L181 was not covered by tests
}
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallbackHostSelectionPolicy, gocql.ShuffleReplicas())

if c.Authenticator.Basic.Username != "" && c.Authenticator.Basic.Password != "" {
if c.Connection.Authenticator.Basic.Username != "" && c.Connection.Authenticator.Basic.Password != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: c.Authenticator.Basic.Username,
Password: c.Authenticator.Basic.Password,
AllowedAuthenticators: c.Authenticator.Basic.AllowedAuthenticators,
Username: c.Connection.Authenticator.Basic.Username,
Password: c.Connection.Authenticator.Basic.Password,
AllowedAuthenticators: c.Connection.Authenticator.Basic.AllowedAuthenticators,
}
}
tlsCfg, err := c.TLS.Config(logger)
tlsCfg, err := c.Connection.TLS.LoadTLSConfig(context.Background())
if err != nil {
return nil, err
}
if c.TLS.Enabled {
if !c.Connection.TLS.Insecure {
cluster.SslOpts = &gocql.SslOptions{
Config: tlsCfg,
}
}
// If tunneling connection to C*, disable cluster autodiscovery features.
if c.DisableAutoDiscovery {
if c.Connection.DisableAutoDiscovery {
cluster.DisableInitialHostLookup = true
cluster.IgnorePeerAddr = true
}
return cluster, nil
}

func (c *Configuration) Close() error {
return c.TLS.Close()
}

func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/cassandra/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package config

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidate_ReturnsErrorWhenInvalid(t *testing.T) {
tests := []struct {
name string
cfg *Configuration
}{
{
name: "missing required fields",
cfg: &Configuration{},
},
{
name: "require fields in invalid format",
cfg: &Configuration{
Connection: Connection{
Servers: []string{"not a url"},
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := test.cfg.Validate()
require.Error(t, err)
})
}
}

func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) {
cfg := Configuration{
Connection: Connection{
Servers: []string{"localhost:9200"},
},
}

err := cfg.Validate()
require.NoError(t, err)
}
Loading
Loading