Skip to content

Commit

Permalink
Use PushService instead of websocket like in libsignal.kt
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon committed Oct 17, 2024
1 parent dc8d574 commit 49baa11
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 40 deletions.
38 changes: 23 additions & 15 deletions src/profile_service.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
use reqwest::Method;

use crate::{
proto::WebSocketRequestMessage,
push_service::{ServiceError, SignalServiceProfile},
websocket::SignalWebSocket,
configuration::Endpoint,
prelude::PushService,
push_service::{
HttpAuthOverride, ReqwestExt, ServiceError, SignalServiceProfile,
},
ServiceAddress,
};

pub struct ProfileService {
ws: SignalWebSocket,
push_service: PushService,
}

impl ProfileService {
pub fn from_socket(ws: SignalWebSocket) -> Self {
ProfileService { ws }
pub fn from_socket(push_service: PushService) -> Self {
ProfileService { push_service }
}

pub async fn retrieve_profile_by_id(
&mut self,
address: ServiceAddress,
profile_key: Option<zkgroup::profiles::ProfileKey>,
) -> Result<SignalServiceProfile, ServiceError> {
let endpoint = match profile_key {
let path = match profile_key {
Some(key) => {
let version =
bincode::serialize(&key.get_profile_key_version(
Expand All @@ -34,13 +38,17 @@ impl ProfileService {
},
};

let request = WebSocketRequestMessage {
path: Some(endpoint),
verb: Some("GET".into()),
// TODO: set locale to en_US
..Default::default()
};

self.ws.request_json(request).await
self.push_service
.request(
Method::GET,
Endpoint::Service,
path,
HttpAuthOverride::NoOverride,
)?
.send_to_signal()
.await?
.json()
.await
.map_err(Into::into)
}
}
1 change: 0 additions & 1 deletion src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ pub struct AttachmentSpec {
pub blur_hash: Option<String>,
}

/// Equivalent of Java's `SignalServiceMessageSender`.
#[derive(Clone)]
pub struct MessageSender<S, R> {
identified_ws: SignalWebSocket,
Expand Down
47 changes: 23 additions & 24 deletions src/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ struct SignalWebSocketProcess {
/// Signal's requests should go in here, to be delivered to the application.
request_sink: mpsc::Sender<RequestStreamItem>,

outgoing_request_map: HashMap<
outgoing_requests: HashMap<
u64,
oneshot::Sender<Result<WebSocketResponseMessage, ServiceError>>,
>,
Expand All @@ -84,7 +84,6 @@ struct SignalWebSocketProcess {
BoxFuture<'static, Result<WebSocketResponseMessage, Canceled>>,
>,

// WS backend stuff
ws: WebSocket,
}

Expand All @@ -97,23 +96,23 @@ impl SignalWebSocketProcess {
let msg = WebSocketMessage::decode(Bytes::from(frame))?;
if let Some(request) = &msg.request {
tracing::trace!(
"decoded WebSocketMessage request {{ r#type: {:?}, verb: {:?}, path: {:?}, body: {} bytes, headers: {:?}, id: {:?} }}",
msg.r#type(),
msg_type =? msg.r#type(),
request.id,
request.verb,
request.path,
request.body.as_ref().map(|x| x.len()).unwrap_or(0),
request.headers,
request.id,
request_body_size_bytes = request.body.as_ref().map(|x| x.len()).unwrap_or(0),
?request.headers,
"decoded WebSocketMessage request"
);
} else if let Some(response) = &msg.response {
tracing::trace!(
"decoded WebSocketMessage response {{ r#type: {:?}, status: {:?}, message: {:?}, body: {} bytes, headers: {:?}, id: {:?} }}",
msg.r#type(),
msg_type =? msg.r#type(),
response.status,
response.message,
response.body.as_ref().map(|x| x.len()).unwrap_or(0),
response.headers,
response_body_size_bytes = response.body.as_ref().map(|x| x.len()).unwrap_or(0),
?response.headers,
response.id,
"decoded WebSocketMessage response"
);
} else {
tracing::debug!("decoded {msg:?}");
Expand Down Expand Up @@ -142,8 +141,7 @@ impl SignalWebSocketProcess {
}),
(Type::Response, _, Some(response)) => {
if let Some(id) = response.id {
if let Some(responder) =
self.outgoing_request_map.remove(&id)
if let Some(responder) = self.outgoing_requests.remove(&id)
{
if let Err(e) = responder.send(Ok(response)) {
tracing::warn!(
Expand Down Expand Up @@ -187,7 +185,7 @@ impl SignalWebSocketProcess {
let mut rng = rand::thread_rng();
loop {
let id = rng.gen();
if !self.outgoing_request_map.contains_key(&id) {
if !self.outgoing_requests.contains_key(&id) {
return id;
}
}
Expand Down Expand Up @@ -232,19 +230,19 @@ impl SignalWebSocketProcess {
request.id = Some(
request
.id
.filter(|x| !self.outgoing_request_map.contains_key(x))
.filter(|x| !self.outgoing_requests.contains_key(x))
.unwrap_or_else(|| self.next_request_id()),
);
tracing::trace!(
"sending WebSocketRequestMessage {{ verb: {:?}, path: {:?}, body (bytes): {:?}, headers: {:?}, id: {:?} }}",
request.id,
request.verb,
request.path,
request.body.as_ref().map(|x| x.len()),
request.headers,
request.id,
request_body_size_bytes = request.body.as_ref().map(|x| x.len()),
?request.headers,
"sending WebSocketRequestMessage",
);

self.outgoing_request_map.insert(request.id.unwrap(), responder);
self.outgoing_requests.insert(request.id.unwrap(), responder);
let msg = WebSocketMessage {
r#type: Some(web_socket_message::Type::Request.into()),
request: Some(request),
Expand All @@ -255,11 +253,12 @@ impl SignalWebSocketProcess {
}
None => {
return Err(ServiceError::WsClosing {
reason: "SignalWebSocket: end of application request stream; socket closing"
reason: "end of application request stream; socket closing"
});
}
}
}
// Incoming websocket message
web_socket_item = self.ws.next().fuse() => {
use reqwest_websocket::Message;
match web_socket_item {
Expand Down Expand Up @@ -301,8 +300,8 @@ impl SignalWebSocketProcess {
let buffer = msg.encode_to_vec();
self.ws.send(buffer.into()).await?;
}
Some(Err(e)) => {
tracing::error!("could not generate response to a Signal request; responder was canceled: {}. Continuing.", e);
Some(Err(error)) => {
tracing::error!(%error, "could not generate response to a Signal request; responder was canceled. continuing.");
}
None => {
unreachable!("outgoing responses should never fuse")
Expand Down Expand Up @@ -331,7 +330,7 @@ impl SignalWebSocket {
keep_alive_path,
requests: outgoing_requests,
request_sink: incoming_request_sink,
outgoing_request_map: HashMap::default(),
outgoing_requests: HashMap::default(),
outgoing_keep_alive_set: HashSet::new(),
// Initializing the FuturesUnordered with a `pending` future means it will never fuse
// itself, so an "empty" FuturesUnordered will still allow new futures to be added.
Expand Down

0 comments on commit 49baa11

Please sign in to comment.