Skip to content

Commit

Permalink
Use receipts event_stream_ordering instead of joins
Browse files Browse the repository at this point in the history
This should reduce IOPs incurred by joining to the events table to lookup
stream ordering, which happens in many receipt handling code paths.
  • Loading branch information
Fizzadar committed Mar 26, 2024
1 parent c900d18 commit 3026b55
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 25 deletions.
22 changes: 8 additions & 14 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ def _get_unread_counts_by_room_for_user_txn(
WITH all_receipts AS (
SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
{receipt_types_clause}
AND user_id = ?
Expand Down Expand Up @@ -621,13 +620,12 @@ def _get_thread(thread_id: str) -> NotifCounts:
SELECT notif_count, COALESCE(unread_count, 0), thread_id
FROM event_push_summary
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
Expand Down Expand Up @@ -659,13 +657,12 @@ def _get_thread(thread_id: str) -> NotifCounts:
sql = f"""
SELECT COUNT(*), thread_id FROM event_push_actions
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
Expand Down Expand Up @@ -738,13 +735,12 @@ def _get_thread(thread_id: str) -> NotifCounts:
thread_id
FROM event_push_actions
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
Expand Down Expand Up @@ -881,9 +877,8 @@ def _get_receipts_by_room_txn(
)

sql = f"""
SELECT room_id, thread_id, MAX(stream_ordering)
SELECT room_id, thread_id, MAX(event_stream_ordering)
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE {receipt_types_clause}
AND user_id = ?
GROUP BY room_id, thread_id
Expand Down Expand Up @@ -1362,9 +1357,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
)

sql = """
SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, r.event_stream_ordering
FROM receipts_linearized AS r
INNER JOIN events AS e USING (event_id)
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
ORDER BY r.stream_id ASC
LIMIT ?
Expand Down
20 changes: 9 additions & 11 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,13 @@ def get_last_unthreaded_receipt_for_user_txn(
)

sql = f"""
SELECT event_id, stream_ordering
SELECT event_id, event_stream_ordering
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE {clause}
AND user_id = ?
AND room_id = ?
AND thread_id IS NULL
ORDER BY stream_ordering DESC
ORDER BY event_stream_ordering DESC
LIMIT 1
"""

Expand Down Expand Up @@ -394,9 +393,9 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]:

content: JsonDict = {}
for receipt_type, user_id, event_id, data in rows:
content.setdefault(event_id, {}).setdefault(receipt_type, {})[user_id] = (
db_to_json(data)
)
content.setdefault(event_id, {}).setdefault(receipt_type, {})[
user_id
] = db_to_json(data)

return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}]

Expand Down Expand Up @@ -483,9 +482,9 @@ def f(
if user_id in receipt_type_dict: # existing receipt
# is the existing receipt threaded and we are currently processing an unthreaded one?
if "thread_id" in receipt_type_dict[user_id] and not thread_id:
receipt_type_dict[user_id] = (
receipt_data # replace with unthreaded one
)
receipt_type_dict[
user_id
] = receipt_data # replace with unthreaded one
else: # receipt does not exist, just set it
receipt_type_dict[user_id] = receipt_data
if thread_id:
Expand Down Expand Up @@ -736,8 +735,7 @@ def _insert_linearized_receipt_txn(
thread_args = (thread_id,)

sql = f"""
SELECT stream_ordering, event_id FROM events
INNER JOIN receipts_linearized AS r USING (event_id, room_id)
SELECT r.event_stream_ordering, r.event_id FROM receipts_linearized AS r
WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause}
"""
txn.execute(
Expand Down

0 comments on commit 3026b55

Please sign in to comment.