Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra init #6072

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/cassandra"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema"
)

// Configuration describes the configuration properties needed to connect to a Cassandra cluster.
Expand Down Expand Up @@ -144,6 +145,7 @@ func (c *Configuration) NewSession() (cassandra.Session, error) {
if err != nil {
return nil, err
}
mappings.SchemaInit(session)
return gocqlw.WrapCQLSession(session), nil
}

Expand Down
216 changes: 216 additions & 0 deletions plugin/storage/cassandra/schema/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package mappings

import (
"bytes"
"errors"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"text/template"

"github.com/gocql/gocql"
//"github.com/jaegertracing/jaeger/pkg/cassandra"
)

type MappingBuilder struct {
Mode string
Datacentre string
Keyspace string
Replication string
TraceTTL int
DependenciesTTL int
CompactionWindowSize int
CompactionWindowUnit string
}

func (mb *MappingBuilder) renderTemplate(templatePath string) (string, error) {
tmpl, err := template.ParseFiles(templatePath)
if err != nil {
return "", err

Check warning on line 34 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L34

Added line #L34 was not covered by tests
}

var renderedOutput bytes.Buffer
if err := tmpl.Execute(&renderedOutput, mb); err != nil {
return "", err

Check warning on line 39 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L39

Added line #L39 was not covered by tests
}

return renderedOutput.String(), nil
}

func RenderCQLTemplate(params MappingBuilder, cqlOutput string) (string, error) {
commentRegex := regexp.MustCompile(`--.*`)
cqlOutput = commentRegex.ReplaceAllString(cqlOutput, "")

lines := strings.Split(cqlOutput, "\n")
var filteredLines []string

for _, line := range lines {
if strings.TrimSpace(line) != "" {
filteredLines = append(filteredLines, line)
}
}

cqlOutput = strings.Join(filteredLines, "\n")

replacements := map[string]string{
"${keyspace}": params.Keyspace,
"${replication}": params.Replication,
"${trace_ttl}": fmt.Sprintf("%d", params.TraceTTL),
"${dependencies_ttl}": fmt.Sprintf("%d", params.DependenciesTTL),
"${compaction_window_size}": fmt.Sprintf("%d", params.CompactionWindowSize),
"${compaction_window_unit}": params.CompactionWindowUnit,
}

for placeholder, value := range replacements {
cqlOutput = strings.ReplaceAll(cqlOutput, placeholder, value)
}

return cqlOutput, nil
}

func getEnv(key, defaultValue string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return defaultValue
}

func isValidKeyspace(keyspace string) bool {
match, _ := regexp.MatchString(`^[a-zA-Z0-9_]+$`, keyspace)
return match
}

func getCompactionWindow(traceTTL int, compactionWindow string) (int, string, error) {
var compactionWindowSize int
var compactionWindowUnit string

if compactionWindow != "" {
matched, err := regexp.MatchString(`^[0-9]+[mhd]$`, compactionWindow)
if err != nil {
return 0, "", err

Check warning on line 95 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L95

Added line #L95 was not covered by tests
}

if matched {
numStr := strings.TrimRight(compactionWindow, "mhd")
unitStr := strings.TrimLeft(compactionWindow, numStr)

compactionWindowSize, err = strconv.Atoi(numStr)
if err != nil {
return 0, "", errors.New("invalid compaction window size format")

Check warning on line 104 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L104

Added line #L104 was not covered by tests
}
compactionWindowUnit = unitStr
} else {
return 0, "", errors.New("invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days")
}
} else {
traceTTLMinutes := traceTTL / 60
compactionWindowSize = (traceTTLMinutes + 30 - 1) / 30
compactionWindowUnit = "m"
}

switch compactionWindowUnit {
case "m":
compactionWindowUnit = "MINUTES"
case "h":
compactionWindowUnit = "HOURS"
case "d":
compactionWindowUnit = "DAYS"
default:
return 0, "", errors.New("invalid compaction window unit")

Check warning on line 124 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L123-L124

Added lines #L123 - L124 were not covered by tests
}

return compactionWindowSize, compactionWindowUnit, nil
}

