From 88ecb839e04f472f15ccdc846f6ba0e77ed357af Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Fri, 20 Sep 2024 17:27:14 -0400 Subject: [PATCH 01/12] clarify name of SpanIDDeduper Signed-off-by: Chris Danis --- cmd/query/app/querysvc/adjusters.go | 2 +- model/adjuster/clockskew.go | 2 +- model/adjuster/span_id_deduper.go | 4 ++-- model/adjuster/span_id_deduper_test.go | 10 +++++----- plugin/storage/memory/memory.go | 4 ++-- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/query/app/querysvc/adjusters.go b/cmd/query/app/querysvc/adjusters.go index 346d0c0bca4..81fdab8178c 100644 --- a/cmd/query/app/querysvc/adjusters.go +++ b/cmd/query/app/querysvc/adjusters.go @@ -14,7 +14,7 @@ import ( // before returning the data to the API clients. func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster { return []adjuster.Adjuster{ - adjuster.SpanIDDeduper(), + adjuster.ZipkinSpanIDRenamer(), adjuster.ClockSkew(maxClockSkewAdjust), adjuster.IPTagAdjuster(), adjuster.OTelTagAdjuster(), diff --git a/model/adjuster/clockskew.go b/model/adjuster/clockskew.go index e7965f5a11b..e6fec7f9dce 100644 --- a/model/adjuster/clockskew.go +++ b/model/adjuster/clockskew.go @@ -19,7 +19,7 @@ import ( // child spans do not start before or end after their parent spans. // // The algorithm assumes that all spans have unique IDs, so the trace may need -// to go through another adjuster first, such as SpanIDDeduper. +// to go through another adjuster first, such as ZipkinSpanIDRenamer. // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/span_id_deduper.go index 9b669a42932..cf36f1ddcb8 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/span_id_deduper.go @@ -11,7 +11,7 @@ import ( "github.com/jaegertracing/jaeger/model" ) -// SpanIDDeduper returns an adjuster that changes span ids for server +// ZipkinSpanIDRenamer returns an adjuster that changes span ids for server // spans (i.e. spans with tag: span.kind == server) if there is another // client span that shares the same span ID. This is needed to deal with // Zipkin-style clients that reuse the same span ID for both client and server @@ -19,7 +19,7 @@ import ( // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. -func SpanIDDeduper() Adjuster { +func ZipkinSpanIDRenamer() Adjuster { return Func(func(trace *model.Trace) (*model.Trace, error) { deduper := &spanIDDeduper{trace: trace} deduper.groupSpansByID() diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_id_deduper_test.go index 503d9e388db..270d9cc4ead 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/span_id_deduper_test.go @@ -52,9 +52,9 @@ func newTrace() *model.Trace { } } -func TestSpanIDDeduperTriggered(t *testing.T) { +func TestZipkinSpanIDRenamerTriggered(t *testing.T) { trace := newTrace() - deduper := SpanIDDeduper() + deduper := ZipkinSpanIDRenamer() trace, err := deduper.Adjust(trace) require.NoError(t, err) @@ -70,11 +70,11 @@ func TestSpanIDDeduperTriggered(t *testing.T) { assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") } -func TestSpanIDDeduperNotTriggered(t *testing.T) { +func TestZipkinSpanIDRenamerNotTriggered(t *testing.T) { trace := newTrace() trace.Spans = trace.Spans[1:] // remove client span - deduper := SpanIDDeduper() + deduper := ZipkinSpanIDRenamer() trace, err := deduper.Adjust(trace) require.NoError(t, err) @@ -87,7 +87,7 @@ func TestSpanIDDeduperNotTriggered(t *testing.T) { assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") } -func TestSpanIDDeduperError(t *testing.T) { +func TestZipkinSpanIDRenamerError(t *testing.T) { trace := newTrace() maxID := int64(-1) diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index d590f1ad1ce..52d748ff750 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -59,7 +59,7 @@ func newTenant(cfg Configuration) *Tenant { traces: map[model.TraceID]*model.Trace{}, services: map[string]struct{}{}, operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.SpanIDDeduper(), + deduper: adjuster.ZipkinSpanIDRenamer(), config: cfg, } } @@ -90,7 +90,7 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback deps := map[string]*model.DependencyLink{} startTs := endTs.Add(-1 * lookback) for _, orig := range m.traces { - // SpanIDDeduper never returns an err + // ZipkinSpanIDRenamer never returns an err trace, _ := m.deduper.Adjust(orig) if traceIsBetweenStartAndEnd(startTs, endTs, trace) { for _, s := range trace.Spans { From d5bdf1466aa9f881fad67fa58ce6440535542bb2 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Fri, 20 Sep 2024 17:28:35 -0400 Subject: [PATCH 02/12] rename zipkin-specific method as well Signed-off-by: Chris Danis --- model/adjuster/span_id_deduper.go | 4 ++-- model/adjuster/span_id_deduper_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/span_id_deduper.go index cf36f1ddcb8..2ecd53b6ceb 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/span_id_deduper.go @@ -23,7 +23,7 @@ func ZipkinSpanIDRenamer() Adjuster { return Func(func(trace *model.Trace) (*model.Trace, error) { deduper := &spanIDDeduper{trace: trace} deduper.groupSpansByID() - deduper.dedupeSpanIDs() + deduper.uniquifyServerSpanIDs() return deduper.trace, nil }) } @@ -63,7 +63,7 @@ func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool { return false } -func (d *spanIDDeduper) dedupeSpanIDs() { +func (d *spanIDDeduper) uniquifyServerSpanIDs() { oldToNewSpanIDs := make(map[model.SpanID]model.SpanID) for _, span := range d.trace.Spans { // only replace span IDs for server-side spans that share the ID with something else diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_id_deduper_test.go index 270d9cc4ead..7b3a056af04 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/span_id_deduper_test.go @@ -96,7 +96,7 @@ func TestZipkinSpanIDRenamerError(t *testing.T) { deduper := &spanIDDeduper{trace: trace} deduper.groupSpansByID() deduper.maxUsedID = maxSpanID - 1 - deduper.dedupeSpanIDs() + deduper.uniquifyServerSpanIDs() if assert.Len(t, trace.Spans[1].Warnings, 1) { assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0]) } From fdc22ca0c8d26c46d809c8ee27671a97d9034fef Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Fri, 20 Sep 2024 17:32:16 -0400 Subject: [PATCH 03/12] fix comment Signed-off-by: Chris Danis --- model/adjuster/span_id_deduper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/span_id_deduper.go index 2ecd53b6ceb..d930e980b1a 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/span_id_deduper.go @@ -82,7 +82,7 @@ func (d *spanIDDeduper) uniquifyServerSpanIDs() { } // swapParentIDs corrects ParentSpanID of all spans that are children of the server -// spans whose IDs we deduped. +// spans whose IDs we made unique. func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) { for _, span := range d.trace.Spans { if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok { From e5dc418fe021834115c2a36868683fa22cafaaa7 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Fri, 20 Sep 2024 17:34:56 -0400 Subject: [PATCH 04/12] add DedupeSpansByID Signed-off-by: Chris Danis --- model/adjuster/span_id_deduper.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/span_id_deduper.go index d930e980b1a..48713607ef1 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/span_id_deduper.go @@ -28,6 +28,16 @@ func ZipkinSpanIDRenamer() Adjuster { }) } +// DedupeBySpanID returns an adjuster that removes all but one span with the same SpanID. +func DedupeBySpanID() Adjuster { + return Func(func(trace *model.Trace) (*model.Trace, error) { + deduper := &spanIDDeduper{trace: trace} + deduper.groupSpansByID() + deduper.dedupeSpansByID() + return deduper.trace, nil + }) +} + const ( warningTooManySpans = "cannot assign unique span ID, too many spans in the trace" ) @@ -54,6 +64,13 @@ func (d *spanIDDeduper) groupSpansByID() { d.spansByID = spansByID } +func (d *spanIDDeduper) dedupeSpansByID() { + d.trace.Spans = nil + for _, spans := range d.spansByID { + d.trace.Spans = append(d.trace.Spans, spans[0]) + } +} + func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool { for _, span := range d.spansByID[spanID] { if span.IsRPCClient() { From ae4c914dfb84837d6b439e619ef3061cb06410d7 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 10:28:15 -0400 Subject: [PATCH 05/12] add comment and test DedupeBySpanID Signed-off-by: Chris Danis --- model/adjuster/span_id_deduper.go | 2 ++ model/adjuster/span_id_deduper_test.go | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/span_id_deduper.go index 48713607ef1..fbee318487d 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/span_id_deduper.go @@ -29,6 +29,8 @@ func ZipkinSpanIDRenamer() Adjuster { } // DedupeBySpanID returns an adjuster that removes all but one span with the same SpanID. +// This is useful for when spans are duplicated in archival storage, as happens with +// ElasticSearch archival. func DedupeBySpanID() Adjuster { return Func(func(trace *model.Trace) (*model.Trace, error) { deduper := &spanIDDeduper{trace: trace} diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_id_deduper_test.go index 7b3a056af04..aa571cba04d 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/span_id_deduper_test.go @@ -101,3 +101,14 @@ func TestZipkinSpanIDRenamerError(t *testing.T) { assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0]) } } + +func TestDedupeBySpanID(t *testing.T) { + trace := newTrace() + deduper := DedupeBySpanID() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, 2, "should dedupe spans") + assert.Equal(t, clientSpanID, trace.Spans[0].SpanID, "client span should be kept") + assert.Equal(t, anotherSpanID, trace.Spans[1].SpanID, "3rd span should be kept") +} From 18a9baba70b2f42ecdf4b7d7a616907e3b3ec5a4 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 10:28:32 -0400 Subject: [PATCH 06/12] add DedupeBySpanID to default adjusters Signed-off-by: Chris Danis --- cmd/query/app/querysvc/adjusters.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/query/app/querysvc/adjusters.go b/cmd/query/app/querysvc/adjusters.go index 81fdab8178c..0e5ac21d956 100644 --- a/cmd/query/app/querysvc/adjusters.go +++ b/cmd/query/app/querysvc/adjusters.go @@ -15,6 +15,7 @@ import ( func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster { return []adjuster.Adjuster{ adjuster.ZipkinSpanIDRenamer(), + adjuster.DedupeBySpanID(), adjuster.ClockSkew(maxClockSkewAdjust), adjuster.IPTagAdjuster(), adjuster.OTelTagAdjuster(), From e847055e5ed17a851a5315a4616d6b471f72ffe7 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 12:50:51 -0400 Subject: [PATCH 07/12] tweak zipkin adjuster name & move to new file Signed-off-by: Chris Danis --- cmd/query/app/querysvc/adjusters.go | 2 +- model/adjuster/clockskew.go | 2 +- model/adjuster/span_id_deduper.go | 98 ---------------- model/adjuster/span_id_deduper_test.go | 60 +--------- model/adjuster/zipkin_span_id_uniquify.go | 107 ++++++++++++++++++ .../adjuster/zipkin_span_id_uniquify_test.go | 103 +++++++++++++++++ plugin/storage/memory/memory.go | 4 +- 7 files changed, 216 insertions(+), 160 deletions(-) create mode 100644 model/adjuster/zipkin_span_id_uniquify.go create mode 100644 model/adjuster/zipkin_span_id_uniquify_test.go diff --git a/cmd/query/app/querysvc/adjusters.go b/cmd/query/app/querysvc/adjusters.go index 0e5ac21d956..be2469f4ab0 100644 --- a/cmd/query/app/querysvc/adjusters.go +++ b/cmd/query/app/querysvc/adjusters.go @@ -14,7 +14,7 @@ import ( // before returning the data to the API clients. func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster { return []adjuster.Adjuster{ - adjuster.ZipkinSpanIDRenamer(), + adjuster.ZipkinSpanIDUniquifier(), adjuster.DedupeBySpanID(), adjuster.ClockSkew(maxClockSkewAdjust), adjuster.IPTagAdjuster(), diff --git a/model/adjuster/clockskew.go b/model/adjuster/clockskew.go index e6fec7f9dce..30f922307dc 100644 --- a/model/adjuster/clockskew.go +++ b/model/adjuster/clockskew.go @@ -19,7 +19,7 @@ import ( // child spans do not start before or end after their parent spans. // // The algorithm assumes that all spans have unique IDs, so the trace may need -// to go through another adjuster first, such as ZipkinSpanIDRenamer. +// to go through another adjuster first, such as ZipkinSpanIDUniquifier. // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/span_id_deduper.go index fbee318487d..6886c464993 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/span_id_deduper.go @@ -5,29 +5,9 @@ package adjuster import ( - "errors" - "math" - "github.com/jaegertracing/jaeger/model" ) -// ZipkinSpanIDRenamer returns an adjuster that changes span ids for server -// spans (i.e. spans with tag: span.kind == server) if there is another -// client span that shares the same span ID. This is needed to deal with -// Zipkin-style clients that reuse the same span ID for both client and server -// side of an RPC call. Jaeger UI expects all spans to have unique IDs. -// -// This adjuster never returns any errors. Instead it records any issues -// it encounters in Span.Warnings. -func ZipkinSpanIDRenamer() Adjuster { - return Func(func(trace *model.Trace) (*model.Trace, error) { - deduper := &spanIDDeduper{trace: trace} - deduper.groupSpansByID() - deduper.uniquifyServerSpanIDs() - return deduper.trace, nil - }) -} - // DedupeBySpanID returns an adjuster that removes all but one span with the same SpanID. // This is useful for when spans are duplicated in archival storage, as happens with // ElasticSearch archival. @@ -40,87 +20,9 @@ func DedupeBySpanID() Adjuster { }) } -const ( - warningTooManySpans = "cannot assign unique span ID, too many spans in the trace" -) - -var maxSpanID = model.NewSpanID(math.MaxUint64) - -type spanIDDeduper struct { - trace *model.Trace - spansByID map[model.SpanID][]*model.Span - maxUsedID model.SpanID -} - -// groupSpansByID groups spans with the same ID returning a map id -> []Span -func (d *spanIDDeduper) groupSpansByID() { - spansByID := make(map[model.SpanID][]*model.Span) - for _, span := range d.trace.Spans { - if spans, ok := spansByID[span.SpanID]; ok { - // TODO maybe return an error if more than 2 spans found - spansByID[span.SpanID] = append(spans, span) - } else { - spansByID[span.SpanID] = []*model.Span{span} - } - } - d.spansByID = spansByID -} - func (d *spanIDDeduper) dedupeSpansByID() { d.trace.Spans = nil for _, spans := range d.spansByID { d.trace.Spans = append(d.trace.Spans, spans[0]) } } - -func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool { - for _, span := range d.spansByID[spanID] { - if span.IsRPCClient() { - return true - } - } - return false -} - -func (d *spanIDDeduper) uniquifyServerSpanIDs() { - oldToNewSpanIDs := make(map[model.SpanID]model.SpanID) - for _, span := range d.trace.Spans { - // only replace span IDs for server-side spans that share the ID with something else - if span.IsRPCServer() && d.isSharedWithClientSpan(span.SpanID) { - newID, err := d.makeUniqueSpanID() - if err != nil { - span.Warnings = append(span.Warnings, err.Error()) - continue - } - oldToNewSpanIDs[span.SpanID] = newID - span.ReplaceParentID(span.SpanID) // previously shared ID is the new parent - span.SpanID = newID - } - } - d.swapParentIDs(oldToNewSpanIDs) -} - -// swapParentIDs corrects ParentSpanID of all spans that are children of the server -// spans whose IDs we made unique. -func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) { - for _, span := range d.trace.Spans { - if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok { - if span.SpanID != parentID { - span.ReplaceParentID(parentID) - } - } - } -} - -// makeUniqueSpanID returns a new ID that is not used in the trace, -// or an error if such ID cannot be generated, which is unlikely, -// given that the whole space of span IDs is 2^64. -func (d *spanIDDeduper) makeUniqueSpanID() (model.SpanID, error) { - for id := d.maxUsedID + 1; id < maxSpanID; id++ { - if _, ok := d.spansByID[id]; !ok { - d.maxUsedID = id - return id, nil - } - } - return 0, errors.New(warningTooManySpans) -} diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_id_deduper_test.go index aa571cba04d..162823a9027 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/span_id_deduper_test.go @@ -14,13 +14,7 @@ import ( "github.com/jaegertracing/jaeger/model" ) -var ( - clientSpanID = model.NewSpanID(1) - anotherSpanID = model.NewSpanID(11) - keySpanKind = "span.kind" -) - -func newTrace() *model.Trace { +func newDuplicatedSpansTrace() *model.Trace { traceID := model.NewTraceID(0, 42) return &model.Trace{ Spans: []*model.Span{ @@ -52,58 +46,8 @@ func newTrace() *model.Trace { } } -func TestZipkinSpanIDRenamerTriggered(t *testing.T) { - trace := newTrace() - deduper := ZipkinSpanIDRenamer() - trace, err := deduper.Adjust(trace) - require.NoError(t, err) - - clientSpan := trace.Spans[0] - assert.Equal(t, clientSpanID, clientSpan.SpanID, "client span ID should not change") - - serverSpan := trace.Spans[1] - assert.Equal(t, clientSpanID+1, serverSpan.SpanID, "server span ID should be reassigned") - assert.Equal(t, clientSpan.SpanID, serverSpan.ParentSpanID(), "client span should be server span's parent") - - thirdSpan := trace.Spans[2] - assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change") - assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") -} - -func TestZipkinSpanIDRenamerNotTriggered(t *testing.T) { - trace := newTrace() - trace.Spans = trace.Spans[1:] // remove client span - - deduper := ZipkinSpanIDRenamer() - trace, err := deduper.Adjust(trace) - require.NoError(t, err) - - serverSpanID := clientSpanID // for better readability - serverSpan := trace.Spans[0] - assert.Equal(t, serverSpanID, serverSpan.SpanID, "server span ID should be unchanged") - - thirdSpan := trace.Spans[1] - assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change") - assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") -} - -func TestZipkinSpanIDRenamerError(t *testing.T) { - trace := newTrace() - - maxID := int64(-1) - assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1") - - deduper := &spanIDDeduper{trace: trace} - deduper.groupSpansByID() - deduper.maxUsedID = maxSpanID - 1 - deduper.uniquifyServerSpanIDs() - if assert.Len(t, trace.Spans[1].Warnings, 1) { - assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0]) - } -} - func TestDedupeBySpanID(t *testing.T) { - trace := newTrace() + trace := newZipkinTrace() deduper := DedupeBySpanID() trace, err := deduper.Adjust(trace) require.NoError(t, err) diff --git a/model/adjuster/zipkin_span_id_uniquify.go b/model/adjuster/zipkin_span_id_uniquify.go new file mode 100644 index 00000000000..5f8a86a32cc --- /dev/null +++ b/model/adjuster/zipkin_span_id_uniquify.go @@ -0,0 +1,107 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "errors" + "math" + + "github.com/jaegertracing/jaeger/model" +) + +// ZipkinSpanIDUniquifier returns an adjuster that changes span ids for server +// spans (i.e. spans with tag: span.kind == server) if there is another +// client span that shares the same span ID. This is needed to deal with +// Zipkin-style clients that reuse the same span ID for both client and server +// side of an RPC call. Jaeger UI expects all spans to have unique IDs. +// +// This adjuster never returns any errors. Instead it records any issues +// it encounters in Span.Warnings. +func ZipkinSpanIDUniquifier() Adjuster { + return Func(func(trace *model.Trace) (*model.Trace, error) { + deduper := &spanIDDeduper{trace: trace} + deduper.groupSpansByID() + deduper.uniquifyServerSpanIDs() + return deduper.trace, nil + }) +} + +const ( + warningTooManySpans = "cannot assign unique span ID, too many spans in the trace" +) + +var maxSpanID = model.NewSpanID(math.MaxUint64) + +type spanIDDeduper struct { + trace *model.Trace + spansByID map[model.SpanID][]*model.Span + maxUsedID model.SpanID +} + +// groupSpansByID groups spans with the same ID returning a map id -> []Span +func (d *spanIDDeduper) groupSpansByID() { + spansByID := make(map[model.SpanID][]*model.Span) + for _, span := range d.trace.Spans { + if spans, ok := spansByID[span.SpanID]; ok { + // TODO maybe return an error if more than 2 spans found + spansByID[span.SpanID] = append(spans, span) + } else { + spansByID[span.SpanID] = []*model.Span{span} + } + } + d.spansByID = spansByID +} + +func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool { + for _, span := range d.spansByID[spanID] { + if span.IsRPCClient() { + return true + } + } + return false +} + +func (d *spanIDDeduper) uniquifyServerSpanIDs() { + oldToNewSpanIDs := make(map[model.SpanID]model.SpanID) + for _, span := range d.trace.Spans { + // only replace span IDs for server-side spans that share the ID with something else + if span.IsRPCServer() && d.isSharedWithClientSpan(span.SpanID) { + newID, err := d.makeUniqueSpanID() + if err != nil { + span.Warnings = append(span.Warnings, err.Error()) + continue + } + oldToNewSpanIDs[span.SpanID] = newID + span.ReplaceParentID(span.SpanID) // previously shared ID is the new parent + span.SpanID = newID + } + } + d.swapParentIDs(oldToNewSpanIDs) +} + +// swapParentIDs corrects ParentSpanID of all spans that are children of the server +// spans whose IDs we made unique. +func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) { + for _, span := range d.trace.Spans { + if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok { + if span.SpanID != parentID { + span.ReplaceParentID(parentID) + } + } + } +} + +// makeUniqueSpanID returns a new ID that is not used in the trace, +// or an error if such ID cannot be generated, which is unlikely, +// given that the whole space of span IDs is 2^64. +func (d *spanIDDeduper) makeUniqueSpanID() (model.SpanID, error) { + for id := d.maxUsedID + 1; id < maxSpanID; id++ { + if _, ok := d.spansByID[id]; !ok { + d.maxUsedID = id + return id, nil + } + } + return 0, errors.New(warningTooManySpans) +} diff --git a/model/adjuster/zipkin_span_id_uniquify_test.go b/model/adjuster/zipkin_span_id_uniquify_test.go new file mode 100644 index 00000000000..1ad0895985f --- /dev/null +++ b/model/adjuster/zipkin_span_id_uniquify_test.go @@ -0,0 +1,103 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "github.com/jaegertracing/jaeger/model" +) + +var ( + clientSpanID = model.NewSpanID(1) + anotherSpanID = model.NewSpanID(11) + keySpanKind = "span.kind" +) + +func newZipkinTrace() *model.Trace { + traceID := model.NewTraceID(0, 42) + return &model.Trace{ + Spans: []*model.Span{ + { + // client span + TraceID: traceID, + SpanID: clientSpanID, + Tags: model.KeyValues{ + // span.kind = client + model.String(keySpanKind, trace.SpanKindClient.String()), + }, + }, + { + // server span + TraceID: traceID, + SpanID: clientSpanID, // shared span ID + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + // some other span, child of server span + TraceID: traceID, + SpanID: anotherSpanID, + References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)}, + }, + }, + } +} + +func TestZipkinSpanIDUniquifierTriggered(t *testing.T) { + trace := newZipkinTrace() + deduper := ZipkinSpanIDUniquifier() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + clientSpan := trace.Spans[0] + assert.Equal(t, clientSpanID, clientSpan.SpanID, "client span ID should not change") + + serverSpan := trace.Spans[1] + assert.Equal(t, clientSpanID+1, serverSpan.SpanID, "server span ID should be reassigned") + assert.Equal(t, clientSpan.SpanID, serverSpan.ParentSpanID(), "client span should be server span's parent") + + thirdSpan := trace.Spans[2] + assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change") + assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") +} + +func TestZipkinSpanIDUniquifierNotTriggered(t *testing.T) { + trace := newZipkinTrace() + trace.Spans = trace.Spans[1:] // remove client span + + deduper := ZipkinSpanIDUniquifier() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + serverSpanID := clientSpanID // for better readability + serverSpan := trace.Spans[0] + assert.Equal(t, serverSpanID, serverSpan.SpanID, "server span ID should be unchanged") + + thirdSpan := trace.Spans[1] + assert.Equal(t, anotherSpanID, thirdSpan.SpanID, "3rd span ID should not change") + assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") +} + +func TestZipkinSpanIDUniquifierError(t *testing.T) { + trace := newZipkinTrace() + + maxID := int64(-1) + assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1") + + deduper := &spanIDDeduper{trace: trace} + deduper.groupSpansByID() + deduper.maxUsedID = maxSpanID - 1 + deduper.uniquifyServerSpanIDs() + if assert.Len(t, trace.Spans[1].Warnings, 1) { + assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0]) + } +} diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 52d748ff750..48276aabffb 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -59,7 +59,7 @@ func newTenant(cfg Configuration) *Tenant { traces: map[model.TraceID]*model.Trace{}, services: map[string]struct{}{}, operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.ZipkinSpanIDRenamer(), + deduper: adjuster.ZipkinSpanIDUniquifier(), config: cfg, } } @@ -90,7 +90,7 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback deps := map[string]*model.DependencyLink{} startTs := endTs.Add(-1 * lookback) for _, orig := range m.traces { - // ZipkinSpanIDRenamer never returns an err + // ZipkinSpanIDUniquifier never returns an err trace, _ := m.deduper.Adjust(orig) if traceIsBetweenStartAndEnd(startTs, endTs, trace) { for _, s := range trace.Spans { From 5ee481b292ff57199f8742b2fd2fa9d3f40469a0 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 13:06:32 -0400 Subject: [PATCH 08/12] fix up span id dedupe testing Signed-off-by: Chris Danis --- model/adjuster/span_id_deduper_test.go | 52 +++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_id_deduper_test.go index 162823a9027..346114ff7d8 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/span_id_deduper_test.go @@ -19,16 +19,14 @@ func newDuplicatedSpansTrace() *model.Trace { return &model.Trace{ Spans: []*model.Span{ { - // client span TraceID: traceID, SpanID: clientSpanID, Tags: model.KeyValues{ - // span.kind = client - model.String(keySpanKind, trace.SpanKindClient.String()), + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), }, }, { - // server span TraceID: traceID, SpanID: clientSpanID, // shared span ID Tags: model.KeyValues{ @@ -46,8 +44,30 @@ func newDuplicatedSpansTrace() *model.Trace { } } -func TestDedupeBySpanID(t *testing.T) { - trace := newZipkinTrace() +func newUniqueSpansTrace() *model.Trace { + traceID := model.NewTraceID(0, 42) + return &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: clientSpanID, + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + // some other span, child of server span + TraceID: traceID, + SpanID: anotherSpanID, + References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)}, + }, + }, + } +} + +func TestDedupeBySpanIDTriggers(t *testing.T) { + trace := newDuplicatedSpansTrace() deduper := DedupeBySpanID() trace, err := deduper.Adjust(trace) require.NoError(t, err) @@ -56,3 +76,23 @@ func TestDedupeBySpanID(t *testing.T) { assert.Equal(t, clientSpanID, trace.Spans[0].SpanID, "client span should be kept") assert.Equal(t, anotherSpanID, trace.Spans[1].SpanID, "3rd span should be kept") } + +func TestDedupeBySpanIDNotTriggered(t *testing.T) { + trace := newUniqueSpansTrace() + deduper := DedupeBySpanID() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, 2, "should not dedupe spans") + assert.Equal(t, clientSpanID, trace.Spans[0].SpanID, "client span should be kept") + assert.Equal(t, anotherSpanID, trace.Spans[1].SpanID, "child span should be kept") +} + +func TestDedupeBySpanIDEmpty(t *testing.T) { + trace := &model.Trace{} + deduper := DedupeBySpanID() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, 0, "should be 0 spans") +} From ec81a1085a678602b86a6a8693b5ea1a953783ca Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 13:32:16 -0400 Subject: [PATCH 09/12] More testing Signed-off-by: Chris Danis --- model/adjuster/span_id_deduper_test.go | 33 +++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_id_deduper_test.go index 346114ff7d8..242b2011d95 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/span_id_deduper_test.go @@ -84,8 +84,11 @@ func TestDedupeBySpanIDNotTriggered(t *testing.T) { require.NoError(t, err) assert.Len(t, trace.Spans, 2, "should not dedupe spans") - assert.Equal(t, clientSpanID, trace.Spans[0].SpanID, "client span should be kept") - assert.Equal(t, anotherSpanID, trace.Spans[1].SpanID, "child span should be kept") + var ids [2]int + for i, span := range trace.Spans { + ids[i] = int(span.SpanID) + } + assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") } func TestDedupeBySpanIDEmpty(t *testing.T) { @@ -94,5 +97,29 @@ func TestDedupeBySpanIDEmpty(t *testing.T) { trace, err := deduper.Adjust(trace) require.NoError(t, err) - assert.Len(t, trace.Spans, 0, "should be 0 spans") + assert.Empty(t, trace.Spans, "should be empty") +} + +func TestDedupeBySpanIDManyManySpans(t *testing.T) { + traceID := model.NewTraceID(0, 42) + spans := make([]*model.Span, 0, 100) + const distinctSpanIDs = 10 + for i := 0; i < 100; i++ { + spans = append(spans, &model.Span{ + TraceID: traceID, + SpanID: model.SpanID(i % distinctSpanIDs), + }) + } + trace := &model.Trace{Spans: spans} + deduper := DedupeBySpanID() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, distinctSpanIDs, "should dedupe spans") + + var ids [distinctSpanIDs]int + for i, span := range trace.Spans { + ids[i] = int(span.SpanID) + } + assert.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, ids, "should keep unique span IDs") } From 22ccccab1eb462dc625437527f32e58e7438e1c3 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 13:40:41 -0400 Subject: [PATCH 10/12] fix flaky tests Signed-off-by: Chris Danis --- model/adjuster/span_id_deduper_test.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_id_deduper_test.go index 242b2011d95..dc3cefcea3c 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/span_id_deduper_test.go @@ -66,6 +66,14 @@ func newUniqueSpansTrace() *model.Trace { } } +func getSpanIDs(spans []*model.Span) []int { + ids := make([]int, len(spans)) + for i, span := range spans { + ids[i] = int(span.SpanID) + } + return ids +} + func TestDedupeBySpanIDTriggers(t *testing.T) { trace := newDuplicatedSpansTrace() deduper := DedupeBySpanID() @@ -73,8 +81,9 @@ func TestDedupeBySpanIDTriggers(t *testing.T) { require.NoError(t, err) assert.Len(t, trace.Spans, 2, "should dedupe spans") - assert.Equal(t, clientSpanID, trace.Spans[0].SpanID, "client span should be kept") - assert.Equal(t, anotherSpanID, trace.Spans[1].SpanID, "3rd span should be kept") + + ids := getSpanIDs(trace.Spans) + assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") } func TestDedupeBySpanIDNotTriggered(t *testing.T) { @@ -84,10 +93,8 @@ func TestDedupeBySpanIDNotTriggered(t *testing.T) { require.NoError(t, err) assert.Len(t, trace.Spans, 2, "should not dedupe spans") - var ids [2]int - for i, span := range trace.Spans { - ids[i] = int(span.SpanID) - } + + ids := getSpanIDs(trace.Spans) assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") } @@ -117,9 +124,6 @@ func TestDedupeBySpanIDManyManySpans(t *testing.T) { assert.Len(t, trace.Spans, distinctSpanIDs, "should dedupe spans") - var ids [distinctSpanIDs]int - for i, span := range trace.Spans { - ids[i] = int(span.SpanID) - } + ids := getSpanIDs(trace.Spans) assert.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, ids, "should keep unique span IDs") } From 97eb7b599090ecbc060ff88028813d1e2b1bff98 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 15:36:44 -0400 Subject: [PATCH 11/12] rename Signed-off-by: Chris Danis --- model/adjuster/{span_id_deduper.go => span_hash_deduper.go} | 0 .../{span_id_deduper_test.go => span_hash_deduper_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename model/adjuster/{span_id_deduper.go => span_hash_deduper.go} (100%) rename model/adjuster/{span_id_deduper_test.go => span_hash_deduper_test.go} (100%) diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/span_hash_deduper.go similarity index 100% rename from model/adjuster/span_id_deduper.go rename to model/adjuster/span_hash_deduper.go diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/span_hash_deduper_test.go similarity index 100% rename from model/adjuster/span_id_deduper_test.go rename to model/adjuster/span_hash_deduper_test.go From 3aeb4e5a28b66d3e66c9eac6d53852dd7f4367c9 Mon Sep 17 00:00:00 2001 From: Chris Danis Date: Mon, 23 Sep 2024 16:15:40 -0400 Subject: [PATCH 12/12] group and dedupe by hashcode Signed-off-by: Chris Danis --- cmd/query/app/querysvc/adjusters.go | 2 +- model/adjuster/span_hash_deduper.go | 32 ++++++++++++++++++------ model/adjuster/span_hash_deduper_test.go | 24 +++++++++--------- 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/cmd/query/app/querysvc/adjusters.go b/cmd/query/app/querysvc/adjusters.go index be2469f4ab0..24e84a161df 100644 --- a/cmd/query/app/querysvc/adjusters.go +++ b/cmd/query/app/querysvc/adjusters.go @@ -15,7 +15,7 @@ import ( func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster { return []adjuster.Adjuster{ adjuster.ZipkinSpanIDUniquifier(), - adjuster.DedupeBySpanID(), + adjuster.DedupeBySpanHash(), adjuster.ClockSkew(maxClockSkewAdjust), adjuster.IPTagAdjuster(), adjuster.OTelTagAdjuster(), diff --git a/model/adjuster/span_hash_deduper.go b/model/adjuster/span_hash_deduper.go index 6886c464993..44318246348 100644 --- a/model/adjuster/span_hash_deduper.go +++ b/model/adjuster/span_hash_deduper.go @@ -8,21 +8,39 @@ import ( "github.com/jaegertracing/jaeger/model" ) -// DedupeBySpanID returns an adjuster that removes all but one span with the same SpanID. +// DedupeBySpanHash returns an adjuster that removes all but one span with the same hashcode. // This is useful for when spans are duplicated in archival storage, as happens with // ElasticSearch archival. -func DedupeBySpanID() Adjuster { +func DedupeBySpanHash() Adjuster { return Func(func(trace *model.Trace) (*model.Trace, error) { - deduper := &spanIDDeduper{trace: trace} - deduper.groupSpansByID() - deduper.dedupeSpansByID() + deduper := &spanHashDeduper{trace: trace} + deduper.groupSpansByHash() + deduper.dedupeSpansByHash() return deduper.trace, nil }) } -func (d *spanIDDeduper) dedupeSpansByID() { +type spanHashDeduper struct { + trace *model.Trace + spansByHash map[uint64][]*model.Span +} + +func (d *spanHashDeduper) groupSpansByHash() { + spansByHash := make(map[uint64][]*model.Span) + for _, span := range d.trace.Spans { + hash, _ := model.HashCode(span) + if spans, ok := spansByHash[hash]; ok { + spansByHash[hash] = append(spans, span) + } else { + spansByHash[hash] = []*model.Span{span} + } + } + d.spansByHash = spansByHash +} + +func (d *spanHashDeduper) dedupeSpansByHash() { d.trace.Spans = nil - for _, spans := range d.spansByID { + for _, spans := range d.spansByHash { d.trace.Spans = append(d.trace.Spans, spans[0]) } } diff --git a/model/adjuster/span_hash_deduper_test.go b/model/adjuster/span_hash_deduper_test.go index dc3cefcea3c..ba7625a68b4 100644 --- a/model/adjuster/span_hash_deduper_test.go +++ b/model/adjuster/span_hash_deduper_test.go @@ -50,16 +50,15 @@ func newUniqueSpansTrace() *model.Trace { Spans: []*model.Span{ { TraceID: traceID, - SpanID: clientSpanID, + SpanID: anotherSpanID, Tags: model.KeyValues{ // span.kind = server model.String(keySpanKind, trace.SpanKindServer.String()), }, }, { - // some other span, child of server span TraceID: traceID, - SpanID: anotherSpanID, + SpanID: anotherSpanID, // same ID as before, but different metadata References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)}, }, }, @@ -74,9 +73,9 @@ func getSpanIDs(spans []*model.Span) []int { return ids } -func TestDedupeBySpanIDTriggers(t *testing.T) { +func TestDedupeBySpanHashTriggers(t *testing.T) { trace := newDuplicatedSpansTrace() - deduper := DedupeBySpanID() + deduper := DedupeBySpanHash() trace, err := deduper.Adjust(trace) require.NoError(t, err) @@ -86,28 +85,29 @@ func TestDedupeBySpanIDTriggers(t *testing.T) { assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") } -func TestDedupeBySpanIDNotTriggered(t *testing.T) { +func TestDedupeBySpanHashNotTriggered(t *testing.T) { trace := newUniqueSpansTrace() - deduper := DedupeBySpanID() + deduper := DedupeBySpanHash() trace, err := deduper.Adjust(trace) require.NoError(t, err) assert.Len(t, trace.Spans, 2, "should not dedupe spans") ids := getSpanIDs(trace.Spans) - assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") + assert.ElementsMatch(t, []int{int(anotherSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") + assert.NotEqual(t, trace.Spans[0], trace.Spans[1], "should keep unique hashcodes") } -func TestDedupeBySpanIDEmpty(t *testing.T) { +func TestDedupeBySpanHashEmpty(t *testing.T) { trace := &model.Trace{} - deduper := DedupeBySpanID() + deduper := DedupeBySpanHash() trace, err := deduper.Adjust(trace) require.NoError(t, err) assert.Empty(t, trace.Spans, "should be empty") } -func TestDedupeBySpanIDManyManySpans(t *testing.T) { +func TestDedupeBySpanHashManyManySpans(t *testing.T) { traceID := model.NewTraceID(0, 42) spans := make([]*model.Span, 0, 100) const distinctSpanIDs = 10 @@ -118,7 +118,7 @@ func TestDedupeBySpanIDManyManySpans(t *testing.T) { }) } trace := &model.Trace{Spans: spans} - deduper := DedupeBySpanID() + deduper := DedupeBySpanHash() trace, err := deduper.Adjust(trace) require.NoError(t, err)