Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix V2Typing send multiple times #214

Merged
merged 14 commits into from
Aug 2, 2023
Merged

Fix V2Typing send multiple times #214

merged 14 commits into from
Aug 2, 2023

Conversation

S7evinK
Copy link
Collaborator

@S7evinK S7evinK commented Jul 20, 2023

With multiple pollers for the same user or different users with the same room, it may happen that SetTyping is called concurrently or in such short sequence that the map holding the hashes hasn't been updated yet.
This can result in unneeded published notifications.

With this change, we assign a deviceID for every room, this devices is then "responsible" to send typing notifications. If the owning user leaves a given room, it is unassigned.

@@ -493,8 +500,14 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) {

func typingHash(ephEvent json.RawMessage) uint64 {
h := fnv.New64a()
for _, userID := range gjson.ParseBytes(ephEvent).Get("content.user_ids").Array() {
h.Write([]byte(userID.Str))
parsedUserIDs := gjson.ParseBytes(ephEvent).Get("content.user_ids").Array()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting the userIDs should, in theory, not be needed, but ensures that we don't compare hashes for "alice, bob" with "bob, alice" which would otherwise be different.

@@ -337,6 +340,10 @@ func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.Ra

func (h *Handler) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
next := typingHash(ephEvent)
// protect typingMap with a lock, so concurrent calls to SetTyping see the correct map
h.typingMu.Lock()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though I couldn't get the test fail in CI, this was causing a race condition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the bits which touch typingMap or typingDeviceHandler should be protected.

Copy link
Member

@kegsay kegsay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't what I was thinking. We need to be smarter by holding onto sequences of typing notifs. For example, given the "actual" typing sequence of [], [A], [], [B], [A,B], [A] we need to send exactly 5 notifications (assuming we start with []). In reality, due to concurrent pollers, we'll see all sort of things like:

  • Poller 1: [A]
  • Poller 2: [A]
  • Poller 1: []
  • Poller 3: [A]
  • Poller 1 [B]
  • Poller 2: []
  • Poller 1: [A, B]
  • Poller 1: [A]
  • Poller 3: []
  • Poller 2: [B]
  • Poller 2: [A,B]
  • Poller 3: [B]
  • Poller 2: [A]
  • Poller 3: [A,B]
  • Poller 3: [A]

Currently we would treat this as a huge 14 notifications, as that is the number of times the set of typing users has changed. In all cases, the sequence each poller sees is the same. So effectively we need to remember and try to find the longest common subsequence so we can distinguish between typing notifs we've already seen and ones we have not.

@@ -169,3 +176,44 @@ func TestHandlerFreshEnsurePolling(t *testing.T) {
})

}

func TestSetTypingConcurrently(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't what I'm interested in testing. The case here should be passing without your change.

The case I'm interested in is when you have 2 pollers receiving delayed typing notifs. For example. if alice starts typing then stops typing (so [A] then []) the problem is that 1 poller may be "behind" the other, such that it has yet to see [A] whilst the other "ahead" poller has already seen [A] and []. In this scenario, we flicker with 4 operations instead of 2, as we go [A], [], [A], [], which this test is not testing, nor does the code fix.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't what I'm interested in testing. The case here should be passing without your change.

It doesn't always pass. With luck it does, yea, if the machine is slow enough to execute both calls.

fatal error: concurrent map writes

goroutine 246 [running]:
github.com/matrix-org/sliding-sync/sync2/handler2.(*Handler).SetTyping(0xc0003d2080, {0x0?, 0x0?}, {0xa24ad3, 0x11}, {0xc0004e8090, 0x2d, 0x2d})
        github.com/sliding-sync/sync2/handler2/handler.go:344 +0x96
github.com/matrix-org/sliding-sync/sync2/handler2_test.TestSetTypingConcurrently.func2()
       github.com/sliding-sync/sync2/handler2/handler_test.go:203 +0xd9
created by github.com/matrix-org/sliding-sync/sync2/handler2_test.TestSetTypingConcurrently
       github.com/sliding-sync/sync2/handler2/handler_test.go:201 +0x2b0

which also means that h.typingMap[roomID] returned 0 as the existing value, resulting in duplicate notifications (if the machine is, again, slow enough, that the map writes aren't concurrent :D)

@S7evinK
Copy link
Collaborator Author

S7evinK commented Jul 20, 2023

Yep. This PR mainly fixes the race condition, so I'd be totally fine to remove the "sorting" changes and only fix the race condition first. (which in turn hopefully reduces the amount of notifications we send)

@S7evinK S7evinK requested a review from kegsay July 24, 2023 08:03
typingMu *sync.Mutex

// room_id -> device_id, stores which device is allowed to update typing notifications
typingDeviceHandler map[string]string
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a mutex for this map? I believe so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Device ID alone doesn't uniquely identify a poller. Try sync2.PollerID?

Copy link
Member

@kegsay kegsay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs integration tests to assert that this behaviour works i.e alice and bob -> alice sees typing first, ensure we don't see dupes.

With this structure, we don't need the fnv hash logic no? So remove?

typingMu *sync.Mutex

// room_id -> device_id, stores which device is allowed to update typing notifications
typingDeviceHandler map[string]string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Device ID alone doesn't uniquely identify a poller. Try sync2.PollerID?

@@ -337,6 +340,10 @@ func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.Ra

func (h *Handler) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
next := typingHash(ephEvent)
// protect typingMap with a lock, so concurrent calls to SetTyping see the correct map
h.typingMu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the bits which touch typingMap or typingDeviceHandler should be protected.

State: sync2.EventsResponse{
Events: roomState,
},
Ephemeral: sync2.EventsResponse{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need something in Timeline as well, else it isn't mimicing Synapse. Put joinEv in it.

// We expect only Alice typing, as only Alice Poller is "allowed"
// to update typing notifications.
m.MatchResponse(t, res, m.MatchTyping(roomA, []string{alice}))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then test that if you update the typing event to be bob then it comes through please.

roomState = append(roomState, joinEv)

// Queue the response with Alice typing
v2.queueResponse(aliceToken, sync2.SyncResponse{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this before you call mustDoV3Request.

joinEv := testutils.NewStateEvent(t, "m.room.member", bob, alice, map[string]interface{}{
"membership": "join",
})
roomState = append(roomState, joinEv)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required, you're adding this in the timeline.


// Queue another response for bob, with bob typing.
// Since Bobs poller isn't allowed to update typing notifications, we should only see
// Alice typing below.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment makes no sense as that hasn't been decided yet. Move it down to when you start doing v3 reqs.

})

// start the pollers
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that alice polls first is why alice is in charge of typing notifs.

}

// Queue the response with Bob typing
v2.queueResponse(aliceToken, sync2.SyncResponse{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue up a response with bobToken with some random user typing please to make sure that we're still ignoring bob's poller.

},
},
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to v2.waitUntilEmpty(t, aliceToken) and bobToken to make sure we do the request after the proxy has processed the v2 response.

@S7evinK S7evinK merged commit 9862fad into main Aug 2, 2023
6 checks passed
@S7evinK S7evinK deleted the s7evink/typing branch August 22, 2023 07:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants