Skip to content

Commit

Permalink
group and dedupe by hashcode
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Danis <[email protected]>
  • Loading branch information
cdanis committed Sep 23, 2024
1 parent b36de4c commit 7a64877
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func StandardAdjusters(maxClockSkewAdjust time.Duration) []adjuster.Adjuster {
return []adjuster.Adjuster{
adjuster.ZipkinSpanIDUniquifier(),
adjuster.DedupeBySpanID(),
adjuster.DedupeBySpanHash(),
adjuster.ClockSkew(maxClockSkewAdjust),
adjuster.IPTagAdjuster(),
adjuster.OTelTagAdjuster(),
Expand Down
32 changes: 25 additions & 7 deletions model/adjuster/span_hash_deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,39 @@ import (
"github.com/jaegertracing/jaeger/model"
)

// DedupeBySpanID returns an adjuster that removes all but one span with the same SpanID.
// DedupeBySpanHash returns an adjuster that removes all but one span with the same hashcode.
// This is useful for when spans are duplicated in archival storage, as happens with
// ElasticSearch archival.
func DedupeBySpanID() Adjuster {
func DedupeBySpanHash() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
deduper := &spanIDDeduper{trace: trace}
deduper.groupSpansByID()
deduper.dedupeSpansByID()
deduper := &spanHashDeduper{trace: trace}
deduper.groupSpansByHash()
deduper.dedupeSpansByHash()
return deduper.trace, nil
})
}

func (d *spanIDDeduper) dedupeSpansByID() {
type spanHashDeduper struct {
trace *model.Trace
spansByHash map[uint64][]*model.Span
}

func (d *spanHashDeduper) groupSpansByHash() {
spansByHash := make(map[uint64][]*model.Span)
for _, span := range d.trace.Spans {
hash, _ := model.HashCode(span)
if spans, ok := spansByHash[hash]; ok {
spansByHash[hash] = append(spans, span)
} else {
spansByHash[hash] = []*model.Span{span}
}
}
d.spansByHash = spansByHash
}

func (d *spanHashDeduper) dedupeSpansByHash() {
d.trace.Spans = nil
for _, spans := range d.spansByID {
for _, spans := range d.spansByHash {
d.trace.Spans = append(d.trace.Spans, spans[0])
}
}
24 changes: 12 additions & 12 deletions model/adjuster/span_hash_deduper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@ func newUniqueSpansTrace() *model.Trace {
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: clientSpanID,
SpanID: anotherSpanID,
Tags: model.KeyValues{
// span.kind = server
model.String(keySpanKind, trace.SpanKindServer.String()),
},
},
{
// some other span, child of server span
TraceID: traceID,
SpanID: anotherSpanID,
SpanID: anotherSpanID, // same ID as before, but different metadata
References: []model.SpanRef{model.NewChildOfRef(traceID, clientSpanID)},
},
},
Expand All @@ -74,9 +73,9 @@ func getSpanIDs(spans []*model.Span) []int {
return ids
}

func TestDedupeBySpanIDTriggers(t *testing.T) {
func TestDedupeBySpanHashTriggers(t *testing.T) {
trace := newDuplicatedSpansTrace()
deduper := DedupeBySpanID()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

Expand All @@ -86,28 +85,29 @@ func TestDedupeBySpanIDTriggers(t *testing.T) {
assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
}

func TestDedupeBySpanIDNotTriggered(t *testing.T) {
func TestDedupeBySpanHashNotTriggered(t *testing.T) {
trace := newUniqueSpansTrace()
deduper := DedupeBySpanID()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Len(t, trace.Spans, 2, "should not dedupe spans")

ids := getSpanIDs(trace.Spans)
assert.ElementsMatch(t, []int{int(clientSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
assert.ElementsMatch(t, []int{int(anotherSpanID), int(anotherSpanID)}, ids, "should keep unique span IDs")
assert.NotEqual(t, trace.Spans[0], trace.Spans[1], "should keep unique hashcodes")
}

func TestDedupeBySpanIDEmpty(t *testing.T) {
func TestDedupeBySpanHashEmpty(t *testing.T) {
trace := &model.Trace{}
deduper := DedupeBySpanID()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

assert.Empty(t, trace.Spans, "should be empty")
}

func TestDedupeBySpanIDManyManySpans(t *testing.T) {
func TestDedupeBySpanHashManyManySpans(t *testing.T) {
traceID := model.NewTraceID(0, 42)
spans := make([]*model.Span, 0, 100)
const distinctSpanIDs = 10
Expand All @@ -118,7 +118,7 @@ func TestDedupeBySpanIDManyManySpans(t *testing.T) {
})
}
trace := &model.Trace{Spans: spans}
deduper := DedupeBySpanID()
deduper := DedupeBySpanHash()
trace, err := deduper.Adjust(trace)
require.NoError(t, err)

Expand Down

0 comments on commit 7a64877

Please sign in to comment.