Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alternative websocket implementation via URLSessionWebSocketTask #84

Merged
merged 9 commits into from
Aug 3, 2023
196 changes: 82 additions & 114 deletions Sources/SwiftCentrifuge/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public protocol CentrifugeConnectionTokenGetter {
}

public struct CentrifugeClientConfig {
public init(timeout: Double = 5.0, headers: [String : String] = [String:String](), tlsSkipVerify: Bool = false, minReconnectDelay: Double = 0.5, maxReconnectDelay: Double = 20.0, maxServerPingDelay: Double = 10.0, name: String = "swift", version: String = "", token: String = "", data: Data? = nil, debug: Bool = false, tokenGetter: CentrifugeConnectionTokenGetter? = nil, logger: CentrifugeLogger? = nil) {
public init(timeout: Double = 5.0, headers: [String : String] = [String:String](), tlsSkipVerify: Bool = false, minReconnectDelay: Double = 0.5, maxReconnectDelay: Double = 20.0, maxServerPingDelay: Double = 10.0, name: String = "swift", version: String = "", token: String = "", data: Data? = nil, debug: Bool = false, useNativeWebSocket: Bool = false, tokenGetter: CentrifugeConnectionTokenGetter? = nil, logger: CentrifugeLogger? = nil) {
self.timeout = timeout
self.headers = headers
self.tlsSkipVerify = tlsSkipVerify
Expand All @@ -42,8 +42,9 @@ public struct CentrifugeClientConfig {
self.token = token
self.data = data
self.debug = debug
self.useNativeWebSocket = useNativeWebSocket
self.tokenGetter = tokenGetter
self.logger = logger
self.logger = logger
}

public var timeout = 5.0
Expand All @@ -59,6 +60,7 @@ public struct CentrifugeClientConfig {
public var data: Data? = nil
public var debug: Bool = false
public var logger: CentrifugeLogger?
public var useNativeWebSocket: Bool = false
}

public enum CentrifugeClientState {
Expand All @@ -77,15 +79,15 @@ public class CentrifugeClient {

//MARK -
fileprivate(set) var internalState: CentrifugeClientState = .disconnected
fileprivate var conn: WebSocket?
fileprivate var conn: WebSocketInterface?
fileprivate var client: String?
fileprivate var token: String?
fileprivate var data: Data?
fileprivate var commandId: UInt32 = 0
fileprivate var commandIdLock: NSLock = NSLock()
fileprivate var opCallbacks: [UInt32: ((CentrifugeResolveData) -> ())] = [:]
fileprivate var connectCallbacks: [String: ((Error?) -> ())] = [:]
fileprivate var subscriptionsLock = NSLock()
fileprivate let subscriptionsLock = NSLock()
fileprivate var subscriptions = [CentrifugeSubscription]()
fileprivate var serverSubs = [String: ServerSubscription]()
fileprivate var reconnectAttempts = 0
Expand All @@ -100,7 +102,7 @@ public class CentrifugeClient {

static let barrierQueue = DispatchQueue(label: "com.centrifugal.centrifuge-swift.barrier<\(UUID().uuidString)>", attributes: .concurrent)

public var state: CentrifugeClientState {
public private(set) var state: CentrifugeClientState {
get {
return CentrifugeClient.barrierQueue.sync { internalState }
}
Expand Down Expand Up @@ -134,71 +136,46 @@ public class CentrifugeClient {
for (key, value) in self.config.headers {
request.addValue(value, forHTTPHeaderField: key)
}

let ws = WebSocket(request: request, protocols: ["centrifuge-protobuf"])
if self.config.tlsSkipVerify {
ws.disableSSLCertValidation = true

let ws: WebSocketInterface
if #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *), config.useNativeWebSocket {
log.info("Using NativeWebSocket")
ws = NativeWebSocket(request: request, queue: syncQueue, log: log)
} else {
log.info("Using StarscreamWebSocket")
ws = StarscreamWebSocket(request: request, tlsSkipVerify: self.config.tlsSkipVerify, queue: syncQueue, log: log)
}
ws.onConnect = { [weak self] in
guard let strongSelf = self else { return }
strongSelf.log.trace("WebSocket connected")
strongSelf.onTransportOpen()
}
ws.onDisconnect = { [weak self] (error: Error?) in
ws.onDisconnect = { [weak self] disconnect, error in
guard let strongSelf = self else { return }
strongSelf.log.trace("WebSocket disconnected")
strongSelf.syncQueue.async { [weak self] in
guard let strongSelf = self else { return }

if strongSelf.state == .disconnected {
return
}

if (strongSelf.state == .connecting) {
if let err = error {
guard let strongSelf = self else { return }
strongSelf.delegate?.onError(
strongSelf,
CentrifugeErrorEvent(error: CentrifugeError.transportError(error: err))
)
}
}

var disconnect: CentrifugeDisconnectOptions

// We act according to Disconnect code semantics.
// See https://github.com/centrifugal/centrifuge/blob/master/disconnect.go.
if let err = error as? WSError {
var code: UInt32 = UInt32(err.code)
var reconnect = code < 3500 || code >= 5000 || (code >= 4000 && code < 4500)
if code < 3000 {
// We expose codes defined by Centrifuge protocol, hiding details
// about transport-specific error codes. We may have extra optional
// transportCode field in the future.
if code == 1009 {
code = disconnectCodeMessageSizeLimit
reconnect = false
} else {
code = connectingCodeTransportClosed
}
}
disconnect = CentrifugeDisconnectOptions(code: UInt32(code), reason: err.message, reconnect: reconnect)
} else {
disconnect = CentrifugeDisconnectOptions(code: connectingCodeTransportClosed, reason: "transport closed", reconnect: true)
}

if strongSelf.state != .disconnected {
strongSelf.processDisconnect(code: disconnect.code, reason: disconnect.reason, reconnect: disconnect.reconnect)
}

if strongSelf.state == .connecting {
strongSelf.scheduleReconnect()
}
assertIsOnQueue(strongSelf.syncQueue)

if strongSelf.state == .disconnected {
return
}

if strongSelf.state == .connecting, let error = error {
strongSelf.delegate?.onError(
strongSelf,
CentrifugeErrorEvent(error: CentrifugeError.transportError(error: error))
)
}

if strongSelf.state != .disconnected {
strongSelf.processDisconnect(code: disconnect.code, reason: disconnect.reason, reconnect: disconnect.reconnect)
}

if strongSelf.state == .connecting {
strongSelf.scheduleReconnect()
}
}
ws.onData = { [weak self] data in
guard let strongSelf = self else { return }
strongSelf.onData(data: data)
self?.onData(data: data)
}
self.conn = ws
}
Expand All @@ -207,7 +184,7 @@ public class CentrifugeClient {
Connect to server.
*/
public func connect() {
self.syncQueue.async{ [weak self] in
self.syncQueue.async { [weak self] in
guard let strongSelf = self else { return }
guard strongSelf.state != .connecting else { return }
guard strongSelf.state != .connected else { return }
Expand Down Expand Up @@ -249,7 +226,7 @@ public class CentrifugeClient {
public func newSubscription(channel: String, delegate: CentrifugeSubscriptionDelegate, config: CentrifugeSubscriptionConfig? = nil) throws -> CentrifugeSubscription {
defer { subscriptionsLock.unlock() }
subscriptionsLock.lock()
guard self.subscriptions.filter({ $0.channel == channel }).count == 0 else { throw CentrifugeError.duplicateSub }
guard !self.subscriptions.contains(where: { $0.channel == channel }) else { throw CentrifugeError.duplicateSub }
let sub = CentrifugeSubscription(
centrifuge: self,
channel: channel,
Expand Down Expand Up @@ -552,14 +529,14 @@ internal extension CentrifugeClient {

fileprivate extension CentrifugeClient {
func onTransportOpen() {
self.syncQueue.async { [weak self] in
guard let strongSelf = self else { return }
guard strongSelf.state == .connecting else { return }
if strongSelf.refreshRequired || (strongSelf.token == "" && strongSelf.config.tokenGetter != nil) {
strongSelf.getConnectionToken(completion: { [weak self] result in
guard let strongSelf = self, strongSelf.state == .connecting else { return }
switch result {
assertIsOnQueue(syncQueue)

guard self.state == .connecting else { return }

if refreshRequired || (token == "" && config.tokenGetter != nil) {
getConnectionToken(completion: { [weak self] result in
guard let strongSelf = self, strongSelf.state == .connecting else { return }
switch result {
case .success(let token):
strongSelf.syncQueue.async { [weak self] in
guard let strongSelf = self, strongSelf.state == .connecting else { return }
Expand Down Expand Up @@ -587,15 +564,14 @@ fileprivate extension CentrifugeClient {
)
strongSelf.conn?.disconnect()
return
}
})
} else {
strongSelf.sendConnect(completion: { [weak self] res, error in
guard let strongSelf = self else { return }
guard strongSelf.state == .connecting else { return }
strongSelf.handleConnectResult(res: res, error: error)
})
}
}
})
} else {
sendConnect(completion: { [weak self] res, error in
guard let strongSelf = self else { return }
guard strongSelf.state == .connecting else { return }
strongSelf.handleConnectResult(res: res, error: error)
})
}
}

Expand Down Expand Up @@ -674,10 +650,8 @@ fileprivate extension CentrifugeClient {
}

func onData(data: Data) {
self.syncQueue.async { [weak self] in
guard let strongSelf = self else { return }
strongSelf.handleData(data: data)
}
assertIsOnQueue(syncQueue)
handleData(data: data)
}

private func nextCommandId() -> UInt32 {
Expand Down Expand Up @@ -757,29 +731,25 @@ fileprivate extension CentrifugeClient {
}

private func scheduleReconnect() {
self.syncQueue.async { [weak self] in
assertIsOnQueue(syncQueue)

guard self.state == .connecting else { return }

let delay = self.getBackoffDelay(
step: self.reconnectAttempts,
minDelay: self.config.minReconnectDelay,
maxDelay: self.config.maxReconnectDelay
)
self.reconnectAttempts += 1
self.reconnectTask?.cancel()
self.reconnectTask = DispatchWorkItem { [weak self] in
guard let strongSelf = self else { return }
guard strongSelf.state == .connecting else { return }
let delay = strongSelf.getBackoffDelay(
step: strongSelf.reconnectAttempts,
minDelay: strongSelf.config.minReconnectDelay,
maxDelay: strongSelf.config.maxReconnectDelay
)
strongSelf.reconnectAttempts += 1
strongSelf.reconnectTask?.cancel()
strongSelf.reconnectTask = DispatchWorkItem { [weak self] in
guard let strongSelf = self else { return }
guard strongSelf.state == .connecting else { return }
strongSelf.syncQueue.async { [weak self] in
guard let strongSelf = self else { return }
guard strongSelf.state == .connecting else { return }
strongSelf.log.debug("start reconnecting")
strongSelf.conn?.connect()
}
}
strongSelf.log.debug("schedule reconnect in \(delay) seconds")
strongSelf.syncQueue.asyncAfter(deadline: .now() + delay, execute: strongSelf.reconnectTask!)
strongSelf.log.debug("start reconnecting")
strongSelf.conn?.connect()
}
self.log.debug("schedule reconnect in \(delay) seconds")
self.syncQueue.asyncAfter(deadline: .now() + delay, execute: self.reconnectTask!)
}

private func handlePub(channel: String, pub: Centrifugal_Centrifuge_Protocol_Publication) {
Expand Down Expand Up @@ -940,14 +910,11 @@ fileprivate extension CentrifugeClient {
if self.pingInterval == 0 {
return
}
self.pingTimer = DispatchSource.makeTimerSource()
self.pingTimer = DispatchSource.makeTimerSource(queue: syncQueue)
self.pingTimer?.setEventHandler { [weak self] in
guard let strongSelf = self else { return }
strongSelf.syncQueue.async { [weak self] in
guard let strongSelf = self else { return }
guard strongSelf.state == .connected else { return }
strongSelf.processDisconnect(code: connectingCodeNoPing, reason: "no ping", reconnect: true)
}
guard strongSelf.state == .connected else { return }
strongSelf.processDisconnect(code: connectingCodeNoPing, reason: "no ping", reconnect: true)
}
self.pingTimer?.schedule(deadline: .now() + Double(self.pingInterval) + self.config.maxServerPingDelay)
self.pingTimer?.resume()
Expand Down Expand Up @@ -1005,7 +972,8 @@ fileprivate extension CentrifugeClient {

// Caller must synchronize access.
private func processDisconnect(code: UInt32, reason: String, reconnect: Bool) {
if (self.state == .disconnected) {
assertIsOnQueue(syncQueue)
if self.state == .disconnected {
return
}

Expand All @@ -1023,12 +991,12 @@ fileprivate extension CentrifugeClient {
for resolveFunc in self.opCallbacks.values {
resolveFunc(CentrifugeResolveData(error: CentrifugeError.clientDisconnected, reply: nil))
}
self.opCallbacks.removeAll(keepingCapacity: false)
self.opCallbacks.removeAll()

for resolveFunc in self.connectCallbacks.values {
resolveFunc(CentrifugeError.clientDisconnected)
}
self.connectCallbacks.removeAll(keepingCapacity: false)
self.connectCallbacks.removeAll()

self.stopReconnect()
self.stopWaitPing()
Expand All @@ -1047,8 +1015,8 @@ fileprivate extension CentrifugeClient {
}
}

if (needEvent) {
if (self.state == .disconnected) {
if needEvent {
if self.state == .disconnected {
self.delegate?.onDisconnected(
self,
CentrifugeDisconnectedEvent(code: code, reason: reason)
Expand All @@ -1060,7 +1028,7 @@ fileprivate extension CentrifugeClient {
)
}
}

self.conn?.disconnect()
}

Expand Down
48 changes: 34 additions & 14 deletions Sources/SwiftCentrifuge/Codes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,40 @@

import Foundation

let disconnectedCodeDisconnectCalled: UInt32 = 0;
let disconnectedCodeUnauthorized: UInt32 = 1;
let disconnectCodeBadProtocol: UInt32 = 2;
let disconnectCodeMessageSizeLimit: UInt32 = 3;
let disconnectedCodeDisconnectCalled: UInt32 = 0
let disconnectedCodeUnauthorized: UInt32 = 1
let disconnectCodeBadProtocol: UInt32 = 2
let disconnectCodeMessageSizeLimit: UInt32 = 3

let connectingCodeConnectCalled: UInt32 = 0;
let connectingCodeTransportClosed: UInt32 = 1;
let connectingCodeNoPing: UInt32 = 2;
let connectingCodeSubscribeTimeout: UInt32 = 3;
let connectingCodeUnsubscribeError: UInt32 = 4;
let connectingCodeConnectCalled: UInt32 = 0
let connectingCodeTransportClosed: UInt32 = 1
let connectingCodeNoPing: UInt32 = 2
let connectingCodeSubscribeTimeout: UInt32 = 3
let connectingCodeUnsubscribeError: UInt32 = 4

let subscribingCodeSubscribeCalled: UInt32 = 0;
let subscribingCodeTransportClosed: UInt32 = 1;
let subscribingCodeSubscribeCalled: UInt32 = 0
let subscribingCodeTransportClosed: UInt32 = 1

let unsubscribedCodeUnsubscribeCalled: UInt32 = 0;
let unsubscribedCodeUnauthorized: UInt32 = 1;
let unsubscribedCodeClientClosed: UInt32 = 2;
let unsubscribedCodeUnsubscribeCalled: UInt32 = 0
let unsubscribedCodeUnauthorized: UInt32 = 1
let unsubscribedCodeClientClosed: UInt32 = 2

func interpretCloseCode(_ code: UInt32) -> (code: UInt32, reconnect: Bool) {
// We act according to Disconnect code semantics.
// See https://github.com/centrifugal/centrifuge/blob/master/disconnect.go.
var code = code
var reconnect = code < 3500 || code >= 5000 || (code >= 4000 && code < 4500)
if code < 3000 {
// We expose codes defined by Centrifuge protocol, hiding details
// about transport-specific error codes. We may have extra optional
// transportCode field in the future.
if code == 1009 {
code = disconnectCodeMessageSizeLimit
reconnect = false
} else {
code = connectingCodeTransportClosed
}
}

return (code, reconnect)
}
Loading
Loading