func (mb *MappingBuilder) GetSpanServiceMappings() (spanMapping string, serviceMapping string, err error) {
traceTTL, _ := strconv.Atoi(getEnv("TRACE_TTL", "172800"))
dependenciesTTL, _ := strconv.Atoi(getEnv("DEPENDENCIES_TTL", "0"))
cas_version := getEnv("VERSION", "4")
// template := os.Args[1]
var template string
var cqlOutput string
if template == "" {
switch cas_version {
case "3":
template = "./v003.cql.tmpl"

Check warning on line 140 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L139-L140

Added lines #L139 - L140 were not covered by tests
case "4":
template = "./v004.cql.tmpl"
default:
template = "./v004.cql.tmpl"

Check warning on line 144 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L143-L144

Added lines #L143 - L144 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how you are planning to use those files. They use variable placeholders like ${keyspace}, which works in shell substitution, but will not work as a Go template. We should be using Go template the this functionality. Your could do a search/replace, something like ${keyspace} ==> {{ .Keyspace }}. But it would be easier just to copy and change the syntax. You might need to use conditional clauses, like {{- if .UseILM}} from ES templates.

}
mode := getEnv("MODE", "")

if mode == "" {
fmt.Println("missing MODE parameter")
return
}
var datacentre, replications string
// var ReplicationFactor int
if mode == "prod" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this distinction anymore. Users can provide precise values for parameters, we don't have to have Jaeger guess those parameters based on the MODE

datacentre = getEnv("DATACENTRE", "")
if datacentre == "" {
fmt.Println("missing DATACENTRE parameter for prod mode")
return

Check warning on line 158 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L155-L158

Added lines #L155 - L158 were not covered by tests
}
replicationFactor, _ := strconv.Atoi(getEnv("REPLICATION_FACTOR", "2"))
replications = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': %d}", datacentre, replicationFactor)

Check warning on line 161 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L160-L161

Added lines #L160 - L161 were not covered by tests
} else if mode == "test" {
datacentre = getEnv("DATACENTRE", "")
replicationFactor, _ := strconv.Atoi(getEnv("REPLICATION_FACTOR", "1"))
replications = fmt.Sprintf("{'class': 'SimpleStrategy', 'replication_factor': '%d'}", replicationFactor)
} else {
fmt.Printf("invaild mode: %s, expecting 'prod' or 'test'", mode)
return

Check warning on line 168 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L167-L168

Added lines #L167 - L168 were not covered by tests
}
keyspace := getEnv("KEYSPACE", fmt.Sprintf("jaeger_v1_%s", mode))
if !isValidKeyspace(keyspace) {
fmt.Printf("invaild characters in KEYSPACE=%s parameter , please use letters, digits, and underscores only", keyspace)
return

Check warning on line 173 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L172-L173

Added lines #L172 - L173 were not covered by tests
}
CompactionWindowSize, compactionWindowUnit, _ := getCompactionWindow(traceTTL, getEnv("COMPACTION_WINDOW_SIZE", ""))

params := MappingBuilder{
Mode: mode,
Datacentre: datacentre,
Keyspace: keyspace,
Replication: replications,
TraceTTL: traceTTL,
DependenciesTTL: dependenciesTTL,
CompactionWindowSize: CompactionWindowSize,
CompactionWindowUnit: compactionWindowUnit,
}
// Render the template
cqlOutput, err = params.renderTemplate(template)
if err != nil {
return "", "", err

Check warning on line 190 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L190

Added line #L190 was not covered by tests
}
cqlOutput, _ = RenderCQLTemplate(params, cqlOutput)
// fmt.Println(cqlOutput)
}
return cqlOutput, "", err
}

