From 9f81c521ab720ad52826793a10bc3ef84158286b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Sat, 16 Dec 2023 13:19:23 +0100 Subject: [PATCH] Enable keep-alive for provisioning socket (#269) --- libsignal-service-actix/src/push_service.rs | 8 ++++++-- libsignal-service-hyper/src/push_service.rs | 5 +++-- libsignal-service/src/provisioning/manager.rs | 7 ++++++- libsignal-service/src/push_service.rs | 2 +- libsignal-service/src/receiver.rs | 7 ++++++- libsignal-service/src/websocket.rs | 17 +++++++---------- 6 files changed, 29 insertions(+), 17 deletions(-) diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 2457831d4..639a5a25f 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -625,9 +625,9 @@ impl PushService for AwcPushService { async fn ws( &mut self, path: &str, + keep_alive_path: &str, additional_headers: &[(&str, &str)], credentials: Option, - keep_alive: bool, ) -> Result { let (ws, stream) = AwcWebSocket::with_client( &mut self.client, @@ -637,7 +637,11 @@ impl PushService for AwcPushService { credentials.as_ref(), ) .await?; - let (ws, task) = SignalWebSocket::from_socket(ws, stream, keep_alive); + let (ws, task) = SignalWebSocket::from_socket( + ws, + stream, + keep_alive_path.to_owned(), + ); actix_rt::spawn(task); Ok(ws) } diff --git a/libsignal-service-hyper/src/push_service.rs b/libsignal-service-hyper/src/push_service.rs index b035da88e..f13648cb3 100644 --- a/libsignal-service-hyper/src/push_service.rs +++ b/libsignal-service-hyper/src/push_service.rs @@ -582,9 +582,9 @@ impl PushService for HyperPushService { async fn ws( &mut self, path: &str, + keepalive_path: &str, additional_headers: &[(&str, &str)], credentials: Option, - keep_alive: bool, ) -> Result { let (ws, stream) = TungsteniteWebSocket::with_tls_config( Self::tls_config(&self.cfg), @@ -594,7 +594,8 @@ impl PushService for HyperPushService { credentials.as_ref(), ) .await?; - let (ws, task) = SignalWebSocket::from_socket(ws, stream, keep_alive); + let (ws, task) = + SignalWebSocket::from_socket(ws, stream, keepalive_path.to_owned()); tokio::task::spawn(task); Ok(ws) } diff --git a/libsignal-service/src/provisioning/manager.rs b/libsignal-service/src/provisioning/manager.rs index 6f1fa3278..d18ce351e 100644 --- a/libsignal-service/src/provisioning/manager.rs +++ b/libsignal-service/src/provisioning/manager.rs @@ -136,7 +136,12 @@ impl LinkingManager

{ // open a websocket without authentication, to receive a tsurl:// let ws = self .push_service - .ws("/v1/websocket/provisioning/", &[], None, false) + .ws( + "/v1/websocket/provisioning/", + "/v1/keepalive/provisioning", + &[], + None, + ) .await?; let registration_id = csprng.gen_range(1..256); diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 2fdfbf7b0..d5c2cb8d4 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -625,9 +625,9 @@ pub trait PushService: MaybeSend { async fn ws( &mut self, path: &str, + keepalive_path: &str, additional_headers: &[(&str, &str)], credentials: Option, - keep_alive: bool, ) -> Result; /// Fetches a list of all devices tied to the authenticated account. diff --git a/libsignal-service/src/receiver.rs b/libsignal-service/src/receiver.rs index f98a78392..cef614745 100644 --- a/libsignal-service/src/receiver.rs +++ b/libsignal-service/src/receiver.rs @@ -53,7 +53,12 @@ impl MessageReceiver { )]; let ws = self .service - .ws("/v1/websocket/", headers, Some(credentials.clone()), true) + .ws( + "/v1/websocket/", + "/v1/keepalive", + headers, + Some(credentials.clone()), + ) .await?; Ok(MessagePipe::from_socket(ws, credentials)) } diff --git a/libsignal-service/src/websocket.rs b/libsignal-service/src/websocket.rs index d9f62f57c..165d21bc9 100644 --- a/libsignal-service/src/websocket.rs +++ b/libsignal-service/src/websocket.rs @@ -62,8 +62,8 @@ struct SignalWebSocketInner { } struct SignalWebSocketProcess { - /// Whether to enable keep-alive or not - keep_alive: bool, + /// Whether to enable keep-alive or not (and send a request to this path) + keep_alive_path: String, /// Receives requests from the application, which we forward to Signal. requests: mpsc::Receiver<( @@ -207,13 +207,13 @@ impl SignalWebSocketProcess { Some(WebSocketStreamItem::Message(frame)) => { self.process_frame(frame).await?; } - Some(WebSocketStreamItem::KeepAliveRequest) if self.keep_alive => { + Some(WebSocketStreamItem::KeepAliveRequest) => { // XXX: would be nicer if we could drop this request into the request // queue above. log::debug!("Sending keep alive upon request"); let request = WebSocketRequestMessage { id: Some(self.next_request_id()), - path: Some("/v1/keepalive".into()), + path: Some(self.keep_alive_path.clone()), verb: Some("GET".into()), ..Default::default() }; @@ -224,10 +224,7 @@ impl SignalWebSocketProcess { ..Default::default() }; let buffer = msg.encode_to_vec(); - self.ws.send_message(buffer.into()).await? - } - Some(WebSocketStreamItem::KeepAliveRequest) => { - log::trace!("keep alive is disabled: ignoring request"); + self.ws.send_message(buffer.into()).await?; } None => { return Err(ServiceError::WsError { @@ -270,14 +267,14 @@ impl SignalWebSocket { pub fn from_socket( ws: WS, stream: WS::Stream, - keep_alive: bool, + keep_alive_path: String, ) -> (Self, impl Future) { // Create process let (incoming_request_sink, incoming_request_stream) = mpsc::channel(1); let (outgoing_request_sink, outgoing_requests) = mpsc::channel(1); let process = SignalWebSocketProcess { - keep_alive, + keep_alive_path, requests: outgoing_requests, request_sink: incoming_request_sink, outgoing_request_map: HashMap::default(),