diff --git a/Sources/SwiftCentrifuge/Client.swift b/Sources/SwiftCentrifuge/Client.swift index f75f45e..896ec9f 100644 --- a/Sources/SwiftCentrifuge/Client.swift +++ b/Sources/SwiftCentrifuge/Client.swift @@ -30,7 +30,7 @@ public protocol CentrifugeConnectionTokenGetter { } public struct CentrifugeClientConfig { - public init(timeout: Double = 5.0, headers: [String : String] = [String:String](), tlsSkipVerify: Bool = false, minReconnectDelay: Double = 0.5, maxReconnectDelay: Double = 20.0, maxServerPingDelay: Double = 10.0, name: String = "swift", version: String = "", token: String = "", data: Data? = nil, debug: Bool = false, tokenGetter: CentrifugeConnectionTokenGetter? = nil, logger: CentrifugeLogger? = nil) { + public init(timeout: Double = 5.0, headers: [String : String] = [String:String](), tlsSkipVerify: Bool = false, minReconnectDelay: Double = 0.5, maxReconnectDelay: Double = 20.0, maxServerPingDelay: Double = 10.0, name: String = "swift", version: String = "", token: String = "", data: Data? = nil, debug: Bool = false, useNativeWebSocket: Bool = false, tokenGetter: CentrifugeConnectionTokenGetter? = nil, logger: CentrifugeLogger? = nil) { self.timeout = timeout self.headers = headers self.tlsSkipVerify = tlsSkipVerify @@ -42,8 +42,9 @@ public struct CentrifugeClientConfig { self.token = token self.data = data self.debug = debug + self.useNativeWebSocket = useNativeWebSocket self.tokenGetter = tokenGetter - self.logger = logger + self.logger = logger } public var timeout = 5.0 @@ -59,6 +60,7 @@ public struct CentrifugeClientConfig { public var data: Data? = nil public var debug: Bool = false public var logger: CentrifugeLogger? + public var useNativeWebSocket: Bool = false } public enum CentrifugeClientState { @@ -77,7 +79,7 @@ public class CentrifugeClient { //MARK - fileprivate(set) var internalState: CentrifugeClientState = .disconnected - fileprivate var conn: WebSocket? + fileprivate var conn: WebSocketInterface? fileprivate var client: String? fileprivate var token: String? fileprivate var data: Data? @@ -85,7 +87,7 @@ public class CentrifugeClient { fileprivate var commandIdLock: NSLock = NSLock() fileprivate var opCallbacks: [UInt32: ((CentrifugeResolveData) -> ())] = [:] fileprivate var connectCallbacks: [String: ((Error?) -> ())] = [:] - fileprivate var subscriptionsLock = NSLock() + fileprivate let subscriptionsLock = NSLock() fileprivate var subscriptions = [CentrifugeSubscription]() fileprivate var serverSubs = [String: ServerSubscription]() fileprivate var reconnectAttempts = 0 @@ -100,7 +102,7 @@ public class CentrifugeClient { static let barrierQueue = DispatchQueue(label: "com.centrifugal.centrifuge-swift.barrier<\(UUID().uuidString)>", attributes: .concurrent) - public var state: CentrifugeClientState { + public private(set) var state: CentrifugeClientState { get { return CentrifugeClient.barrierQueue.sync { internalState } } @@ -134,71 +136,46 @@ public class CentrifugeClient { for (key, value) in self.config.headers { request.addValue(value, forHTTPHeaderField: key) } - - let ws = WebSocket(request: request, protocols: ["centrifuge-protobuf"]) - if self.config.tlsSkipVerify { - ws.disableSSLCertValidation = true + + let ws: WebSocketInterface + if #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *), config.useNativeWebSocket { + log.info("Using NativeWebSocket") + ws = NativeWebSocket(request: request, queue: syncQueue, log: log) + } else { + log.info("Using StarscreamWebSocket") + ws = StarscreamWebSocket(request: request, tlsSkipVerify: self.config.tlsSkipVerify, queue: syncQueue, log: log) } ws.onConnect = { [weak self] in guard let strongSelf = self else { return } strongSelf.log.trace("WebSocket connected") strongSelf.onTransportOpen() } - ws.onDisconnect = { [weak self] (error: Error?) in + ws.onDisconnect = { [weak self] disconnect, error in guard let strongSelf = self else { return } strongSelf.log.trace("WebSocket disconnected") - strongSelf.syncQueue.async { [weak self] in - guard let strongSelf = self else { return } - - if strongSelf.state == .disconnected { - return - } - - if (strongSelf.state == .connecting) { - if let err = error { - guard let strongSelf = self else { return } - strongSelf.delegate?.onError( - strongSelf, - CentrifugeErrorEvent(error: CentrifugeError.transportError(error: err)) - ) - } - } - - var disconnect: CentrifugeDisconnectOptions - - // We act according to Disconnect code semantics. - // See https://github.com/centrifugal/centrifuge/blob/master/disconnect.go. - if let err = error as? WSError { - var code: UInt32 = UInt32(err.code) - var reconnect = code < 3500 || code >= 5000 || (code >= 4000 && code < 4500) - if code < 3000 { - // We expose codes defined by Centrifuge protocol, hiding details - // about transport-specific error codes. We may have extra optional - // transportCode field in the future. - if code == 1009 { - code = disconnectCodeMessageSizeLimit - reconnect = false - } else { - code = connectingCodeTransportClosed - } - } - disconnect = CentrifugeDisconnectOptions(code: UInt32(code), reason: err.message, reconnect: reconnect) - } else { - disconnect = CentrifugeDisconnectOptions(code: connectingCodeTransportClosed, reason: "transport closed", reconnect: true) - } - - if strongSelf.state != .disconnected { - strongSelf.processDisconnect(code: disconnect.code, reason: disconnect.reason, reconnect: disconnect.reconnect) - } - - if strongSelf.state == .connecting { - strongSelf.scheduleReconnect() - } + assertIsOnQueue(strongSelf.syncQueue) + + if strongSelf.state == .disconnected { + return + } + + if strongSelf.state == .connecting, let error = error { + strongSelf.delegate?.onError( + strongSelf, + CentrifugeErrorEvent(error: CentrifugeError.transportError(error: error)) + ) + } + + if strongSelf.state != .disconnected { + strongSelf.processDisconnect(code: disconnect.code, reason: disconnect.reason, reconnect: disconnect.reconnect) + } + + if strongSelf.state == .connecting { + strongSelf.scheduleReconnect() } } ws.onData = { [weak self] data in - guard let strongSelf = self else { return } - strongSelf.onData(data: data) + self?.onData(data: data) } self.conn = ws } @@ -207,7 +184,7 @@ public class CentrifugeClient { Connect to server. */ public func connect() { - self.syncQueue.async{ [weak self] in + self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } guard strongSelf.state != .connecting else { return } guard strongSelf.state != .connected else { return } @@ -249,7 +226,7 @@ public class CentrifugeClient { public func newSubscription(channel: String, delegate: CentrifugeSubscriptionDelegate, config: CentrifugeSubscriptionConfig? = nil) throws -> CentrifugeSubscription { defer { subscriptionsLock.unlock() } subscriptionsLock.lock() - guard self.subscriptions.filter({ $0.channel == channel }).count == 0 else { throw CentrifugeError.duplicateSub } + guard !self.subscriptions.contains(where: { $0.channel == channel }) else { throw CentrifugeError.duplicateSub } let sub = CentrifugeSubscription( centrifuge: self, channel: channel, @@ -552,14 +529,14 @@ internal extension CentrifugeClient { fileprivate extension CentrifugeClient { func onTransportOpen() { - self.syncQueue.async { [weak self] in - guard let strongSelf = self else { return } - guard strongSelf.state == .connecting else { return } - - if strongSelf.refreshRequired || (strongSelf.token == "" && strongSelf.config.tokenGetter != nil) { - strongSelf.getConnectionToken(completion: { [weak self] result in - guard let strongSelf = self, strongSelf.state == .connecting else { return } - switch result { + assertIsOnQueue(syncQueue) + + guard self.state == .connecting else { return } + + if refreshRequired || (token == "" && config.tokenGetter != nil) { + getConnectionToken(completion: { [weak self] result in + guard let strongSelf = self, strongSelf.state == .connecting else { return } + switch result { case .success(let token): strongSelf.syncQueue.async { [weak self] in guard let strongSelf = self, strongSelf.state == .connecting else { return } @@ -587,15 +564,14 @@ fileprivate extension CentrifugeClient { ) strongSelf.conn?.disconnect() return - } - }) - } else { - strongSelf.sendConnect(completion: { [weak self] res, error in - guard let strongSelf = self else { return } - guard strongSelf.state == .connecting else { return } - strongSelf.handleConnectResult(res: res, error: error) - }) - } + } + }) + } else { + sendConnect(completion: { [weak self] res, error in + guard let strongSelf = self else { return } + guard strongSelf.state == .connecting else { return } + strongSelf.handleConnectResult(res: res, error: error) + }) } } @@ -674,10 +650,8 @@ fileprivate extension CentrifugeClient { } func onData(data: Data) { - self.syncQueue.async { [weak self] in - guard let strongSelf = self else { return } - strongSelf.handleData(data: data) - } + assertIsOnQueue(syncQueue) + handleData(data: data) } private func nextCommandId() -> UInt32 { @@ -757,29 +731,25 @@ fileprivate extension CentrifugeClient { } private func scheduleReconnect() { - self.syncQueue.async { [weak self] in + assertIsOnQueue(syncQueue) + + guard self.state == .connecting else { return } + + let delay = self.getBackoffDelay( + step: self.reconnectAttempts, + minDelay: self.config.minReconnectDelay, + maxDelay: self.config.maxReconnectDelay + ) + self.reconnectAttempts += 1 + self.reconnectTask?.cancel() + self.reconnectTask = DispatchWorkItem { [weak self] in guard let strongSelf = self else { return } guard strongSelf.state == .connecting else { return } - let delay = strongSelf.getBackoffDelay( - step: strongSelf.reconnectAttempts, - minDelay: strongSelf.config.minReconnectDelay, - maxDelay: strongSelf.config.maxReconnectDelay - ) - strongSelf.reconnectAttempts += 1 - strongSelf.reconnectTask?.cancel() - strongSelf.reconnectTask = DispatchWorkItem { [weak self] in - guard let strongSelf = self else { return } - guard strongSelf.state == .connecting else { return } - strongSelf.syncQueue.async { [weak self] in - guard let strongSelf = self else { return } - guard strongSelf.state == .connecting else { return } - strongSelf.log.debug("start reconnecting") - strongSelf.conn?.connect() - } - } - strongSelf.log.debug("schedule reconnect in \(delay) seconds") - strongSelf.syncQueue.asyncAfter(deadline: .now() + delay, execute: strongSelf.reconnectTask!) + strongSelf.log.debug("start reconnecting") + strongSelf.conn?.connect() } + self.log.debug("schedule reconnect in \(delay) seconds") + self.syncQueue.asyncAfter(deadline: .now() + delay, execute: self.reconnectTask!) } private func handlePub(channel: String, pub: Centrifugal_Centrifuge_Protocol_Publication) { @@ -940,14 +910,11 @@ fileprivate extension CentrifugeClient { if self.pingInterval == 0 { return } - self.pingTimer = DispatchSource.makeTimerSource() + self.pingTimer = DispatchSource.makeTimerSource(queue: syncQueue) self.pingTimer?.setEventHandler { [weak self] in guard let strongSelf = self else { return } - strongSelf.syncQueue.async { [weak self] in - guard let strongSelf = self else { return } - guard strongSelf.state == .connected else { return } - strongSelf.processDisconnect(code: connectingCodeNoPing, reason: "no ping", reconnect: true) - } + guard strongSelf.state == .connected else { return } + strongSelf.processDisconnect(code: connectingCodeNoPing, reason: "no ping", reconnect: true) } self.pingTimer?.schedule(deadline: .now() + Double(self.pingInterval) + self.config.maxServerPingDelay) self.pingTimer?.resume() @@ -1005,7 +972,8 @@ fileprivate extension CentrifugeClient { // Caller must synchronize access. private func processDisconnect(code: UInt32, reason: String, reconnect: Bool) { - if (self.state == .disconnected) { + assertIsOnQueue(syncQueue) + if self.state == .disconnected { return } @@ -1023,12 +991,12 @@ fileprivate extension CentrifugeClient { for resolveFunc in self.opCallbacks.values { resolveFunc(CentrifugeResolveData(error: CentrifugeError.clientDisconnected, reply: nil)) } - self.opCallbacks.removeAll(keepingCapacity: false) + self.opCallbacks.removeAll() for resolveFunc in self.connectCallbacks.values { resolveFunc(CentrifugeError.clientDisconnected) } - self.connectCallbacks.removeAll(keepingCapacity: false) + self.connectCallbacks.removeAll() self.stopReconnect() self.stopWaitPing() @@ -1047,8 +1015,8 @@ fileprivate extension CentrifugeClient { } } - if (needEvent) { - if (self.state == .disconnected) { + if needEvent { + if self.state == .disconnected { self.delegate?.onDisconnected( self, CentrifugeDisconnectedEvent(code: code, reason: reason) @@ -1060,7 +1028,7 @@ fileprivate extension CentrifugeClient { ) } } - + self.conn?.disconnect() } diff --git a/Sources/SwiftCentrifuge/Codes.swift b/Sources/SwiftCentrifuge/Codes.swift index a70765b..b34a2c9 100644 --- a/Sources/SwiftCentrifuge/Codes.swift +++ b/Sources/SwiftCentrifuge/Codes.swift @@ -7,20 +7,40 @@ import Foundation -let disconnectedCodeDisconnectCalled: UInt32 = 0; -let disconnectedCodeUnauthorized: UInt32 = 1; -let disconnectCodeBadProtocol: UInt32 = 2; -let disconnectCodeMessageSizeLimit: UInt32 = 3; +let disconnectedCodeDisconnectCalled: UInt32 = 0 +let disconnectedCodeUnauthorized: UInt32 = 1 +let disconnectCodeBadProtocol: UInt32 = 2 +let disconnectCodeMessageSizeLimit: UInt32 = 3 -let connectingCodeConnectCalled: UInt32 = 0; -let connectingCodeTransportClosed: UInt32 = 1; -let connectingCodeNoPing: UInt32 = 2; -let connectingCodeSubscribeTimeout: UInt32 = 3; -let connectingCodeUnsubscribeError: UInt32 = 4; +let connectingCodeConnectCalled: UInt32 = 0 +let connectingCodeTransportClosed: UInt32 = 1 +let connectingCodeNoPing: UInt32 = 2 +let connectingCodeSubscribeTimeout: UInt32 = 3 +let connectingCodeUnsubscribeError: UInt32 = 4 -let subscribingCodeSubscribeCalled: UInt32 = 0; -let subscribingCodeTransportClosed: UInt32 = 1; +let subscribingCodeSubscribeCalled: UInt32 = 0 +let subscribingCodeTransportClosed: UInt32 = 1 -let unsubscribedCodeUnsubscribeCalled: UInt32 = 0; -let unsubscribedCodeUnauthorized: UInt32 = 1; -let unsubscribedCodeClientClosed: UInt32 = 2; +let unsubscribedCodeUnsubscribeCalled: UInt32 = 0 +let unsubscribedCodeUnauthorized: UInt32 = 1 +let unsubscribedCodeClientClosed: UInt32 = 2 + +func interpretCloseCode(_ code: UInt32) -> (code: UInt32, reconnect: Bool) { + // We act according to Disconnect code semantics. + // See https://github.com/centrifugal/centrifuge/blob/master/disconnect.go. + var code = code + var reconnect = code < 3500 || code >= 5000 || (code >= 4000 && code < 4500) + if code < 3000 { + // We expose codes defined by Centrifuge protocol, hiding details + // about transport-specific error codes. We may have extra optional + // transportCode field in the future. + if code == 1009 { + code = disconnectCodeMessageSizeLimit + reconnect = false + } else { + code = connectingCodeTransportClosed + } + } + + return (code, reconnect) +} diff --git a/Sources/SwiftCentrifuge/Helpers.swift b/Sources/SwiftCentrifuge/Helpers.swift index f60f293..9e1ea82 100644 --- a/Sources/SwiftCentrifuge/Helpers.swift +++ b/Sources/SwiftCentrifuge/Helpers.swift @@ -62,3 +62,11 @@ internal enum CentrifugeSerializer { return commands } } + +func assertIsOnQueue(_ queue: DispatchQueue) { +#if DEBUG + if #available(macOS 10.12, iOS 10.0, watchOS 3.0, tvOS 10.0, *) { + dispatchPrecondition(condition: .onQueue(queue)) + } +#endif +} diff --git a/Sources/SwiftCentrifuge/WebSocket/NativeWebSocket.swift b/Sources/SwiftCentrifuge/WebSocket/NativeWebSocket.swift new file mode 100644 index 0000000..70c4b21 --- /dev/null +++ b/Sources/SwiftCentrifuge/WebSocket/NativeWebSocket.swift @@ -0,0 +1,214 @@ +// +// NativeWebSocket.swift +// SwiftCentrifuge +// +// Created by Anton Selyanin on 03.05.2023. +// + +import Foundation + + +import Foundation + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +final class NativeWebSocket: NSObject, WebSocketInterface, URLSessionWebSocketDelegate { + + var onConnect: (() -> Void)? + + var onDisconnect: ((CentrifugeDisconnectOptions, Error?) -> Void)? + + var onData: ((Data) -> Void)? + + private var session: URLSession? + + private let log: CentrifugeLogger + private let request: URLRequest + private let queue: DispatchQueue + + /// The websocket is considered 'active' when `task` is not nil + private var task: URLSessionWebSocketTask? + + init(request: URLRequest, queue: DispatchQueue, log: CentrifugeLogger) { + var request = request + request.setValue("Sec-WebSocket-Protocol", forHTTPHeaderField: "centrifuge-protobuf") + self.request = request + self.log = log + self.queue = queue + } + + deinit { + session?.invalidateAndCancel() + } + + func connect() { + assertIsOnQueue(queue) + if let task = task { + log.warning("Creating a new connection while the previous is active, socket state: \(task.state.asString)") + } + + log.debug("Connecting...") + + task = getOrCreateSession().webSocketTask(with: request) + doRead() + task?.resume() + } + + func disconnect() { + assertIsOnQueue(queue) + + guard task != nil else { return } + + log.debug("Disconnecting...") + // This will trigger "did close" delegate method invocation + task?.cancel(with: .goingAway, reason: nil) + } + + func write(data: Data) { + assertIsOnQueue(queue) + guard let task = task else { + log.warning("Attempted to write to an inactive websocket connection") + return + } + + task.send(.data(data), completionHandler: { [weak self] error in + guard let error = error else { return } + self?.log.trace("Failed to send message, error: \(error)") + }) + } + + private func doRead() { + task?.receive { [weak self] (result) in + guard let self = self else { return } + assertIsOnQueue(self.queue) + + switch result { + case .success(let message): + switch message { + case .string: + self.log.warning("Received unexpected string packet") + case .data(let data): + self.onData?(data) + + @unknown default: + break + } + + case .failure(let error): + self.log.trace("Read error: \(error)") + } + + self.doRead() + } + } + + private func getOrCreateSession() -> URLSession { + if let session = session { + return session + } + + let operationQueue = OperationQueue() + operationQueue.underlyingQueue = queue + + // For some reason, `URLSessionWebSocketTask` will only respect the proxy + // configuration if started with a URL and not a URLRequest. As a temporary + // workaround, port header information from the request to the session. + // + // We copied this workaround from Signal-iOS web socket implementation + let configuration = URLSessionConfiguration.default + configuration.httpAdditionalHeaders = request.allHTTPHeaderFields + + let delegate = URLSessionDelegateBox(delegate: self) + + let session = URLSession( + configuration: configuration, delegate: delegate, delegateQueue: operationQueue) + self.session = session + + return session + } + + func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + assertIsOnQueue(queue) + log.debug("Connected") + self.onConnect?() + } + + func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + assertIsOnQueue(queue) + guard let task = self.task, task === webSocketTask else { + // Ignore callbacks from obsolete tasks + return + } + + handleTaskClose(task: task, code: closeCode, reason: reason, error: nil) + } + + func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { + assertIsOnQueue(queue) + guard let task = task as? URLSessionWebSocketTask, task === self.task else { + // Ignore callbacks from obsolete tasks + return + } + + handleTaskClose(task: task, code: task.closeCode, reason: task.closeReason, error: error) + } + + private func handleTaskClose(task: URLSessionWebSocketTask, + code: URLSessionWebSocketTask.CloseCode, reason: Data?, error: Error?) { + let reason = reason.flatMap { String(data: $0, encoding: .utf8) } ?? "transport closed" + + log.debug("WebSocket closed, code: \(code.rawValue), reason: \(reason)") + + let (code, reconnect) = interpretCloseCode(UInt32(code.rawValue)) + let disconnect = CentrifugeDisconnectOptions(code: code, reason: reason, reconnect: reconnect) + + self.log.trace("Socket disconnected, code: \(task.closeCode.rawValue), reconnect: \(reconnect)") + + self.task?.cancel() + self.task = nil + self.onDisconnect?(disconnect, error) + } +} + + +/// URLSession holds it's delegate by strong reference. +/// We need a wrapper to break a reference cycle between `NativeWebSocket` and `URLSession` +/// +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +private final class URLSessionDelegateBox: NSObject, URLSessionWebSocketDelegate { + private weak var delegate: URLSessionWebSocketDelegate? + + init(delegate: URLSessionWebSocketDelegate?) { + self.delegate = delegate + } + + func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, + didOpenWithProtocol protocol: String?) { + delegate?.urlSession?( + session, webSocketTask: webSocketTask, didOpenWithProtocol: `protocol`) + } + + func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, + didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + delegate?.urlSession?( + session, webSocketTask: webSocketTask, didCloseWith: closeCode, reason: reason) + } + + func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { + delegate?.urlSession?(session, task: task, didCompleteWithError: error) + } +} + +private extension URLSessionTask.State { + var asString: String { + switch self { + case .running: + return "running" + case .suspended: + return "suspended" + case .canceling: + return "cancelling" + case .completed: + return "completed" + } + } +} diff --git a/Sources/SwiftCentrifuge/WebSocket/StarscreamWebSocket.swift b/Sources/SwiftCentrifuge/WebSocket/StarscreamWebSocket.swift new file mode 100644 index 0000000..b23aed7 --- /dev/null +++ b/Sources/SwiftCentrifuge/WebSocket/StarscreamWebSocket.swift @@ -0,0 +1,70 @@ +// +// StarscreamWebSocket.swift +// SwiftCentrifuge +// +// Created by Anton Selyanin on 03.05.2023. +// + +import Foundation + +final class StarscreamWebSocket: WebSocketInterface { + private let log: CentrifugeLogger + + var onConnect: (() -> Void)? + var onDisconnect: ((CentrifugeDisconnectOptions, Error?) -> Void)? + var onData: ((Data) -> Void)? + + private let socket: WebSocket + private let queue: DispatchQueue + + init(request: URLRequest, tlsSkipVerify: Bool, queue: DispatchQueue, log: CentrifugeLogger) { + self.socket = WebSocket(request: request, protocols: ["centrifuge-protobuf"]) + self.log = log + self.queue = queue + + self.socket.callbackQueue = queue + self.socket.disableSSLCertValidation = tlsSkipVerify + setup() + } + + func connect() { + assertIsOnQueue(queue) + log.debug("Connecting...") + socket.connect() + } + + func disconnect() { + assertIsOnQueue(queue) + log.debug("Disconnecting...") + socket.disconnect() + } + + func write(data: Data) { + assertIsOnQueue(queue) + socket.write(data: data) + } + + private func setup() { + socket.onConnect = { [weak self] in + self?.onConnect?() + } + + socket.onDisconnect = { [weak self] error in + guard let self = self else { return } + + let disconnect: CentrifugeDisconnectOptions + + if let error = error as? WSError { + let (code, reconnect) = interpretCloseCode(UInt32(error.code)) + disconnect = CentrifugeDisconnectOptions(code: code, reason: error.message, reconnect: reconnect) + } else { + disconnect = CentrifugeDisconnectOptions(code: connectingCodeTransportClosed, reason: "transport closed", reconnect: true) + } + self.onDisconnect?(disconnect, error) + } + + socket.onData = { [weak self] data in + self?.onData?(data) + } + } +} diff --git a/Sources/SwiftCentrifuge/WebSocket.swift b/Sources/SwiftCentrifuge/WebSocket/WebSocket.swift similarity index 100% rename from Sources/SwiftCentrifuge/WebSocket.swift rename to Sources/SwiftCentrifuge/WebSocket/WebSocket.swift diff --git a/Sources/SwiftCentrifuge/WebSocket/WebSocketInterface.swift b/Sources/SwiftCentrifuge/WebSocket/WebSocketInterface.swift new file mode 100644 index 0000000..7a5bf49 --- /dev/null +++ b/Sources/SwiftCentrifuge/WebSocket/WebSocketInterface.swift @@ -0,0 +1,19 @@ +// +// WebSocketInterface.swift +// SwiftCentrifuge +// +// Created by Anton Selyanin on 03.05.2023. +// + +import Foundation + + +protocol WebSocketInterface: AnyObject { + var onConnect: (() -> Void)? { get set } + var onDisconnect: ((CentrifugeDisconnectOptions, Error?) -> Void)? { get set } + var onData: ((Data) -> Void)? { get set } + + func connect() + func disconnect() + func write(data: Data) +}