diff --git a/CHANGES.md b/CHANGES.md index 168e29f1b235..ed9cca73bc31 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,42 @@ +# Synapse 1.105.0 (2024-04-16) + +No significant changes since 1.105.0rc1. + + + + +# Synapse 1.105.0rc1 (2024-04-11) + +### Features + +- Stabilize support for [MSC4010](https://github.com/matrix-org/matrix-spec-proposals/pull/4010) which clarifies the interaction of push rules and account data. Contributed by @clokep. ([\#17022](https://github.com/element-hq/synapse/issues/17022)) +- Stabilize support for [MSC3981](https://github.com/matrix-org/matrix-spec-proposals/pull/3981): `/relations` recursion. Contributed by @clokep. ([\#17023](https://github.com/element-hq/synapse/issues/17023)) +- Add support for moving `/pushrules` off of main process. ([\#17037](https://github.com/element-hq/synapse/issues/17037), [\#17038](https://github.com/element-hq/synapse/issues/17038)) + +### Bugfixes + +- Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations. ([\#16930](https://github.com/element-hq/synapse/issues/16930), [\#16932](https://github.com/element-hq/synapse/issues/16932), [\#16942](https://github.com/element-hq/synapse/issues/16942), [\#17064](https://github.com/element-hq/synapse/issues/17064), [\#17065](https://github.com/element-hq/synapse/issues/17065), [\#17066](https://github.com/element-hq/synapse/issues/17066)) +- Fix server notice rooms not always being created as unencrypted rooms, even when `encryption_enabled_by_default_for_room_type` is in use (server notices are always unencrypted). ([\#17033](https://github.com/element-hq/synapse/issues/17033)) +- Fix the `.m.rule.encrypted_room_one_to_one` and `.m.rule.room_one_to_one` default underride push rules being in the wrong order. Contributed by @Sumpy1. ([\#17043](https://github.com/element-hq/synapse/issues/17043)) + +### Internal Changes + +- Refactor auth chain fetching to reduce duplication. ([\#17044](https://github.com/element-hq/synapse/issues/17044)) +- Improve database performance by adding a missing index to `access_tokens.refresh_token_id`. ([\#17045](https://github.com/element-hq/synapse/issues/17045), [\#17054](https://github.com/element-hq/synapse/issues/17054)) +- Improve database performance by reducing number of receipts fetched when sending push notifications. ([\#17049](https://github.com/element-hq/synapse/issues/17049)) + + + +### Updates to locked dependencies + +* Bump packaging from 23.2 to 24.0. ([\#17027](https://github.com/element-hq/synapse/issues/17027)) +* Bump regex from 1.10.3 to 1.10.4. ([\#17028](https://github.com/element-hq/synapse/issues/17028)) +* Bump ruff from 0.3.2 to 0.3.5. ([\#17060](https://github.com/element-hq/synapse/issues/17060)) +* Bump serde_json from 1.0.114 to 1.0.115. ([\#17041](https://github.com/element-hq/synapse/issues/17041)) +* Bump types-pillow from 10.2.0.20240125 to 10.2.0.20240406. ([\#17061](https://github.com/element-hq/synapse/issues/17061)) +* Bump types-requests from 2.31.0.20240125 to 2.31.0.20240406. ([\#17063](https://github.com/element-hq/synapse/issues/17063)) +* Bump typing-extensions from 4.9.0 to 4.11.0. ([\#17062](https://github.com/element-hq/synapse/issues/17062)) + # Synapse 1.104.0 (2024-04-02) ### Bugfixes diff --git a/Cargo.lock b/Cargo.lock index b2af0440548d..630d38c2f4d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -306,9 +306,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", @@ -367,9 +367,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.114" +version = "1.0.115" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" +checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd" dependencies = [ "itoa", "ryu", diff --git a/debian/changelog b/debian/changelog index 28451044ab23..49c9b3b497ea 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,15 @@ +matrix-synapse-py3 (1.105.0) stable; urgency=medium + + * New Synapse release 1.105.0. + + -- Synapse Packaging team Tue, 16 Apr 2024 15:53:23 +0100 + +matrix-synapse-py3 (1.105.0~rc1) stable; urgency=medium + + * New Synapse release 1.105.0rc1. + + -- Synapse Packaging team Thu, 11 Apr 2024 12:15:49 +0100 + matrix-synapse-py3 (1.104.0) stable; urgency=medium * New Synapse release 1.104.0. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 3917d9ae7ebe..77534a4f4fd2 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -310,6 +310,13 @@ "shared_extra_conf": {}, "worker_extra_conf": "", }, + "push_rules": { + "app": "synapse.app.generic_worker", + "listener_resources": ["client", "replication"], + "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/"], + "shared_extra_conf": {}, + "worker_extra_conf": "", + }, } # Templates for sections that may be inserted multiple times in config files @@ -401,6 +408,7 @@ def add_worker_roles_to_shared_config( "receipts", "to_device", "typing", + "push_rules", ] # Worker-type specific sharding config. Now a single worker can fulfill multiple diff --git a/docs/workers.md b/docs/workers.md index d19f1a9dea46..ab9c1db86b6c 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -532,6 +532,13 @@ the stream writer for the `presence` stream: ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ +##### The `push_rules` stream + +The following endpoints should be routed directly to the worker configured as +the stream writer for the `push` stream: + + ^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/ + #### Restrict outbound federation traffic to a specific set of workers The diff --git a/poetry.lock b/poetry.lock index 5332774705be..814877b70a53 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "alabaster" @@ -1602,13 +1602,13 @@ tests = ["Sphinx", "doubles", "flake8", "flake8-quotes", "gevent", "mock", "pyte [[package]] name = "packaging" -version = "23.2" +version = "24.0" description = "Core utilities for Python packages" optional = false python-versions = ">=3.7" files = [ - {file = "packaging-23.2-py3-none-any.whl", hash = "sha256:8c491190033a9af7e1d931d0b5dacc2ef47509b34dd0de67ed209b5203fc88c7"}, - {file = "packaging-23.2.tar.gz", hash = "sha256:048fb0e9405036518eaaf48a55953c750c11e1a1b68e0dd1a9d62ed0c092cfc5"}, + {file = "packaging-24.0-py3-none-any.whl", hash = "sha256:2ddfb553fdf02fb784c234c7ba6ccc288296ceabec964ad2eae3777778130bc5"}, + {file = "packaging-24.0.tar.gz", hash = "sha256:eb82c5e3e56209074766e6885bb04b8c38a0c015d0a30036ebe7ece34c9989e9"}, ] [[package]] @@ -2444,28 +2444,28 @@ files = [ [[package]] name = "ruff" -version = "0.3.2" +version = "0.3.5" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.3.2-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:77f2612752e25f730da7421ca5e3147b213dca4f9a0f7e0b534e9562c5441f01"}, - {file = "ruff-0.3.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:9966b964b2dd1107797be9ca7195002b874424d1d5472097701ae8f43eadef5d"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b83d17ff166aa0659d1e1deaf9f2f14cbe387293a906de09bc4860717eb2e2da"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:bb875c6cc87b3703aeda85f01c9aebdce3d217aeaca3c2e52e38077383f7268a"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:be75e468a6a86426430373d81c041b7605137a28f7014a72d2fc749e47f572aa"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:967978ac2d4506255e2f52afe70dda023fc602b283e97685c8447d036863a302"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1231eacd4510f73222940727ac927bc5d07667a86b0cbe822024dd00343e77e9"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2c6d613b19e9a8021be2ee1d0e27710208d1603b56f47203d0abbde906929a9b"}, - {file = "ruff-0.3.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c8439338a6303585d27b66b4626cbde89bb3e50fa3cae86ce52c1db7449330a7"}, - {file = "ruff-0.3.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:de8b480d8379620cbb5ea466a9e53bb467d2fb07c7eca54a4aa8576483c35d36"}, - {file = "ruff-0.3.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:b74c3de9103bd35df2bb05d8b2899bf2dbe4efda6474ea9681280648ec4d237d"}, - {file = "ruff-0.3.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f380be9fc15a99765c9cf316b40b9da1f6ad2ab9639e551703e581a5e6da6745"}, - {file = "ruff-0.3.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:0ac06a3759c3ab9ef86bbeca665d31ad3aa9a4b1c17684aadb7e61c10baa0df4"}, - {file = "ruff-0.3.2-py3-none-win32.whl", hash = "sha256:9bd640a8f7dd07a0b6901fcebccedadeb1a705a50350fb86b4003b805c81385a"}, - {file = "ruff-0.3.2-py3-none-win_amd64.whl", hash = "sha256:0c1bdd9920cab5707c26c8b3bf33a064a4ca7842d91a99ec0634fec68f9f4037"}, - {file = "ruff-0.3.2-py3-none-win_arm64.whl", hash = "sha256:5f65103b1d76e0d600cabd577b04179ff592064eaa451a70a81085930e907d0b"}, - {file = "ruff-0.3.2.tar.gz", hash = "sha256:fa78ec9418eb1ca3db392811df3376b46471ae93792a81af2d1cbb0e5dcb5142"}, + {file = "ruff-0.3.5-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:aef5bd3b89e657007e1be6b16553c8813b221ff6d92c7526b7e0227450981eac"}, + {file = "ruff-0.3.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:89b1e92b3bd9fca249153a97d23f29bed3992cff414b222fcd361d763fc53f12"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5e55771559c89272c3ebab23326dc23e7f813e492052391fe7950c1a5a139d89"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:dabc62195bf54b8a7876add6e789caae0268f34582333cda340497c886111c39"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a05f3793ba25f194f395578579c546ca5d83e0195f992edc32e5907d142bfa3"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:dfd3504e881082959b4160ab02f7a205f0fadc0a9619cc481982b6837b2fd4c0"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:87258e0d4b04046cf1d6cc1c56fadbf7a880cc3de1f7294938e923234cf9e498"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:712e71283fc7d9f95047ed5f793bc019b0b0a29849b14664a60fd66c23b96da1"}, + {file = "ruff-0.3.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a532a90b4a18d3f722c124c513ffb5e5eaff0cc4f6d3aa4bda38e691b8600c9f"}, + {file = "ruff-0.3.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:122de171a147c76ada00f76df533b54676f6e321e61bd8656ae54be326c10296"}, + {file = "ruff-0.3.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:d80a6b18a6c3b6ed25b71b05eba183f37d9bc8b16ace9e3d700997f00b74660b"}, + {file = "ruff-0.3.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:a7b6e63194c68bca8e71f81de30cfa6f58ff70393cf45aab4c20f158227d5936"}, + {file = "ruff-0.3.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:a759d33a20c72f2dfa54dae6e85e1225b8e302e8ac655773aff22e542a300985"}, + {file = "ruff-0.3.5-py3-none-win32.whl", hash = "sha256:9d8605aa990045517c911726d21293ef4baa64f87265896e491a05461cae078d"}, + {file = "ruff-0.3.5-py3-none-win_amd64.whl", hash = "sha256:dc56bb16a63c1303bd47563c60482a1512721053d93231cf7e9e1c6954395a0e"}, + {file = "ruff-0.3.5-py3-none-win_arm64.whl", hash = "sha256:faeeae9905446b975dcf6d4499dc93439b131f1443ee264055c5716dd947af55"}, + {file = "ruff-0.3.5.tar.gz", hash = "sha256:a067daaeb1dc2baf9b82a32dae67d154d95212080c80435eb052d95da647763d"}, ] [[package]] @@ -3109,13 +3109,13 @@ files = [ [[package]] name = "types-pillow" -version = "10.2.0.20240125" +version = "10.2.0.20240406" description = "Typing stubs for Pillow" optional = false python-versions = ">=3.8" files = [ - {file = "types-Pillow-10.2.0.20240125.tar.gz", hash = "sha256:c449b2c43b9fdbe0494a7b950e6b39a4e50516091213fec24ef3f33c1d017717"}, - {file = "types_Pillow-10.2.0.20240125-py3-none-any.whl", hash = "sha256:322dbae32b4b7918da5e8a47c50ac0f24b0aa72a804a23857620f2722b03c858"}, + {file = "types-Pillow-10.2.0.20240406.tar.gz", hash = "sha256:62e0cc1f17caba40e72e7154a483f4c7f3bea0e1c34c0ebba9de3c7745bc306d"}, + {file = "types_Pillow-10.2.0.20240406-py3-none-any.whl", hash = "sha256:5ac182e8afce53de30abca2fdf9cbec7b2500e549d0be84da035a729a84c7c47"}, ] [[package]] @@ -3156,13 +3156,13 @@ files = [ [[package]] name = "types-requests" -version = "2.31.0.20240125" +version = "2.31.0.20240406" description = "Typing stubs for requests" optional = false python-versions = ">=3.8" files = [ - {file = "types-requests-2.31.0.20240125.tar.gz", hash = "sha256:03a28ce1d7cd54199148e043b2079cdded22d6795d19a2c2a6791a4b2b5e2eb5"}, - {file = "types_requests-2.31.0.20240125-py3-none-any.whl", hash = "sha256:9592a9a4cb92d6d75d9b491a41477272b710e021011a2a3061157e2fb1f1a5d1"}, + {file = "types-requests-2.31.0.20240406.tar.gz", hash = "sha256:4428df33c5503945c74b3f42e82b181e86ec7b724620419a2966e2de604ce1a1"}, + {file = "types_requests-2.31.0.20240406-py3-none-any.whl", hash = "sha256:6216cdac377c6b9a040ac1c0404f7284bd13199c0e1bb235f4324627e8898cf5"}, ] [package.dependencies] @@ -3181,13 +3181,13 @@ files = [ [[package]] name = "typing-extensions" -version = "4.9.0" +version = "4.11.0" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" files = [ - {file = "typing_extensions-4.9.0-py3-none-any.whl", hash = "sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd"}, - {file = "typing_extensions-4.9.0.tar.gz", hash = "sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783"}, + {file = "typing_extensions-4.11.0-py3-none-any.whl", hash = "sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a"}, + {file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"}, ] [[package]] @@ -3451,4 +3451,4 @@ user-search = ["pyicu"] [metadata] lock-version = "2.0" python-versions = "^3.8.0" -content-hash = "b510fa05f4ea33194bec079f5d04ebb3f9ffbb5c1ea96a0341d57ba770ef81e6" +content-hash = "4abda113a01f162bb3978b0372956d569364533aa39f57863c234363f8449a4f" diff --git a/pyproject.toml b/pyproject.toml index 9a645079c30c..f0f025645fd4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust" [tool.poetry] name = "matrix-synapse" -version = "1.104.0" +version = "1.105.0" description = "Homeserver for the Matrix decentralised comms protocol" authors = ["Matrix.org Team and Contributors "] license = "AGPL-3.0-or-later" @@ -321,7 +321,7 @@ all = [ # This helps prevents merge conflicts when running a batch of dependabot updates. isort = ">=5.10.1" black = ">=22.7.0" -ruff = "0.3.2" +ruff = "0.3.5" # Type checking only works with the pydantic.v1 compat module from pydantic v2 pydantic = "^2" diff --git a/rust/src/push/base_rules.rs b/rust/src/push/base_rules.rs index b00390f7e4b8..74f02d600138 100644 --- a/rust/src/push/base_rules.rs +++ b/rust/src/push/base_rules.rs @@ -304,12 +304,12 @@ pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[ default_enabled: true, }, PushRule { - rule_id: Cow::Borrowed("global/underride/.m.rule.room_one_to_one"), + rule_id: Cow::Borrowed("global/underride/.m.rule.encrypted_room_one_to_one"), priority_class: 1, conditions: Cow::Borrowed(&[ Condition::Known(KnownCondition::EventMatch(EventMatchCondition { key: Cow::Borrowed("type"), - pattern: Cow::Borrowed("m.room.message"), + pattern: Cow::Borrowed("m.room.encrypted"), })), Condition::Known(KnownCondition::RoomMemberCount { is: Some(Cow::Borrowed("2")), @@ -320,12 +320,12 @@ pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[ default_enabled: true, }, PushRule { - rule_id: Cow::Borrowed("global/underride/.m.rule.encrypted_room_one_to_one"), + rule_id: Cow::Borrowed("global/underride/.m.rule.room_one_to_one"), priority_class: 1, conditions: Cow::Borrowed(&[ Condition::Known(KnownCondition::EventMatch(EventMatchCondition { key: Cow::Borrowed("type"), - pattern: Cow::Borrowed("m.room.encrypted"), + pattern: Cow::Borrowed("m.room.message"), })), Condition::Known(KnownCondition::RoomMemberCount { is: Some(Cow::Borrowed("2")), diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index a533cad5ae79..15507372a4cc 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -60,7 +60,7 @@ ) from synapse.notifier import ReplicationNotifier from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn -from synapse.storage.databases.main import FilteringWorkerStore, PushRuleStore +from synapse.storage.databases.main import FilteringWorkerStore from synapse.storage.databases.main.account_data import AccountDataWorkerStore from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore @@ -77,10 +77,8 @@ ) from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore from synapse.storage.databases.main.profile import ProfileWorkerStore -from synapse.storage.databases.main.pusher import ( - PusherBackgroundUpdatesStore, - PusherWorkerStore, -) +from synapse.storage.databases.main.push_rule import PusherWorkerStore +from synapse.storage.databases.main.pusher import PusherBackgroundUpdatesStore from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore from synapse.storage.databases.main.registration import ( RegistrationBackgroundUpdateStore, @@ -245,7 +243,6 @@ class Store( AccountDataWorkerStore, FilteringWorkerStore, ProfileWorkerStore, - PushRuleStore, PusherWorkerStore, PusherBackgroundUpdatesStore, PresenceBackgroundUpdateStore, diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 0bd3befdc2ec..fcc78d2d81d4 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -393,11 +393,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: # MSC3967: Do not require UIA when first uploading cross signing keys self.msc3967_enabled = experimental.get("msc3967_enabled", False) - # MSC3981: Recurse relations - self.msc3981_recurse_relations = experimental.get( - "msc3981_recurse_relations", False - ) - # MSC3861: Matrix architecture change to delegate authentication via OIDC try: self.msc3861 = MSC3861(**experimental.get("msc3861", {})) @@ -409,11 +404,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: # Check that none of the other config options conflict with MSC3861 when enabled self.msc3861.check_config_conflicts(self.root) - # MSC4010: Do not allow setting m.push_rules account data. - self.msc4010_push_rules_account_data = experimental.get( - "msc4010_push_rules_account_data", False - ) - self.msc4028_push_encrypted_events = experimental.get( "msc4028_push_encrypted_events", False ) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index e9c67807e5df..7ecf349e4ad7 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -156,6 +156,8 @@ class WriterLocations: can only be a single instance. presence: The instances that write to the presence stream. Currently can only be a single instance. + push_rules: The instances that write to the push stream. Currently + can only be a single instance. """ events: List[str] = attr.ib( @@ -182,6 +184,10 @@ class WriterLocations: default=["master"], converter=_instance_to_list_converter, ) + push_rules: List[str] = attr.ib( + default=["master"], + converter=_instance_to_list_converter, + ) @attr.s(auto_attribs=True) @@ -341,6 +347,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: "account_data", "receipts", "presence", + "push_rules", ): instances = _instance_to_list_converter(getattr(self.writers, stream)) for instance in instances: @@ -378,6 +385,11 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: "Must only specify one instance to handle `presence` messages." ) + if len(self.writers.push_rules) != 1: + raise ConfigError( + "Must only specify one instance to handle `push` messages." + ) + self.events_shard_config = RoutableShardedWorkerHandlingConfig( self.writers.events ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 8b5ffb135e30..5e81a51638a8 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -956,6 +956,7 @@ async def create_room( room_alias=room_alias, power_level_content_override=power_level_content_override, creator_join_profile=creator_join_profile, + ignore_forced_encryption=ignore_forced_encryption, ) # we avoid dropping the lock between invites, as otherwise joins can diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 9e9f6cd06258..601d37341b6d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -51,6 +51,7 @@ from synapse.logging import opentracing from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.http.push import ReplicationCopyPusherRestServlet from synapse.storage.databases.main.state_deltas import StateDelta from synapse.types import ( JsonDict, @@ -181,6 +182,12 @@ def __init__(self, hs: "HomeServer"): hs.config.server.forgotten_room_retention_period ) + self._is_push_writer = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) + self._push_writer = hs.config.worker.writers.push_rules[0] + self._copy_push_client = ReplicationCopyPusherRestServlet.make_client(hs) + def _on_user_joined_room(self, event_id: str, room_id: str) -> None: """Notify the rate limiter that a room join has occurred. @@ -1301,9 +1308,17 @@ async def copy_user_state_on_room_upgrade( old_room_id, new_room_id, user_id ) # Copy over push rules - await self.store.copy_push_rules_from_room_to_room_for_user( - old_room_id, new_room_id, user_id - ) + if self._is_push_writer: + await self.store.copy_push_rules_from_room_to_room_for_user( + old_room_id, new_room_id, user_id + ) + else: + await self._copy_push_client( + instance_name=self._push_writer, + user_id=user_id, + old_room_id=old_room_id, + new_room_id=new_room_id, + ) except Exception: logger.exception( "Error copying tags and/or push rules from rooms %s to %s for user %s. " diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3aa2e2b7ba51..a6d54ee4b81e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -953,7 +953,7 @@ async def compute_state_delta( batch: TimelineBatch, sync_config: SyncConfig, since_token: Optional[StreamToken], - now_token: StreamToken, + end_token: StreamToken, full_state: bool, ) -> MutableStateMap[EventBase]: """Works out the difference in state between the end of the previous sync and @@ -964,7 +964,9 @@ async def compute_state_delta( batch: The timeline batch for the room that will be sent to the user. sync_config: since_token: Token of the end of the previous batch. May be `None`. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. full_state: Whether to force returning the full state. `lazy_load_members` still applies when `full_state` is `True`. @@ -1044,7 +1046,7 @@ async def compute_state_delta( room_id, sync_config.user, batch, - now_token, + end_token, members_to_fetch, timeline_state, ) @@ -1058,7 +1060,7 @@ async def compute_state_delta( room_id, batch, since_token, - now_token, + end_token, members_to_fetch, timeline_state, ) @@ -1130,7 +1132,7 @@ async def _compute_state_delta_for_full_sync( room_id: str, syncing_user: UserID, batch: TimelineBatch, - now_token: StreamToken, + end_token: StreamToken, members_to_fetch: Optional[Set[str]], timeline_state: StateMap[str], ) -> StateMap[str]: @@ -1143,7 +1145,9 @@ async def _compute_state_delta_for_full_sync( room_id: The room we are calculating for. syncing_user: The user that is calling `/sync`. batch: The timeline batch for the room that will be sent to the user. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. members_to_fetch: If lazy-loading is enabled, the memberships needed for events in the timeline. timeline_state: The contribution to the room state from state events in @@ -1183,15 +1187,16 @@ async def _compute_state_delta_for_full_sync( await_full_state = True lazy_load_members = False - if batch: - state_at_timeline_end = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + if batch: + # Strictly speaking, this returns the state *after* the first event in the + # timeline, but that is good enough here. state_at_timeline_start = ( await self._state_storage_controller.get_state_ids_for_event( batch.events[0].event_id, @@ -1200,13 +1205,6 @@ async def _compute_state_delta_for_full_sync( ) ) else: - state_at_timeline_end = await self.get_state_at( - room_id, - stream_position=now_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) - state_at_timeline_start = state_at_timeline_end state_ids = _calculate_state( @@ -1223,7 +1221,7 @@ async def _compute_state_delta_for_incremental_sync( room_id: str, batch: TimelineBatch, since_token: StreamToken, - now_token: StreamToken, + end_token: StreamToken, members_to_fetch: Optional[Set[str]], timeline_state: StateMap[str], ) -> StateMap[str]: @@ -1239,7 +1237,9 @@ async def _compute_state_delta_for_incremental_sync( room_id: The room we are calculating for. batch: The timeline batch for the room that will be sent to the user. since_token: Token of the end of the previous batch. - now_token: Token of the end of the current batch. + end_token: Token of the end of the current batch. Normally this will be + the same as the global "now_token", but if the user has left the room, + the point just after their leave event. members_to_fetch: If lazy-loading is enabled, the memberships needed for events in the timeline. Otherwise, `None`. timeline_state: The contribution to the room state from state events in @@ -1259,25 +1259,70 @@ async def _compute_state_delta_for_incremental_sync( await_full_state = True lazy_load_members = False - if batch.limited: - if batch: - state_at_timeline_start = ( - await self._state_storage_controller.get_state_ids_for_event( + # For a non-gappy sync if the events in the timeline are simply a linear + # chain (i.e. no merging/branching of the graph), then we know the state + # delta between the end of the previous sync and start of the new one is + # empty. + # + # c.f. #16941 for an example of why we can't do this for all non-gappy + # syncs. + is_linear_timeline = True + if batch.events: + # We need to make sure the first event in our batch points to the + # last event in the previous batch. + last_event_id_prev_batch = ( + await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=since_token.room_key, + ) + ) + + prev_event_id = last_event_id_prev_batch + for e in batch.events: + if e.prev_event_ids() != [prev_event_id]: + is_linear_timeline = False + break + prev_event_id = e.event_id + + if is_linear_timeline and not batch.limited: + state_ids: StateMap[str] = {} + if lazy_load_members: + if members_to_fetch and batch.events: + # We're lazy-loading, so the client might need some more + # member events to understand the events in this timeline. + # So we fish out all the member events corresponding to the + # timeline here. The caller will then dedupe any redundant + # ones. + + state_ids = await self._state_storage_controller.get_state_ids_for_event( batch.events[0].event_id, - state_filter=state_filter, - await_full_state=await_full_state, + # we only want members! + state_filter=StateFilter.from_types( + (EventTypes.Member, member) for member in members_to_fetch + ), + await_full_state=False, ) - ) - else: - # We can get here if the user has ignored the senders of all - # the recent events. - state_at_timeline_start = await self.get_state_at( - room_id, - stream_position=now_token, + return state_ids + + if batch: + state_at_timeline_start = ( + await self._state_storage_controller.get_state_ids_for_event( + batch.events[0].event_id, state_filter=state_filter, await_full_state=await_full_state, ) + ) + else: + # We can get here if the user has ignored the senders of all + # the recent events. + state_at_timeline_start = await self.get_state_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + if batch.limited: # for now, we disable LL for gappy syncs - see # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346 # N.B. this slows down incr syncs as we are now processing way @@ -1292,58 +1337,28 @@ async def _compute_state_delta_for_incremental_sync( # about them). state_filter = StateFilter.all() - state_at_previous_sync = await self.get_state_at( - room_id, - stream_position=since_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) + state_at_previous_sync = await self.get_state_at( + room_id, + stream_position=since_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) - if batch: - state_at_timeline_end = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) - else: - # We can get here if the user has ignored the senders of all - # the recent events. - state_at_timeline_end = await self.get_state_at( - room_id, - stream_position=now_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=end_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) - state_ids = _calculate_state( - timeline_contains=timeline_state, - timeline_start=state_at_timeline_start, - timeline_end=state_at_timeline_end, - previous_timeline_end=state_at_previous_sync, - lazy_load_members=lazy_load_members, - ) - else: - state_ids = {} - if lazy_load_members: - if members_to_fetch and batch.events: - # We're returning an incremental sync, with no - # "gap" since the previous sync, so normally there would be - # no state to return. - # But we're lazy-loading, so the client might need some more - # member events to understand the events in this timeline. - # So we fish out all the member events corresponding to the - # timeline here. The caller will then dedupe any redundant ones. + state_ids = _calculate_state( + timeline_contains=timeline_state, + timeline_start=state_at_timeline_start, + timeline_end=state_at_timeline_end, + previous_timeline_end=state_at_previous_sync, + lazy_load_members=lazy_load_members, + ) - state_ids = await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, - # we only want members! - state_filter=StateFilter.from_types( - (EventTypes.Member, member) for member in members_to_fetch - ), - await_full_state=False, - ) return state_ids async def _find_missing_partial_state_memberships( @@ -2344,6 +2359,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=since_token, upto_token=leave_token, + end_token=leave_token, out_of_band=leave_event.internal_metadata.is_out_of_band_membership(), ) ) @@ -2381,6 +2397,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=None if newly_joined else since_token, upto_token=prev_batch_token, + end_token=now_token, ) else: entry = RoomSyncResultBuilder( @@ -2391,6 +2408,7 @@ async def _get_room_changes_for_incremental_sync( full_state=False, since_token=since_token, upto_token=since_token, + end_token=now_token, ) room_entries.append(entry) @@ -2449,6 +2467,7 @@ async def _get_room_changes_for_initial_sync( full_state=True, since_token=since_token, upto_token=now_token, + end_token=now_token, ) ) elif event.membership == Membership.INVITE: @@ -2478,6 +2497,7 @@ async def _get_room_changes_for_initial_sync( full_state=True, since_token=since_token, upto_token=leave_token, + end_token=leave_token, ) ) @@ -2548,6 +2568,7 @@ async def _generate_room_entry( { "since_token": since_token, "upto_token": upto_token, + "end_token": room_builder.end_token, } ) @@ -2621,7 +2642,7 @@ async def _generate_room_entry( batch, sync_config, since_token, - now_token, + room_builder.end_token, full_state=full_state, ) else: @@ -2781,6 +2802,61 @@ def _calculate_state( e for t, e in timeline_start.items() if t[0] == EventTypes.Member ) + # Naively, we would just return the difference between the state at the start + # of the timeline (`timeline_start_ids`) and that at the end of the previous sync + # (`previous_timeline_end_ids`). However, that fails in the presence of forks in + # the DAG. + # + # For example, consider a DAG such as the following: + # + # E1 + # ↗ ↖ + # | S2 + # | ↑ + # --|------|---- + # | | + # E3 | + # ↖ / + # E4 + # + # ... and a filter that means we only return 2 events, represented by the dashed + # horizontal line. Assuming S2 was *not* included in the previous sync, we need to + # include it in the `state` section. + # + # Note that the state at the start of the timeline (E3) does not include S2. So, + # to make sure it gets included in the calculation here, we actually look at + # the state at the *end* of the timeline, and subtract any events that are present + # in the timeline. + # + # ---------- + # + # Aside 1: You may then wonder if we need to include `timeline_start` in the + # calculation. Consider a linear DAG: + # + # E1 + # ↑ + # S2 + # ↑ + # ----|------ + # | + # E3 + # ↑ + # S4 + # ↑ + # E5 + # + # ... where S2 and S4 change the same piece of state; and where we have a filter + # that returns 3 events (E3, S4, E5). We still need to tell the client about S2, + # because it might affect the display of E3. However, the state at the end of the + # timeline only tells us about S4; if we don't inspect `timeline_start` we won't + # find out about S2. + # + # (There are yet more complicated cases in which a state event is excluded from the + # timeline, but whose effect actually lands in the DAG in the *middle* of the + # timeline. We have no way to represent that in the /sync response, and we don't + # even try; it is ether omitted or plonked into `state` as if it were at the start + # of the timeline, depending on what else is in the timeline.) + state_ids = ( (timeline_end_ids | timeline_start_ids) - previous_timeline_end_ids @@ -2883,13 +2959,30 @@ class RoomSyncResultBuilder: Attributes: room_id + rtype: One of `"joined"` or `"archived"` + events: List of events to include in the room (more events may be added when generating result). + newly_joined: If the user has newly joined the room + full_state: Whether the full state should be sent in result + since_token: Earliest point to return events from, or None - upto_token: Latest point to return events from. + + upto_token: Latest point to return events from. If `events` is populated, + this is set to the token at the start of `events` + + end_token: The last point in the timeline that the client should see events + from. Normally this will be the same as the global `now_token`, but in + the case of rooms where the user has left the room, this will be the point + just after their leave event. + + This is used in the calculation of the state which is returned in `state`: + any state changes *up to* `end_token` (and not beyond!) which are not + reflected in the timeline need to be returned in `state`. + out_of_band: whether the events in the room are "out of band" events and the server isn't in the room. """ @@ -2901,5 +2994,5 @@ class RoomSyncResultBuilder: full_state: bool since_token: Optional[StreamToken] upto_token: StreamToken - + end_token: StreamToken out_of_band: bool = False diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py index 8e5641707a2b..de07e75b469a 100644 --- a/synapse/replication/http/push.py +++ b/synapse/replication/http/push.py @@ -77,5 +77,46 @@ async def _handle_request( # type: ignore[override] return 200, {} +class ReplicationCopyPusherRestServlet(ReplicationEndpoint): + """Copies push rules from an old room to new room. + + Request format: + + POST /_synapse/replication/copy_push_rules/:user_id/:old_room_id/:new_room_id + + {} + + """ + + NAME = "copy_push_rules" + PATH_ARGS = ("user_id", "old_room_id", "new_room_id") + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._store = hs.get_datastores().main + + @staticmethod + async def _serialize_payload(user_id: str, old_room_id: str, new_room_id: str) -> JsonDict: # type: ignore[override] + return {} + + async def _handle_request( # type: ignore[override] + self, + request: Request, + content: JsonDict, + user_id: str, + old_room_id: str, + new_room_id: str, + ) -> Tuple[int, JsonDict]: + + await self._store.copy_push_rules_from_room_to_room_for_user( + old_room_id, new_room_id, user_id + ) + + return 200, {} + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ReplicationRemovePusherRestServlet(hs).register(http_server) + ReplicationCopyPusherRestServlet(hs).register(http_server) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index ecc12c0b2828..72a42cb6cc4c 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -66,6 +66,7 @@ FederationStream, PresenceFederationStream, PresenceStream, + PushRulesStream, ReceiptsStream, Stream, ToDeviceStream, @@ -178,6 +179,12 @@ def __init__(self, hs: "HomeServer"): continue + if isinstance(stream, PushRulesStream): + if hs.get_instance_name() in hs.config.worker.writers.push_rules: + self._streams_to_replicate.append(stream) + + continue + # Only add any other streams if we're on master. if hs.config.worker.worker_app is not None: continue diff --git a/synapse/rest/client/account_data.py b/synapse/rest/client/account_data.py index 12ffca984f82..0ee24081fac6 100644 --- a/synapse/rest/client/account_data.py +++ b/synapse/rest/client/account_data.py @@ -81,8 +81,7 @@ async def on_PUT( raise AuthError(403, "Cannot add account data for other users.") # Raise an error if the account data type cannot be set directly. - if self._hs.config.experimental.msc4010_push_rules_account_data: - _check_can_set_account_data_type(account_data_type) + _check_can_set_account_data_type(account_data_type) body = parse_json_object_from_request(request) @@ -108,10 +107,7 @@ async def on_GET( raise AuthError(403, "Cannot get account data for other users.") # Push rules are stored in a separate table and must be queried separately. - if ( - self._hs.config.experimental.msc4010_push_rules_account_data - and account_data_type == AccountDataTypes.PUSH_RULES - ): + if account_data_type == AccountDataTypes.PUSH_RULES: account_data: Optional[JsonMapping] = ( await self._push_rules_handler.push_rules_for_user(requester.user) ) @@ -162,8 +158,7 @@ async def on_DELETE( raise AuthError(403, "Cannot delete account data for other users.") # Raise an error if the account data type cannot be set directly. - if self._hs.config.experimental.msc4010_push_rules_account_data: - _check_can_set_account_data_type(account_data_type) + _check_can_set_account_data_type(account_data_type) await self.handler.remove_account_data_for_user(user_id, account_data_type) @@ -209,15 +204,7 @@ async def on_PUT( ) # Raise an error if the account data type cannot be set directly. - if self._hs.config.experimental.msc4010_push_rules_account_data: - _check_can_set_account_data_type(account_data_type) - elif account_data_type == ReceiptTypes.FULLY_READ: - raise SynapseError( - 405, - "Cannot set m.fully_read through this API." - " Use /rooms/!roomId:server.name/read_markers", - Codes.BAD_JSON, - ) + _check_can_set_account_data_type(account_data_type) body = parse_json_object_from_request(request) @@ -256,10 +243,7 @@ async def on_GET( ) # Room-specific push rules are not currently supported. - if ( - self._hs.config.experimental.msc4010_push_rules_account_data - and account_data_type == AccountDataTypes.PUSH_RULES - ): + if account_data_type == AccountDataTypes.PUSH_RULES: account_data: Optional[JsonMapping] = {} else: account_data = await self.store.get_account_data_for_room_and_type( @@ -317,8 +301,7 @@ async def on_DELETE( ) # Raise an error if the account data type cannot be set directly. - if self._hs.config.experimental.msc4010_push_rules_account_data: - _check_can_set_account_data_type(account_data_type) + _check_can_set_account_data_type(account_data_type) await self.handler.remove_account_data_for_room( user_id, room_id, account_data_type diff --git a/synapse/rest/client/push_rule.py b/synapse/rest/client/push_rule.py index 7d58611abbf4..af042504c9b9 100644 --- a/synapse/rest/client/push_rule.py +++ b/synapse/rest/client/push_rule.py @@ -59,12 +59,14 @@ def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() self.store = hs.get_datastores().main self.notifier = hs.get_notifier() - self._is_worker = hs.config.worker.worker_app is not None + self._is_push_worker = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) self._push_rules_handler = hs.get_push_rules_handler() self._push_rule_linearizer = Linearizer(name="push_rules") async def on_PUT(self, request: SynapseRequest, path: str) -> Tuple[int, JsonDict]: - if self._is_worker: + if not self._is_push_worker: raise Exception("Cannot handle PUT /push_rules on worker") requester = await self.auth.get_user_by_req(request) @@ -137,7 +139,7 @@ async def handle_put( async def on_DELETE( self, request: SynapseRequest, path: str ) -> Tuple[int, JsonDict]: - if self._is_worker: + if not self._is_push_worker: raise Exception("Cannot handle DELETE /push_rules on worker") requester = await self.auth.get_user_by_req(request) diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 42da017f3752..49943cf0c34f 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -55,7 +55,6 @@ def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() self._store = hs.get_datastores().main self._relations_handler = hs.get_relations_handler() - self._support_recurse = hs.config.experimental.msc3981_recurse_relations async def on_GET( self, @@ -70,12 +69,9 @@ async def on_GET( pagination_config = await PaginationConfig.from_request( self._store, request, default_limit=5, default_dir=Direction.BACKWARDS ) - if self._support_recurse: - recurse = parse_boolean(request, "recurse", default=False) or parse_boolean( - request, "org.matrix.msc3981.recurse", default=False - ) - else: - recurse = False + recurse = parse_boolean(request, "recurse", default=False) or parse_boolean( + request, "org.matrix.msc3981.recurse", default=False + ) # The unstable version of this API returns an extra field for client # compatibility, see https://github.com/matrix-org/synapse/issues/12930. diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 32db274f32bd..c46d4fe8cf3a 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -132,7 +132,8 @@ def on_GET(self, request: Request) -> Tuple[int, JsonDict]: # Adds support for relation-based redactions as per MSC3912. "org.matrix.msc3912": self.config.experimental.msc3912_enabled, # Whether recursively provide relations is supported. - "org.matrix.msc3981": self.config.experimental.msc3981_recurse_relations, + # TODO This is no longer needed once unstable MSC3981 does not need to be supported. + "org.matrix.msc3981": True, # Adds support for deleting account data. "org.matrix.msc3391": self.config.experimental.msc3391_enabled, # Allows clients to inhibit profile update propagation. diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index bf779587d953..586e84f2a4db 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -63,7 +63,7 @@ from .presence import PresenceStore from .profile import ProfileStore from .purge_events import PurgeEventsStore -from .push_rule import PushRuleStore +from .push_rule import PushRulesWorkerStore from .pusher import PusherStore from .receipts import ReceiptsStore from .registration import RegistrationStore @@ -130,7 +130,6 @@ class DataStore( RejectionsStore, FilteringWorkerStore, PusherStore, - PushRuleStore, ApplicationServiceTransactionStore, EventPushActionsStore, ServerMetricsStore, @@ -140,6 +139,7 @@ class DataStore( SearchStore, TagsStore, AccountDataStore, + PushRulesWorkerStore, StreamWorkerStore, OpenIdStore, ClientIpWorkerStore, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 846c3f363a59..fb132ef09090 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -27,6 +27,7 @@ Collection, Dict, FrozenSet, + Generator, Iterable, List, Optional, @@ -279,64 +280,16 @@ def _get_auth_chain_ids_using_cover_index_txn( # Now we look up all links for the chains we have, adding chains that # are reachable from any event. - # - # This query is structured to first get all chain IDs reachable, and - # then pull out all links from those chains. This does pull out more - # rows than is strictly necessary, however there isn't a way of - # structuring the recursive part of query to pull out the links without - # also returning large quantities of redundant data (which can make it a - # lot slower). - sql = """ - WITH RECURSIVE links(chain_id) AS ( - SELECT - DISTINCT origin_chain_id - FROM event_auth_chain_links WHERE %s - UNION - SELECT - target_chain_id - FROM event_auth_chain_links - INNER JOIN links ON (chain_id = origin_chain_id) - ) - SELECT - origin_chain_id, origin_sequence_number, - target_chain_id, target_sequence_number - FROM links - INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id) - """ # A map from chain ID to max sequence number *reachable* from any event ID. chains: Dict[int, int] = {} - - # Add all linked chains reachable from initial set of chains. - chains_to_fetch = set(event_chains.keys()) - while chains_to_fetch: - batch2 = tuple(itertools.islice(chains_to_fetch, 1000)) - chains_to_fetch.difference_update(batch2) - clause, args = make_in_list_sql_clause( - txn.database_engine, "origin_chain_id", batch2 - ) - txn.execute(sql % (clause,), args) - - links: Dict[int, List[Tuple[int, int, int]]] = {} - - for ( - origin_chain_id, - origin_sequence_number, - target_chain_id, - target_sequence_number, - ) in txn: - links.setdefault(origin_chain_id, []).append( - (origin_sequence_number, target_chain_id, target_sequence_number) - ) - + for links in self._get_chain_links(txn, set(event_chains.keys())): for chain_id in links: if chain_id not in event_chains: continue _materialize(chain_id, event_chains[chain_id], links, chains) - chains_to_fetch.difference_update(chains) - # Add the initial set of chains, excluding the sequence corresponding to # initial event. for chain_id, seq_no in event_chains.items(): @@ -380,6 +333,68 @@ def _get_auth_chain_ids_using_cover_index_txn( return results + @classmethod + def _get_chain_links( + cls, txn: LoggingTransaction, chains_to_fetch: Set[int] + ) -> Generator[Dict[int, List[Tuple[int, int, int]]], None, None]: + """Fetch all auth chain links from the given set of chains, and all + links from those chains, recursively. + + Note: This may return links that are not reachable from the given + chains. + + Returns a generator that produces dicts from origin chain ID to 3-tuple + of origin sequence number, target chain ID and target sequence number. + """ + + # This query is structured to first get all chain IDs reachable, and + # then pull out all links from those chains. This does pull out more + # rows than is strictly necessary, however there isn't a way of + # structuring the recursive part of query to pull out the links without + # also returning large quantities of redundant data (which can make it a + # lot slower). + sql = """ + WITH RECURSIVE links(chain_id) AS ( + SELECT + DISTINCT origin_chain_id + FROM event_auth_chain_links WHERE %s + UNION + SELECT + target_chain_id + FROM event_auth_chain_links + INNER JOIN links ON (chain_id = origin_chain_id) + ) + SELECT + origin_chain_id, origin_sequence_number, + target_chain_id, target_sequence_number + FROM links + INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id) + """ + + while chains_to_fetch: + batch2 = tuple(itertools.islice(chains_to_fetch, 1000)) + chains_to_fetch.difference_update(batch2) + clause, args = make_in_list_sql_clause( + txn.database_engine, "origin_chain_id", batch2 + ) + txn.execute(sql % (clause,), args) + + links: Dict[int, List[Tuple[int, int, int]]] = {} + + for ( + origin_chain_id, + origin_sequence_number, + target_chain_id, + target_sequence_number, + ) in txn: + links.setdefault(origin_chain_id, []).append( + (origin_sequence_number, target_chain_id, target_sequence_number) + ) + + chains_to_fetch.difference_update(links) + + yield links + def _get_auth_chain_ids_txn( self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool ) -> Set[str]: @@ -564,53 +579,9 @@ def fetch_chain_info(events_to_fetch: Collection[str]) -> None: # Now we look up all links for the chains we have, adding chains that # are reachable from any event. - # - # This query is structured to first get all chain IDs reachable, and - # then pull out all links from those chains. This does pull out more - # rows than is strictly necessary, however there isn't a way of - # structuring the recursive part of query to pull out the links without - # also returning large quantities of redundant data (which can make it a - # lot slower). - sql = """ - WITH RECURSIVE links(chain_id) AS ( - SELECT - DISTINCT origin_chain_id - FROM event_auth_chain_links WHERE %s - UNION - SELECT - target_chain_id - FROM event_auth_chain_links - INNER JOIN links ON (chain_id = origin_chain_id) - ) - SELECT - origin_chain_id, origin_sequence_number, - target_chain_id, target_sequence_number - FROM links - INNER JOIN event_auth_chain_links ON (chain_id = origin_chain_id) - """ - - # (We need to take a copy of `seen_chains` as we want to mutate it in - # the loop) - chains_to_fetch = set(seen_chains) - while chains_to_fetch: - batch2 = tuple(itertools.islice(chains_to_fetch, 1000)) - clause, args = make_in_list_sql_clause( - txn.database_engine, "origin_chain_id", batch2 - ) - txn.execute(sql % (clause,), args) - - links: Dict[int, List[Tuple[int, int, int]]] = {} - - for ( - origin_chain_id, - origin_sequence_number, - target_chain_id, - target_sequence_number, - ) in txn: - links.setdefault(origin_chain_id, []).append( - (origin_sequence_number, target_chain_id, target_sequence_number) - ) + # (We need to take a copy of `seen_chains` as the function mutates it) + for links in self._get_chain_links(txn, set(seen_chains)): for chains in set_to_chain: for chain_id in links: if chain_id not in chains: @@ -618,7 +589,6 @@ def fetch_chain_info(events_to_fetch: Collection[str]) -> None: _materialize(chain_id, chains[chain_id], links, chains) - chains_to_fetch.difference_update(chains) seen_chains.update(chains) # Now for each chain we figure out the maximum sequence number reachable diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3a5666cd9b0f..40bf000e9cb9 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -106,7 +106,7 @@ ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -859,37 +859,86 @@ def f(txn: LoggingTransaction) -> List[str]: return await self.db_pool.runInteraction("get_push_action_users_in_range", f) - def _get_receipts_by_room_txn( - self, txn: LoggingTransaction, user_id: str + def _get_receipts_for_room_and_threads_txn( + self, + txn: LoggingTransaction, + user_id: str, + room_ids: StrCollection, + thread_ids: StrCollection, ) -> Dict[str, _RoomReceipt]: """ - Generate a map of room ID to the latest stream ordering that has been - read by the given user. + Get (private) read receipts for a user in each of the given room IDs + and thread IDs. - Args: - txn: - user_id: The user to fetch receipts for. + Note: The corresponding room ID for each thread must appear in + `room_ids` arg. Returns: A map including all rooms the user is in with a receipt. It maps room IDs to _RoomReceipt instances """ - receipt_types_clause, args = make_in_list_sql_clause( + + receipt_types_clause, receipts_args = make_in_list_sql_clause( self.database_engine, "receipt_type", (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) + thread_ids_clause, thread_ids_args = make_in_list_sql_clause( + self.database_engine, + "thread_id", + thread_ids, + ) + + room_ids_clause, room_ids_args = make_in_list_sql_clause( + self.database_engine, + "room_id", + room_ids, + ) + + # We use the union of two (almost identical) queries here, the first to + # fetch the specific thread receipts and the second to fetch the + # unthreaded receipts. + # + # This SQL is optimized to use the indices we have on + # `receipts_linearized`. + # + # We compare room ID and thread IDs independently due to the above, + # which means that this query might return more rows than we need if the + # same thread ID appears across different rooms (e.g. 'main' thread ID). + # This doesn't cause any logic issues, and isn't a performance concern + # given this function generally gets called with only one room and + # thread ID. sql = f""" SELECT room_id, thread_id, MAX(stream_ordering) FROM receipts_linearized INNER JOIN events USING (room_id, event_id) WHERE {receipt_types_clause} + AND {thread_ids_clause} + AND {room_ids_clause} + AND user_id = ? + GROUP BY room_id, thread_id + + UNION ALL + + SELECT room_id, thread_id, MAX(stream_ordering) + FROM receipts_linearized + INNER JOIN events USING (room_id, event_id) + WHERE {receipt_types_clause} + AND {room_ids_clause} + AND thread_id IS NULL AND user_id = ? GROUP BY room_id, thread_id """ - args.extend((user_id,)) + args = list(receipts_args) + args.extend(thread_ids_args) + args.extend(room_ids_args) + args.append(user_id) + args.extend(receipts_args) + args.extend(room_ids_args) + args.append(user_id) + txn.execute(sql, args) result: Dict[str, _RoomReceipt] = {} @@ -925,12 +974,6 @@ async def get_unread_push_actions_for_user_in_range_for_http( The list will have between 0~limit entries. """ - receipts_by_room = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http_receipts", - self._get_receipts_by_room_txn, - user_id=user_id, - ) - def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, str, int, str, bool]]: @@ -952,6 +995,27 @@ def get_push_actions_txn( "get_unread_push_actions_for_user_in_range_http", get_push_actions_txn ) + room_ids = set() + thread_ids = [] + for ( + _, + room_id, + thread_id, + _, + _, + _, + ) in push_actions: + room_ids.add(room_id) + thread_ids.append(thread_id) + + receipts_by_room = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_http_receipts", + self._get_receipts_for_room_and_threads_txn, + user_id=user_id, + room_ids=room_ids, + thread_ids=thread_ids, + ) + notifs = [ HttpPushAction( event_id=event_id, @@ -998,12 +1062,6 @@ async def get_unread_push_actions_for_user_in_range_for_email( The list will have between 0~limit entries. """ - receipts_by_room = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_receipts", - self._get_receipts_by_room_txn, - user_id=user_id, - ) - def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, str, int, str, bool, int]]: @@ -1026,6 +1084,28 @@ def get_push_actions_txn( "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn ) + room_ids = set() + thread_ids = [] + for ( + _, + room_id, + thread_id, + _, + _, + _, + _, + ) in push_actions: + room_ids.add(room_id) + thread_ids.append(thread_id) + + receipts_by_room = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_email_receipts", + self._get_receipts_for_room_and_threads_txn, + user_id=user_id, + room_ids=room_ids, + thread_ids=thread_ids, + ) + # Make a list of dicts from the two sets of results. notifs = [ EmailPushAction( diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 91beca6ffcf3..660c8345181d 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -53,11 +53,7 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException -from synapse.storage.util.id_generators import ( - AbstractStreamIdGenerator, - IdGenerator, - StreamIdGenerator, -) +from synapse.storage.util.id_generators import IdGenerator, StreamIdGenerator from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules from synapse.types import JsonDict from synapse.util import json_encoder, unwrapFirstError @@ -130,6 +126,8 @@ class PushRulesWorkerStore( `get_max_push_rules_stream_id` which can be called in the initializer. """ + _push_rules_stream_id_gen: StreamIdGenerator + def __init__( self, database: DatabasePool, @@ -138,6 +136,10 @@ def __init__( ): super().__init__(database, db_conn, hs) + self._is_push_writer = ( + hs.get_instance_name() in hs.config.worker.writers.push_rules + ) + # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. self._push_rules_stream_id_gen = StreamIdGenerator( @@ -145,7 +147,7 @@ def __init__( hs.get_replication_notifier(), "push_rules_stream", "stream_id", - is_writer=hs.config.worker.worker_app is None, + is_writer=self._is_push_writer, ) push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict( @@ -162,6 +164,9 @@ def __init__( prefilled_cache=push_rules_prefill, ) + self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") + self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") + def get_max_push_rules_stream_id(self) -> int: """Get the position of the push rules stream. @@ -383,23 +388,6 @@ def get_all_push_rule_updates_txn( "get_all_push_rule_updates", get_all_push_rule_updates_txn ) - -class PushRuleStore(PushRulesWorkerStore): - # Because we have write access, this will be a StreamIdGenerator - # (see PushRulesWorkerStore.__init__) - _push_rules_stream_id_gen: AbstractStreamIdGenerator - - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - super().__init__(database, db_conn, hs) - - self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") - self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") - async def add_push_rule( self, user_id: str, @@ -410,6 +398,9 @@ async def add_push_rule( before: Optional[str] = None, after: Optional[str] = None, ) -> None: + if not self._is_push_writer: + raise Exception("Not a push writer") + conditions_json = json_encoder.encode(conditions) actions_json = json_encoder.encode(actions) async with self._push_rules_stream_id_gen.get_next() as stream_id: @@ -455,6 +446,9 @@ def _add_push_rule_relative_txn( before: str, after: str, ) -> None: + if not self._is_push_writer: + raise Exception("Not a push writer") + relative_to_rule = before or after sql = """ @@ -524,6 +518,9 @@ def _add_push_rule_highest_priority_txn( conditions_json: str, actions_json: str, ) -> None: + if not self._is_push_writer: + raise Exception("Not a push writer") + if isinstance(self.database_engine, PostgresEngine): # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first # then re-select the count/max below. @@ -575,6 +572,9 @@ def _upsert_push_rule_txn( actions_json: str, update_stream: bool = True, ) -> None: + if not self._is_push_writer: + raise Exception("Not a push writer") + """Specialised version of simple_upsert_txn that picks a push_rule_id using the _push_rule_id_gen if it needs to insert the rule. It assumes that the "push_rules" table is locked""" @@ -653,6 +653,8 @@ async def delete_push_rule(self, user_id: str, rule_id: str) -> None: user_id: The matrix ID of the push rule owner rule_id: The rule_id of the rule to be deleted """ + if not self._is_push_writer: + raise Exception("Not a push writer") def delete_push_rule_txn( txn: LoggingTransaction, @@ -704,6 +706,9 @@ async def set_push_rule_enabled( Raises: RuleNotFoundException if the rule does not exist. """ + if not self._is_push_writer: + raise Exception("Not a push writer") + async with self._push_rules_stream_id_gen.get_next() as stream_id: event_stream_ordering = self._stream_id_gen.get_current_token() await self.db_pool.runInteraction( @@ -727,6 +732,9 @@ def _set_push_rule_enabled_txn( enabled: bool, is_default_rule: bool, ) -> None: + if not self._is_push_writer: + raise Exception("Not a push writer") + new_id = self._push_rules_enable_id_gen.get_next() if not is_default_rule: @@ -796,6 +804,9 @@ async def set_push_rule_actions( Raises: RuleNotFoundException if the rule does not exist. """ + if not self._is_push_writer: + raise Exception("Not a push writer") + actions_json = json_encoder.encode(actions) def set_push_rule_actions_txn( @@ -865,6 +876,9 @@ def _insert_push_rules_update_txn( op: str, data: Optional[JsonDict] = None, ) -> None: + if not self._is_push_writer: + raise Exception("Not a push writer") + values = { "stream_id": stream_id, "event_stream_ordering": event_stream_ordering, @@ -882,9 +896,6 @@ def _insert_push_rules_update_txn( self.push_rules_stream_cache.entity_has_changed, user_id, stream_id ) - def get_max_push_rules_stream_id(self) -> int: - return self._push_rules_stream_id_gen.get_current_token() - async def copy_push_rule_from_room_to_room( self, new_room_id: str, user_id: str, rule: PushRule ) -> None: @@ -895,6 +906,9 @@ async def copy_push_rule_from_room_to_room( user_id : ID of user the push rule belongs to. rule: A push rule. """ + if not self._is_push_writer: + raise Exception("Not a push writer") + # Create new rule id rule_id_scope = "/".join(rule.rule_id.split("/")[:-1]) new_rule_id = rule_id_scope + "/" + new_room_id @@ -930,6 +944,9 @@ async def copy_push_rules_from_room_to_room_for_user( new_room_id: ID of the new room. user_id: ID of user to copy push rules for. """ + if not self._is_push_writer: + raise Exception("Not a push writer") + # Retrieve push rules for this user user_push_rules = await self.get_push_rules_for_user(user_id) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index d939ade42716..29bf47befc99 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -2108,6 +2108,13 @@ def __init__( unique=False, ) + self.db_pool.updates.register_background_index_update( + update_name="access_tokens_refresh_token_id_idx", + index_name="access_tokens_refresh_token_id_idx", + table="access_tokens", + columns=("refresh_token_id",), + ) + async def _background_update_set_deactivated_flag( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/schema/main/delta/84/04_access_token_index.sql b/synapse/storage/schema/main/delta/84/04_access_token_index.sql new file mode 100644 index 000000000000..d2fa945c8fa3 --- /dev/null +++ b/synapse/storage/schema/main/delta/84/04_access_token_index.sql @@ -0,0 +1,15 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2023 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8404, 'access_tokens_refresh_token_id_idx', '{}'); diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 1b36324b8fc5..2780d29cadaf 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -17,14 +17,16 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import Collection, List, Optional +from typing import Collection, ContextManager, List, Optional from unittest.mock import AsyncMock, Mock, patch +from parameterized import parameterized + from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes, JoinRules from synapse.api.errors import Codes, ResourceLimitError -from synapse.api.filtering import Filtering +from synapse.api.filtering import FilterCollection, Filtering from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.events import EventBase from synapse.events.snapshot import EventContext @@ -33,7 +35,7 @@ from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer -from synapse.types import UserID, create_requester +from synapse.types import JsonDict, UserID, create_requester from synapse.util import Clock import tests.unittest @@ -258,13 +260,7 @@ def test_ban_wins_race_with_join(self) -> None: # Eve tries to join the room. We monkey patch the internal logic which selects # the prev_events used when creating the join event, such that the ban does not # precede the join. - mocked_get_prev_events = patch.object( - self.hs.get_datastores().main, - "get_prev_events_for_room", - new_callable=AsyncMock, - return_value=[last_room_creation_event_id], - ) - with mocked_get_prev_events: + with self._patch_get_latest_events([last_room_creation_event_id]): self.helper.join(room_id, eve, tok=eve_token) # Eve makes a second, incremental sync. @@ -288,6 +284,460 @@ def test_ban_wins_race_with_join(self) -> None: ) self.assertEqual(eve_initial_sync_after_join.joined, []) + def test_state_includes_changes_on_forks(self) -> None: + """State changes that happen on a fork of the DAG must be included in `state` + + Given the following DAG: + + E1 + ↗ ↖ + | S2 + | ↑ + --|------|---- + | | + E3 | + ↖ / + E4 + + ... and a filter that means we only return 2 events, represented by the dashed + horizontal line: `S2` must be included in the `state` section. + """ + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync as Alice to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Send a final event, joining the two branches of the dag + e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"] + + # do an incremental sync, with a filter that will ensure we only get two of + # the three new events. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 2}}} + ), + ), + since_token=initial_sync_result.next_batch, + ) + ) + + # The state event should appear in the 'state' section of the response. + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertTrue(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event, e4_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + + def test_state_includes_changes_on_forks_when_events_excluded(self) -> None: + """A variation on the previous test, but where one event is filtered + + The DAG is the same as the previous test, but E4 is excluded by the filter. + + E1 + ↗ ↖ + | S2 + | ↑ + --|------|---- + | | + E3 | + ↖ / + (E4) + + """ + + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync as Alice to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Send a final event, joining the two branches of the dag + self.helper.send(room_id, "e4", type="not_a_normal_message", tok=alice_tok)[ + "event_id" + ] + + # do an incremental sync, with a filter that will only return E3, excluding S2 + # and E4. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, + { + "room": { + "timeline": { + "limit": 1, + "not_types": ["not_a_normal_message"], + } + } + }, + ), + ), + since_token=initial_sync_result.next_batch, + ) + ) + + # The state event should appear in the 'state' section of the response. + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertTrue(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + + def test_state_includes_changes_on_long_lived_forks(self) -> None: + """State changes that happen on a fork of the DAG must be included in `state` + + Given the following DAG: + + E1 + ↗ ↖ + | S2 + | ↑ + --|------|---- + E3 | + --|------|---- + | E4 + | | + + ... and a filter that means we only return 1 event, represented by the dashed + horizontal lines: `S2` must be included in the `state` section on the second sync. + """ + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync as Alice to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Do an incremental sync, this will return E3 but *not* S2 at this + # point. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 1}}} + ), + ), + since_token=initial_sync_result.next_batch, + ) + ) + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertTrue(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [], + ) + + # Now send another event that points to S2, but not E3. + with self._patch_get_latest_events([s2_event]): + e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"] + + # Doing an incremental sync should return S2 in state. + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 1}}} + ), + ), + since_token=incremental_sync.next_batch, + ) + ) + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertFalse(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e4_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + + def test_state_includes_changes_on_ungappy_syncs(self) -> None: + """Test `state` where the sync is not gappy. + + We start with a DAG like this: + + E1 + ↗ ↖ + | S2 + | + --|--- + | + E3 + + ... and initialsync with `limit=1`, represented by the horizontal dashed line. + At this point, we do not expect S2 to appear in the response at all (since + it is excluded from the timeline by the `limit`, and the state is based on the + state after the most recent event before the sync token (E3), which doesn't + include S2. + + Now more events arrive, and we do an incremental sync: + + E1 + ↗ ↖ + | S2 + | ↑ + E3 | + ↑ | + --|------|---- + | | + E4 | + ↖ / + E5 + + This is the last chance for us to tell the client about S2, so it *must* be + included in the response. + """ + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + alice_requester = create_requester(alice) + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + + # Do an initial sync to get a known starting point. + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, generate_sync_config(alice) + ) + ) + last_room_creation_event_id = ( + initial_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Send a state event, and a regular event, both using the same prev ID + with self._patch_get_latest_events([last_room_creation_event_id]): + s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[ + "event_id" + ] + e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"] + + # Another initial sync, with limit=1 + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config( + alice, + filter_collection=FilterCollection( + self.hs, {"room": {"timeline": {"limit": 1}}} + ), + ), + ) + ) + room_sync = initial_sync_result.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e3_event], + ) + self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()]) + + # More events, E4 and E5 + with self._patch_get_latest_events([e3_event]): + e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"] + e5_event = self.helper.send(room_id, "e5", tok=alice_tok)["event_id"] + + # Now incremental sync + incremental_sync = self.get_success( + self.sync_handler.wait_for_sync_for_user( + alice_requester, + generate_sync_config(alice), + since_token=initial_sync_result.next_batch, + ) + ) + + # The state event should appear in the 'state' section of the response. + room_sync = incremental_sync.joined[0] + self.assertEqual(room_sync.room_id, room_id) + self.assertFalse(room_sync.timeline.limited) + self.assertEqual( + [e.event_id for e in room_sync.timeline.events], + [e4_event, e5_event], + ) + self.assertEqual( + [e.event_id for e in room_sync.state.values()], + [s2_event], + ) + + @parameterized.expand( + [ + (False, False), + (True, False), + (False, True), + (True, True), + ] + ) + def test_archived_rooms_do_not_include_state_after_leave( + self, initial_sync: bool, empty_timeline: bool + ) -> None: + """If the user leaves the room, state changes that happen after they leave are not returned. + + We try with both a zero and a normal timeline limit, + and we try both an initial sync and an incremental sync for both. + """ + if empty_timeline and not initial_sync: + # FIXME synapse doesn't return the room at all in this situation! + self.skipTest("Synapse does not correctly handle this case") + + # Alice creates the room, and bob joins. + alice = self.register_user("alice", "password") + alice_tok = self.login(alice, "password") + + bob = self.register_user("bob", "password") + bob_tok = self.login(bob, "password") + bob_requester = create_requester(bob) + + room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok) + self.helper.join(room_id, bob, tok=bob_tok) + + initial_sync_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + bob_requester, generate_sync_config(bob) + ) + ) + + # Alice sends a message and a state + before_message_event = self.helper.send(room_id, "before", tok=alice_tok)[ + "event_id" + ] + before_state_event = self.helper.send_state( + room_id, "test_state", {"body": "before"}, tok=alice_tok + )["event_id"] + + # Bob leaves + leave_event = self.helper.leave(room_id, bob, tok=bob_tok)["event_id"] + + # Alice sends some more stuff + self.helper.send(room_id, "after", tok=alice_tok)["event_id"] + self.helper.send_state(room_id, "test_state", {"body": "after"}, tok=alice_tok)[ + "event_id" + ] + + # And now, Bob resyncs. + filter_dict: JsonDict = {"room": {"include_leave": True}} + if empty_timeline: + filter_dict["room"]["timeline"] = {"limit": 0} + sync_room_result = self.get_success( + self.sync_handler.wait_for_sync_for_user( + bob_requester, + generate_sync_config( + bob, filter_collection=FilterCollection(self.hs, filter_dict) + ), + since_token=None if initial_sync else initial_sync_result.next_batch, + ) + ).archived[0] + + if empty_timeline: + # The timeline should be empty + self.assertEqual(sync_room_result.timeline.events, []) + + # And the state should include the leave event... + self.assertEqual( + sync_room_result.state[("m.room.member", bob)].event_id, leave_event + ) + # ... and the state change before he left. + self.assertEqual( + sync_room_result.state[("test_state", "")].event_id, before_state_event + ) + else: + # The last three events in the timeline should be those leading up to the + # leave + self.assertEqual( + [e.event_id for e in sync_room_result.timeline.events[-3:]], + [before_message_event, before_state_event, leave_event], + ) + # ... And the state should be empty + self.assertEqual(sync_room_result.state, {}) + + def _patch_get_latest_events(self, latest_events: List[str]) -> ContextManager: + """Monkey-patch `get_prev_events_for_room` + + Returns a context manager which will replace the implementation of + `get_prev_events_for_room` with one which returns `latest_events`. + """ + return patch.object( + self.hs.get_datastores().main, + "get_prev_events_for_room", + new_callable=AsyncMock, + return_value=latest_events, + ) + def test_call_invite_in_public_room_not_returned(self) -> None: user = self.register_user("alice", "password") tok = self.login(user, "password") @@ -401,14 +851,26 @@ async def _check_sigs_and_hash_for_pulled_events_and_fetch( def generate_sync_config( - user_id: str, device_id: Optional[str] = "device_id" + user_id: str, + device_id: Optional[str] = "device_id", + filter_collection: Optional[FilterCollection] = None, ) -> SyncConfig: - """Generate a sync config (with a unique request key).""" + """Generate a sync config (with a unique request key). + + Args: + user_id: user who is syncing. + device_id: device that is syncing. Defaults to "device_id". + filter_collection: filter to apply. Defaults to the default filter (ie, + return everything, with a default limit) + """ + if filter_collection is None: + filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION + global _request_key _request_key += 1 return SyncConfig( user=UserID.from_string(user_id), - filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION, + filter_collection=filter_collection, is_guest=False, request_key=("request_key", _request_key), device_id=device_id, diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index c05b3fc781c6..f5a7602d0a5d 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -35,7 +35,6 @@ from tests import unittest from tests.server import FakeChannel from tests.test_utils.event_injection import inject_event -from tests.unittest import override_config class BaseRelationsTestCase(unittest.HomeserverTestCase): @@ -957,7 +956,6 @@ def test_pagination_from_sync_and_messages(self) -> None: class RecursiveRelationTestCase(BaseRelationsTestCase): - @override_config({"experimental_features": {"msc3981_recurse_relations": True}}) def test_recursive_relations(self) -> None: """Generate a complex, multi-level relationship tree and query it.""" # Create a thread with a few messages in it. @@ -1003,7 +1001,7 @@ def test_recursive_relations(self) -> None: channel = self.make_request( "GET", f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}" - "?dir=f&limit=20&org.matrix.msc3981.recurse=true", + "?dir=f&limit=20&recurse=true", access_token=self.user_token, ) self.assertEqual(200, channel.code, channel.json_body) @@ -1024,7 +1022,6 @@ def test_recursive_relations(self) -> None: ], ) - @override_config({"experimental_features": {"msc3981_recurse_relations": True}}) def test_recursive_relations_with_filter(self) -> None: """The event_type and rel_type still apply.""" # Create a thread with a few messages in it. @@ -1052,7 +1049,7 @@ def test_recursive_relations_with_filter(self) -> None: channel = self.make_request( "GET", f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}/{RelationTypes.ANNOTATION}" - "?dir=f&limit=20&org.matrix.msc3981.recurse=true", + "?dir=f&limit=20&recurse=true", access_token=self.user_token, ) self.assertEqual(200, channel.code, channel.json_body) @@ -1065,7 +1062,7 @@ def test_recursive_relations_with_filter(self) -> None: channel = self.make_request( "GET", f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}/{RelationTypes.ANNOTATION}/m.reaction" - "?dir=f&limit=20&org.matrix.msc3981.recurse=true", + "?dir=f&limit=20&recurse=true", access_token=self.user_token, ) self.assertEqual(200, channel.code, channel.json_body) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index daa68d78b931..fe00afe1986c 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -170,8 +170,8 @@ def invite( targ: Optional[str] = None, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=src, targ=targ, @@ -189,8 +189,8 @@ def join( appservice_user_id: Optional[str] = None, expect_errcode: Optional[Codes] = None, expect_additional_fields: Optional[dict] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=user, targ=user, @@ -242,8 +242,8 @@ def leave( user: Optional[str] = None, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, - ) -> None: - self.change_membership( + ) -> JsonDict: + return self.change_membership( room=room, src=user, targ=user, @@ -282,7 +282,7 @@ def change_membership( expect_code: int = HTTPStatus.OK, expect_errcode: Optional[str] = None, expect_additional_fields: Optional[dict] = None, - ) -> None: + ) -> JsonDict: """ Send a membership state event into a room. @@ -298,6 +298,9 @@ def change_membership( using an application service access token in `tok`. expect_code: The expected HTTP response code expect_errcode: The expected Matrix error code + + Returns: + The JSON response """ temp_id = self.auth_user_id self.auth_user_id = src @@ -356,6 +359,7 @@ def change_membership( ) self.auth_user_id = temp_id + return channel.json_body def send( self,