Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] Deduplicate spans based upon their hashcode #6009

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion cmd/query/app/querysvc/adjusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
// before returning the data to the API clients.
func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster {
return []adjuster.Adjuster{
adjuster.SpanIDDeduper(),
adjuster.ZipkinSpanIDUniquifier(),
adjuster.DedupeBySpanHash(),
adjuster.ClockSkew(maxClockSkewAdjust),
adjuster.IPTagAdjuster(),
adjuster.OTelTagAdjuster(),
Expand Down
2 changes: 1 addition & 1 deletion model/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// child spans do not start before or end after their parent spans.
//
// The algorithm assumes that all spans have unique IDs, so the trace may need
// to go through another adjuster first, such as SpanIDDeduper.
// to go through another adjuster first, such as ZipkinSpanIDUniquifier.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
Expand Down
46 changes: 46 additions & 0 deletions model/adjuster/span_hash_deduper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"github.com/jaegertracing/jaeger/model"
)

// DedupeBySpanHash returns an adjuster that removes all but one span with the same hashcode.
// This is useful for when spans are duplicated in archival storage, as happens with
// ElasticSearch archival.
func DedupeBySpanHash() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanHashDeduper{trace: trace}
deduper.groupSpansByHash()
deduper.dedupeSpansByHash()
return deduper.trace, nil
})
}

type spanHashDeduper struct {
trace *model.Trace
spansByHash map[uint64][]*model.Span
}

func (d *spanHashDeduper) groupSpansByHash() {
spansByHash := make(map[uint64][]*model.Span)
for _, span := range d.trace.Spans {
hash, _ := model.HashCode(span)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall there was some issue in the past with hash function requiring proper ordering of tags or something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is still the case:

func (s *Span) Hash(w io.Writer) (err error) {
	// gob is not the most efficient way, but it ensures we don't miss any fields.
	// See BenchmarkSpanHash in span_test.go
	enc := gob.NewEncoder(w)
	return enc.Encode(s)
}

You can only compare hashes if the tags are properly sorted. Surprisingly, we don't have an explicit adjuster for that, but some of that sorting happens in ip_tag.go and sort_log_fields.go adjusters. Perhaps it would make sense to pull the sorting explicitly into SortTagsAndLogs adjuster and makes sure it is called before DedupeBySpanHash adjuster.

if spans, ok := spansByHash[hash]; ok {
spansByHash[hash] = append(spans, span)
} else {
spansByHash[hash] = []*model.Span{span}
}
}
d.spansByHash = spansByHash
}

func (d *spanHashDeduper) dedupeSpansByHash() {
d.trace.Spans = nil
for _, spans := range d.spansByHash {
d.trace.Spans = append(d.trace.Spans, spans[0])
}
}
129 changes: 129 additions & 0 deletions model/adjuster/span_hash_deduper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/jaegertracing/jaeger/model"
)

func newDuplicatedSpansTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: clientSpanID,
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
TraceID: traceID,
SpanID: clientSpanID, // shared span ID
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
// some other span, child of server span
TraceID: traceID,
SpanID: anotherSpanID,
References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)},
},
},
}
}

func newUniqueSpansTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: anotherSpanID,
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
TraceID: traceID,
SpanID: anotherSpanID, // same ID as before, but different metadata
References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)},
},
},
}
}

func getSpanIDs(spans []*model.Span) []int {
ids := make([]int, len(spans))
for i, span := range spans {
ids[i] = int(span.SpanID)
}
return ids
}

func TestDedupeBySpanHashTriggers(t *testing.T) {
trace := newDuplicatedSpansTrace()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, 2, "should dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
}

func TestDedupeBySpanHashNotTriggered(t *testing.T) {
trace := newUniqueSpansTrace()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, 2, "should not dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{int(anotherSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
assert.NotEqual(t, trace.Spans[0], trace.Spans[1], "should keep unique hashcodes")
}

func TestDedupeBySpanHashEmpty(t *testing.T) {
trace := &model.Trace{}
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Empty(t, trace.Spans, "should be empty")
}

