Skip to content

Commit

Permalink
remove vanus sdk in go mod file
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai committed Jul 17, 2023
1 parent 14cc0d1 commit af954f9
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,48 @@ package exporter

import (
"context"
"os"

v2 "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
vanussdk "github.com/vanus-labs/sdk/golang"
"github.com/vanus-labs/vanus/client"
"github.com/vanus-labs/vanus/client/pkg/api"
"github.com/vanus-labs/vanus/observability/log"
"github.com/vanus-labs/vanus/pkg/cluster"
"github.com/vanus-labs/vanus/proto/pkg/cloudevents"
"github.com/vanus-labs/vanus/proto/pkg/codec"
"google.golang.org/grpc/credentials/insecure"

"go.opentelemetry.io/otel/attribute"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
)

func GetExporter(endpoints []string, eventbus string) tracesdk.SpanExporter {
spanExporter, err := New(context.Background(), WithEndpoints(endpoints), WithEventbus(eventbus))
if err != nil {
log.Error().Err(err).Msg("new span exporter failed")
os.Exit(-1)
}
return spanExporter
}

type Option func(*Options)

type Options struct {
Endpoints string
Endpoints []string
Eventbus string
}

func defaultOptions() *Options {
return &Options{
Endpoints: "127.0.0.1:8080",
Endpoints: []string{},
Eventbus: "event-tracing",
}
}

