-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix V2Typing send multiple times #214
Conversation
sync2/handler2/handler.go
Outdated
@@ -493,8 +500,14 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) { | |||
|
|||
func typingHash(ephEvent json.RawMessage) uint64 { | |||
h := fnv.New64a() | |||
for _, userID := range gjson.ParseBytes(ephEvent).Get("content.user_ids").Array() { | |||
h.Write([]byte(userID.Str)) | |||
parsedUserIDs := gjson.ParseBytes(ephEvent).Get("content.user_ids").Array() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorting the userIDs should, in theory, not be needed, but ensures that we don't compare hashes for "alice, bob" with "bob, alice" which would otherwise be different.
@@ -337,6 +340,10 @@ func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.Ra | |||
|
|||
func (h *Handler) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) { | |||
next := typingHash(ephEvent) | |||
// protect typingMap with a lock, so concurrent calls to SetTyping see the correct map | |||
h.typingMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though I couldn't get the test fail in CI, this was causing a race condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the bits which touch typingMap
or typingDeviceHandler
should be protected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't what I was thinking. We need to be smarter by holding onto sequences of typing notifs. For example, given the "actual" typing sequence of [], [A], [], [B], [A,B], [A]
we need to send exactly 5 notifications (assuming we start with []
). In reality, due to concurrent pollers, we'll see all sort of things like:
- Poller 1: [A]
- Poller 2: [A]
- Poller 1: []
- Poller 3: [A]
- Poller 1 [B]
- Poller 2: []
- Poller 1: [A, B]
- Poller 1: [A]
- Poller 3: []
- Poller 2: [B]
- Poller 2: [A,B]
- Poller 3: [B]
- Poller 2: [A]
- Poller 3: [A,B]
- Poller 3: [A]
Currently we would treat this as a huge 14 notifications, as that is the number of times the set of typing users has changed. In all cases, the sequence each poller sees is the same. So effectively we need to remember and try to find the longest common subsequence so we can distinguish between typing notifs we've already seen and ones we have not.
@@ -169,3 +176,44 @@ func TestHandlerFreshEnsurePolling(t *testing.T) { | |||
}) | |||
|
|||
} | |||
|
|||
func TestSetTypingConcurrently(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't what I'm interested in testing. The case here should be passing without your change.
The case I'm interested in is when you have 2 pollers receiving delayed typing notifs. For example. if alice starts typing then stops typing (so [A] then []) the problem is that 1 poller may be "behind" the other, such that it has yet to see [A] whilst the other "ahead" poller has already seen [A] and []. In this scenario, we flicker with 4 operations instead of 2, as we go [A], [], [A], [], which this test is not testing, nor does the code fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't what I'm interested in testing. The case here should be passing without your change.
It doesn't always pass. With luck it does, yea, if the machine is slow enough to execute both calls.
fatal error: concurrent map writes
goroutine 246 [running]:
github.com/matrix-org/sliding-sync/sync2/handler2.(*Handler).SetTyping(0xc0003d2080, {0x0?, 0x0?}, {0xa24ad3, 0x11}, {0xc0004e8090, 0x2d, 0x2d})
github.com/sliding-sync/sync2/handler2/handler.go:344 +0x96
github.com/matrix-org/sliding-sync/sync2/handler2_test.TestSetTypingConcurrently.func2()
github.com/sliding-sync/sync2/handler2/handler_test.go:203 +0xd9
created by github.com/matrix-org/sliding-sync/sync2/handler2_test.TestSetTypingConcurrently
github.com/sliding-sync/sync2/handler2/handler_test.go:201 +0x2b0
which also means that h.typingMap[roomID]
returned 0
as the existing
value, resulting in duplicate notifications (if the machine is, again, slow enough, that the map writes aren't concurrent :D)
Yep. This PR mainly fixes the race condition, so I'd be totally fine to remove the "sorting" changes and only fix the race condition first. (which in turn hopefully reduces the amount of notifications we send) |
sync2/handler2/handler.go
Outdated
typingMu *sync.Mutex | ||
|
||
// room_id -> device_id, stores which device is allowed to update typing notifications | ||
typingDeviceHandler map[string]string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add a mutex for this map? I believe so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Device ID alone doesn't uniquely identify a poller. Try sync2.PollerID
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs integration tests to assert that this behaviour works i.e alice and bob -> alice sees typing first, ensure we don't see dupes.
With this structure, we don't need the fnv hash logic no? So remove?
sync2/handler2/handler.go
Outdated
typingMu *sync.Mutex | ||
|
||
// room_id -> device_id, stores which device is allowed to update typing notifications | ||
typingDeviceHandler map[string]string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Device ID alone doesn't uniquely identify a poller. Try sync2.PollerID
?
@@ -337,6 +340,10 @@ func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.Ra | |||
|
|||
func (h *Handler) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) { | |||
next := typingHash(ephEvent) | |||
// protect typingMap with a lock, so concurrent calls to SetTyping see the correct map | |||
h.typingMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the bits which touch typingMap
or typingDeviceHandler
should be protected.
State: sync2.EventsResponse{ | ||
Events: roomState, | ||
}, | ||
Ephemeral: sync2.EventsResponse{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need something in Timeline
as well, else it isn't mimicing Synapse. Put joinEv
in it.
// We expect only Alice typing, as only Alice Poller is "allowed" | ||
// to update typing notifications. | ||
m.MatchResponse(t, res, m.MatchTyping(roomA, []string{alice})) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then test that if you update the typing event to be bob then it comes through please.
roomState = append(roomState, joinEv) | ||
|
||
// Queue the response with Alice typing | ||
v2.queueResponse(aliceToken, sync2.SyncResponse{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do this before you call mustDoV3Request
.
tests-integration/extensions_test.go
Outdated
joinEv := testutils.NewStateEvent(t, "m.room.member", bob, alice, map[string]interface{}{ | ||
"membership": "join", | ||
}) | ||
roomState = append(roomState, joinEv) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required, you're adding this in the timeline.
tests-integration/extensions_test.go
Outdated
|
||
// Queue another response for bob, with bob typing. | ||
// Since Bobs poller isn't allowed to update typing notifications, we should only see | ||
// Alice typing below. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment makes no sense as that hasn't been decided yet. Move it down to when you start doing v3 reqs.
}) | ||
|
||
// start the pollers | ||
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that alice polls first is why alice is in charge of typing notifs.
} | ||
|
||
// Queue the response with Bob typing | ||
v2.queueResponse(aliceToken, sync2.SyncResponse{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queue up a response with bobToken
with some random user typing please to make sure that we're still ignoring bob's poller.
}, | ||
}, | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to v2.waitUntilEmpty(t, aliceToken)
and bobToken
to make sure we do the request after the proxy has processed the v2 response.
With multiple pollers for the same user or different users with the same room, it may happen that
SetTyping
is called concurrently or in such short sequence that the map holding the hashes hasn't been updated yet.This can result in unneeded published notifications.
With this change, we assign a deviceID for every room, this devices is then "responsible" to send typing notifications. If the owning user leaves a given room, it is unassigned.