func TestDedupeBySpanHashManyManySpans(t *testing.T) {
traceID := model.NewTraceID(0, 42)
spans := make([]*model.Span, 0, 100)
const distinctSpanIDs = 10
for i := 0; i < 100; i++ {
spans = append(spans, &model.Span{
TraceID: traceID,
SpanID: model.SpanID(i % distinctSpanIDs),
})
}
trace := &model.Trace{Spans: spans}
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, distinctSpanIDs, "should dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, ids, "should keep unique span IDs")
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ import (
"github.com/jaegertracing/jaeger/model"
)

// SpanIDDeduper returns an adjuster that changes span ids for server
// ZipkinSpanIDUniquifier returns an adjuster that changes span ids for server
// spans (i.e. spans with tag: span.kind == server) if there is another
// client span that shares the same span ID. This is needed to deal with
// Zipkin-style clients that reuse the same span ID for both client and server
// side of an RPC call. Jaeger UI expects all spans to have unique IDs.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
func SpanIDDeduper() Adjuster {
func ZipkinSpanIDUniquifier() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.dedupeSpanIDs()
deduper.uniquifyServerSpanIDs()
return deduper.trace, nil
})
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func (d *spanIDDeduper) isSharedWithClientSpan(spanID model.SpanID) bool {
return false
}

func (d *spanIDDeduper) dedupeSpanIDs() {
func (d *spanIDDeduper) uniquifyServerSpanIDs() {
oldToNewSpanIDs := make(map[model.SpanID]model.SpanID)
for _, span := range d.trace.Spans {
// only replace span IDs for server-side spans that share the ID with something else
Expand All @@ -82,7 +82,7 @@ func (d *spanIDDeduper) dedupeSpanIDs() {
}

// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we deduped.
// spans whose IDs we made unique.
func (d *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[model.SpanID]model.SpanID) {
for _, span := range d.trace.Spans {
if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
keySpanKind = "span.kind"
)

func newTrace() *model.Trace {
func newZipkinTrace() *model.Trace {
traceID := model.NewTraceID(0, 42)
return &model.Trace{
Spans: []*model.Span{
Expand Down Expand Up @@ -52,9 +52,9 @@ func newTrace() *model.Trace {
}
}

func TestSpanIDDeduperTriggered(t *testing.T) {
trace := newTrace()
deduper := SpanIDDeduper()
func TestZipkinSpanIDUniquifierTriggered(t *testing.T) {
trace := newZipkinTrace()
deduper := ZipkinSpanIDUniquifier()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

Expand All @@ -70,11 +70,11 @@ func TestSpanIDDeduperTriggered(t *testing.T) {
assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent")
}

func TestSpanIDDeduperNotTriggered(t *testing.T) {
trace := newTrace()
func TestZipkinSpanIDUniquifierNotTriggered(t *testing.T) {
trace := newZipkinTrace()
trace.Spans = trace.Spans[1:] // remove client span

deduper := SpanIDDeduper()
deduper := ZipkinSpanIDUniquifier()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

Expand All @@ -87,16 +87,16 @@ func TestSpanIDDeduperNotTriggered(t *testing.T) {
assert.Equal(t, serverSpan.SpanID, thirdSpan.ParentSpanID(), "server span should be 3rd span's parent")
}

func TestSpanIDDeduperError(t *testing.T) {
trace := newTrace()
func TestZipkinSpanIDUniquifierError(t *testing.T) {
trace := newZipkinTrace()

maxID := int64(-1)
assert.Equal(t, maxSpanID, model.NewSpanID(uint64(maxID)), "maxSpanID must be 2^64-1")

deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.maxUsedID = maxSpanID - 1
deduper.dedupeSpanIDs()
deduper.uniquifyServerSpanIDs()
if assert.Len(t, trace.Spans[1].Warnings, 1) {
assert.Equal(t, "cannot assign unique span ID, too many spans in the trace", trace.Spans[1].Warnings[0])
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newTenant(cfg Configuration) *Tenant {
traces: map[model.TraceID]*model.Trace{},
services: map[string]struct{}{},
operations: map[string]map[spanstore.Operation]struct{}{},
deduper: adjuster.SpanIDDeduper(),
deduper: adjuster.ZipkinSpanIDUniquifier(),
config: cfg,
}
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback
deps := map[string]*model.DependencyLink{}
startTs := endTs.Add(-1 * lookback)
for _, orig := range m.traces {
// SpanIDDeduper never returns an err
// ZipkinSpanIDUniquifier never returns an err
trace, _ := m.deduper.Adjust(orig)
if traceIsBetweenStartAndEnd(startTs, endTs, trace) {
for _, s := range trace.Spans {
Expand Down