From 76ce7a903413e56e2c4ca46e4c61c273a872fc95 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jun 2024 15:19:21 -0500 Subject: [PATCH 01/21] Add `is_dm` filtering to Sliding Sync `/sync` Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync --- changelog.d/17244.feature | 1 + synapse/handlers/sliding_sync.py | 85 ++++++++++++++- tests/handlers/test_sliding_sync.py | 156 +++++++++++++++++++++++++++- tests/rest/client/test_sync.py | 127 ++++++++++++++++++++++ 4 files changed, 363 insertions(+), 6 deletions(-) create mode 100644 changelog.d/17244.feature diff --git a/changelog.d/17244.feature b/changelog.d/17244.feature new file mode 100644 index 00000000000..5c16342c110 --- /dev/null +++ b/changelog.d/17244.feature @@ -0,0 +1 @@ +Add `is_dm` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 34ae21ba509..08c6aadff6f 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -31,7 +31,7 @@ else: from pydantic import Extra -from synapse.api.constants import Membership +from synapse.api.constants import AccountDataTypes, Membership from synapse.events import EventBase from synapse.rest.client.models import SlidingSyncBody from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID @@ -332,11 +332,15 @@ async def current_sync_for_user( lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} if sync_config.lists: for list_key, list_config in sync_config.lists.items(): - # TODO: Apply filters - # - # TODO: Exclude partially stated rooms unless the `required_state` has - # `["m.room.member", "$LAZY"]` + # Apply filters filtered_room_ids = room_id_set + if list_config.filters is not None: + # TODO: To be absolutely correct, this could also take into account + # from/to tokens but some of the streams don't support looking back + # in time (like global account_data). + filtered_room_ids = await self.filter_rooms( + sync_config.user, room_id_set, list_config.filters + ) # TODO: Apply sorts sorted_room_ids = sorted(filtered_room_ids) @@ -608,3 +612,74 @@ async def get_sync_room_ids_for_user( sync_room_id_set.add(room_id) return sync_room_id_set + + async def filter_rooms( + self, + user: UserID, + room_id_set: AbstractSet[str], + filters: SlidingSyncConfig.SlidingSyncList.Filters, + ) -> AbstractSet[str]: + """ + Filter rooms based on the sync request. + """ + user_id = user.to_string() + + # TODO: Apply filters + # + # TODO: Exclude partially stated rooms unless the `required_state` has + # `["m.room.member", "$LAZY"]` + + filtered_room_id_set = set(room_id_set) + + # Filter for Direct-Message (DM) rooms + if filters.is_dm is not None: + # We're using global account data (`m.direct`) instead of checking for + # `is_direct` on membership events because that property only appears for + # the invitee membership event (doesn't show up for the inviter). Account + # data is set by the client so it needs to be scrutinized. + dm_map = await self.store.get_global_account_data_by_type_for_user( + user_id, AccountDataTypes.DIRECT + ) + logger.warn("dm_map: %s", dm_map) + # Flatten out the map + dm_room_id_set = set() + if dm_map: + for room_ids in dm_map.values(): + # Account data should be a list of room IDs. Ignore anything else + if isinstance(room_ids, list): + for room_id in room_ids: + if isinstance(room_id, str): + dm_room_id_set.add(room_id) + + if filters.is_dm: + # Only DM rooms please + filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set) + else: + # Only non-DM rooms please + filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set) + + if filters.spaces: + raise NotImplementedError() + + if filters.is_encrypted: + raise NotImplementedError() + + if filters.is_invite: + raise NotImplementedError() + + if filters.room_types: + raise NotImplementedError() + + if filters.not_room_types: + raise NotImplementedError() + + if filters.room_name_like: + raise NotImplementedError() + + if filters.tags: + raise NotImplementedError() + + if filters.not_tags: + raise NotImplementedError() + + return filtered_room_id_set diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 5c27474b966..220683b9d65 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -22,7 +22,7 @@ from twisted.test.proto_helpers import MemoryReactor -from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership from synapse.api.room_versions import RoomVersions from synapse.rest import admin from synapse.rest.client import knock, login, room @@ -1116,3 +1116,157 @@ def test_sharded_event_persisters(self) -> None: room_id3, }, ) + + +class FilterRoomsTestCase(HomeserverTestCase): + """ + Tests Sliding Sync handler `filter_rooms()` to make sure it includes/excludes rooms + correctly. + """ + + servlets = [ + admin.register_servlets, + knock.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + + def _create_dm_room( + self, + inviter_user_id: str, + inviter_tok: str, + invitee_user_id: str, + invitee_tok: str, + ) -> str: + """ + Helper to create a DM room as the "inviter" and invite the "invitee" user to the room. The + "invitee" user also will join the room. The `m.direct` account data will be set + for both users. + """ + + # Create a room and send an invite the other user + room_id = self.helper.create_room_as( + inviter_user_id, + is_public=False, + tok=inviter_tok, + ) + self.helper.invite( + room_id, + src=inviter_user_id, + targ=invitee_user_id, + tok=inviter_tok, + extra_data={"is_direct": True}, + ) + # Person that was invited joins the room + self.helper.join(room_id, invitee_user_id, tok=invitee_tok) + + # Mimic the client setting the room as a direct message in the global account + # data + self.get_success( + self.store.add_account_data_for_user( + invitee_user_id, + AccountDataTypes.DIRECT, + {inviter_user_id: [room_id]}, + ) + ) + self.get_success( + self.store.add_account_data_for_user( + inviter_user_id, + AccountDataTypes.DIRECT, + {invitee_user_id: [room_id]}, + ) + ) + + return room_id + + def test_filter_dm_rooms(self) -> None: + """ + Test filter for DM rooms + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create a normal room + room_id = self.helper.create_room_as( + user1_id, + is_public=False, + tok=user1_tok, + ) + + # Create a DM room + dm_room_id = self._create_dm_room( + inviter_user_id=user1_id, + inviter_tok=user1_tok, + invitee_user_id=user2_id, + invitee_tok=user2_tok, + ) + + # TODO: Better way to avoid the circular import? (see + # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) + from synapse.handlers.sliding_sync import SlidingSyncConfig + + filters = SlidingSyncConfig.SlidingSyncList.Filters( + is_dm=True, + ) + + # Try filtering the rooms + filtered_room_ids = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), {room_id, dm_room_id}, filters + ) + ) + + self.assertEqual(filtered_room_ids, {dm_room_id}) + + def test_filter_non_dm_rooms(self) -> None: + """ + Test filter for non-DM rooms + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create a normal room + room_id = self.helper.create_room_as( + user1_id, + is_public=False, + tok=user1_tok, + ) + + # Create a DM room + dm_room_id = self._create_dm_room( + inviter_user_id=user1_id, + inviter_tok=user1_tok, + invitee_user_id=user2_id, + invitee_tok=user2_tok, + ) + + # TODO: Better way to avoid the circular import? (see + # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) + from synapse.handlers.sliding_sync import SlidingSyncConfig + + filters = SlidingSyncConfig.SlidingSyncList.Filters( + is_dm=False, + ) + + # Try filtering the rooms + filtered_room_ids = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), {room_id, dm_room_id}, filters + ) + ) + + self.assertEqual(filtered_room_ids, {room_id}) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index a20a3fb40d2..40870b2cfe7 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -27,6 +27,7 @@ import synapse.rest.admin from synapse.api.constants import ( + AccountDataTypes, EventContentFields, EventTypes, ReceiptTypes, @@ -1226,10 +1227,59 @@ def default_config(self) -> JsonDict: return config def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync" self.store = hs.get_datastores().main self.event_sources = hs.get_event_sources() + def _create_dm_room( + self, + inviter_user_id: str, + inviter_tok: str, + invitee_user_id: str, + invitee_tok: str, + ) -> str: + """ + Helper to create a DM room as the "inviter" and invite the "invitee" user to the + room. The "invitee" user also will join the room. The `m.direct` account data + will be set for both users. + """ + + # Create a room and send an invite the other user + room_id = self.helper.create_room_as( + inviter_user_id, + is_public=False, + tok=inviter_tok, + ) + self.helper.invite( + room_id, + src=inviter_user_id, + targ=invitee_user_id, + tok=inviter_tok, + extra_data={"is_direct": True}, + ) + # Person that was invited joins the room + self.helper.join(room_id, invitee_user_id, tok=invitee_tok) + + # Mimic the client setting the room as a direct message in the global account + # data + self.get_success( + self.store.add_account_data_for_user( + invitee_user_id, + AccountDataTypes.DIRECT, + {inviter_user_id: [room_id]}, + ) + ) + self.get_success( + self.store.add_account_data_for_user( + inviter_user_id, + AccountDataTypes.DIRECT, + {invitee_user_id: [room_id]}, + ) + ) + + return room_id + def test_sync_list(self) -> None: """ Test that room IDs show up in the Sliding Sync lists @@ -1336,3 +1386,80 @@ def test_wait_for_sync_token(self) -> None: self.assertEqual( channel.json_body["next_pos"], future_position_token_serialized ) + + def test_filter_list(self) -> None: + """ + Test that filters apply to lists + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create a DM room + dm_room_id = self._create_dm_room( + inviter_user_id=user1_id, + inviter_tok=user1_tok, + invitee_user_id=user2_id, + invitee_tok=user2_tok, + ) + + # Create a normal room + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "dms": { + "ranges": [[0, 99]], + "sort": ["by_recency"], + "required_state": [], + "timeline_limit": 1, + "filters": {"is_dm": True}, + }, + "foo-list": { + "ranges": [[0, 99]], + "sort": ["by_recency"], + "required_state": [], + "timeline_limit": 1, + "filters": {"is_dm": False}, + }, + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure it has the foo-list we requested + self.assertListEqual( + list(channel.json_body["lists"].keys()), + ["dms", "foo-list"], + channel.json_body["lists"].keys(), + ) + + # Make sure the list includes the room we are joined to + self.assertListEqual( + list(channel.json_body["lists"]["dms"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 99], + "room_ids": [dm_room_id], + } + ], + list(channel.json_body["lists"]["dms"]), + ) + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 99], + "room_ids": [room_id], + } + ], + list(channel.json_body["lists"]["foo-list"]), + ) From 360f05cc6ebc8e49f4d71b4645dee5f6f349be3d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jun 2024 15:21:06 -0500 Subject: [PATCH 02/21] Move changelog --- changelog.d/{17244.feature => 17277.feature} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{17244.feature => 17277.feature} (100%) diff --git a/changelog.d/17244.feature b/changelog.d/17277.feature similarity index 100% rename from changelog.d/17244.feature rename to changelog.d/17277.feature From d8e2b1d6d52dabee7ccb18731a3ceb9904eec298 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jun 2024 15:30:43 -0500 Subject: [PATCH 03/21] Add docstring --- synapse/rest/client/models.py | 47 +++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py index 5433ed91efc..129fca22037 100644 --- a/synapse/rest/client/models.py +++ b/synapse/rest/client/models.py @@ -238,6 +238,53 @@ class SlidingSyncList(CommonRoomParameters): """ class Filters(RequestBodyModel): + """ + All fields are applied with AND operators, hence if `is_dm: True` and + `is_encrypted: True` then only Encrypted DM rooms will be returned. The + absence of fields implies no filter on that criteria: it does NOT imply + `False`. These fields may be expanded through use of extensions. + + Attributes: + is_dm: Flag which only returns rooms present (or not) in the DM section + of account data. If unset, both DM rooms and non-DM rooms are returned. + If False, only non-DM rooms are returned. If True, only DM rooms are + returned. + spaces: Filter the room based on the space they belong to according to + `m.space.child` state events. If multiple spaces are present, a room can + be part of any one of the listed spaces (OR'd). The server will inspect + the `m.space.child` state events for the JOINED space room IDs given. + Servers MUST NOT navigate subspaces. It is up to the client to give a + complete list of spaces to navigate. Only rooms directly mentioned as + `m.space.child` events in these spaces will be returned. Unknown spaces + or spaces the user is not joined to will be ignored. + is_encrypted: Flag which only returns rooms which have an + `m.room.encryption` state event. If unset, both encrypted and + unencrypted rooms are returned. If `False`, only unencrypted rooms are + returned. If `True`, only encrypted rooms are returned. + is_invite: Flag which only returns rooms the user is currently invited + to. If unset, both invited and joined rooms are returned. If `False`, no + invited rooms are returned. If `True`, only invited rooms are returned. + room_types: If specified, only rooms where the `m.room.create` event has + a `type` matching one of the strings in this array will be returned. If + this field is unset, all rooms are returned regardless of type. This can + be used to get the initial set of spaces for an account. For rooms which + do not have a room type, use `null`/`None` to include them. + not_room_types: Same as `room_types` but inverted. This can be used to + filter out spaces from the room list. If a type is in both `room_types` + and `not_room_types`, then `not_room_types` wins and they are not included + in the result. + room_name_like: Filter the room name. Case-insensitive partial matching + e.g 'foo' matches 'abFooab'. The term 'like' is inspired by SQL 'LIKE', + and the text here is similar to '%foo%'. + tags: Filter the room based on its room tags. If multiple tags are + present, a room can have any one of the listed tags (OR'd). + not_tags: Filter the room based on its room tags. Takes priority over + `tags`. For example, a room with tags A and B with filters `tags: [A]` + `not_tags: [B]` would NOT be included because `not_tags` takes priority over + `tags`. This filter is useful if your rooms list does NOT include the + list of favourite rooms again. + """ + is_dm: Optional[StrictBool] = None spaces: Optional[List[StrictStr]] = None is_encrypted: Optional[StrictBool] = None From dd439386c792ca84218b4135d086ec9e7c0cdb5a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jun 2024 17:20:17 -0500 Subject: [PATCH 04/21] Reference actual filter code --- tests/handlers/test_sliding_sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 220683b9d65..9201b34523a 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1191,7 +1191,7 @@ def _create_dm_room( def test_filter_dm_rooms(self) -> None: """ - Test filter for DM rooms + Test `filter.is_dm` for DM rooms """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1232,7 +1232,7 @@ def test_filter_dm_rooms(self) -> None: def test_filter_non_dm_rooms(self) -> None: """ - Test filter for non-DM rooms + Test `filter.is_dm` for non-DM rooms """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") From 88fe201f00e0c7d4ae83dd3df5c723e7b89bf48f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jun 2024 17:23:43 -0500 Subject: [PATCH 05/21] Condense true/false tests --- tests/handlers/test_sliding_sync.py | 52 ++++++++--------------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 9201b34523a..621b554be60 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1217,56 +1217,32 @@ def test_filter_dm_rooms(self) -> None: # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) from synapse.handlers.sliding_sync import SlidingSyncConfig - filters = SlidingSyncConfig.SlidingSyncList.Filters( + # Try with `is_dm=True` + # ----------------------------- + truthy_filters = SlidingSyncConfig.SlidingSyncList.Filters( is_dm=True, ) - # Try filtering the rooms - filtered_room_ids = self.get_success( + # Filter the rooms + truthy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( - UserID.from_string(user1_id), {room_id, dm_room_id}, filters + UserID.from_string(user1_id), {room_id, dm_room_id}, truthy_filters ) ) - self.assertEqual(filtered_room_ids, {dm_room_id}) + self.assertEqual(truthy_filtered_room_ids, {dm_room_id}) - def test_filter_non_dm_rooms(self) -> None: - """ - Test `filter.is_dm` for non-DM rooms - """ - user1_id = self.register_user("user1", "pass") - user1_tok = self.login(user1_id, "pass") - user2_id = self.register_user("user2", "pass") - user2_tok = self.login(user2_id, "pass") - - # Create a normal room - room_id = self.helper.create_room_as( - user1_id, - is_public=False, - tok=user1_tok, - ) - - # Create a DM room - dm_room_id = self._create_dm_room( - inviter_user_id=user1_id, - inviter_tok=user1_tok, - invitee_user_id=user2_id, - invitee_tok=user2_tok, - ) - - # TODO: Better way to avoid the circular import? (see - # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) - from synapse.handlers.sliding_sync import SlidingSyncConfig - - filters = SlidingSyncConfig.SlidingSyncList.Filters( + # Try with `is_dm=False` + # ----------------------------- + falsy_filters = SlidingSyncConfig.SlidingSyncList.Filters( is_dm=False, ) - # Try filtering the rooms - filtered_room_ids = self.get_success( + # Filter the rooms + falsy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( - UserID.from_string(user1_id), {room_id, dm_room_id}, filters + UserID.from_string(user1_id), {room_id, dm_room_id}, falsy_filters ) ) - self.assertEqual(filtered_room_ids, {room_id}) + self.assertEqual(falsy_filtered_room_ids, {room_id}) From 44088bd4af49411febf0a80b1571e2f6174fd25c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jun 2024 17:40:16 -0500 Subject: [PATCH 06/21] Add `is_encrypted` filtering to Sliding Sync `/sync` Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync --- changelog.d/17249.feature | 1 + synapse/handlers/sliding_sync.py | 21 ++++++++-- tests/handlers/test_sliding_sync.py | 65 +++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 3 deletions(-) create mode 100644 changelog.d/17249.feature diff --git a/changelog.d/17249.feature b/changelog.d/17249.feature new file mode 100644 index 00000000000..fce512692cb --- /dev/null +++ b/changelog.d/17249.feature @@ -0,0 +1 @@ +Add `is_encrypted` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 08c6aadff6f..ecd41d3ecef 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -31,7 +31,7 @@ else: from pydantic import Extra -from synapse.api.constants import AccountDataTypes, Membership +from synapse.api.constants import AccountDataTypes, EventTypes, Membership from synapse.events import EventBase from synapse.rest.client.models import SlidingSyncBody from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID @@ -661,8 +661,23 @@ async def filter_rooms( if filters.spaces: raise NotImplementedError() - if filters.is_encrypted: - raise NotImplementedError() + # Filter for encrypted rooms + if filters.is_encrypted is not None: + # Make a copy so we don't run into an error: `Set changed size during iteration` + for room_id in list(filtered_room_id_set): + # TODO: Is there a good method to look up all rooms at once? (N+1 query problem) + is_encrypted = ( + await self.storage_controllers.state.get_current_state_event( + room_id, EventTypes.RoomEncryption, "" + ) + ) + + # If we're looking for encrypted rooms, filter out rooms that are not + # encrypted and vice versa + if (filters.is_encrypted and not is_encrypted) or ( + not filters.is_encrypted and is_encrypted + ): + filtered_room_id_set.remove(room_id) if filters.is_invite: raise NotImplementedError() diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 621b554be60..599e3dbc108 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1246,3 +1246,68 @@ def test_filter_dm_rooms(self) -> None: ) self.assertEqual(falsy_filtered_room_ids, {room_id}) + + def test_filter_encrypted_rooms(self) -> None: + """ + Test `filter.is_encrypted` for encrypted rooms + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a normal room + room_id = self.helper.create_room_as( + user1_id, + is_public=False, + tok=user1_tok, + ) + + # Create an encrypted room + encrypted_room_id = self.helper.create_room_as( + user1_id, + is_public=False, + tok=user1_tok, + ) + self.helper.send_state( + encrypted_room_id, + EventTypes.RoomEncryption, + {"algorithm": "m.megolm.v1.aes-sha2"}, + tok=user1_tok, + ) + + # TODO: Better way to avoid the circular import? (see + # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) + from synapse.handlers.sliding_sync import SlidingSyncConfig + + # Try with `is_encrypted=True` + # ----------------------------- + truthy_filters = SlidingSyncConfig.SlidingSyncList.Filters( + is_encrypted=True, + ) + + # Filter the rooms + truthy_filtered_room_ids = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + {room_id, encrypted_room_id}, + truthy_filters, + ) + ) + + self.assertEqual(truthy_filtered_room_ids, {encrypted_room_id}) + + # Try with `is_encrypted=False` + # ----------------------------- + falsy_filters = SlidingSyncConfig.SlidingSyncList.Filters( + is_encrypted=False, + ) + + # Filter the rooms + falsy_filtered_room_ids = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + {room_id, encrypted_room_id}, + falsy_filters, + ) + ) + + self.assertEqual(falsy_filtered_room_ids, {room_id}) From 4412dbd22c4a2811fd23458435acc306f283318a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jun 2024 17:41:40 -0500 Subject: [PATCH 07/21] Update changelog number --- changelog.d/{17249.feature => 17281.feature} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{17249.feature => 17281.feature} (100%) diff --git a/changelog.d/17249.feature b/changelog.d/17281.feature similarity index 100% rename from changelog.d/17249.feature rename to changelog.d/17281.feature From 35b18be8591c3e687dd1400d8672766e1b755cc1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 6 Jun 2024 17:42:58 -0500 Subject: [PATCH 08/21] Fix lints --- synapse/handlers/sliding_sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index ecd41d3ecef..bc39e36b20b 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -226,6 +226,7 @@ class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.store = hs.get_datastores().main + self.storage_controllers = hs.get_storage_controllers() self.auth_blocking = hs.get_auth_blocking() self.notifier = hs.get_notifier() self.event_sources = hs.get_event_sources() From 61f86e0d390b2c04b14b687cf78f9e332d90c177 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 12:20:41 -0500 Subject: [PATCH 09/21] Add future todo --- synapse/handlers/sliding_sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index bc39e36b20b..6fde273ce5d 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -668,6 +668,7 @@ async def filter_rooms( for room_id in list(filtered_room_id_set): # TODO: Is there a good method to look up all rooms at once? (N+1 query problem) is_encrypted = ( + # TODO: Get state at the `to_token` instead of the current state await self.storage_controllers.state.get_current_state_event( room_id, EventTypes.RoomEncryption, "" ) From 578b44af4cdc417dde22a3d148bbe7f3d62764dc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 14:24:47 -0500 Subject: [PATCH 10/21] Move get_state_at() to area we can share from --- synapse/handlers/sync.py | 95 ++-------------------------- synapse/storage/controllers/state.py | 87 ++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 90 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 39964726c5b..d91367f4326 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -981,89 +981,6 @@ async def _load_filtered_recents( bundled_aggregations=bundled_aggregations, ) - async def get_state_after_event( - self, - event_id: str, - state_filter: Optional[StateFilter] = None, - await_full_state: bool = True, - ) -> StateMap[str]: - """ - Get the room state after the given event - - Args: - event_id: event of interest - state_filter: The state filter used to fetch state from the database. - await_full_state: if `True`, will block if we do not yet have complete state - at the event and `state_filter` is not satisfied by partial state. - Defaults to `True`. - """ - state_ids = await self._state_storage_controller.get_state_ids_for_event( - event_id, - state_filter=state_filter or StateFilter.all(), - await_full_state=await_full_state, - ) - - # using get_metadata_for_events here (instead of get_event) sidesteps an issue - # with redactions: if `event_id` is a redaction event, and we don't have the - # original (possibly because it got purged), get_event will refuse to return - # the redaction event, which isn't terribly helpful here. - # - # (To be fair, in that case we could assume it's *not* a state event, and - # therefore we don't need to worry about it. But still, it seems cleaner just - # to pull the metadata.) - m = (await self.store.get_metadata_for_events([event_id]))[event_id] - if m.state_key is not None and m.rejection_reason is None: - state_ids = dict(state_ids) - state_ids[(m.event_type, m.state_key)] = event_id - - return state_ids - - async def get_state_at( - self, - room_id: str, - stream_position: StreamToken, - state_filter: Optional[StateFilter] = None, - await_full_state: bool = True, - ) -> StateMap[str]: - """Get the room state at a particular stream position - - Args: - room_id: room for which to get state - stream_position: point at which to get state - state_filter: The state filter used to fetch state from the database. - await_full_state: if `True`, will block if we do not yet have complete state - at the last event in the room before `stream_position` and - `state_filter` is not satisfied by partial state. Defaults to `True`. - """ - # FIXME: This gets the state at the latest event before the stream ordering, - # which might not be the same as the "current state" of the room at the time - # of the stream token if there were multiple forward extremities at the time. - last_event_id = await self.store.get_last_event_in_room_before_stream_ordering( - room_id, - end_token=stream_position.room_key, - ) - - if last_event_id: - state = await self.get_state_after_event( - last_event_id, - state_filter=state_filter or StateFilter.all(), - await_full_state=await_full_state, - ) - - else: - # no events in this room - so presumably no state - state = {} - - # (erikj) This should be rarely hit, but we've had some reports that - # we get more state down gappy syncs than we should, so let's add - # some logging. - logger.info( - "Failed to find any events in room %s at %s", - room_id, - stream_position.room_key, - ) - return state - async def compute_summary( self, room_id: str, @@ -1437,7 +1354,7 @@ async def _compute_state_delta_for_full_sync( await_full_state = True lazy_load_members = False - state_at_timeline_end = await self.get_state_at( + state_at_timeline_end = await self._state_storage_controller.get_state_at( room_id, stream_position=end_token, state_filter=state_filter, @@ -1565,7 +1482,7 @@ async def _compute_state_delta_for_incremental_sync( else: # We can get here if the user has ignored the senders of all # the recent events. - state_at_timeline_start = await self.get_state_at( + state_at_timeline_start = await self._state_storage_controller.get_state_at( room_id, stream_position=end_token, state_filter=state_filter, @@ -1587,14 +1504,14 @@ async def _compute_state_delta_for_incremental_sync( # about them). state_filter = StateFilter.all() - state_at_previous_sync = await self.get_state_at( + state_at_previous_sync = await self._state_storage_controller.get_state_at( room_id, stream_position=since_token, state_filter=state_filter, await_full_state=await_full_state, ) - state_at_timeline_end = await self.get_state_at( + state_at_timeline_end = await self._state_storage_controller.get_state_at( room_id, stream_position=end_token, state_filter=state_filter, @@ -2593,7 +2510,7 @@ async def _get_room_changes_for_incremental_sync( continue if room_id in sync_result_builder.joined_room_ids or has_join: - old_state_ids = await self.get_state_at( + old_state_ids = await self._state_storage_controller.get_state_at( room_id, since_token, state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]), @@ -2623,7 +2540,7 @@ async def _get_room_changes_for_incremental_sync( newly_left_rooms.append(room_id) else: if not old_state_ids: - old_state_ids = await self.get_state_at( + old_state_ids = await self._state_storage_controller.get_state_at( room_id, since_token, state_filter=StateFilter.from_types( diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index f9eced23bff..69aeeafde47 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -45,7 +45,7 @@ PartialStateEventsTracker, ) from synapse.synapse_rust.acl import ServerAclEvaluator -from synapse.types import MutableStateMap, StateMap, get_domain_from_id +from synapse.types import MutableStateMap, StreamToken, StateMap, get_domain_from_id from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string @@ -372,6 +372,91 @@ async def get_state_ids_for_event( ) return state_map[event_id] + async def get_state_after_event( + self, + event_id: str, + state_filter: Optional[StateFilter] = None, + await_full_state: bool = True, + ) -> StateMap[str]: + """ + Get the room state after the given event + + Args: + event_id: event of interest + state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at the event and `state_filter` is not satisfied by partial state. + Defaults to `True`. + """ + state_ids = await self.get_state_ids_for_event( + event_id, + state_filter=state_filter or StateFilter.all(), + await_full_state=await_full_state, + ) + + # using get_metadata_for_events here (instead of get_event) sidesteps an issue + # with redactions: if `event_id` is a redaction event, and we don't have the + # original (possibly because it got purged), get_event will refuse to return + # the redaction event, which isn't terribly helpful here. + # + # (To be fair, in that case we could assume it's *not* a state event, and + # therefore we don't need to worry about it. But still, it seems cleaner just + # to pull the metadata.) + m = (await self.stores.main.get_metadata_for_events([event_id]))[event_id] + if m.state_key is not None and m.rejection_reason is None: + state_ids = dict(state_ids) + state_ids[(m.event_type, m.state_key)] = event_id + + return state_ids + + async def get_state_at( + self, + room_id: str, + stream_position: StreamToken, + state_filter: Optional[StateFilter] = None, + await_full_state: bool = True, + ) -> StateMap[str]: + """Get the room state at a particular stream position + + Args: + room_id: room for which to get state + stream_position: point at which to get state + state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at the last event in the room before `stream_position` and + `state_filter` is not satisfied by partial state. Defaults to `True`. + """ + # FIXME: This gets the state at the latest event before the stream ordering, + # which might not be the same as the "current state" of the room at the time + # of the stream token if there were multiple forward extremities at the time. + last_event_id = ( + await self.stores.main.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=stream_position.room_key, + ) + ) + + if last_event_id: + state = await self.get_state_after_event( + last_event_id, + state_filter=state_filter or StateFilter.all(), + await_full_state=await_full_state, + ) + + else: + # no events in this room - so presumably no state + state = {} + + # (erikj) This should be rarely hit, but we've had some reports that + # we get more state down gappy syncs than we should, so let's add + # some logging. + logger.info( + "Failed to find any events in room %s at %s", + room_id, + stream_position.room_key, + ) + return state + @trace @tag_args async def get_state_for_groups( From 7dec9307dc2efb9eb64690e8bb897ac97b7602ee Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 14:33:04 -0500 Subject: [PATCH 11/21] Filter based on state at to_token --- synapse/handlers/sliding_sync.py | 24 +++++++++++++------ tests/handlers/test_sliding_sync.py | 37 ++++++++++++++++------------- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 6fde273ce5d..b5ffd8e67d2 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -35,6 +35,7 @@ from synapse.events import EventBase from synapse.rest.client.models import SlidingSyncBody from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID +from synapse.types.state import StateFilter if TYPE_CHECKING: from synapse.server import HomeServer @@ -340,7 +341,7 @@ async def current_sync_for_user( # from/to tokens but some of the streams don't support looking back # in time (like global account_data). filtered_room_ids = await self.filter_rooms( - sync_config.user, room_id_set, list_config.filters + sync_config.user, room_id_set, list_config.filters, to_token ) # TODO: Apply sorts sorted_room_ids = sorted(filtered_room_ids) @@ -619,9 +620,15 @@ async def filter_rooms( user: UserID, room_id_set: AbstractSet[str], filters: SlidingSyncConfig.SlidingSyncList.Filters, + to_token: StreamToken, ) -> AbstractSet[str]: """ Filter rooms based on the sync request. + + Args: + user: + room_id_set: The set of room IDs to filter down + filters: The filters to apply """ user_id = user.to_string() @@ -664,15 +671,18 @@ async def filter_rooms( # Filter for encrypted rooms if filters.is_encrypted is not None: - # Make a copy so we don't run into an error: `Set changed size during iteration` + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items for room_id in list(filtered_room_id_set): # TODO: Is there a good method to look up all rooms at once? (N+1 query problem) - is_encrypted = ( - # TODO: Get state at the `to_token` instead of the current state - await self.storage_controllers.state.get_current_state_event( - room_id, EventTypes.RoomEncryption, "" - ) + state_at_to_token = await self.storage_controllers.state.get_state_at( + room_id, + to_token, + state_filter=StateFilter.from_types( + [(EventTypes.RoomEncryption, "")] + ), ) + is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, "")) # If we're looking for encrypted rooms, filter out rooms that are not # encrypted and vice versa diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 599e3dbc108..0c0f229f2e4 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1140,6 +1140,7 @@ def default_config(self) -> JsonDict: def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.sliding_sync_handler = self.hs.get_sliding_sync_handler() self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() def _create_dm_room( self, @@ -1213,6 +1214,8 @@ def test_filter_dm_rooms(self) -> None: invitee_tok=user2_tok, ) + after_rooms_token = self.event_sources.get_current_token() + # TODO: Better way to avoid the circular import? (see # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) from synapse.handlers.sliding_sync import SlidingSyncConfig @@ -1226,7 +1229,10 @@ def test_filter_dm_rooms(self) -> None: # Filter the rooms truthy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( - UserID.from_string(user1_id), {room_id, dm_room_id}, truthy_filters + UserID.from_string(user1_id), + {room_id, dm_room_id}, + truthy_filters, + after_rooms_token, ) ) @@ -1241,7 +1247,10 @@ def test_filter_dm_rooms(self) -> None: # Filter the rooms falsy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( - UserID.from_string(user1_id), {room_id, dm_room_id}, falsy_filters + UserID.from_string(user1_id), + {room_id, dm_room_id}, + falsy_filters, + after_rooms_token, ) ) @@ -1274,39 +1283,35 @@ def test_filter_encrypted_rooms(self) -> None: tok=user1_tok, ) + after_rooms_token = self.event_sources.get_current_token() + # TODO: Better way to avoid the circular import? (see # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) from synapse.handlers.sliding_sync import SlidingSyncConfig # Try with `is_encrypted=True` - # ----------------------------- - truthy_filters = SlidingSyncConfig.SlidingSyncList.Filters( - is_encrypted=True, - ) - - # Filter the rooms truthy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( UserID.from_string(user1_id), {room_id, encrypted_room_id}, - truthy_filters, + SlidingSyncConfig.SlidingSyncList.Filters( + is_encrypted=True, + ), + after_rooms_token, ) ) self.assertEqual(truthy_filtered_room_ids, {encrypted_room_id}) # Try with `is_encrypted=False` - # ----------------------------- - falsy_filters = SlidingSyncConfig.SlidingSyncList.Filters( - is_encrypted=False, - ) - - # Filter the rooms falsy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( UserID.from_string(user1_id), {room_id, encrypted_room_id}, - falsy_filters, + SlidingSyncConfig.SlidingSyncList.Filters( + is_encrypted=False, + ), + after_rooms_token, ) ) From 945197bc74dba66357db3081cab975e699bff28c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 14:35:42 -0500 Subject: [PATCH 12/21] Update docstring --- synapse/handlers/sliding_sync.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index b5ffd8e67d2..46ed9e6279a 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -626,9 +626,10 @@ async def filter_rooms( Filter rooms based on the sync request. Args: - user: - room_id_set: The set of room IDs to filter down - filters: The filters to apply + user: User to filter rooms for + room_id_set: Set of room IDs to filter down + filters: Filters to apply + to_token: We filter based on the state of the room at this token """ user_id = user.to_string() From 48eca7dbb78c5340227191f907d9bf47ad7d241f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 14:40:31 -0500 Subject: [PATCH 13/21] Less test bulk --- tests/handlers/test_sliding_sync.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 621b554be60..e0c8a031a20 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1218,30 +1218,26 @@ def test_filter_dm_rooms(self) -> None: from synapse.handlers.sliding_sync import SlidingSyncConfig # Try with `is_dm=True` - # ----------------------------- - truthy_filters = SlidingSyncConfig.SlidingSyncList.Filters( - is_dm=True, - ) - - # Filter the rooms truthy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( - UserID.from_string(user1_id), {room_id, dm_room_id}, truthy_filters + UserID.from_string(user1_id), + {room_id, dm_room_id}, + SlidingSyncConfig.SlidingSyncList.Filters( + is_dm=True, + ), ) ) self.assertEqual(truthy_filtered_room_ids, {dm_room_id}) # Try with `is_dm=False` - # ----------------------------- - falsy_filters = SlidingSyncConfig.SlidingSyncList.Filters( - is_dm=False, - ) - - # Filter the rooms falsy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( - UserID.from_string(user1_id), {room_id, dm_room_id}, falsy_filters + UserID.from_string(user1_id), + {room_id, dm_room_id}, + SlidingSyncConfig.SlidingSyncList.Filters( + is_dm=False, + ), ) ) From 7aa051958956232461766388f5b88d73c89ab3c5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 14:43:00 -0500 Subject: [PATCH 14/21] Incorporate `to_token` to filters --- synapse/handlers/sliding_sync.py | 12 ++++++++---- tests/handlers/test_sliding_sync.py | 5 +++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 08c6aadff6f..e452c31cbab 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -335,11 +335,8 @@ async def current_sync_for_user( # Apply filters filtered_room_ids = room_id_set if list_config.filters is not None: - # TODO: To be absolutely correct, this could also take into account - # from/to tokens but some of the streams don't support looking back - # in time (like global account_data). filtered_room_ids = await self.filter_rooms( - sync_config.user, room_id_set, list_config.filters + sync_config.user, room_id_set, list_config.filters, to_token ) # TODO: Apply sorts sorted_room_ids = sorted(filtered_room_ids) @@ -618,9 +615,16 @@ async def filter_rooms( user: UserID, room_id_set: AbstractSet[str], filters: SlidingSyncConfig.SlidingSyncList.Filters, + to_token: StreamToken, ) -> AbstractSet[str]: """ Filter rooms based on the sync request. + + Args: + user: User to filter rooms for + room_id_set: Set of room IDs to filter down + filters: Filters to apply + to_token: We filter based on the state of the room at this token """ user_id = user.to_string() diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index e0c8a031a20..a84d4553c6b 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1140,6 +1140,7 @@ def default_config(self) -> JsonDict: def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.sliding_sync_handler = self.hs.get_sliding_sync_handler() self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() def _create_dm_room( self, @@ -1213,6 +1214,8 @@ def test_filter_dm_rooms(self) -> None: invitee_tok=user2_tok, ) + after_rooms_token = self.event_sources.get_current_token() + # TODO: Better way to avoid the circular import? (see # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) from synapse.handlers.sliding_sync import SlidingSyncConfig @@ -1225,6 +1228,7 @@ def test_filter_dm_rooms(self) -> None: SlidingSyncConfig.SlidingSyncList.Filters( is_dm=True, ), + after_rooms_token, ) ) @@ -1238,6 +1242,7 @@ def test_filter_dm_rooms(self) -> None: SlidingSyncConfig.SlidingSyncList.Filters( is_dm=False, ), + after_rooms_token, ) ) From a6e5798dd36c6e2fcfce7a8ff52a5e2e94acf301 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 14:44:29 -0500 Subject: [PATCH 15/21] Explain why no `to_token` for global account data --- synapse/handlers/sliding_sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index e452c31cbab..95edf6c54f8 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -641,6 +641,9 @@ async def filter_rooms( # `is_direct` on membership events because that property only appears for # the invitee membership event (doesn't show up for the inviter). Account # data is set by the client so it needs to be scrutinized. + # + # We're unable to take `to_token` into account for global account data since + # we only keep track of the latest account data for the user. dm_map = await self.store.get_global_account_data_by_type_for_user( user_id, AccountDataTypes.DIRECT ) From 5dd6d3770d7e3edb1e9e600fff5e4e2a7e081725 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 15:03:00 -0500 Subject: [PATCH 16/21] Add docstring --- synapse/handlers/sliding_sync.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 95edf6c54f8..39a42079e3e 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -238,9 +238,19 @@ async def wait_for_sync_for_user( from_token: Optional[StreamToken] = None, timeout_ms: int = 0, ) -> SlidingSyncResult: - """Get the sync for a client if we have new data for it now. Otherwise + """ + Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. + + Args: + requester: The user making the request + sync_config: Sync configuration + from_token: The point in the stream to sync from. Token of the end of the + previous batch. May be `None` if this is the initial sync request. + timeout_ms: The time in milliseconds to wait for new data to arrive. If 0, + we will immediately but there might not be any new data so we just return an + empty response. """ # If the user is not part of the mau group, then check that limits have # not been exceeded (if not part of the group by this point, almost certain @@ -312,6 +322,14 @@ async def current_sync_for_user( """ Generates the response body of a Sliding Sync result, represented as a `SlidingSyncResult`. + + We fetch data according to the token range (> `from_token` and <= `to_token`). + + Args: + sync_config: Sync configuration + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. Token of the end of the + previous batch. May be `None` if this is the initial sync request. """ user_id = sync_config.user.to_string() app_service = self.store.get_app_service_by_user_id(user_id) @@ -387,6 +405,12 @@ async def get_sync_room_ids_for_user( `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way to tell when a room was forgotten at the moment so we can't factor it into the from/to range. + + + Args: + user: User to fetch rooms for + to_token: The token to fetch rooms up to. + from_token: The point in the stream to sync from. """ user_id = user.to_string() From 271ae6f8e7e74bc2748590d0504bc8f9afa20077 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 15:06:55 -0500 Subject: [PATCH 17/21] Remove import workaround --- tests/handlers/test_sliding_sync.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index a84d4553c6b..da1ec87bf5e 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -24,6 +24,7 @@ from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership from synapse.api.room_versions import RoomVersions +from synapse.handlers.sliding_sync import SlidingSyncConfig from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer @@ -1216,10 +1217,6 @@ def test_filter_dm_rooms(self) -> None: after_rooms_token = self.event_sources.get_current_token() - # TODO: Better way to avoid the circular import? (see - # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) - from synapse.handlers.sliding_sync import SlidingSyncConfig - # Try with `is_dm=True` truthy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( From 355de36d9389996b1ddf46c0a0845cf7c4fb3697 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 15:10:35 -0500 Subject: [PATCH 18/21] Remove import workaround --- tests/handlers/test_sliding_sync.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 70f9d4e1942..5fa95ff1143 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1274,10 +1274,6 @@ def test_filter_encrypted_rooms(self) -> None: after_rooms_token = self.event_sources.get_current_token() - # TODO: Better way to avoid the circular import? (see - # https://github.com/element-hq/synapse/pull/17187#discussion_r1619492779) - from synapse.handlers.sliding_sync import SlidingSyncConfig - # Try with `is_encrypted=True` truthy_filtered_room_ids = self.get_success( self.sliding_sync_handler.filter_rooms( From f69d1c50a5deadb0ede62ee701f91d674ba8a37d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 15:19:24 -0500 Subject: [PATCH 19/21] Remove sneaky log --- synapse/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 759fab165a0..883e5740cea 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -502,7 +502,7 @@ async def filter_rooms( dm_map = await self.store.get_global_account_data_by_type_for_user( user_id, AccountDataTypes.DIRECT ) - logger.warn("dm_map: %s", dm_map) + # Flatten out the map dm_room_id_set = set() if dm_map: From d752b8ae196ba9392f2a910e23634fef2ab19229 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 18:27:41 -0500 Subject: [PATCH 20/21] Comment no longer as useful Would be hard to craft a query to do all rooms at once since we have to query for rows between the min/max stream_ordering and filter on the client anyway. Would be easier if we could query for one thing for each room. --- synapse/handlers/sliding_sync.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 597a58e2df8..7163f43c342 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -530,7 +530,6 @@ async def filter_rooms( # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items for room_id in list(filtered_room_id_set): - # TODO: Is there a good method to look up all rooms at once? (N+1 query problem) state_at_to_token = await self.storage_controllers.state.get_state_at( room_id, to_token, From c73391db8c975519951be802e9cdfc2840110e2c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 17 Jun 2024 11:36:00 -0500 Subject: [PATCH 21/21] Fix tests --- tests/handlers/test_sliding_sync.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 3d32425375d..0358239c7f4 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1282,11 +1282,20 @@ def test_filter_encrypted_rooms(self) -> None: after_rooms_token = self.event_sources.get_current_token() + # Get the rooms the user should be syncing with + sync_room_map = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=None, + to_token=after_rooms_token, + ) + ) + # Try with `is_encrypted=True` - truthy_filtered_room_ids = self.get_success( + truthy_filtered_room_map = self.get_success( self.sliding_sync_handler.filter_rooms( UserID.from_string(user1_id), - {room_id, encrypted_room_id}, + sync_room_map, SlidingSyncConfig.SlidingSyncList.Filters( is_encrypted=True, ), @@ -1294,13 +1303,13 @@ def test_filter_encrypted_rooms(self) -> None: ) ) - self.assertEqual(truthy_filtered_room_ids, {encrypted_room_id}) + self.assertEqual(truthy_filtered_room_map.keys(), {encrypted_room_id}) # Try with `is_encrypted=False` - falsy_filtered_room_ids = self.get_success( + falsy_filtered_room_map = self.get_success( self.sliding_sync_handler.filter_rooms( UserID.from_string(user1_id), - {room_id, encrypted_room_id}, + sync_room_map, SlidingSyncConfig.SlidingSyncList.Filters( is_encrypted=False, ), @@ -1308,7 +1317,7 @@ def test_filter_encrypted_rooms(self) -> None: ) ) - self.assertEqual(falsy_filtered_room_ids, {room_id}) + self.assertEqual(falsy_filtered_room_map.keys(), {room_id}) class SortRoomsTestCase(HomeserverTestCase):