Skip to content

Commit

Permalink
syncv3: read receipts extension (MSC3960)
Browse files Browse the repository at this point in the history
  • Loading branch information
morguldir committed Aug 30, 2024
1 parent bf9d498 commit 9fde835
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 16 deletions.
25 changes: 21 additions & 4 deletions src/api/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use ruma::{
state_res::Event,
uint, DeviceId, EventId, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
};
use service::rooms::read_receipt::pack_receipts;
use tracing::{Instrument as _, Span};

use crate::{
Expand Down Expand Up @@ -1168,6 +1169,10 @@ pub(crate) async fn sync_events_v4_route(
let mut device_list_changes = HashSet::new();
let mut device_list_left = HashSet::new();

let mut receipts = sync_events::v4::Receipts {
rooms: BTreeMap::new(),
};

let mut account_data = sync_events::v4::AccountData {
global: Vec::new(),
rooms: BTreeMap::new(),
Expand Down Expand Up @@ -1509,7 +1514,21 @@ pub(crate) async fn sync_events_v4_route(
.collect(),
);

if roomsince != &0 && timeline_pdus.is_empty() && account_data.rooms.get(room_id).is_some_and(Vec::is_empty) {
let room_receipts = services
.rooms
.read_receipt
.readreceipts_since(room_id, *roomsince);
let vector: Vec<_> = room_receipts.into_iter().collect();
let receipt_size = vector.len();
receipts
.rooms
.insert(room_id.clone(), pack_receipts(Box::new(vector.into_iter())));

if roomsince != &0
&& timeline_pdus.is_empty()
&& account_data.rooms.get(room_id).is_some_and(Vec::is_empty)
&& receipt_size == 0
{
continue;
}

Expand Down Expand Up @@ -1723,9 +1742,7 @@ pub(crate) async fn sync_events_v4_route(
device_unused_fallback_key_types: None,
},
account_data,
receipts: sync_events::v4::Receipts {
rooms: BTreeMap::new(),
},
receipts,
typing: sync_events::v4::Typing {
rooms: BTreeMap::new(),
},
Expand Down
10 changes: 2 additions & 8 deletions src/service/rooms/read_receipt/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,11 @@ use std::{mem::size_of, sync::Arc};

use conduit::{utils, Error, Result};
use database::Map;
use ruma::{
events::{receipt::ReceiptEvent, AnySyncEphemeralRoomEvent},
serde::Raw,
CanonicalJsonObject, OwnedUserId, RoomId, UserId,
};
use ruma::{events::receipt::ReceiptEvent, serde::Raw, CanonicalJsonObject, RoomId, UserId};

use super::AnySyncEphemeralRoomEventIter;
use crate::{globals, Dep};

type AnySyncEphemeralRoomEventIter<'a> =
Box<dyn Iterator<Item = Result<(OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>)>> + 'a>;

pub(super) struct Data {
roomuserid_privateread: Arc<Map>,
roomuserid_lastprivatereadupdate: Arc<Map>,
Expand Down
41 changes: 37 additions & 4 deletions src/service/rooms/read_receipt/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
mod data;

use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

use conduit::Result;
use conduit::{debug, Result};
use data::Data;
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
use ruma::{
events::{
receipt::{ReceiptEvent, ReceiptEventContent},
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
},
serde::Raw,
OwnedUserId, RoomId, UserId,
};

use crate::{sending, Dep};

Expand All @@ -17,6 +24,9 @@ struct Services {
sending: Dep<sending::Service>,
}

type AnySyncEphemeralRoomEventIter<'a> =
Box<dyn Iterator<Item = Result<(OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>)>> + 'a>;

impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
Expand Down Expand Up @@ -44,7 +54,7 @@ impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub fn readreceipts_since<'a>(
&'a self, room_id: &RoomId, since: u64,
) -> impl Iterator<Item = Result<(OwnedUserId, u64, Raw<ruma::events::AnySyncEphemeralRoomEvent>)>> + 'a {
) -> impl Iterator<Item = Result<(OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>)>> + 'a {
self.db.readreceipts_since(room_id, since)
}

Expand All @@ -65,3 +75,26 @@ impl Service {
self.db.last_privateread_update(user_id, room_id)
}
}

#[must_use]
pub fn pack_receipts(receipts: AnySyncEphemeralRoomEventIter<'_>) -> Raw<SyncEphemeralRoomEvent<ReceiptEventContent>> {
let mut json = BTreeMap::new();
for (_user, _count, value) in receipts.flatten() {
let receipt = serde_json::from_str::<SyncEphemeralRoomEvent<ReceiptEventContent>>(value.json().get());
if let Ok(value) = receipt {
for (event, receipt) in value.content {
json.insert(event, receipt);
}
} else {
debug!("failed to parse receipt: {:?}", receipt);
}
}
let content = ReceiptEventContent::from_iter(json);

Raw::from_json(
serde_json::value::to_raw_value(&SyncEphemeralRoomEvent {
content,
})
.expect("received valid json"),
)
}

0 comments on commit 9fde835

Please sign in to comment.