Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Cassandra db schema on session initialization #5922

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (c *Configuration) NewSession(logger *zap.Logger) (cassandra.Session, error
// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster(logger *zap.Logger) (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Servers...)
cluster.Keyspace = c.Keyspace
// Remove the keyspace because it may not exist. Key space would be added later
// cluster.Keyspace = c.Keyspace
cluster.NumConns = c.ConnectionsPerHost
cluster.Timeout = c.Timeout
cluster.ConnectTimeout = c.ConnectTimeout
Expand Down
271 changes: 271 additions & 0 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
package cassandra

import (
"bytes"
"context"
"embed"
"errors"
"flag"
"fmt"
"io"
"os"
"regexp"
"strconv"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -131,6 +137,262 @@ func (f *Factory) configureFromOptions(o *Options) {
}
}

const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move the changes to a separate file, like schema.go, there's no reason for this to be in factory.go

MODE = `MODE`
DATACENTER = `DATACENTER`
TRACE_TTL = `TRACE_TTL`
DEPENDENCIES_TTL = `DEPENDENCIES_TTL`
KEYSPACE = `KEYSPACE`
REPLICATION_FACTOR = `REPLICATION_FACTOR`
VERSION = `VERSION`
COMPACTION_WINDOW = `COMPACTION_WINDOW`
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functionality you're adding is only needed in jaeger-v2, which uses YAML config files. So there will be no env variables - you can simply use the struct below. But you should add mapstructure tags to the fields, as well as validation instructions (example: cmd/jaeger/internal/extension/jaegerquery/config.go)


// Parameters required for initilizing the db
type StorageConfigParams struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe SchemaConfig

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro @akstron i'm working on a proposal to redefine the configs for cassandra in #5928 (comment). What are both of your thoughts on adding the configurations here into a mapstructure:"schema" grouping?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think schema and data are different?

Copy link
Contributor

@mahadzaryab1 mahadzaryab1 Sep 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I should've mentioned. I was suggesting consolidating data and schema into one under schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok makes sense

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

mode string
datacenter string
trace_ttl int
dependencies_ttl int
keyspace string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should already be defined in the main config

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about this config: jaeger/cmd/jaeger/config-cassandra.yaml ? How would I know what all config key-value pairs are already defined?

Copy link
Author

@akstron akstron Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yurishkuro , I needed some help here. Is this: jaeger/cmd/jaeger/internal/all-in-one.yaml the example for the config file which would be used in v2? If yes, under what section should I expect the cassandra specific configs to be put? I couldn't find any example for this in the current code base.

Just to get the complete picture, would the schema configs I am adding look something like this:

schema:
    datacenter: <datacenterName>
    keyspace: <keyspace>
    .......

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cmd/jaeger/config-cassandra.yaml

replication_factor int
replication string
cas_version int
compaction_window_size int
compaction_window_unit string
}

