Skip to content

Commit

Permalink
Suppress dupe account data events
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay committed Aug 2, 2023
1 parent d5ee6d1 commit 0c45b71
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions sync2/handler2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler2
import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"os"
"sync"
Expand Down Expand Up @@ -31,12 +32,14 @@ var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.C
// processing v2 data (as a sync2.V2DataReceiver) and publishing updates (pubsub.Payload to V2Listeners);
// and receiving and processing EnsurePolling events.
type Handler struct {
pMap sync2.IPollerMap
v2Store *sync2.Storage
Store *state.Storage
v2Pub pubsub.Notifier
v3Sub *pubsub.V3Sub
unreadMap map[string]struct {
pMap sync2.IPollerMap
v2Store *sync2.Storage
Store *state.Storage
v2Pub pubsub.Notifier
v3Sub *pubsub.V3Sub
// user_id+room_id => fnv_hash(last_event_bytes)
accountDataMap *sync.Map
unreadMap map[string]struct {
Highlight int
Notif int
}
Expand All @@ -63,6 +66,7 @@ func NewHandler(
Highlight int
Notif int
}),
accountDataMap: &sync.Map{},
typingMap: make(map[string]uint64),
deviceDataTicker: sync2.NewDeviceDataTicker(deviceDataUpdateDuration),
e2eeWorkerPool: internal.NewWorkerPool(500), // TODO: assign as fraction of db max conns, not hardcoded
Expand Down Expand Up @@ -433,7 +437,25 @@ func (h *Handler) UpdateUnreadCounts(ctx context.Context, roomID, userID string,
}

func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
data, err := h.Store.InsertAccountData(userID, roomID, events)
// duplicate suppression for multiple devices on the same account.
// We suppress by remembering the last bytes for a given account data, and if they match we ignore.
dedupedEvents := make([]json.RawMessage, 0, len(events))
for i := range events {
evType := gjson.GetBytes(events[i], "type").Str
key := fmt.Sprintf("%s|%s|%s", userID, roomID, evType)
thisHash := fnvHash(events[i])
last, _ := h.accountDataMap.Load(key)
if last != nil {
lastHash := last.(uint64)
if lastHash == thisHash {
continue // skip this event
}
}
dedupedEvents = append(dedupedEvents, events[i])
h.accountDataMap.Store(key, thisHash)
}

data, err := h.Store.InsertAccountData(userID, roomID, dedupedEvents)
if err != nil {
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data")
sentry.CaptureException(err)
Expand Down Expand Up @@ -506,6 +528,12 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) {
}()
}

func fnvHash(event json.RawMessage) uint64 {
h := fnv.New64a()
h.Write(event)
return h.Sum64()
}

func typingHash(ephEvent json.RawMessage) uint64 {
h := fnv.New64a()
for _, userID := range gjson.ParseBytes(ephEvent).Get("content.user_ids").Array() {
Expand Down

0 comments on commit 0c45b71

Please sign in to comment.