diff --git a/changelog.d/17187.feature b/changelog.d/17187.feature new file mode 100644 index 00000000000..50383cb4a4d --- /dev/null +++ b/changelog.d/17187.feature @@ -0,0 +1 @@ +Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 0a9123c56bb..542e4faaa1d 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -50,7 +50,7 @@ class Membership: KNOCK: Final = "knock" LEAVE: Final = "leave" BAN: Final = "ban" - LIST: Final = (INVITE, JOIN, KNOCK, LEAVE, BAN) + LIST: Final = {INVITE, JOIN, KNOCK, LEAVE, BAN} class PresenceState: diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py new file mode 100644 index 00000000000..34ae21ba509 --- /dev/null +++ b/synapse/handlers/sliding_sync.py @@ -0,0 +1,610 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# +import logging +from enum import Enum +from typing import TYPE_CHECKING, AbstractSet, Dict, Final, List, Optional, Tuple + +import attr +from immutabledict import immutabledict + +from synapse._pydantic_compat import HAS_PYDANTIC_V2 + +if TYPE_CHECKING or HAS_PYDANTIC_V2: + from pydantic.v1 import Extra +else: + from pydantic import Extra + +from synapse.api.constants import Membership +from synapse.events import EventBase +from synapse.rest.client.models import SlidingSyncBody +from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool: + """ + Returns True if the membership event should be included in the sync response, + otherwise False. + + Attributes: + membership: The membership state of the user in the room. + user_id: The user ID that the membership applies to + sender: The person who sent the membership event + """ + + # Everything except `Membership.LEAVE` because we want everything that's *still* + # relevant to the user. There are few more things to include in the sync response + # (newly_left) but those are handled separately. + # + # This logic includes kicks (leave events where the sender is not the same user) and + # can be read as "anything that isn't a leave or a leave with a different sender". + return membership != Membership.LEAVE or sender != user_id + + +class SlidingSyncConfig(SlidingSyncBody): + """ + Inherit from `SlidingSyncBody` since we need all of the same fields and add a few + extra fields that we need in the handler + """ + + user: UserID + device_id: Optional[str] + + # Pydantic config + class Config: + # By default, ignore fields that we don't recognise. + extra = Extra.ignore + # By default, don't allow fields to be reassigned after parsing. + allow_mutation = False + # Allow custom types like `UserID` to be used in the model + arbitrary_types_allowed = True + + +class OperationType(Enum): + """ + Represents the operation types in a Sliding Sync window. + + Attributes: + SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about + entries in this range. + INSERT: Sets a single entry. If the position is not empty then clients MUST move + entries to the left or the right depending on where the closest empty space is. + DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move + places. + INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for + offline support, but they should be treated as empty when additional operations + which concern indexes in the range arrive from the server. + """ + + SYNC: Final = "SYNC" + INSERT: Final = "INSERT" + DELETE: Final = "DELETE" + INVALIDATE: Final = "INVALIDATE" + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SlidingSyncResult: + """ + The Sliding Sync result to be serialized to JSON for a response. + + Attributes: + next_pos: The next position token in the sliding window to request (next_batch). + lists: Sliding window API. A map of list key to list results. + rooms: Room subscription API. A map of room ID to room subscription to room results. + extensions: Extensions API. A map of extension key to extension results. + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class RoomResult: + """ + Attributes: + name: Room name or calculated room name. + avatar: Room avatar + heroes: List of stripped membership events (containing `user_id` and optionally + `avatar_url` and `displayname`) for the users used to calculate the room name. + initial: Flag which is set when this is the first time the server is sending this + data on this connection. Clients can use this flag to replace or update + their local state. When there is an update, servers MUST omit this flag + entirely and NOT send "initial":false as this is wasteful on bandwidth. The + absence of this flag means 'false'. + required_state: The current state of the room + timeline: Latest events in the room. The last event is the most recent + is_dm: Flag to specify whether the room is a direct-message room (most likely + between two people). + invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state` + in sync v2, absent on joined/left rooms + prev_batch: A token that can be passed as a start parameter to the + `/rooms//messages` API to retrieve earlier messages. + limited: True if their are more events than fit between the given position and now. + Sync again to get more. + joined_count: The number of users with membership of join, including the client's + own user ID. (same as sync `v2 m.joined_member_count`) + invited_count: The number of users with membership of invite. (same as sync v2 + `m.invited_member_count`) + notification_count: The total number of unread notifications for this room. (same + as sync v2) + highlight_count: The number of unread notifications for this room with the highlight + flag set. (same as sync v2) + num_live: The number of timeline events which have just occurred and are not historical. + The last N events are 'live' and should be treated as such. This is mostly + useful to determine whether a given @mention event should make a noise or not. + Clients cannot rely solely on the absence of `initial: true` to determine live + events because if a room not in the sliding window bumps into the window because + of an @mention it will have `initial: true` yet contain a single live event + (with potentially other old events in the timeline). + """ + + name: str + avatar: Optional[str] + heroes: Optional[List[EventBase]] + initial: bool + required_state: List[EventBase] + timeline: List[EventBase] + is_dm: bool + invite_state: List[EventBase] + prev_batch: StreamToken + limited: bool + joined_count: int + invited_count: int + notification_count: int + highlight_count: int + num_live: int + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class SlidingWindowList: + """ + Attributes: + count: The total number of entries in the list. Always present if this list + is. + ops: The sliding list operations to perform. + """ + + @attr.s(slots=True, frozen=True, auto_attribs=True) + class Operation: + """ + Attributes: + op: The operation type to perform. + range: Which index positions are affected by this operation. These are + both inclusive. + room_ids: Which room IDs are affected by this operation. These IDs match + up to the positions in the `range`, so the last room ID in this list + matches the 9th index. The room data is held in a separate object. + """ + + op: OperationType + range: Tuple[int, int] + room_ids: List[str] + + count: int + ops: List[Operation] + + next_pos: StreamToken + lists: Dict[str, SlidingWindowList] + rooms: Dict[str, RoomResult] + extensions: JsonMapping + + def __bool__(self) -> bool: + """Make the result appear empty if there are no updates. This is used + to tell if the notifier needs to wait for more events when polling for + events. + """ + return bool(self.lists or self.rooms or self.extensions) + + @staticmethod + def empty(next_pos: StreamToken) -> "SlidingSyncResult": + "Return a new empty result" + return SlidingSyncResult( + next_pos=next_pos, + lists={}, + rooms={}, + extensions={}, + ) + + +class SlidingSyncHandler: + def __init__(self, hs: "HomeServer"): + self.clock = hs.get_clock() + self.store = hs.get_datastores().main + self.auth_blocking = hs.get_auth_blocking() + self.notifier = hs.get_notifier() + self.event_sources = hs.get_event_sources() + self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync + + async def wait_for_sync_for_user( + self, + requester: Requester, + sync_config: SlidingSyncConfig, + from_token: Optional[StreamToken] = None, + timeout_ms: int = 0, + ) -> SlidingSyncResult: + """Get the sync for a client if we have new data for it now. Otherwise + wait for new data to arrive on the server. If the timeout expires, then + return an empty sync result. + """ + # If the user is not part of the mau group, then check that limits have + # not been exceeded (if not part of the group by this point, almost certain + # auth_blocking will occur) + await self.auth_blocking.check_auth_blocking(requester=requester) + + # TODO: If the To-Device extension is enabled and we have a `from_token`, delete + # any to-device messages before that token (since we now know that the device + # has received them). (see sync v2 for how to do this) + + # If we're working with a user-provided token, we need to make sure to wait for + # this worker to catch up with the token so we don't skip past any incoming + # events or future events if the user is nefariously, manually modifying the + # token. + if from_token is not None: + # We need to make sure this worker has caught up with the token. If + # this returns false, it means we timed out waiting, and we should + # just return an empty response. + before_wait_ts = self.clock.time_msec() + if not await self.notifier.wait_for_stream_token(from_token): + logger.warning( + "Timed out waiting for worker to catch up. Returning empty response" + ) + return SlidingSyncResult.empty(from_token) + + # If we've spent significant time waiting to catch up, take it off + # the timeout. + after_wait_ts = self.clock.time_msec() + if after_wait_ts - before_wait_ts > 1_000: + timeout_ms -= after_wait_ts - before_wait_ts + timeout_ms = max(timeout_ms, 0) + + # We're going to respond immediately if the timeout is 0 or if this is an + # initial sync (without a `from_token`) so we can avoid calling + # `notifier.wait_for_events()`. + if timeout_ms == 0 or from_token is None: + now_token = self.event_sources.get_current_token() + result = await self.current_sync_for_user( + sync_config, + from_token=from_token, + to_token=now_token, + ) + else: + # Otherwise, we wait for something to happen and report it to the user. + async def current_sync_callback( + before_token: StreamToken, after_token: StreamToken + ) -> SlidingSyncResult: + return await self.current_sync_for_user( + sync_config, + from_token=from_token, + to_token=after_token, + ) + + result = await self.notifier.wait_for_events( + sync_config.user.to_string(), + timeout_ms, + current_sync_callback, + from_token=from_token, + ) + + return result + + async def current_sync_for_user( + self, + sync_config: SlidingSyncConfig, + to_token: StreamToken, + from_token: Optional[StreamToken] = None, + ) -> SlidingSyncResult: + """ + Generates the response body of a Sliding Sync result, represented as a + `SlidingSyncResult`. + """ + user_id = sync_config.user.to_string() + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() + + # Get all of the room IDs that the user should be able to see in the sync + # response + room_id_set = await self.get_sync_room_ids_for_user( + sync_config.user, + from_token=from_token, + to_token=to_token, + ) + + # Assemble sliding window lists + lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + if sync_config.lists: + for list_key, list_config in sync_config.lists.items(): + # TODO: Apply filters + # + # TODO: Exclude partially stated rooms unless the `required_state` has + # `["m.room.member", "$LAZY"]` + filtered_room_ids = room_id_set + # TODO: Apply sorts + sorted_room_ids = sorted(filtered_room_ids) + + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] + if list_config.ranges: + for range in list_config.ranges: + ops.append( + SlidingSyncResult.SlidingWindowList.Operation( + op=OperationType.SYNC, + range=range, + room_ids=sorted_room_ids[range[0] : range[1]], + ) + ) + + lists[list_key] = SlidingSyncResult.SlidingWindowList( + count=len(sorted_room_ids), + ops=ops, + ) + + return SlidingSyncResult( + next_pos=to_token, + lists=lists, + # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions` + rooms={}, + extensions={}, + ) + + async def get_sync_room_ids_for_user( + self, + user: UserID, + to_token: StreamToken, + from_token: Optional[StreamToken] = None, + ) -> AbstractSet[str]: + """ + Fetch room IDs that should be listed for this user in the sync response (the + full room list that will be filtered, sorted, and sliced). + + We're looking for rooms where the user has the following state in the token + range (> `from_token` and <= `to_token`): + + - `invite`, `join`, `knock`, `ban` membership events + - Kicks (`leave` membership events where `sender` is different from the + `user_id`/`state_key`) + - `newly_left` (rooms that were left during the given token range) + - In order for bans/kicks to not show up in sync, you need to `/forget` those + rooms. This doesn't modify the event itself though and only adds the + `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way + to tell when a room was forgotten at the moment so we can't factor it into the + from/to range. + """ + user_id = user.to_string() + + # First grab a current snapshot rooms for the user + # (also handles forgotten rooms) + room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is( + user_id=user_id, + # We want to fetch any kind of membership (joined and left rooms) in order + # to get the `event_pos` of the latest room membership event for the + # user. + # + # We will filter out the rooms that don't belong below (see + # `filter_membership_for_sync`) + membership_list=Membership.LIST, + excluded_rooms=self.rooms_to_exclude_globally, + ) + + # If the user has never joined any rooms before, we can just return an empty list + if not room_for_user_list: + return set() + + # Our working list of rooms that can show up in the sync response + sync_room_id_set = { + room_for_user.room_id + for room_for_user in room_for_user_list + if filter_membership_for_sync( + membership=room_for_user.membership, + user_id=user_id, + sender=room_for_user.sender, + ) + } + + # Get the `RoomStreamToken` that represents the spot we queried up to when we got + # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`. + # + # First, we need to get the max stream_ordering of each event persister instance + # that we queried events from. + instance_to_max_stream_ordering_map: Dict[str, int] = {} + for room_for_user in room_for_user_list: + instance_name = room_for_user.event_pos.instance_name + stream_ordering = room_for_user.event_pos.stream + + current_instance_max_stream_ordering = ( + instance_to_max_stream_ordering_map.get(instance_name) + ) + if ( + current_instance_max_stream_ordering is None + or stream_ordering > current_instance_max_stream_ordering + ): + instance_to_max_stream_ordering_map[instance_name] = stream_ordering + + # Then assemble the `RoomStreamToken` + membership_snapshot_token = RoomStreamToken( + # Minimum position in the `instance_map` + stream=min(instance_to_max_stream_ordering_map.values()), + instance_map=immutabledict(instance_to_max_stream_ordering_map), + ) + + # If our `to_token` is already the same or ahead of the latest room membership + # for the user, we can just straight-up return the room list (nothing has + # changed) + if membership_snapshot_token.is_before_or_eq(to_token.room_key): + return sync_room_id_set + + # Since we fetched the users room list at some point in time after the from/to + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. In particular, we need to make these fixups: + # + # - 1a) Remove rooms that the user joined after the `to_token` + # - 1b) Add back rooms that the user left after the `to_token` + # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`) + # + # Below, we're doing two separate lookups for membership changes. We could + # request everything for both fixups in one range, [`from_token.room_key`, + # `membership_snapshot_token`), but we want to avoid raw `stream_ordering` + # comparison without `instance_name` (which is flawed). We could refactor + # `event.internal_metadata` to include `instance_name` but it might turn out a + # little difficult and a bigger, broader Synapse change than we want to make. + + # 1) ----------------------------------------------------- + + # 1) Fetch membership changes that fall in the range from `to_token` up to + # `membership_snapshot_token` + membership_change_events_after_to_token = ( + await self.store.get_membership_changes_for_user( + user_id, + from_key=to_token.room_key, + to_key=membership_snapshot_token, + excluded_rooms=self.rooms_to_exclude_globally, + ) + ) + + # 1) Assemble a list of the last membership events in some given ranges. Someone + # could have left and joined multiple times during the given range but we only + # care about end-result so we grab the last one. + last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} + # We also need the first membership event after the `to_token` so we can step + # backward to the previous membership that would apply to the from/to range. + first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} + for event in membership_change_events_after_to_token: + last_membership_change_by_room_id_after_to_token[event.room_id] = event + # Only set if we haven't already set it + first_membership_change_by_room_id_after_to_token.setdefault( + event.room_id, event + ) + + # 1) Fixup + for ( + last_membership_change_after_to_token + ) in last_membership_change_by_room_id_after_to_token.values(): + room_id = last_membership_change_after_to_token.room_id + + # We want to find the first membership change after the `to_token` then step + # backward to know the membership in the from/to range. + first_membership_change_after_to_token = ( + first_membership_change_by_room_id_after_to_token.get(room_id) + ) + assert first_membership_change_after_to_token is not None, ( + "If there was a `last_membership_change_after_to_token` that we're iterating over, " + + "then there should be corresponding a first change. For example, even if there " + + "is only one event after the `to_token`, the first and last event will be same event. " + + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`" + + "/`first_membership_change_by_room_id_after_to_token` dicts above." + ) + # TODO: Instead of reading from `unsigned`, refactor this to use the + # `current_state_delta_stream` table in the future. Probably a new + # `get_membership_changes_for_user()` function that uses + # `current_state_delta_stream` with a join to `room_memberships`. This would + # help in state reset scenarios since `prev_content` is looking at the + # current branch vs the current room state. This is all just data given to + # the client so no real harm to data integrity, but we'd like to be nice to + # the client. Since the `current_state_delta_stream` table is new, it + # doesn't have all events in it. Since this is Sliding Sync, if we ever need + # to, we can signal the client to throw all of their state away by sending + # "operation: RESET". + prev_content = first_membership_change_after_to_token.unsigned.get( + "prev_content", {} + ) + prev_membership = prev_content.get("membership", None) + prev_sender = first_membership_change_after_to_token.unsigned.get( + "prev_sender", None + ) + + # Check if the previous membership (membership that applies to the from/to + # range) should be included in our `sync_room_id_set` + should_prev_membership_be_included = ( + prev_membership is not None + and prev_sender is not None + and filter_membership_for_sync( + membership=prev_membership, + user_id=user_id, + sender=prev_sender, + ) + ) + + # Check if the last membership (membership that applies to our snapshot) was + # already included in our `sync_room_id_set` + was_last_membership_already_included = filter_membership_for_sync( + membership=last_membership_change_after_to_token.membership, + user_id=user_id, + sender=last_membership_change_after_to_token.sender, + ) + + # 1a) Add back rooms that the user left after the `to_token` + # + # For example, if the last membership event after the `to_token` is a leave + # event, then the room was excluded from `sync_room_id_set` when we first + # crafted it above. We should add these rooms back as long as the user also + # was part of the room before the `to_token`. + if ( + not was_last_membership_already_included + and should_prev_membership_be_included + ): + sync_room_id_set.add(room_id) + # 1b) Remove rooms that the user joined (hasn't left) after the `to_token` + # + # For example, if the last membership event after the `to_token` is a "join" + # event, then the room was included `sync_room_id_set` when we first crafted + # it above. We should remove these rooms as long as the user also wasn't + # part of the room before the `to_token`. + elif ( + was_last_membership_already_included + and not should_prev_membership_be_included + ): + sync_room_id_set.discard(room_id) + + # 2) ----------------------------------------------------- + # We fix-up newly_left rooms after the first fixup because it may have removed + # some left rooms that we can figure out our newly_left in the following code + + # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` + membership_change_events_in_from_to_range = [] + if from_token: + membership_change_events_in_from_to_range = ( + await self.store.get_membership_changes_for_user( + user_id, + from_key=from_token.room_key, + to_key=to_token.room_key, + excluded_rooms=self.rooms_to_exclude_globally, + ) + ) + + # 2) Assemble a list of the last membership events in some given ranges. Someone + # could have left and joined multiple times during the given range but we only + # care about end-result so we grab the last one. + last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {} + for event in membership_change_events_in_from_to_range: + last_membership_change_by_room_id_in_from_to_range[event.room_id] = event + + # 2) Fixup + for ( + last_membership_change_in_from_to_range + ) in last_membership_change_by_room_id_in_from_to_range.values(): + room_id = last_membership_change_in_from_to_range.room_id + + # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We + # include newly_left rooms because the last event that the user should see + # is their own leave event + if last_membership_change_in_from_to_range.membership == Membership.LEAVE: + sync_room_id_set.add(room_id) + + return sync_room_id_set diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9d37e2a86f3..39964726c5b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2002,7 +2002,7 @@ async def get_sync_result_builder( """ user_id = sync_config.user.to_string() - # Note: we get the users room list *before* we get the current token, this + # Note: we get the users room list *before* we get the `now_token`, this # avoids checking back in history if rooms are joined after the token is fetched. token_before_rooms = self.event_sources.get_current_token() mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) @@ -2014,10 +2014,10 @@ async def get_sync_result_builder( now_token = self.event_sources.get_current_token() log_kv({"now_token": now_token}) - # Since we fetched the users room list before the token, there's a small window - # during which membership events may have been persisted, so we fetch these now - # and modify the joined room list for any changes between the get_rooms_for_user - # call and the get_current_token call. + # Since we fetched the users room list before calculating the `now_token` (see + # above), there's a small window during which membership events may have been + # persisted, so we fetch these now and modify the joined room list for any + # changes between the get_rooms_for_user call and the get_current_token call. membership_change_events = [] if since_token: membership_change_events = await self.store.get_membership_changes_for_user( @@ -2027,16 +2027,19 @@ async def get_sync_result_builder( self.rooms_to_exclude_globally, ) - mem_last_change_by_room_id: Dict[str, EventBase] = {} + last_membership_change_by_room_id: Dict[str, EventBase] = {} for event in membership_change_events: - mem_last_change_by_room_id[event.room_id] = event + last_membership_change_by_room_id[event.room_id] = event # For the latest membership event in each room found, add/remove the room ID # from the joined room list accordingly. In this case we only care if the # latest change is JOIN. - for room_id, event in mem_last_change_by_room_id.items(): + for room_id, event in last_membership_change_by_room_id.items(): assert event.internal_metadata.stream_ordering + # As a shortcut, skip any events that happened before we got our + # `get_rooms_for_user()` snapshot (any changes are already represented + # in that list). if ( event.internal_metadata.stream_ordering < token_before_rooms.room_key.stream diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py index fc1aed2889b..5433ed91efc 100644 --- a/synapse/rest/client/models.py +++ b/synapse/rest/client/models.py @@ -18,14 +18,30 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import TYPE_CHECKING, Dict, Optional +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union from synapse._pydantic_compat import HAS_PYDANTIC_V2 if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import Extra, StrictInt, StrictStr, constr, validator + from pydantic.v1 import ( + Extra, + StrictBool, + StrictInt, + StrictStr, + conint, + constr, + validator, + ) else: - from pydantic import Extra, StrictInt, StrictStr, constr, validator + from pydantic import ( + Extra, + StrictBool, + StrictInt, + StrictStr, + conint, + constr, + validator, + ) from synapse.rest.models import RequestBodyModel from synapse.util.threepids import validate_email @@ -97,3 +113,172 @@ class EmailRequestTokenBody(ThreepidRequestTokenBody): class MsisdnRequestTokenBody(ThreepidRequestTokenBody): country: ISO3116_1_Alpha_2 phone_number: StrictStr + + +class SlidingSyncBody(RequestBodyModel): + """ + Sliding Sync API request body. + + Attributes: + lists: Sliding window API. A map of list key to list information + (:class:`SlidingSyncList`). Max lists: 100. The list keys should be + arbitrary strings which the client is using to refer to the list. Keep this + small as it needs to be sent a lot. Max length: 64 bytes. + room_subscriptions: Room subscription API. A map of room ID to room subscription + information. Used to subscribe to a specific room. Sometimes clients know + exactly which room they want to get information about e.g by following a + permalink or by refreshing a webapp currently viewing a specific room. The + sliding window API alone is insufficient for this use case because there's + no way to say "please track this room explicitly". + extensions: Extensions API. A map of extension key to extension config. + """ + + class CommonRoomParameters(RequestBodyModel): + """ + Common parameters shared between the sliding window and room subscription APIs. + + Attributes: + required_state: Required state for each room returned. An array of event + type and state key tuples. Elements in this array are ORd together to + produce the final set of state events to return. One unique exception is + when you request all state events via `["*", "*"]`. When used, all state + events are returned by default, and additional entries FILTER OUT the + returned set of state events. These additional entries cannot use `*` + themselves. For example, `["*", "*"], ["m.room.member", + "@alice:example.com"]` will *exclude* every `m.room.member` event + *except* for `@alice:example.com`, and include every other state event. + In addition, `["*", "*"], ["m.space.child", "*"]` is an error, the + `m.space.child` filter is not required as it would have been returned + anyway. + timeline_limit: The maximum number of timeline events to return per response. + (Max 1000 messages) + include_old_rooms: Determines if `predecessor` rooms are included in the + `rooms` response. The user MUST be joined to old rooms for them to show up + in the response. + """ + + class IncludeOldRooms(RequestBodyModel): + timeline_limit: StrictInt + required_state: List[Tuple[StrictStr, StrictStr]] + + required_state: List[Tuple[StrictStr, StrictStr]] + # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 + if TYPE_CHECKING: + timeline_limit: int + else: + timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type] + include_old_rooms: Optional[IncludeOldRooms] = None + + class SlidingSyncList(CommonRoomParameters): + """ + Attributes: + ranges: Sliding window ranges. If this field is missing, no sliding window + is used and all rooms are returned in this list. Integers are + *inclusive*. + sort: How the list should be sorted on the server. The first value is + applied first, then tiebreaks are performed with each subsequent sort + listed. + + FIXME: Furthermore, it's not currently defined how servers should behave + if they encounter a filter or sort operation they do not recognise. If + the server rejects the request with an HTTP 400 then that will break + backwards compatibility with new clients vs old servers. However, the + client would be otherwise unaware that only some of the sort/filter + operations have taken effect. We may need to include a "warnings" + section to indicate which sort/filter operations are unrecognised, + allowing for some form of graceful degradation of service. + -- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions + + slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with + sliding windows). When true, the `ranges` and `sort` fields are ignored. + required_state: Required state for each room returned. An array of event + type and state key tuples. Elements in this array are ORd together to + produce the final set of state events to return. + + One unique exception is when you request all state events via `["*", + "*"]`. When used, all state events are returned by default, and + additional entries FILTER OUT the returned set of state events. These + additional entries cannot use `*` themselves. For example, `["*", "*"], + ["m.room.member", "@alice:example.com"]` will *exclude* every + `m.room.member` event *except* for `@alice:example.com`, and include + every other state event. In addition, `["*", "*"], ["m.space.child", + "*"]` is an error, the `m.space.child` filter is not required as it + would have been returned anyway. + + Room members can be lazily-loaded by using the special `$LAZY` state key + (`["m.room.member", "$LAZY"]`). Typically, when you view a room, you + want to retrieve all state events except for m.room.member events which + you want to lazily load. To get this behaviour, clients can send the + following:: + + { + "required_state": [ + // activate lazy loading + ["m.room.member", "$LAZY"], + // request all state events _except_ for m.room.member + events which are lazily loaded + ["*", "*"] + ] + } + + timeline_limit: The maximum number of timeline events to return per response. + include_old_rooms: Determines if `predecessor` rooms are included in the + `rooms` response. The user MUST be joined to old rooms for them to show up + in the response. + include_heroes: Return a stripped variant of membership events (containing + `user_id` and optionally `avatar_url` and `displayname`) for the users used + to calculate the room name. + filters: Filters to apply to the list before sorting. + bump_event_types: Allowlist of event types which should be considered recent activity + when sorting `by_recency`. By omitting event types from this field, + clients can ensure that uninteresting events (e.g. a profile rename) do + not cause a room to jump to the top of its list(s). Empty or omitted + `bump_event_types` have no effect—all events in a room will be + considered recent activity. + """ + + class Filters(RequestBodyModel): + is_dm: Optional[StrictBool] = None + spaces: Optional[List[StrictStr]] = None + is_encrypted: Optional[StrictBool] = None + is_invite: Optional[StrictBool] = None + room_types: Optional[List[Union[StrictStr, None]]] = None + not_room_types: Optional[List[StrictStr]] = None + room_name_like: Optional[StrictStr] = None + tags: Optional[List[StrictStr]] = None + not_tags: Optional[List[StrictStr]] = None + + # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 + if TYPE_CHECKING: + ranges: Optional[List[Tuple[int, int]]] = None + else: + ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None # type: ignore[valid-type] + sort: Optional[List[StrictStr]] = None + slow_get_all_rooms: Optional[StrictBool] = False + include_heroes: Optional[StrictBool] = False + filters: Optional[Filters] = None + bump_event_types: Optional[List[StrictStr]] = None + + class RoomSubscription(CommonRoomParameters): + pass + + class Extension(RequestBodyModel): + enabled: Optional[StrictBool] = False + lists: Optional[List[StrictStr]] = None + rooms: Optional[List[StrictStr]] = None + + # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 + if TYPE_CHECKING: + lists: Optional[Dict[str, SlidingSyncList]] = None + else: + lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]] = None # type: ignore[valid-type] + room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]] = None + extensions: Optional[Dict[StrictStr, Extension]] = None + + @validator("lists") + def lists_length_check( + cls, value: Optional[Dict[str, SlidingSyncList]] + ) -> Optional[Dict[str, SlidingSyncList]]: + if value is not None: + assert len(value) <= 100, f"Max lists: 100 but saw {len(value)}" + return value diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index fb4d44211e3..61fdf71a272 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -292,6 +292,9 @@ async def on_PUT( try: if event_type == EventTypes.Member: membership = content.get("membership", None) + if not isinstance(membership, str): + raise SynapseError(400, "Invalid membership (must be a string)") + event_id, _ = await self.room_member_handler.update_membership( requester, target=UserID.from_string(state_key), diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 27ea943e31e..385b102b3d2 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -33,6 +33,7 @@ format_event_raw, ) from synapse.handlers.presence import format_user_presence_state +from synapse.handlers.sliding_sync import SlidingSyncConfig, SlidingSyncResult from synapse.handlers.sync import ( ArchivedSyncResult, InvitedSyncResult, @@ -43,9 +44,16 @@ SyncVersion, ) from synapse.http.server import HttpServer -from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string +from synapse.http.servlet import ( + RestServlet, + parse_and_validate_json_object_from_request, + parse_boolean, + parse_integer, + parse_string, +) from synapse.http.site import SynapseRequest from synapse.logging.opentracing import trace_with_opname +from synapse.rest.client.models import SlidingSyncBody from synapse.types import JsonDict, Requester, StreamToken from synapse.util import json_decoder from synapse.util.caches.lrucache import LruCache @@ -735,8 +743,228 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: return 200, response +class SlidingSyncRestServlet(RestServlet): + """ + API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a + subset (sliding window) of rooms, state, and timeline events (just what they need) + in order to bootstrap quickly and subscribe to only what the client cares about. + Because the client can specify what it cares about, we can respond quickly and skip + all of the work we would normally have to do with a sync v2 response. + + Request query parameters: + timeout: How long to wait for new events in milliseconds. + pos: Stream position token when asking for incremental deltas. + + Request body:: + { + // Sliding Window API + "lists": { + "foo-list": { + "ranges": [ [0, 99] ], + "sort": [ "by_notification_level", "by_recency", "by_name" ], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"] + ], + "timeline_limit": 10, + "filters": { + "is_dm": true + }, + "bump_event_types": [ "m.room.message", "m.room.encrypted" ], + } + }, + // Room Subscriptions API + "room_subscriptions": { + "!sub1:bar": { + "required_state": [ ["*","*"] ], + "timeline_limit": 10, + "include_old_rooms": { + "timeline_limit": 1, + "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ], + } + } + }, + // Extensions API + "extensions": {} + } + + Response JSON:: + { + "next_pos": "s58_224_0_13_10_1_1_16_0_1", + "lists": { + "foo-list": { + "count": 1337, + "ops": [{ + "op": "SYNC", + "range": [0, 99], + "room_ids": [ + "!foo:bar", + // ... 99 more room IDs + ] + }] + } + }, + // Aggregated rooms from lists and room subscriptions + "rooms": { + // Room from room subscription + "!sub1:bar": { + "name": "Alice and Bob", + "avatar": "mxc://...", + "initial": true, + "required_state": [ + {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}}, + {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}}, + {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}}, + {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}} + ], + "timeline": [ + {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}}, + {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}}, + {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}}, + {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}}, + ], + "prev_batch": "t111_222_333", + "joined_count": 41, + "invited_count": 1, + "notification_count": 1, + "highlight_count": 0 + }, + // rooms from list + "!foo:bar": { + "name": "The calculated room name", + "avatar": "mxc://...", + "initial": true, + "required_state": [ + {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}}, + {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}}, + {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!foo:example.com", "content":{"via":["example.com"]}}, + {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!bar:example.com", "content":{"via":["example.com"]}}, + {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!baz:example.com", "content":{"via":["example.com"]}} + ], + "timeline": [ + {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"C"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"D"}}, + ], + "prev_batch": "t111_222_333", + "joined_count": 4, + "invited_count": 0, + "notification_count": 54, + "highlight_count": 3 + }, + // ... 99 more items + }, + "extensions": {} + } + """ + + PATTERNS = client_patterns( + "/org.matrix.msc3575/sync$", releases=[], v1=False, unstable=True + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastores().main + self.filtering = hs.get_filtering() + self.sliding_sync_handler = hs.get_sliding_sync_handler() + + # TODO: Update this to `on_GET` once we figure out how we want to handle params + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + user = requester.user + device_id = requester.device_id + + timeout = parse_integer(request, "timeout", default=0) + # Position in the stream + from_token_string = parse_string(request, "pos") + + from_token = None + if from_token_string is not None: + from_token = await StreamToken.from_string(self.store, from_token_string) + + # TODO: We currently don't know whether we're going to use sticky params or + # maybe some filters like sync v2 where they are built up once and referenced + # by filter ID. For now, we will just prototype with always passing everything + # in. + body = parse_and_validate_json_object_from_request(request, SlidingSyncBody) + logger.info("Sliding sync request: %r", body) + + sync_config = SlidingSyncConfig( + user=user, + device_id=device_id, + # FIXME: Currently, we're just manually copying the fields from the + # `SlidingSyncBody` into the config. How can we gurantee into the future + # that we don't forget any? I would like something more structured like + # `copy_attributes(from=body, to=config)` + lists=body.lists, + room_subscriptions=body.room_subscriptions, + extensions=body.extensions, + ) + + sliding_sync_results = await self.sliding_sync_handler.wait_for_sync_for_user( + requester, + sync_config, + from_token, + timeout, + ) + + # The client may have disconnected by now; don't bother to serialize the + # response if so. + if request._disconnected: + logger.info("Client has disconnected; not serializing response.") + return 200, {} + + response_content = await self.encode_response(sliding_sync_results) + + return 200, response_content + + # TODO: Is there a better way to encode things? + async def encode_response( + self, + sliding_sync_result: SlidingSyncResult, + ) -> JsonDict: + response: JsonDict = defaultdict(dict) + + response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store) + serialized_lists = self.encode_lists(sliding_sync_result.lists) + if serialized_lists: + response["lists"] = serialized_lists + response["rooms"] = {} # TODO: sliding_sync_result.rooms + response["extensions"] = {} # TODO: sliding_sync_result.extensions + + return response + + def encode_lists( + self, lists: Dict[str, SlidingSyncResult.SlidingWindowList] + ) -> JsonDict: + def encode_operation( + operation: SlidingSyncResult.SlidingWindowList.Operation, + ) -> JsonDict: + return { + "op": operation.op.value, + "range": operation.range, + "room_ids": operation.room_ids, + } + + serialized_lists = {} + for list_key, list_result in lists.items(): + serialized_lists[list_key] = { + "count": list_result.count, + "ops": [encode_operation(op) for op in list_result.ops], + } + + return serialized_lists + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: SyncRestServlet(hs).register(http_server) if hs.config.experimental.msc3575_enabled: + SlidingSyncRestServlet(hs).register(http_server) SlidingSyncE2eeRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index 95e319d2e66..ae927c3904c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -109,6 +109,7 @@ from synapse.handlers.search import SearchHandler from synapse.handlers.send_email import SendEmailHandler from synapse.handlers.set_password import SetPasswordHandler +from synapse.handlers.sliding_sync import SlidingSyncHandler from synapse.handlers.sso import SsoHandler from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler @@ -554,6 +555,9 @@ def get_jwt_handler(self) -> "JwtHandler": def get_sync_handler(self) -> SyncHandler: return SyncHandler(self) + def get_sliding_sync_handler(self) -> SlidingSyncHandler: + return SlidingSyncHandler(self) + @cache_in_self def get_room_list_handler(self) -> RoomListHandler: return RoomListHandler(self) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py new file mode 100644 index 00000000000..5c27474b966 --- /dev/null +++ b/tests/handlers/test_sliding_sync.py @@ -0,0 +1,1118 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# +import logging +from unittest.mock import patch + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.room_versions import RoomVersions +from synapse.rest import admin +from synapse.rest.client import knock, login, room +from synapse.server import HomeServer +from synapse.storage.util.id_generators import MultiWriterIdGenerator +from synapse.types import JsonDict, UserID +from synapse.util import Clock + +from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.unittest import HomeserverTestCase + +logger = logging.getLogger(__name__) + + +class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): + """ + Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it returns + the correct list of rooms IDs. + """ + + servlets = [ + admin.register_servlets, + knock.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def test_no_rooms(self) -> None: + """ + Test when the user has never joined any rooms before + """ + user1_id = self.register_user("user1", "pass") + # user1_tok = self.login(user1_id, "pass") + + now_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=now_token, + to_token=now_token, + ) + ) + + self.assertEqual(room_id_results, set()) + + def test_get_newly_joined_room(self) -> None: + """ + Test that rooms that the user has newly_joined show up. newly_joined is when you + join after the `from_token` and <= `to_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room_token = self.event_sources.get_current_token() + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room_token, + to_token=after_room_token, + ) + ) + + self.assertEqual(room_id_results, {room_id}) + + def test_get_already_joined_room(self) -> None: + """ + Test that rooms that the user is already joined show up. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room_token, + to_token=after_room_token, + ) + ) + + self.assertEqual(room_id_results, {room_id}) + + def test_get_invited_banned_knocked_room(self) -> None: + """ + Test that rooms that the user is invited to, banned from, and knocked on show + up. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room_token = self.event_sources.get_current_token() + + # Setup the invited room (user2 invites user1 to the room) + invited_room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.invite(invited_room_id, targ=user1_id, tok=user2_tok) + + # Setup the ban room (user2 bans user1 from the room) + ban_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(ban_room_id, user1_id, tok=user1_tok) + self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok) + + # Setup the knock room (user1 knocks on the room) + knock_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, room_version=RoomVersions.V7.identifier + ) + self.helper.send_state( + knock_room_id, + EventTypes.JoinRules, + {"join_rule": JoinRules.KNOCK}, + tok=user2_tok, + ) + # User1 knocks on the room + channel = self.make_request( + "POST", + "/_matrix/client/r0/knock/%s" % (knock_room_id,), + b"{}", + user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + after_room_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room_token, + to_token=after_room_token, + ) + ) + + # Ensure that the invited, ban, and knock rooms show up + self.assertEqual( + room_id_results, + { + invited_room_id, + ban_room_id, + knock_room_id, + }, + ) + + def test_get_kicked_room(self) -> None: + """ + Test that a room that the user was kicked from still shows up. When the user + comes back to their client, they should see that they were kicked. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Setup the kick room (user2 kicks user1 from the room) + kick_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(kick_room_id, user1_id, tok=user1_tok) + # Kick user1 from the room + self.helper.change_membership( + room=kick_room_id, + src=user2_id, + targ=user1_id, + tok=user2_tok, + membership=Membership.LEAVE, + extra_data={ + "reason": "Bad manners", + }, + ) + + after_kick_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_kick_token, + to_token=after_kick_token, + ) + ) + + # The kicked room should show up + self.assertEqual(room_id_results, {kick_room_id}) + + def test_forgotten_rooms(self) -> None: + """ + Forgotten rooms do not show up even if we forget after the from/to range. + + Ideally, we would be able to track when the `/forget` happens and apply it + accordingly in the token range but the forgotten flag is only an extra bool in + the `room_memberships` table. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Setup a normal room that we leave. This won't show up in the sync response + # because we left it before our token but is good to check anyway. + leave_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(leave_room_id, user1_id, tok=user1_tok) + self.helper.leave(leave_room_id, user1_id, tok=user1_tok) + + # Setup the ban room (user2 bans user1 from the room) + ban_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(ban_room_id, user1_id, tok=user1_tok) + self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok) + + # Setup the kick room (user2 kicks user1 from the room) + kick_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(kick_room_id, user1_id, tok=user1_tok) + # Kick user1 from the room + self.helper.change_membership( + room=kick_room_id, + src=user2_id, + targ=user1_id, + tok=user2_tok, + membership=Membership.LEAVE, + extra_data={ + "reason": "Bad manners", + }, + ) + + before_room_forgets = self.event_sources.get_current_token() + + # Forget the room after we already have our tokens. This doesn't change + # the membership event itself but will mark it internally in Synapse + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{leave_room_id}/forget", + content={}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{ban_room_id}/forget", + content={}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{kick_room_id}/forget", + content={}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room_forgets, + to_token=before_room_forgets, + ) + ) + + # We shouldn't see the room because it was forgotten + self.assertEqual(room_id_results, set()) + + def test_only_newly_left_rooms_show_up(self) -> None: + """ + Test that newly_left rooms still show up in the sync response but rooms that + were left before the `from_token` don't show up. See condition "2)" comments in + the `get_sync_room_ids_for_user` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Leave before we calculate the `from_token` + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave during the from_token/to_token range (newly_left) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room2_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room2_token, + ) + ) + + # Only the newly_left room should show up + self.assertEqual(room_id_results, {room_id2}) + + def test_no_joins_after_to_token(self) -> None: + """ + Rooms we join after the `to_token` should *not* show up. See condition "1b)" + comments in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Room join after after our `to_token` shouldn't show up + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + _ = room_id2 + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + self.assertEqual(room_id_results, {room_id1}) + + def test_join_during_range_and_left_room_after_to_token(self) -> None: + """ + Room still shows up if we left the room but were joined during the + from_token/to_token. See condition "1a)" comments in the + `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave the room after we already have our tokens + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # We should still see the room because we were joined during the + # from_token/to_token time period. + self.assertEqual(room_id_results, {room_id1}) + + def test_join_before_range_and_left_room_after_to_token(self) -> None: + """ + Room still shows up if we left the room but were joined before the `from_token` + so it should show up. See condition "1a)" comments in the + `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave the room after we already have our tokens + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # We should still see the room because we were joined before the `from_token` + self.assertEqual(room_id_results, {room_id1}) + + def test_kicked_before_range_and_left_after_to_token(self) -> None: + """ + Room still shows up if we left the room but were kicked before the `from_token` + so it should show up. See condition "1a)" comments in the + `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Setup the kick room (user2 kicks user1 from the room) + kick_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(kick_room_id, user1_id, tok=user1_tok) + # Kick user1 from the room + self.helper.change_membership( + room=kick_room_id, + src=user2_id, + targ=user1_id, + tok=user2_tok, + membership=Membership.LEAVE, + extra_data={ + "reason": "Bad manners", + }, + ) + + after_kick_token = self.event_sources.get_current_token() + + # Leave the room after we already have our tokens + # + # We have to join before we can leave (leave -> leave isn't a valid transition + # or at least it doesn't work in Synapse, 403 forbidden) + self.helper.join(kick_room_id, user1_id, tok=user1_tok) + self.helper.leave(kick_room_id, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_kick_token, + to_token=after_kick_token, + ) + ) + + # We shouldn't see the room because it was forgotten + self.assertEqual(room_id_results, {kick_room_id}) + + def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None: + """ + Newly left room should show up. But we're also testing that joining and leaving + after the `to_token` doesn't mess with the results. See condition "2)" and "1a)" + comments in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join and leave the room during the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join and leave the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should still show up because it's newly_left during the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_newly_left_during_range_and_join_after_to_token(self) -> None: + """ + Newly left room should show up. But we're also testing that joining after the + `to_token` doesn't mess with the results. See condition "2)" and "1b)" comments + in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join and leave the room during the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should still show up because it's newly_left during the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_no_from_token(self) -> None: + """ + Test that if we don't provide a `from_token`, we get all the rooms that we we're + joined to up to the `to_token`. + + Providing `from_token` only really has the effect that it adds `newly_left` + rooms to the response. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Join room1 + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Join and leave the room2 before the `to_token` + self.helper.join(room_id2, user1_id, tok=user1_tok) + self.helper.leave(room_id2, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join the room2 after we already have our tokens + self.helper.join(room_id2, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=None, + to_token=after_room1_token, + ) + ) + + # Only rooms we were joined to before the `to_token` should show up + self.assertEqual(room_id_results, {room_id1}) + + def test_from_token_ahead_of_to_token(self) -> None: + """ + Test when the provided `from_token` comes after the `to_token`. We should + basically expect the same result as having no `from_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Join room1 before `before_room_token` + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Join and leave the room2 before `before_room_token` + self.helper.join(room_id2, user1_id, tok=user1_tok) + self.helper.leave(room_id2, user1_id, tok=user1_tok) + + # Note: These are purposely swapped. The `from_token` should come after + # the `to_token` in this test + to_token = self.event_sources.get_current_token() + + # Join room2 after `before_room_token` + self.helper.join(room_id2, user1_id, tok=user1_tok) + + # -------- + + # Join room3 after `before_room_token` + self.helper.join(room_id3, user1_id, tok=user1_tok) + + # Join and leave the room4 after `before_room_token` + self.helper.join(room_id4, user1_id, tok=user1_tok) + self.helper.leave(room_id4, user1_id, tok=user1_tok) + + # Note: These are purposely swapped. The `from_token` should come after the + # `to_token` in this test + from_token = self.event_sources.get_current_token() + + # Join the room4 after we already have our tokens + self.helper.join(room_id4, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=from_token, + to_token=to_token, + ) + ) + + # Only rooms we were joined to before the `to_token` should show up + # + # There won't be any newly_left rooms because the `from_token` is ahead of the + # `to_token` and that range will give no membership changes to check. + self.assertEqual(room_id_results, {room_id1}) + + def test_leave_before_range_and_join_leave_after_to_token(self) -> None: + """ + Old left room shouldn't show up. But we're also testing that joining and leaving + after the `to_token` doesn't mess with the results. See condition "1a)" comments + in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join and leave the room before the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join and leave the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room shouldn't show up because it was left before the `from_token` + self.assertEqual(room_id_results, set()) + + def test_leave_before_range_and_join_after_to_token(self) -> None: + """ + Old left room shouldn't show up. But we're also testing that joining after the + `to_token` doesn't mess with the results. See condition "1b)" comments in the + `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join and leave the room before the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room shouldn't show up because it was left before the `from_token` + self.assertEqual(room_id_results, set()) + + def test_join_leave_multiple_times_during_range_and_after_to_token( + self, + ) -> None: + """ + Join and leave multiple times shouldn't affect rooms from showing up. It just + matters that we were joined or newly_left in the from/to range. But we're also + testing that joining and leaving after the `to_token` doesn't mess with the + results. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join, leave, join back to the room before the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave and Join the room multiple times after we already have our tokens + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because it was newly_left and joined during the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_join_leave_multiple_times_before_range_and_after_to_token( + self, + ) -> None: + """ + Join and leave multiple times before the from/to range shouldn't affect rooms + from showing up. It just matters that we were joined or newly_left in the + from/to range. But we're also testing that joining and leaving after the + `to_token` doesn't mess with the results. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join, leave, join back to the room before the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave and Join the room multiple times after we already have our tokens + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because we were joined before the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_invite_before_range_and_join_leave_after_to_token( + self, + ) -> None: + """ + Make it look like we joined after the token range but we were invited before the + from/to range so the room should still show up. See condition "1a)" comments in + the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Invited to the room before the token + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join and leave the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because we were invited before the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_multiple_rooms_are_not_confused( + self, + ) -> None: + """ + Test that multiple rooms are not confused as we fixup the list. This test is + spawning from a real world bug in the code where I was accidentally using + `event.room_id` in one of the fix-up loops but the `event` being referenced was + actually from a different loop. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Invited and left the room before the token + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + # Invited to room2 + self.helper.invite(room_id2, src=user2_id, targ=user1_id, tok=user2_tok) + + before_room3_token = self.event_sources.get_current_token() + + # Invited and left room3 during the from/to range + room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + self.helper.invite(room_id3, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.leave(room_id3, user1_id, tok=user1_tok) + + after_room3_token = self.event_sources.get_current_token() + + # Join and leave the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + # Leave room2 + self.helper.leave(room_id2, user1_id, tok=user1_tok) + # Leave room3 + self.helper.leave(room_id3, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room3_token, + to_token=after_room3_token, + ) + ) + + self.assertEqual( + room_id_results, + { + # `room_id1` shouldn't show up because we left before the from/to range + # + # Room should show up because we were invited before the from/to range + room_id2, + # Room should show up because it was newly_left during the from/to range + room_id3, + }, + ) + + +class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase): + """ + Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it works with + sharded event stream_writers enabled + """ + + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def default_config(self) -> dict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + + # Enable shared event stream_writers + config["stream_writers"] = {"events": ["worker1", "worker2", "worker3"]} + config["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, + "worker1": {"host": "testserv", "port": 1001}, + "worker2": {"host": "testserv", "port": 1002}, + "worker3": {"host": "testserv", "port": 1003}, + } + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def _create_room(self, room_id: str, user_id: str, tok: str) -> None: + """ + Create a room with a specific room_id. We use this so that that we have a + consistent room_id across test runs that hashes to the same value and will be + sharded to a known worker in the tests. + """ + + # We control the room ID generation by patching out the + # `_generate_room_id` method + with patch( + "synapse.handlers.room.RoomCreationHandler._generate_room_id" + ) as mock: + mock.side_effect = lambda: room_id + self.helper.create_room_as(user_id, tok=tok) + + def test_sharded_event_persisters(self) -> None: + """ + This test should catch bugs that would come from flawed stream position + (`stream_ordering`) comparisons or making `RoomStreamToken`'s naively. To + compare event positions properly, you need to consider both the `instance_name` + and `stream_ordering` together. + + The test creates three event persister workers and a room that is sharded to + each worker. On worker2, we make the event stream position stuck so that it lags + behind the other workers and we start getting `RoomStreamToken` that have an + `instance_map` component (i.e. q`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). + + We then send some events to advance the stream positions of worker1 and worker3 + but worker2 is lagging behind because it's stuck. We are specifically testing + that `get_sync_room_ids_for_user(from_token=xxx, to_token=xxx)` should work + correctly in these adverse conditions. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "worker1"}, + ) + + worker_hs2 = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "worker2"}, + ) + + self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "worker3"}, + ) + + # Specially crafted room IDs that get persisted on different workers. + # + # Sharded to worker1 + room_id1 = "!fooo:test" + # Sharded to worker2 + room_id2 = "!bar:test" + # Sharded to worker3 + room_id3 = "!quux:test" + + # Create rooms on the different workers. + self._create_room(room_id1, user2_id, user2_tok) + self._create_room(room_id2, user2_id, user2_tok) + self._create_room(room_id3, user2_id, user2_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok) + # Leave room2 + self.helper.leave(room_id2, user1_id, tok=user1_tok) + join_response3 = self.helper.join(room_id3, user1_id, tok=user1_tok) + # Leave room3 + self.helper.leave(room_id3, user1_id, tok=user1_tok) + + # Ensure that the events were sharded to different workers. + pos1 = self.get_success( + self.store.get_position_for_event(join_response1["event_id"]) + ) + self.assertEqual(pos1.instance_name, "worker1") + pos2 = self.get_success( + self.store.get_position_for_event(join_response2["event_id"]) + ) + self.assertEqual(pos2.instance_name, "worker2") + pos3 = self.get_success( + self.store.get_position_for_event(join_response3["event_id"]) + ) + self.assertEqual(pos3.instance_name, "worker3") + + before_stuck_activity_token = self.event_sources.get_current_token() + + # We now gut wrench into the events stream `MultiWriterIdGenerator` on worker2 to + # mimic it getting stuck persisting an event. This ensures that when we send an + # event on worker1/worker3 we end up in a state where worker2 events stream + # position lags that on worker1/worker3, resulting in a RoomStreamToken with a + # non-empty `instance_map` component. + # + # Worker2's event stream position will not advance until we call `__aexit__` + # again. + worker_store2 = worker_hs2.get_datastores().main + assert isinstance(worker_store2._stream_id_gen, MultiWriterIdGenerator) + actx = worker_store2._stream_id_gen.get_next() + self.get_success(actx.__aenter__()) + + # For room_id1/worker1: leave and join the room to advance the stream position + # and generate membership changes. + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + # For room_id2/worker2: which is currently stuck, join the room. + join_on_worker2_response = self.helper.join(room_id2, user1_id, tok=user1_tok) + # For room_id3/worker3: leave and join the room to advance the stream position + # and generate membership changes. + self.helper.leave(room_id3, user1_id, tok=user1_tok) + join_on_worker3_response = self.helper.join(room_id3, user1_id, tok=user1_tok) + + # Get a token while things are stuck after our activity + stuck_activity_token = self.event_sources.get_current_token() + logger.info("stuck_activity_token %s", stuck_activity_token) + # Let's make sure we're working with a token that has an `instance_map` + self.assertNotEqual(len(stuck_activity_token.room_key.instance_map), 0) + + # Just double check that the join event on worker2 (that is stuck) happened + # after the position recorded for worker2 in the token but before the max + # position in the token. This is crucial for the behavior we're trying to test. + join_on_worker2_pos = self.get_success( + self.store.get_position_for_event(join_on_worker2_response["event_id"]) + ) + logger.info("join_on_worker2_pos %s", join_on_worker2_pos) + # Ensure the join technially came after our token + self.assertGreater( + join_on_worker2_pos.stream, + stuck_activity_token.room_key.get_stream_pos_for_instance("worker2"), + ) + # But less than the max stream position of some other worker + self.assertLess( + join_on_worker2_pos.stream, + # max + stuck_activity_token.room_key.get_max_stream_pos(), + ) + + # Just double check that the join event on worker3 happened after the min stream + # value in the token but still within the position recorded for worker3. This is + # crucial for the behavior we're trying to test. + join_on_worker3_pos = self.get_success( + self.store.get_position_for_event(join_on_worker3_response["event_id"]) + ) + logger.info("join_on_worker3_pos %s", join_on_worker3_pos) + # Ensure the join came after the min but still encapsulated by the token + self.assertGreaterEqual( + join_on_worker3_pos.stream, + # min + stuck_activity_token.room_key.stream, + ) + self.assertLessEqual( + join_on_worker3_pos.stream, + stuck_activity_token.room_key.get_stream_pos_for_instance("worker3"), + ) + + # We finish the fake persisting an event we started above and advance worker2's + # event stream position (unstuck worker2). + self.get_success(actx.__aexit__(None, None, None)) + + # The function under test + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_stuck_activity_token, + to_token=stuck_activity_token, + ) + ) + + self.assertEqual( + room_id_results, + { + room_id1, + # room_id2 shouldn't show up because we left before the from/to range + # and the join event during the range happened while worker2 was stuck. + # This means that from the perspective of the master, where the + # `stuck_activity_token` is generated, the stream position for worker2 + # wasn't advanced to the join yet. Looking at the `instance_map`, the + # join technically comes after `stuck_activity_token``. + # + # room_id2, + room_id3, + }, + ) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index daeb1d3ddd9..a20a3fb40d2 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -34,7 +34,7 @@ ) from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType from synapse.util import Clock from tests import unittest @@ -1204,3 +1204,135 @@ def test_incremental_sync(self) -> None: self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"]) self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"]) + + +class SlidingSyncTestCase(unittest.HomeserverTestCase): + """ + Tests regarding MSC3575 Sliding Sync `/sync` endpoint. + """ + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + sync.register_servlets, + devices.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync" + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def test_sync_list(self) -> None: + """ + Test that room IDs show up in the Sliding Sync lists + """ + alice_user_id = self.register_user("alice", "correcthorse") + alice_access_token = self.login(alice_user_id, "correcthorse") + + room_id = self.helper.create_room_as( + alice_user_id, tok=alice_access_token, is_public=True + ) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "sort": ["by_notification_level", "by_recency", "by_name"], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, + } + } + }, + access_token=alice_access_token, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure it has the foo-list we requested + self.assertListEqual( + list(channel.json_body["lists"].keys()), + ["foo-list"], + channel.json_body["lists"].keys(), + ) + + # Make sure the list includes the room we are joined to + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 99], + "room_ids": [room_id], + } + ], + channel.json_body["lists"]["foo-list"], + ) + + def test_wait_for_sync_token(self) -> None: + """ + Test that worker will wait until it catches up to the given token + """ + alice_user_id = self.register_user("alice", "correcthorse") + alice_access_token = self.login(alice_user_id, "correcthorse") + + # Create a future token that will cause us to wait. Since we never send a new + # event to reach that future stream_ordering, the worker will wait until the + # full timeout. + current_token = self.event_sources.get_current_token() + future_position_token = current_token.copy_and_replace( + StreamKeyType.ROOM, + RoomStreamToken(stream=current_token.room_key.stream + 1), + ) + + future_position_token_serialized = self.get_success( + future_position_token.to_string(self.store) + ) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + f"?pos={future_position_token_serialized}", + { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "sort": ["by_notification_level", "by_recency", "by_name"], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, + } + } + }, + access_token=alice_access_token, + await_result=False, + ) + # Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)` + # timeout + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=9900) + channel.await_result(timeout_ms=200) + self.assertEqual(channel.code, 200, channel.json_body) + + # We expect the `next_pos` in the result to be the same as what we requested + # with because we weren't able to find anything new yet. + self.assertEqual( + channel.json_body["next_pos"], future_position_token_serialized + ) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 7362bde7ab8..f0ba40a1f13 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -330,9 +330,12 @@ def change_membership( data, ) - assert channel.code == expect_code, "Expected: %d, got: %d, resp: %r" % ( + assert ( + channel.code == expect_code + ), "Expected: %d, got: %d, PUT %s -> resp: %r" % ( expect_code, channel.code, + path, channel.result["body"], )