func constructStorageConfigParams() (*StorageConfigParams, error) {
var err error

datacenter := os.Getenv(DATACENTER)
replication_factor_string := os.Getenv(REPLICATION_FACTOR)

var replication_factor, compaction_window_size int
var replication, compaction_window_unit string

mode := os.Getenv(MODE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually need the "mode"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In v1 I suppose the mode env variable is used for simple testing scenarios, that's why I added it in this as well. Do you suggest removing it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if we can


if mode == "" {
return nil, fmt.Errorf("missing MODE parameter")
}

if mode != "test" || mode != "prod" {
return nil, fmt.Errorf("invalid MODE=%s, expecting 'prod' or 'test'", mode)
}

if mode == "test" {
if datacenter == "" {
datacenter = "test"
}

replication_factor = 1
if replication_factor_string != "" {
replication_factor, err = strconv.Atoi(replication_factor_string)
if err != nil {
return nil, err
}
}

replication = fmt.Sprintf("{'class': 'SimpleStrategy', 'replication_factor': '%v'}", replication_factor)
}

if mode == "prod" {
if datacenter == "" {
return nil, fmt.Errorf("missing DATACENTER parameter for prod mode")
}

replication_factor = 2
if replication_factor_string != "" {
replication_factor, err = strconv.Atoi(replication_factor_string)
if err != nil {
return nil, err
}
}

replication = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': '%v' }", datacenter, replication_factor)
}

trace_ttl_string := os.Getenv(TRACE_TTL)
trace_ttl := 172800
if trace_ttl_string != "" {
trace_ttl, err = strconv.Atoi(trace_ttl_string)
if err != nil {
return nil, err
}
}

dependencies_ttl_string := os.Getenv(DEPENDENCIES_TTL)
dependencies_ttl := 0
if dependencies_ttl_string != "" {
dependencies_ttl, err = strconv.Atoi(dependencies_ttl_string)
if err != nil {
return nil, err
}
}

cas_version_string := os.Getenv(VERSION)
cas_version := 4
if cas_version_string != "" {
cas_version, err = strconv.Atoi(cas_version_string)
if err != nil {
return nil, err
}
}

keyspace := os.Getenv(KEYSPACE)
if keyspace == "" {
keyspace = fmt.Sprint("jaeger_v1_%s", datacenter)
}

var isMatch bool
isMatch, err = regexp.MatchString("[^a-zA-Z0-9_]", keyspace)
if err != nil {
return nil, err
}

if isMatch {
return nil, fmt.Errorf(`invalid characters in KEYSPACE=%s parameter, please use letters, digits or underscores`, keyspace)
}

if compaction_window := os.Getenv(COMPACTION_WINDOW); compaction_window != `` {
isMatch, err = regexp.MatchString("^[0-9]+[mhd]$", compaction_window)
if err != nil {
return nil, err
}

if !isMatch {
return nil, fmt.Errorf("Invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days.")
}

compaction_window_size, err = strconv.Atoi(compaction_window[:len(compaction_window)-1])
if err != nil {
return nil, err
}

compaction_window_unit = compaction_window[len(compaction_window)-1:]
} else {
trace_ttl_minutes := trace_ttl / 60

compaction_window_size = (trace_ttl_minutes + 30 - 1) / 30
compaction_window_unit = "m"
}

switch compaction_window_unit {
case `m`:
compaction_window_unit = `MINUTES`
case `h`:
compaction_window_unit = `HOURS`
case `d`:
compaction_window_unit = `DAYS`
}

return &StorageConfigParams{
mode: mode,
datacenter: datacenter,
trace_ttl: trace_ttl,
dependencies_ttl: dependencies_ttl,
keyspace: keyspace,
replication_factor: replication_factor,
replication: replication,
cas_version: cas_version,
compaction_window_size: compaction_window_size,
compaction_window_unit: compaction_window_unit,
}, nil
}

// Embed all the template files in binaries

//go:embed schema/v001.cql.tmpl
//go:embed schema/v002.cql.tmpl
//go:embed schema/v003.cql.tmpl
//go:embed schema/v004.cql.tmpl
var schemaFile embed.FS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to embed all versions? we keep them mostly for historical reasons, but only the latest is used.


func handleTemplateReplacements(data []byte, params *StorageConfigParams) []byte {
templateKeysValuePairs := map[string]string{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to rewrite the schema with Go template syntax and use that, because then the data binding can be directly against the config struct (better type safety).

`trace_ttl`: strconv.Itoa(params.trace_ttl),
`dependecies_ttl`: strconv.Itoa(params.dependencies_ttl),
`keyspace`: params.keyspace,
`replication_factor`: strconv.Itoa(params.replication_factor),
`replication`: params.replication,
`cas_version`: strconv.Itoa(params.cas_version),
`compaction_window_size`: strconv.Itoa(params.compaction_window_size),
`compaction_window_unit`: params.compaction_window_unit,
}

result := data
for key, value := range templateKeysValuePairs {
result = bytes.ReplaceAll(result, []byte(key), []byte(value))
}
return result
}

func constructQueriesFromTemplateFiles(session cassandra.Session, params *StorageConfigParams) ([]cassandra.Query, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is cassandra.Session not able to execute multiple queries at once?

var queries []cassandra.Query

schemaFileName := fmt.Sprintf(`schema/v00%s.cql.tmpl`, strconv.Itoa(4))
schemaData, err := schemaFile.ReadFile(schemaFileName)
if err != nil {
return nil, err
}

lines := bytes.Split(schemaData, []byte("\n"))
var extractedLines [][]byte

for _, line := range lines {
// Remove any comments, if at the end of the line
commentIndex := bytes.LastIndex(line, []byte(`--`))
if commentIndex != -1 {
// remove everything after comment
line = line[0:commentIndex]
}

if len(line) == 0 {
continue
}

extractedLines = append(extractedLines, bytes.TrimSpace(handleTemplateReplacements(line, params)))
}

// Construct individual queries
var queryString string
for _, line := range extractedLines {
queryString += string(line)
if bytes.HasSuffix(line, []byte(";")) {
queries = append(queries, session.Query(queryString))
queryString = ""
}
}

if len(queryString) > 0 {
return nil, fmt.Errorf(`Invalid template`)
}

return queries, nil
}

func (f *Factory) InitializeDB(session cassandra.Session) error {
params, err := constructStorageConfigParams()
if err != nil {
return err
}

queries, err := constructQueriesFromTemplateFiles(session, params)
if err != nil {
return err
}

for _, query := range queries {
err := query.Exec()
if err != nil {
return err
}
}

return nil
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil})
Expand All @@ -143,12 +405,21 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.primarySession = primarySession

// After creating a session, execute commands to initialize the setup if not already present
if err := f.InitializeDB(primarySession); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the schema already exists?

return err
}

if f.archiveConfig != nil {
archiveSession, err := f.archiveConfig.NewSession(logger)
if err != nil {
return err
}
f.archiveSession = archiveSession

if err := f.InitializeDB(archiveSession); err != nil {
return err
}
} else {
logger.Info("Cassandra archive storage configuration is empty, skipping")
}
Expand Down