diff --git a/AirMessage/Connection/Connect/DataProxyConnect.swift b/AirMessage/Connection/Connect/DataProxyConnect.swift index e6131c6..97e85e2 100644 --- a/AirMessage/Connection/Connect/DataProxyConnect.swift +++ b/AirMessage/Connection/Connect/DataProxyConnect.swift @@ -26,9 +26,13 @@ class DataProxyConnect: DataProxy { private var webSocket: WebSocket? private var handshakeTimer: DispatchSourceTimer? + private var pingTimer: DispatchSourceTimer? private static let pingFrequency: TimeInterval = 5 * 60 + private var pingResponseTimer: DispatchSourceTimer? + private static let pingResponseTimeout: TimeInterval = 60 + private var connectionRecoveryTimer: DispatchSourceTimer? private var connectionRecoveryCount = 0 //The max num of attempts before capping the delay time - not before giving up @@ -94,7 +98,7 @@ class DataProxyConnect: DataProxy { self.onWSConnect() } - //Set listener + //Set the listeners webSocket.onBinary { [weak self] _, byteBuffer in self?.processingQueue.async { [weak self] in self?.onWSReceive(data: Data(byteBuffer.readableBytesView)) @@ -110,6 +114,11 @@ class DataProxyConnect: DataProxy { } } } + webSocket.onPong { [weak self] _ in + self?.processingQueue.async { [weak self] in + self?.onWSPong() + } + } }).whenFailure { [weak self] error in self?.processingQueue.async { [weak self] in self?.onWSError(error: error) @@ -264,6 +273,45 @@ class DataProxyConnect: DataProxy { //Ping webSocket?.sendPing() + + //Wait for a pong + startPingResponseTimer() + } + + private func startPingResponseTimer() { + //Make sure we're on the processing queue + assertDispatchQueue(processingQueue) + + //Cancel the old timer + pingResponseTimer?.cancel() + + //Create the new timer + let timer = DispatchSource.makeTimerSource(queue: processingQueue) + timer.schedule(deadline: .now() + DataProxyConnect.pingResponseTimeout, repeating: .never) + timer.setEventHandler(handler: onPingResponseTimer) + timer.resume() + pingResponseTimer = timer + } + + private func stopPingResponseTimer() { + //Make sure we're on the processing queue + assertDispatchQueue(processingQueue) + + pingResponseTimer?.cancel() + pingResponseTimer = nil + } + + private func onPingResponseTimer() { + //Make sure we're on the processing queue + assertDispatchQueue(processingQueue) + + LogManager.log("Didn't receive a pong response from Connect proxy, disconnecting", level: .info) + + //Didn't receive a pong from the server in time! Disconnect + onWSDisconnect(withCode: .normalClosure) + + //Disconnect + _ = webSocket?.close() } //MARK: Handshake Timer @@ -352,10 +400,9 @@ class DataProxyConnect: DataProxy { private func onWSDisconnect(withCode code: WebSocketErrorCode) { LogManager.log("Connection to Connect relay lost: \(code)", level: .info) - //Stop the ping timer + //Stop timers stopPingTimer() - - //Cancel the handshake timer + stopPingResponseTimer() stopHandshakeTimer() //Update the active state @@ -524,6 +571,11 @@ class DataProxyConnect: DataProxy { onWSDisconnect(withCode: .normalClosure) } + private func onWSPong() { + //Stop the ping response timer + stopPingResponseTimer() + } + //MARK: Message handling private func completeHandshake() {