diff --git a/cmd/query/app/querysvc/adjusters.go b/cmd/query/app/querysvc/adjusters.go index 346d0c0bca4..24e84a161df 100644 --- a/cmd/query/app/querysvc/adjusters.go +++ b/cmd/query/app/querysvc/adjusters.go @@ -14,7 +14,8 @@ import ( // before returning the data to the API clients. func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster { return []adjuster.Adjuster{ - adjuster.SpanIDDeduper(), + adjuster.ZipkinSpanIDUniquifier(), + adjuster.DedupeBySpanHash(), adjuster.ClockSkew(maxClockSkewAdjust), adjuster.IPTagAdjuster(), adjuster.OTelTagAdjuster(), diff --git a/model/adjuster/clockskew.go b/model/adjuster/clockskew.go index e7965f5a11b..30f922307dc 100644 --- a/model/adjuster/clockskew.go +++ b/model/adjuster/clockskew.go @@ -19,7 +19,7 @@ import ( // child spans do not start before or end after their parent spans. // // The algorithm assumes that all spans have unique IDs, so the trace may need -// to go through another adjuster first, such as SpanIDDeduper. +// to go through another adjuster first, such as ZipkinSpanIDUniquifier. // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. diff --git a/model/adjuster/span_hash_deduper.go b/model/adjuster/span_hash_deduper.go new file mode 100644 index 00000000000..44318246348 --- /dev/null +++ b/model/adjuster/span_hash_deduper.go @@ -0,0 +1,46 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "github.com/jaegertracing/jaeger/model" +) + +// DedupeBySpanHash returns an adjuster that removes all but one span with the same hashcode. +// This is useful for when spans are duplicated in archival storage, as happens with +// ElasticSearch archival. +func DedupeBySpanHash() Adjuster { + return Func(func(trace *model.Trace) (*model.Trace, error) { + deduper := &spanHashDeduper{trace: trace} + deduper.groupSpansByHash() + deduper.dedupeSpansByHash() + return deduper.trace, nil + }) +} + +type spanHashDeduper struct { + trace *model.Trace + spansByHash map[uint64][]*model.Span +} + +func (d *spanHashDeduper) groupSpansByHash() { + spansByHash := make(map[uint64][]*model.Span) + for _, span := range d.trace.Spans { + hash, _ := model.HashCode(span) + if spans, ok := spansByHash[hash]; ok { + spansByHash[hash] = append(spans, span) + } else { + spansByHash[hash] = []*model.Span{span} + } + } + d.spansByHash = spansByHash +} + +func (d *spanHashDeduper) dedupeSpansByHash() { + d.trace.Spans = nil + for _, spans := range d.spansByHash { + d.trace.Spans = append(d.trace.Spans, spans[0]) + } +} diff --git a/model/adjuster/span_hash_deduper_test.go b/model/adjuster/span_hash_deduper_test.go new file mode 100644 index 00000000000..ba7625a68b4 --- /dev/null +++ b/model/adjuster/span_hash_deduper_test.go @@ -0,0 +1,129 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "github.com/jaegertracing/jaeger/model" +) + +func newDuplicatedSpansTrace() *model.Trace { + traceID := model.NewTraceID(0, 42) + return &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: clientSpanID, + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + TraceID: traceID, + SpanID: clientSpanID, // shared span ID + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + // some other span, child of server span + TraceID: traceID, + SpanID: anotherSpanID, + References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)}, + }, + }, + } +} + +func newUniqueSpansTrace() *model.Trace { + traceID := model.NewTraceID(0, 42) + return &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: anotherSpanID, + Tags: model.KeyValues{ + // span.kind = server + model.String(keySpanKind, trace.SpanKindServer.String()), + }, + }, + { + TraceID: traceID, + SpanID: anotherSpanID, // same ID as before, but different metadata + References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)}, + }, + }, + } +} + +func getSpanIDs(spans []*model.Span) []int { + ids := make([]int, len(spans)) + for i, span := range spans { + ids[i] = int(span.SpanID) + } + return ids +} + +func TestDedupeBySpanHashTriggers(t *testing.T) { + trace := newDuplicatedSpansTrace() + deduper := DedupeBySpanHash() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, 2, "should dedupe spans") + + ids := getSpanIDs(trace.Spans) + assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") +} + +func TestDedupeBySpanHashNotTriggered(t *testing.T) { + trace := newUniqueSpansTrace() + deduper := DedupeBySpanHash() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, 2, "should not dedupe spans") + + ids := getSpanIDs(trace.Spans) + assert.ElementsMatch(t, []int{int(anotherSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs") + assert.NotEqual(t, trace.Spans[0], trace.Spans[1], "should keep unique hashcodes") +} + +func TestDedupeBySpanHashEmpty(t *testing.T) { + trace := &model.Trace{} + deduper := DedupeBySpanHash() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Empty(t, trace.Spans, "should be empty") +} + +func TestDedupeBySpanHashManyManySpans(t *testing.T) { + traceID := model.NewTraceID(0, 42) + spans := make([]*model.Span, 0, 100) + const distinctSpanIDs = 10 + for i := 0; i < 100; i++ { + spans = append(spans, &model.Span{ + TraceID: traceID, + SpanID: model.SpanID(i % distinctSpanIDs), + }) + } + trace := &model.Trace{Spans: spans} + deduper := DedupeBySpanHash() + trace, err := deduper.Adjust(trace) + require.NoError(t, err) + + assert.Len(t, trace.Spans, distinctSpanIDs, "should dedupe spans") + + ids := getSpanIDs(trace.Spans) + assert.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, ids, "should keep unique span IDs") +} diff --git a/model/adjuster/span_id_deduper.go b/model/adjuster/zipkin_span_id_uniquify.go similarity index 92% rename from model/adjuster/span_id_deduper.go rename to model/adjuster/zipkin_span_id_uniquify.go index 9b669a42932..5f8a86a32cc 100644 --- a/model/adjuster/span_id_deduper.go +++ b/model/adjuster/zipkin_span_id_uniquify.go @@ -11,7 +11,7 @@ import ( "github.com/jaegertracing/jaeger/model" ) -// SpanIDDeduper returns an adjuster that changes span ids for server +// ZipkinSpanIDUniquifier returns an adjuster that changes span ids for server // spans (i.e. spans with tag: span.kind == server) if there is another // client span that shares the same span ID. This is needed to deal with // Zipkin-style clients that reuse the same span ID for both client and server @@ -19,11 +19,11 @@ import ( // // This adjuster never returns any errors. Instead it records any issues // it encounters in Span.Warnings. -func SpanIDDeduper() Adjuster { +func ZipkinSpanIDUniquifier() Adjuster { return Func(func(trace *model.Trace) (*model.Trace, error) { deduper := &spanIDDeduper{trace: trace} deduper.groupSpansByID() - deduper.dedupeSpanIDs() + deduper.uniquifyServerSpanIDs() return deduper.trace, nil }) } @@ -63,7 +63,7 @@ func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool { return false } -func (d *spanIDDeduper) dedupeSpanIDs() { +func (d *spanIDDeduper) uniquifyServerSpanIDs() { oldToNewSpanIDs := make(map[model.SpanID]model.SpanID) for _, span := range d.trace.Spans { // only replace span IDs for server-side spans that share the ID with something else @@ -82,7 +82,7 @@ func (d *spanIDDeduper) dedupeSpanIDs() { } // swapParentIDs corrects ParentSpanID of all spans that are children of the server -// spans whose IDs we deduped. +// spans whose IDs we made unique. func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) { for _, span := range d.trace.Spans { if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok { diff --git a/model/adjuster/span_id_deduper_test.go b/model/adjuster/zipkin_span_id_uniquify_test.go similarity index 87% rename from model/adjuster/span_id_deduper_test.go rename to model/adjuster/zipkin_span_id_uniquify_test.go index 503d9e388db..1ad0895985f 100644 --- a/model/adjuster/span_id_deduper_test.go +++ b/model/adjuster/zipkin_span_id_uniquify_test.go @@ -20,7 +20,7 @@ var ( keySpanKind = "span.kind" ) -func newTrace() *model.Trace { +func newZipkinTrace() *model.Trace { traceID := model.NewTraceID(0, 42) return &model.Trace{ Spans: []*model.Span{ @@ -52,9 +52,9 @@ func newTrace() *model.Trace { } } -func TestSpanIDDeduperTriggered(t *testing.T) { - trace := newTrace() - deduper := SpanIDDeduper() +func TestZipkinSpanIDUniquifierTriggered(t *testing.T) { + trace := newZipkinTrace() + deduper := ZipkinSpanIDUniquifier() trace, err := deduper.Adjust(trace) require.NoError(t, err) @@ -70,11 +70,11 @@ func TestSpanIDDeduperTriggered(t *testing.T) { assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") } -func TestSpanIDDeduperNotTriggered(t *testing.T) { - trace := newTrace() +func TestZipkinSpanIDUniquifierNotTriggered(t *testing.T) { + trace := newZipkinTrace() trace.Spans = trace.Spans[1:] // remove client span - deduper := SpanIDDeduper() + deduper := ZipkinSpanIDUniquifier() trace, err := deduper.Adjust(trace) require.NoError(t, err) @@ -87,8 +87,8 @@ func TestSpanIDDeduperNotTriggered(t *testing.T) { assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent") } -func TestSpanIDDeduperError(t *testing.T) { - trace := newTrace() +func TestZipkinSpanIDUniquifierError(t *testing.T) { + trace := newZipkinTrace() maxID := int64(-1) assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1") @@ -96,7 +96,7 @@ func TestSpanIDDeduperError(t *testing.T) { deduper := &spanIDDeduper{trace: trace} deduper.groupSpansByID() deduper.maxUsedID = maxSpanID - 1 - deduper.dedupeSpanIDs() + deduper.uniquifyServerSpanIDs() if assert.Len(t, trace.Spans[1].Warnings, 1) { assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0]) } diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index d590f1ad1ce..48276aabffb 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -59,7 +59,7 @@ func newTenant(cfg Configuration) *Tenant { traces: map[model.TraceID]*model.Trace{}, services: map[string]struct{}{}, operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.SpanIDDeduper(), + deduper: adjuster.ZipkinSpanIDUniquifier(), config: cfg, } } @@ -90,7 +90,7 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback deps := map[string]*model.DependencyLink{} startTs := endTs.Add(-1 * lookback) for _, orig := range m.traces { - // SpanIDDeduper never returns an err + // ZipkinSpanIDUniquifier never returns an err trace, _ := m.deduper.Adjust(orig) if traceIsBetweenStartAndEnd(startTs, endTs, trace) { for _, s := range trace.Spans {