diff --git a/sync3/caches/user.go b/sync3/caches/user.go index 51f18a22..3a7ec0c1 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -428,7 +428,9 @@ func (c *UserCache) Invites() map[string]UserRoomData { } // AttemptToFetchPrevBatch tries to find a prev_batch value for the given event. This may not always succeed. -func (c *UserCache) AttemptToFetchPrevBatch(roomID string, firstTimelineEvent *EventData) (prevBatch string) { +func (c *UserCache) AttemptToFetchPrevBatch(ctx context.Context, roomID string, firstTimelineEvent *EventData) (prevBatch string) { + _, span := internal.StartSpan(ctx, "AttemptToFetchPrevBatch") + defer span.End() return c.store.GetClosestPrevBatch(roomID, firstTimelineEvent.NID) } diff --git a/sync3/conn.go b/sync3/conn.go index 5b8e95c5..7fcb32e0 100644 --- a/sync3/conn.go +++ b/sync3/conn.go @@ -135,6 +135,7 @@ func (c *Conn) isOutstanding(pos int64) bool { // client. It will NOT be reported to Sentry---this should happen as close as possible // to the creation of the error (or else Sentry cannot provide a meaningful traceback.) func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.Time) (resp *Response, herr *internal.HandlerError) { + ctx, span := internal.StartSpan(ctx, "OnIncomingRequest.AcquireMutex") c.cancelOutstandingRequestMu.Lock() if c.cancelOutstandingRequest != nil { c.cancelOutstandingRequest() @@ -149,6 +150,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T // it's intentional for the lock to be held whilst inside HandleIncomingRequest // as it guarantees linearisation of data within a single connection defer c.mu.Unlock() + span.End() isFirstRequest := req.pos == 0 isRetransmit := !isFirstRequest && c.lastClientRequest.pos == req.pos diff --git a/sync3/extensions/extensions.go b/sync3/extensions/extensions.go index be0cdc92..4eebd4c9 100644 --- a/sync3/extensions/extensions.go +++ b/sync3/extensions/extensions.go @@ -318,7 +318,9 @@ func (h *Handler) HandleLiveUpdate(ctx context.Context, update caches.Update, re extCtx.Handler = h exts := req.EnabledExtensions() for _, ext := range exts { - ext.AppendLive(ctx, res, extCtx, update) + childCtx, region := internal.StartSpan(ctx, "extension_live_"+ext.Name()) + ext.AppendLive(childCtx, res, extCtx, update) + region.End() } } diff --git a/sync3/handler/connstate_live.go b/sync3/handler/connstate_live.go index ef2a0da8..70da78db 100644 --- a/sync3/handler/connstate_live.go +++ b/sync3/handler/connstate_live.go @@ -61,6 +61,7 @@ func (s *connStateLive) liveUpdate( // block until we get a new event, with appropriate timeout startTime := time.Now() hasLiveStreamed := false + numProcessedUpdates := 0 for response.ListOps() == 0 && len(response.Rooms) == 0 && !response.Extensions.HasData(isInitial) { hasLiveStreamed = true timeToWait := time.Duration(req.TimeoutMSecs()) * time.Millisecond @@ -82,10 +83,12 @@ func (s *connStateLive) liveUpdate( return case update := <-s.updates: s.processUpdate(ctx, update, response, ex) + numProcessedUpdates++ // if there's more updates and we don't have lots stacked up already, go ahead and process another - for len(s.updates) > 0 && response.ListOps() < 50 { + for len(s.updates) > 0 && numProcessedUpdates < 100 { update = <-s.updates s.processUpdate(ctx, update, response, ex) + numProcessedUpdates++ } } } @@ -128,6 +131,8 @@ func (s *connStateLive) processUpdate(ctx context.Context, update caches.Update, } func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, response *sync3.Response) bool { + _, span := internal.StartSpan(ctx, "processLiveUpdate") + defer span.End() internal.AssertWithContext(ctx, "processLiveUpdate: response list length != internal list length", s.lists.Len() == len(response.Lists)) internal.AssertWithContext(ctx, "processLiveUpdate: request list length != internal list length", s.lists.Len() == len(s.muxedReq.Lists)) roomUpdate, _ := up.(caches.RoomUpdate) @@ -236,7 +241,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, }) if len(r.Timeline) == 0 && r.PrevBatch == "" { // attempt to fill in the prev_batch value for this room - prevBatch := s.userCache.AttemptToFetchPrevBatch(roomEventUpdate.RoomID(), roomEventUpdate.EventData) + prevBatch := s.userCache.AttemptToFetchPrevBatch(ctx, roomEventUpdate.RoomID(), roomEventUpdate.EventData) if prevBatch != "" { r.PrevBatch = prevBatch } @@ -246,11 +251,13 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, sender := roomEventUpdate.EventData.Sender if s.lazyCache.IsLazyLoading(roomID) && !s.lazyCache.IsSet(roomID, sender) { // load the state event + _, span := internal.StartSpan(ctx, "LazyLoadingMemberEvent") memberEvent := s.globalCache.LoadStateEvent(context.Background(), roomID, s.loadPositions[roomID], "m.room.member", sender) if memberEvent != nil { r.RequiredState = append(r.RequiredState, memberEvent) s.lazyCache.AddUser(roomID, sender) } + span.End() } } }