From 75de8a6330bfef0ac286206f30906276bd92cce7 Mon Sep 17 00:00:00 2001 From: Vishal Choudhary Date: Tue, 15 Nov 2022 22:32:21 +0530 Subject: [PATCH 1/2] added decoders Signed-off-by: Vishal Choudhary --- cmd/ingester/app/builder/builder.go | 4 ++++ plugin/storage/kafka/options.go | 4 ++++ plugin/storage/kafka/unmarshaller.go | 32 ++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index d984b5dcd03..4046c7ad660 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -39,6 +39,10 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit unmarshaller = kafka.NewProtobufUnmarshaller() case kafka.EncodingZipkinThrift: unmarshaller = kafka.NewZipkinThriftUnmarshaller() + case kafka.EncodingOtlpJSON: + unmarshaller = kafka.NewOtlpJSONUnmarshaller() + case kafka.EncodingOtlpProto: + unmarshaller = kafka.NewOtlpProtoUnmarshaller() default: return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s")`, options.Encoding, strings.Join(kafka.AllEncodings, "\", \"")) diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 9efdd64784a..712e2efc2b4 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -34,6 +34,10 @@ const ( EncodingProto = "protobuf" // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" + // EncodingOtlpJSON is used for spans encoded as OTLP JSON. + EncodingOtlpJSON = "otlp-json" + // EncodingOtlpProto is used for spans encoded as OTLP Proto. + EncodingOtlpProto = "otlp-proto" configPrefix = "kafka.producer" suffixBrokers = ".brokers" diff --git a/plugin/storage/kafka/unmarshaller.go b/plugin/storage/kafka/unmarshaller.go index 32b9a52ad12..6efe6739c1c 100644 --- a/plugin/storage/kafka/unmarshaller.go +++ b/plugin/storage/kafka/unmarshaller.go @@ -19,6 +19,8 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" @@ -79,3 +81,33 @@ func (h *ZipkinThriftUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) { } return mSpans[0], err } + +type OtlpJSONUnmarshaller struct{} + +func NewOtlpJSONUnmarshaller() *OtlpJSONUnmarshaller { + return &OtlpJSONUnmarshaller{} +} + +func (OtlpJSONUnmarshaller) Unmarshal(buf []byte) (ptrace.Traces, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + if err != nil { + return ptrace.NewTraces(), err + } + return req.Traces(), nil +} + +type OtlpProtoUnmarshaller struct{} + +func NewOtlpProtoUnmarshaller() *OtlpProtoUnmarshaller { + return &OtlpProtoUnmarshaller{} +} + +func (h *OtlpProtoUnmarshaller) Unmarshal(buf []byte) (ptrace.Traces, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalProto(buf) + if err != nil { + return ptrace.NewTraces(), err + } + return req.Traces(), nil +} From cd3205095ac35e54a37e47650f6c3f47907c5bac Mon Sep 17 00:00:00 2001 From: Vishal Choudhary Date: Wed, 16 Nov 2022 20:00:56 +0530 Subject: [PATCH 2/2] converted ptrace.Traces to *model.Span Signed-off-by: Vishal Choudhary --- plugin/storage/kafka/unmarshaller.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/plugin/storage/kafka/unmarshaller.go b/plugin/storage/kafka/unmarshaller.go index 6efe6739c1c..020aba093d7 100644 --- a/plugin/storage/kafka/unmarshaller.go +++ b/plugin/storage/kafka/unmarshaller.go @@ -19,11 +19,11 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" - "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" ) // Unmarshaller decodes a byte array to a span @@ -88,13 +88,18 @@ func NewOtlpJSONUnmarshaller() *OtlpJSONUnmarshaller { return &OtlpJSONUnmarshaller{} } -func (OtlpJSONUnmarshaller) Unmarshal(buf []byte) (ptrace.Traces, error) { +func (OtlpJSONUnmarshaller) Unmarshal(buf []byte) (*model.Span, error) { req := ptraceotlp.NewExportRequest() err := req.UnmarshalJSON(buf) if err != nil { - return ptrace.NewTraces(), err + return nil, err + } + + batch, err := otlp2jaeger.ProtoFromTraces(req.Traces()) + if err != nil { + return nil, err } - return req.Traces(), nil + return batch[0].Spans[0], nil } type OtlpProtoUnmarshaller struct{} @@ -103,11 +108,16 @@ func NewOtlpProtoUnmarshaller() *OtlpProtoUnmarshaller { return &OtlpProtoUnmarshaller{} } -func (h *OtlpProtoUnmarshaller) Unmarshal(buf []byte) (ptrace.Traces, error) { +func (h *OtlpProtoUnmarshaller) Unmarshal(buf []byte) (*model.Span, error) { req := ptraceotlp.NewExportRequest() err := req.UnmarshalProto(buf) if err != nil { - return ptrace.NewTraces(), err + return nil, err + } + + batch, err := otlp2jaeger.ProtoFromTraces(req.Traces()) + if err != nil { + return nil, err } - return req.Traces(), nil + return batch[0].Spans[0], nil }