func WithEndpoint(endpoint string) Option {
func WithEndpoints(endpoints []string) Option {
return func(options *Options) {
options.Endpoints = endpoint
options.Endpoints = endpoints
}
}

Expand All @@ -54,9 +69,8 @@ func WithEventbus(eventbus string) Option {

// Exporter exports trace data in the OTLP wire format.
type Exporter struct {
endpoints string
client vanussdk.Client
publisher vanussdk.Publisher
endpoints []string
writer api.BusWriter
}

var _ tracesdk.SpanExporter = (*Exporter)(nil)
Expand All @@ -67,47 +81,52 @@ func New(ctx context.Context, opts ...Option) (*Exporter, error) {
apply(defaultOpts)
}

clientOpts := &vanussdk.ClientOptions{
Endpoint: defaultOpts.Endpoints,
Token: "admin",
ctrl := cluster.NewClusterController(defaultOpts.Endpoints, insecure.NewCredentials())
if err := ctrl.WaitForControllerReady(true); err != nil {
log.Error(ctx).Err(err).Msg("wait for controller ready timeout")
return nil, err
}

c, err := vanussdk.Connect(clientOpts)
eventbus, err := ctrl.EventbusService().GetEventbusByName(ctx, "default", defaultOpts.Eventbus)
if err != nil {
panic("failed to connect to Vanus cluster, error: " + err.Error())
log.Error(ctx).Err(err).Str("eventbus", defaultOpts.Eventbus).Msg("failed to get eventbus")
return nil, err
}

ebOpt := vanussdk.WithEventbus("default", defaultOpts.Eventbus)
c := client.Connect(defaultOpts.Endpoints)
bus := c.Eventbus(ctx, api.WithName(defaultOpts.Eventbus), api.WithID(eventbus.Id))
exporter := &Exporter{
endpoints: defaultOpts.Endpoints,
client: c,
publisher: c.Publisher(ebOpt),
}
_, err = c.Controller().Eventbus().Get(ctx, ebOpt)
if err != nil {
panic("failed to get tracing eventbus, error: " + err.Error())
writer: bus.Writer(),
}
return exporter, nil
}

// ExportSpans exports a batch of spans.
func (e *Exporter) ExportSpans(ctx context.Context, ss []tracesdk.ReadOnlySpan) error {
es := make([]*v2.Event, 0)
ces := make([]*cloudevents.CloudEvent, 0)
for _, span := range ss {
if span.Name() != "EventTracing" {
event := newEvent(span)
if event.Type() != "event-tracing" {
continue
}
event := newEvent(span)
es = append(es, &event)
eventpb, err := codec.ToProto(&event)
if err != nil {
log.Error(ctx).Err(err).Any("event", event).Msg("failed to proto event")
return nil
}
ces = append(ces, eventpb)
}

if len(es) == 0 {
if len(ces) == 0 {
return nil
}

err := e.publisher.Publish(ctx, es...)
ceBatch := &cloudevents.CloudEventBatch{
Events: ces,
}
_, err := e.writer.Append(ctx, ceBatch)
if err != nil {
log.Error(ctx).Err(err).Msg("failed to publish events to tracing eventbus")
log.Error(ctx).Err(err).Msg("failed to append events to tracing eventbus")
return nil
}
return nil
Expand All @@ -122,7 +141,6 @@ func newEvent(span tracesdk.ReadOnlySpan) v2.Event {
event := v2.NewEvent()
event.SetID(uuid.New().String())
event.SetSource(span.Name())
event.SetType(span.SpanKind().String())
data := make(map[string]interface{})
data["name"] = span.Name()
data["trace_id"] = span.SpanContext().TraceID().String()
Expand All @@ -136,6 +154,9 @@ func newEvent(span tracesdk.ReadOnlySpan) v2.Event {
data[string(attr.Key)] = attr.Value.AsInt64()
} else if attr.Value.Type() == attribute.STRING {
data[string(attr.Key)] = attr.Value.AsString()
if string(attr.Key) == "type" && attr.Value.AsString() == "event-tracing" {
event.SetType("event-tracing")
}
}
}
data["events"] = span.Events()
Expand Down
3 changes: 2 additions & 1 deletion cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"os"

"github.com/vanus-labs/vanus/client/pkg/exporter"
"github.com/vanus-labs/vanus/observability"
"github.com/vanus-labs/vanus/observability/log"
"github.com/vanus-labs/vanus/observability/metrics"
Expand All @@ -39,7 +40,7 @@ func main() {

ctx := signal.SetupSignalContext()
cfg.Observability.T.ServerName = "Vanus Gateway"
_ = observability.Initialize(ctx, cfg.Observability, metrics.GetGatewayMetrics)
_ = observability.Initialize(ctx, cfg.Observability, metrics.GetGatewayMetrics, exporter.GetExporter)

ga := gateway.NewGateway(*cfg)
if err = ga.Start(ctx); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/trigger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"google.golang.org/grpc"

"github.com/vanus-labs/vanus/client/pkg/exporter"
"github.com/vanus-labs/vanus/observability"
"github.com/vanus-labs/vanus/observability/log"
"github.com/vanus-labs/vanus/observability/metrics"
Expand Down Expand Up @@ -50,7 +51,7 @@ func main() {
}
ctx := signal.SetupSignalContext()
cfg.Observability.T.ServerName = "Vanus Trigger"
_ = observability.Initialize(ctx, cfg.Observability, metrics.GetTriggerMetrics)
_ = observability.Initialize(ctx, cfg.Observability, metrics.GetTriggerMetrics, exporter.GetExporter)
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
srv := trigger.NewTriggerServer(*cfg)
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ require (
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
)

require github.com/vanus-labs/sdk/golang v0.4.7 // indirect

require (
cloud.google.com/go/compute v1.19.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
Expand Down Expand Up @@ -129,4 +127,4 @@ replace (
github.com/vanus-labs/vanus/raft => ./raft
)

// replace github.com/vanus-labs/sdk/golang => ./FORBIDDEN_DEPENDENCY
replace github.com/vanus-labs/sdk/golang => ./FORBIDDEN_DEPENDENCY
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,6 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/vanus-labs/sdk/golang v0.4.7 h1:SIWuyguOX4t0Jve9U7g6A2A4knFjoIW+9UhMsji8jIY=
github.com/vanus-labs/sdk/golang v0.4.7/go.mod h1:QpmncLBj1i1rtZmqSsoZyWN4l6odMHpSZmbad4GBuQQ=
github.com/vigneshuvi/GoDateFormat v0.0.0-20210204121036-67364dc23c79 h1:37VzBuFO88QQnCEu+G41v9IqgJNBXR+4vR9vGwVqJ00=
github.com/vigneshuvi/GoDateFormat v0.0.0-20210204121036-67364dc23c79/go.mod h1:190gFTWxRNREiiPal7zWZlNrwFSpv3BxDmOfgYqoYCY=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
8 changes: 6 additions & 2 deletions internal/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,12 @@ func (ga *ceGateway) receive(ctx context.Context, event v2.Event) (re *v2.Event,
ctx, span := ga.tracer.Start(ctx, "receive")
defer span.End()

event.SetExtension("traceid", span.SpanContext().TraceID().String())
span.SetName("EventTracing")
traceID := span.SpanContext().TraceID().String()
spanID := span.SpanContext().SpanID().String()
traceFlags := span.SpanContext().TraceFlags().String()
event.SetExtension("traceparent", fmt.Sprintf("00-%s-%s-%s", traceID, spanID, traceFlags))
span.SetName(event.ID())
span.SetAttributes(attribute.String("type", "event-tracing"))
span.SetAttributes(attribute.String("event_id", event.ID()))
span.AddEvent("received from source", trace.WithTimestamp(time.Now()))

Expand Down
11 changes: 7 additions & 4 deletions internal/trigger/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/binary"
stderr "errors"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -206,7 +207,8 @@ func (elReader *eventlogReader) loop(ctx context.Context, lr api.BusReader) erro
}
for i := range events {
_, span := elReader.newSpan(ctx, *events[i])
span.SetName("EventTracing")
span.SetName(events[i].ID())
span.SetAttributes(attribute.String("type", "event-tracing"))
span.SetAttributes(attribute.String("event_id", events[i].ID()))
span.AddEvent("read from eventbus", trace.WithTimestamp(time.Now()))
span.SetAttributes(attribute.String("eventbus_id", elReader.config.EventbusID.String()))
Expand Down Expand Up @@ -240,12 +242,13 @@ func (elReader *eventlogReader) newSpan(ctx context.Context, event ce.Event) (co
if event.Extensions() == nil {
return elReader.config.Tracer.Start(ctx, event.ID())
}
if _, ok := event.Extensions()["traceid"]; !ok {
if _, ok := event.Extensions()["traceparent"]; !ok {
return elReader.config.Tracer.Start(ctx, event.ID())
}
traceid, _ := trace.TraceIDFromHex(event.Extensions()["traceid"].(string))
tps := strings.Split(event.Extensions()["traceparent"].(string), "-")
traceID, _ := trace.TraceIDFromHex(tps[1])
_ctx := trace.ContextWithSpanContext(ctx, trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceid,
TraceID: traceID,
}))
return elReader.config.Tracer.Start(_ctx, event.ID())
}
Expand Down
17 changes: 1 addition & 16 deletions observability/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ module github.com/vanus-labs/vanus/observability
go 1.19

require (
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.15.0
github.com/rs/zerolog v1.29.1
github.com/vanus-labs/sdk/golang v0.4.7
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
Expand All @@ -21,34 +18,22 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/vanus-labs/vanus/pkg v0.0.0-20230330121558-f90ea26ae55b // indirect
github.com/vanus-labs/vanus/proto v0.7.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

// replace github.com/vanus-labs/vanus => ../FORBIDDEN_DEPENDENCY
replace github.com/vanus-labs/vanus => ../FORBIDDEN_DEPENDENCY
Loading

0 comments on commit af954f9

Please sign in to comment.