func SchemaInit(c *gocql.Session) {
builder := &MappingBuilder{}
schema, _, err := builder.GetSpanServiceMappings()
Comment on lines +199 to +200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does the term "mapping" come from? If you're copying it from Elasticsearch, there "mapping" is actually a term understood by ES. No such thing in Cassandra.

if err != nil {
fmt.Println("Error:", err)

Check warning on line 202 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L202

Added line #L202 was not covered by tests
}
queries := strings.Split(schema, ";")
for _, query := range queries {
trimmedQuery := strings.TrimSpace(query)
if trimmedQuery != "" {
fmt.Println(trimmedQuery)
if err := c.Query(trimmedQuery + ";").Exec(); err != nil {
fmt.Printf("Failed to create sampling_probabilities table: %v", err)
} else {
fmt.Println("Table sampling_probabilities created successfully.")

Check warning on line 212 in plugin/storage/cassandra/schema/create.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/schema/create.go#L208-L212

Added lines #L208 - L212 were not covered by tests
}
}
}
}
113 changes: 113 additions & 0 deletions plugin/storage/cassandra/schema/create_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package mappings

import (
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

// func TestRenderTemplate(t *testing.T) {
// mb := &MappingBuilder{
// Mode: "test",
// Datacentre: "dc1",
// Keyspace: "test_keyspace",
// Replication: "{'class': 'SimpleStrategy', 'replication_factor': '1'}",
// TraceTTL: 172800,
// DependenciesTTL: 0,
// CompactionWindowSize: 30,
// CompactionWindowUnit: "MINUTES",
// }

// // Create a temporary template file
// templateContent := "CREATE KEYSPACE ${keyspace} WITH replication = ${replication};"
// tmpFile, err := os.CreateTemp("", "template.cql.tmpl")
// assert.NoError(t, err)
// defer os.Remove(tmpFile.Name())

// _, err = tmpFile.WriteString(templateContent)
// assert.NoError(t, err)
// tmpFile.Close()

// renderedOutput, err := mb.renderTemplate(tmpFile.Name())
// assert.NoError(t, err)
// expectedOutput := "CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};"
// assert.Equal(t, expectedOutput, renderedOutput)
// }

func TestRenderCQLTemplate(t *testing.T) {
params := MappingBuilder{
Keyspace: "test_keyspace",
Replication: "{'class': 'SimpleStrategy', 'replication_factor': '1'}",
TraceTTL: 172800,
DependenciesTTL: 0,
CompactionWindowSize: 30,
CompactionWindowUnit: "MINUTES",
}

cqlOutput := `
CREATE KEYSPACE ${keyspace} WITH replication = ${replication};
CREATE TABLE ${keyspace}.traces (trace_id UUID PRIMARY KEY, span_id UUID, trace_ttl int);
`

renderedOutput, err := RenderCQLTemplate(params, cqlOutput)
assert.NoError(t, err)
expectedOutput := `
CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE test_keyspace.traces (trace_id UUID PRIMARY KEY, span_id UUID, trace_ttl int);
`
assert.Equal(t, strings.TrimSpace(expectedOutput), strings.TrimSpace(renderedOutput))
}

func TestGetCompactionWindow(t *testing.T) {
tests := []struct {
traceTTL int
compactionWindow string
expectedSize int
expectedUnit string
expectError bool
}{
{172800, "30m", 30, "MINUTES", false},
{172800, "2h", 2, "HOURS", false},
{172800, "1d", 1, "DAYS", false},
{172800, "", 96, "MINUTES", false},
{172800, "invalid", 0, "", true},
}

for _, test := range tests {
size, unit, err := getCompactionWindow(test.traceTTL, test.compactionWindow)
if test.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expectedSize, size)
assert.Equal(t, test.expectedUnit, unit)
}
}
}

func TestIsValidKeyspace(t *testing.T) {
assert.True(t, isValidKeyspace("valid_keyspace"))
assert.False(t, isValidKeyspace("invalid-keyspace"))
assert.False(t, isValidKeyspace("invalid keyspace"))
}

func TestGetSpanServiceMappings(t *testing.T) {
os.Setenv("TRACE_TTL", "172800")
os.Setenv("DEPENDENCIES_TTL", "0")
os.Setenv("VERSION", "4")
os.Setenv("MODE", "test")
os.Setenv("DATACENTRE", "dc1")
os.Setenv("REPLICATION_FACTOR", "1")
os.Setenv("KEYSPACE", "test_keyspace")

builder := &MappingBuilder{}
spanMapping, serviceMapping, err := builder.GetSpanServiceMappings()
assert.NoError(t, err)
assert.NotEmpty(t, spanMapping)
assert.Empty(t, serviceMapping)
}
Loading