Skip to content

Commit

Permalink
Merge pull request #405 from matrix-org/kegan/spans
Browse files Browse the repository at this point in the history
Add more spans for live updates; change when we early return from buffered events
  • Loading branch information
kegsay committed Feb 26, 2024
2 parents a95f3c7 + 8ccb018 commit 30f8c5b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
4 changes: 3 additions & 1 deletion sync3/caches/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,9 @@ func (c *UserCache) Invites() map[string]UserRoomData {
}

// AttemptToFetchPrevBatch tries to find a prev_batch value for the given event. This may not always succeed.
func (c *UserCache) AttemptToFetchPrevBatch(roomID string, firstTimelineEvent *EventData) (prevBatch string) {
func (c *UserCache) AttemptToFetchPrevBatch(ctx context.Context, roomID string, firstTimelineEvent *EventData) (prevBatch string) {
_, span := internal.StartSpan(ctx, "AttemptToFetchPrevBatch")
defer span.End()
return c.store.GetClosestPrevBatch(roomID, firstTimelineEvent.NID)
}

Expand Down
2 changes: 2 additions & 0 deletions sync3/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (c *Conn) isOutstanding(pos int64) bool {
// client. It will NOT be reported to Sentry---this should happen as close as possible
// to the creation of the error (or else Sentry cannot provide a meaningful traceback.)
func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.Time) (resp *Response, herr *internal.HandlerError) {
ctx, span := internal.StartSpan(ctx, "OnIncomingRequest.AcquireMutex")
c.cancelOutstandingRequestMu.Lock()
if c.cancelOutstandingRequest != nil {
c.cancelOutstandingRequest()
Expand All @@ -149,6 +150,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T
// it's intentional for the lock to be held whilst inside HandleIncomingRequest
// as it guarantees linearisation of data within a single connection
defer c.mu.Unlock()
span.End()

isFirstRequest := req.pos == 0
isRetransmit := !isFirstRequest && c.lastClientRequest.pos == req.pos
Expand Down
4 changes: 3 additions & 1 deletion sync3/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ func (h *Handler) HandleLiveUpdate(ctx context.Context, update caches.Update, re
extCtx.Handler = h
exts := req.EnabledExtensions()
for _, ext := range exts {
ext.AppendLive(ctx, res, extCtx, update)
childCtx, region := internal.StartSpan(ctx, "extension_live_"+ext.Name())
ext.AppendLive(childCtx, res, extCtx, update)
region.End()
}
}

Expand Down
11 changes: 9 additions & 2 deletions sync3/handler/connstate_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (s *connStateLive) liveUpdate(
// block until we get a new event, with appropriate timeout
startTime := time.Now()
hasLiveStreamed := false
numProcessedUpdates := 0
for response.ListOps() == 0 && len(response.Rooms) == 0 && !response.Extensions.HasData(isInitial) {
hasLiveStreamed = true
timeToWait := time.Duration(req.TimeoutMSecs()) * time.Millisecond
Expand All @@ -82,10 +83,12 @@ func (s *connStateLive) liveUpdate(
return
case update := <-s.updates:
s.processUpdate(ctx, update, response, ex)
numProcessedUpdates++
// if there's more updates and we don't have lots stacked up already, go ahead and process another
for len(s.updates) > 0 && response.ListOps() < 50 {
for len(s.updates) > 0 && numProcessedUpdates < 100 {
update = <-s.updates
s.processUpdate(ctx, update, response, ex)
numProcessedUpdates++
}
}
}
Expand Down Expand Up @@ -128,6 +131,8 @@ func (s *connStateLive) processUpdate(ctx context.Context, update caches.Update,
}

func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, response *sync3.Response) bool {
_, span := internal.StartSpan(ctx, "processLiveUpdate")
defer span.End()
internal.AssertWithContext(ctx, "processLiveUpdate: response list length != internal list length", s.lists.Len() == len(response.Lists))
internal.AssertWithContext(ctx, "processLiveUpdate: request list length != internal list length", s.lists.Len() == len(s.muxedReq.Lists))
roomUpdate, _ := up.(caches.RoomUpdate)
Expand Down Expand Up @@ -236,7 +241,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
})
if len(r.Timeline) == 0 && r.PrevBatch == "" {
// attempt to fill in the prev_batch value for this room
prevBatch := s.userCache.AttemptToFetchPrevBatch(roomEventUpdate.RoomID(), roomEventUpdate.EventData)
prevBatch := s.userCache.AttemptToFetchPrevBatch(ctx, roomEventUpdate.RoomID(), roomEventUpdate.EventData)
if prevBatch != "" {
r.PrevBatch = prevBatch
}
Expand All @@ -246,11 +251,13 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
sender := roomEventUpdate.EventData.Sender
if s.lazyCache.IsLazyLoading(roomID) && !s.lazyCache.IsSet(roomID, sender) {
// load the state event
_, span := internal.StartSpan(ctx, "LazyLoadingMemberEvent")
memberEvent := s.globalCache.LoadStateEvent(context.Background(), roomID, s.loadPositions[roomID], "m.room.member", sender)
if memberEvent != nil {
r.RequiredState = append(r.RequiredState, memberEvent)
s.lazyCache.AddUser(roomID, sender)
}
span.End()
}
}
}
Expand Down

0 comments on commit 30f8c5b

Please sign in to comment.