From 7b4d7429f8e655fd5a1c3a65e6347577e8b49784 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Feb 2024 13:24:11 +0000 Subject: [PATCH] Don't invalidate the entire event cache when we purge history (#16905) We do this by adding support to the LRU cache for "extra indices" based on the cached value. This allows us to efficiently map from room ID to the cached events and only invalidate those. --- changelog.d/16905.misc | 1 + synapse/storage/databases/main/cache.py | 2 +- .../storage/databases/main/events_worker.py | 14 +++-- synapse/util/caches/lrucache.py | 59 +++++++++++++++++++ tests/util/test_lrucache.py | 31 ++++++++++ 5 files changed, 100 insertions(+), 7 deletions(-) create mode 100644 changelog.d/16905.misc diff --git a/changelog.d/16905.misc b/changelog.d/16905.misc new file mode 100644 index 00000000000..c5f47eb3e94 --- /dev/null +++ b/changelog.d/16905.misc @@ -0,0 +1 @@ +Don't invalidate the entire event cache when we purge history. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 7314d87404b..bfd492d95d3 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -373,7 +373,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None: deleted. """ - self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined] + self._invalidate_local_get_event_cache_room_id(room_id) # type: ignore[attr-defined] self._attempt_to_invalidate_cache("have_seen_event", (room_id,)) self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,)) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 1fd458b5102..9c3775bb7c4 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -268,6 +268,8 @@ def __init__( ] = AsyncLruCache( cache_name="*getEvent*", max_size=hs.config.caches.event_cache_size, + # `extra_index_cb` Returns a tuple as that is the key type + extra_index_cb=lambda _, v: (v.event.room_id,), ) # Map from event ID to a deferred that will result in a map from event @@ -782,9 +784,9 @@ async def get_unredacted_events_from_cache_or_db( if missing_events_ids: - async def get_missing_events_from_cache_or_db() -> Dict[ - str, EventCacheEntry - ]: + async def get_missing_events_from_cache_or_db() -> ( + Dict[str, EventCacheEntry] + ): """Fetches the events in `missing_event_ids` from the database. Also creates entries in `self._current_event_fetches` to allow @@ -910,12 +912,12 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None: self._event_ref.pop(event_id, None) self._current_event_fetches.pop(event_id, None) - def _invalidate_local_get_event_cache_all(self) -> None: - """Clears the in-memory get event caches. + def _invalidate_local_get_event_cache_room_id(self, room_id: str) -> None: + """Clears the in-memory get event caches for a room. Used when we purge room history. """ - self._get_event_cache.clear() + self._get_event_cache.invalidate_on_extra_index_local((room_id,)) self._event_ref.clear() self._current_event_fetches.clear() diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 6e8c1e84acf..a1b4f5b6a7c 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -35,6 +35,7 @@ Iterable, List, Optional, + Set, Tuple, Type, TypeVar, @@ -386,6 +387,7 @@ def __init__( apply_cache_factor_from_config: bool = True, clock: Optional[Clock] = None, prune_unread_entries: bool = True, + extra_index_cb: Optional[Callable[[KT, VT], KT]] = None, ): """ Args: @@ -416,6 +418,20 @@ def __init__( prune_unread_entries: If True, cache entries that haven't been read recently will be evicted from the cache in the background. Set to False to opt-out of this behaviour. + + extra_index_cb: If provided, the cache keeps a second index from a + (different) key to a cache entry based on the return value of + the callback. This can then be used to invalidate entries based + on the second type of key. + + For example, for the event cache this would be a callback that + maps an event to its room ID, allowing invalidation of all + events in a given room. + + Note: Though the two types of key have the same type, they are + in different namespaces. + + Note: The new key does not have to be unique. """ # Default `clock` to something sensible. Note that we rename it to # `real_clock` so that mypy doesn't think its still `Optional`. @@ -463,6 +479,8 @@ def __init__( lock = threading.Lock() + extra_index: Dict[KT, Set[KT]] = {} + def evict() -> None: while cache_len() > self.max_size: # Get the last node in the list (i.e. the oldest node). @@ -521,6 +539,11 @@ def add_node( if size_callback: cached_cache_len[0] += size_callback(node.value) + if extra_index_cb: + index_key = extra_index_cb(node.key, node.value) + mapped_keys = extra_index.setdefault(index_key, set()) + mapped_keys.add(node.key) + if caches.TRACK_MEMORY_USAGE and metrics: metrics.inc_memory_usage(node.memory) @@ -537,6 +560,14 @@ def delete_node(node: _Node[KT, VT]) -> int: node.run_and_clear_callbacks() + if extra_index_cb: + index_key = extra_index_cb(node.key, node.value) + mapped_keys = extra_index.get(index_key) + if mapped_keys is not None: + mapped_keys.discard(node.key) + if not mapped_keys: + extra_index.pop(index_key, None) + if caches.TRACK_MEMORY_USAGE and metrics: metrics.dec_memory_usage(node.memory) @@ -748,6 +779,8 @@ def cache_clear() -> None: if size_callback: cached_cache_len[0] = 0 + extra_index.clear() + if caches.TRACK_MEMORY_USAGE and metrics: metrics.clear_memory_usage() @@ -755,6 +788,28 @@ def cache_clear() -> None: def cache_contains(key: KT) -> bool: return key in cache + @synchronized + def cache_invalidate_on_extra_index(index_key: KT) -> None: + """Invalidates all entries that match the given extra index key. + + This can only be called when `extra_index_cb` was specified. + """ + + assert extra_index_cb is not None + + keys = extra_index.pop(index_key, None) + if not keys: + return + + for key in keys: + node = cache.pop(key, None) + if not node: + continue + + evicted_len = delete_node(node) + if metrics: + metrics.inc_evictions(EvictionReason.invalidation, evicted_len) + # make sure that we clear out any excess entries after we get resized. self._on_resize = evict @@ -771,6 +826,7 @@ def cache_contains(key: KT) -> bool: self.len = synchronized(cache_len) self.contains = cache_contains self.clear = cache_clear + self.invalidate_on_extra_index = cache_invalidate_on_extra_index def __getitem__(self, key: KT) -> VT: result = self.get(key, _Sentinel.sentinel) @@ -864,6 +920,9 @@ async def invalidate(self, key: KT) -> None: # This method should invalidate any external cache and then invalidate the LruCache. return self._lru_cache.invalidate(key) + def invalidate_on_extra_index_local(self, index_key: KT) -> None: + self._lru_cache.invalidate_on_extra_index(index_key) + def invalidate_local(self, key: KT) -> None: """Remove an entry from the local cache diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py index dcc2b4be89d..3f0d8139f8e 100644 --- a/tests/util/test_lrucache.py +++ b/tests/util/test_lrucache.py @@ -383,3 +383,34 @@ def test_evict_memory(self, jemalloc_interface: Mock) -> None: # the items should still be in the cache self.assertEqual(cache.get("key1"), 1) self.assertEqual(cache.get("key2"), 2) + + +class ExtraIndexLruCacheTestCase(unittest.HomeserverTestCase): + def test_invalidate_simple(self) -> None: + cache: LruCache[str, int] = LruCache(10, extra_index_cb=lambda k, v: str(v)) + cache["key1"] = 1 + cache["key2"] = 2 + + cache.invalidate_on_extra_index("key1") + self.assertEqual(cache.get("key1"), 1) + self.assertEqual(cache.get("key2"), 2) + + cache.invalidate_on_extra_index("1") + self.assertEqual(cache.get("key1"), None) + self.assertEqual(cache.get("key2"), 2) + + def test_invalidate_multi(self) -> None: + cache: LruCache[str, int] = LruCache(10, extra_index_cb=lambda k, v: str(v)) + cache["key1"] = 1 + cache["key2"] = 1 + cache["key3"] = 2 + + cache.invalidate_on_extra_index("key1") + self.assertEqual(cache.get("key1"), 1) + self.assertEqual(cache.get("key2"), 1) + self.assertEqual(cache.get("key3"), 2) + + cache.invalidate_on_extra_index("1") + self.assertEqual(cache.get("key1"), None) + self.assertEqual(cache.get("key2"), None) + self.assertEqual(cache.get("key3"), 2)