diff --git a/src/profile_service.rs b/src/profile_service.rs index e23a4b51b..0c02b12ba 100644 --- a/src/profile_service.rs +++ b/src/profile_service.rs @@ -1,17 +1,21 @@ +use reqwest::Method; + use crate::{ - proto::WebSocketRequestMessage, - push_service::{ServiceError, SignalServiceProfile}, - websocket::SignalWebSocket, + configuration::Endpoint, + prelude::PushService, + push_service::{ + HttpAuthOverride, ReqwestExt, ServiceError, SignalServiceProfile, + }, ServiceAddress, }; pub struct ProfileService { - ws: SignalWebSocket, + push_service: PushService, } impl ProfileService { - pub fn from_socket(ws: SignalWebSocket) -> Self { - ProfileService { ws } + pub fn from_socket(push_service: PushService) -> Self { + ProfileService { push_service } } pub async fn retrieve_profile_by_id( @@ -19,7 +23,7 @@ impl ProfileService { address: ServiceAddress, profile_key: Option, ) -> Result { - let endpoint = match profile_key { + let path = match profile_key { Some(key) => { let version = bincode::serialize(&key.get_profile_key_version( @@ -34,13 +38,17 @@ impl ProfileService { }, }; - let request = WebSocketRequestMessage { - path: Some(endpoint), - verb: Some("GET".into()), - // TODO: set locale to en_US - ..Default::default() - }; - - self.ws.request_json(request).await + self.push_service + .request( + Method::GET, + Endpoint::Service, + path, + HttpAuthOverride::NoOverride, + )? + .send_to_signal() + .await? + .json() + .await + .map_err(Into::into) } } diff --git a/src/sender.rs b/src/sender.rs index e755d8460..aea24d508 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -82,7 +82,6 @@ pub struct AttachmentSpec { pub blur_hash: Option, } -/// Equivalent of Java's `SignalServiceMessageSender`. #[derive(Clone)] pub struct MessageSender { identified_ws: SignalWebSocket, diff --git a/src/websocket/mod.rs b/src/websocket/mod.rs index b74d02ff1..c423bf65a 100644 --- a/src/websocket/mod.rs +++ b/src/websocket/mod.rs @@ -73,7 +73,7 @@ struct SignalWebSocketProcess { /// Signal's requests should go in here, to be delivered to the application. request_sink: mpsc::Sender, - outgoing_request_map: HashMap< + outgoing_requests: HashMap< u64, oneshot::Sender>, >, @@ -84,7 +84,6 @@ struct SignalWebSocketProcess { BoxFuture<'static, Result>, >, - // WS backend stuff ws: WebSocket, } @@ -97,23 +96,23 @@ impl SignalWebSocketProcess { let msg = WebSocketMessage::decode(Bytes::from(frame))?; if let Some(request) = &msg.request { tracing::trace!( - "decoded WebSocketMessage request {{ r#type: {:?}, verb: {:?}, path: {:?}, body: {} bytes, headers: {:?}, id: {:?} }}", - msg.r#type(), + msg_type =? msg.r#type(), + request.id, request.verb, request.path, - request.body.as_ref().map(|x| x.len()).unwrap_or(0), - request.headers, - request.id, + request_body_size_bytes = request.body.as_ref().map(|x| x.len()).unwrap_or(0), + ?request.headers, + "decoded WebSocketMessage request" ); } else if let Some(response) = &msg.response { tracing::trace!( - "decoded WebSocketMessage response {{ r#type: {:?}, status: {:?}, message: {:?}, body: {} bytes, headers: {:?}, id: {:?} }}", - msg.r#type(), + msg_type =? msg.r#type(), response.status, response.message, - response.body.as_ref().map(|x| x.len()).unwrap_or(0), - response.headers, + response_body_size_bytes = response.body.as_ref().map(|x| x.len()).unwrap_or(0), + ?response.headers, response.id, + "decoded WebSocketMessage response" ); } else { tracing::debug!("decoded {msg:?}"); @@ -142,8 +141,7 @@ impl SignalWebSocketProcess { }), (Type::Response, _, Some(response)) => { if let Some(id) = response.id { - if let Some(responder) = - self.outgoing_request_map.remove(&id) + if let Some(responder) = self.outgoing_requests.remove(&id) { if let Err(e) = responder.send(Ok(response)) { tracing::warn!( @@ -187,7 +185,7 @@ impl SignalWebSocketProcess { let mut rng = rand::thread_rng(); loop { let id = rng.gen(); - if !self.outgoing_request_map.contains_key(&id) { + if !self.outgoing_requests.contains_key(&id) { return id; } } @@ -232,19 +230,19 @@ impl SignalWebSocketProcess { request.id = Some( request .id - .filter(|x| !self.outgoing_request_map.contains_key(x)) + .filter(|x| !self.outgoing_requests.contains_key(x)) .unwrap_or_else(|| self.next_request_id()), ); tracing::trace!( - "sending WebSocketRequestMessage {{ verb: {:?}, path: {:?}, body (bytes): {:?}, headers: {:?}, id: {:?} }}", + request.id, request.verb, request.path, - request.body.as_ref().map(|x| x.len()), - request.headers, - request.id, + request_body_size_bytes = request.body.as_ref().map(|x| x.len()), + ?request.headers, + "sending WebSocketRequestMessage", ); - self.outgoing_request_map.insert(request.id.unwrap(), responder); + self.outgoing_requests.insert(request.id.unwrap(), responder); let msg = WebSocketMessage { r#type: Some(web_socket_message::Type::Request.into()), request: Some(request), @@ -255,11 +253,12 @@ impl SignalWebSocketProcess { } None => { return Err(ServiceError::WsClosing { - reason: "SignalWebSocket: end of application request stream; socket closing" + reason: "end of application request stream; socket closing" }); } } } + // Incoming websocket message web_socket_item = self.ws.next().fuse() => { use reqwest_websocket::Message; match web_socket_item { @@ -301,8 +300,8 @@ impl SignalWebSocketProcess { let buffer = msg.encode_to_vec(); self.ws.send(buffer.into()).await?; } - Some(Err(e)) => { - tracing::error!("could not generate response to a Signal request; responder was canceled: {}. Continuing.", e); + Some(Err(error)) => { + tracing::error!(%error, "could not generate response to a Signal request; responder was canceled. continuing."); } None => { unreachable!("outgoing responses should never fuse") @@ -331,7 +330,7 @@ impl SignalWebSocket { keep_alive_path, requests: outgoing_requests, request_sink: incoming_request_sink, - outgoing_request_map: HashMap::default(), + outgoing_requests: HashMap::default(), outgoing_keep_alive_set: HashSet::new(), // Initializing the FuturesUnordered with a `pending` future means it will never fuse // itself, so an "empty" FuturesUnordered will still allow new futures to be added.