From f14fa8fa52e7da5c8927450064b5fb7fb3a593f3 Mon Sep 17 00:00:00 2001 From: Vahagn Madoyan Date: Sat, 19 Oct 2024 23:17:53 +0400 Subject: [PATCH] add data getter protocole for update token with data --- Sources/SwiftCentrifuge/Client.swift | 170 +++++++++++++-------------- 1 file changed, 85 insertions(+), 85 deletions(-) diff --git a/Sources/SwiftCentrifuge/Client.swift b/Sources/SwiftCentrifuge/Client.swift index 6148bbb..84e7672 100644 --- a/Sources/SwiftCentrifuge/Client.swift +++ b/Sources/SwiftCentrifuge/Client.swift @@ -1,4 +1,4 @@ - // +// // Client.swift // SwiftCentrifuge // @@ -65,8 +65,8 @@ public struct CentrifugeClientConfig { public weak var dataGetter: CentrifugeConnectionDataGetter? public var data: Data? = nil public var debug: Bool = false - public var logger: CentrifugeLogger? - public var useNativeWebSocket: Bool = false + public var logger: CentrifugeLogger? + public var useNativeWebSocket: Bool = false } public enum CentrifugeClientState { @@ -77,12 +77,12 @@ public enum CentrifugeClientState { public class CentrifugeClient { public weak var delegate: CentrifugeClientDelegate? - + //MARK - fileprivate(set) var url: String fileprivate(set) var syncQueue: DispatchQueue fileprivate(set) var config: CentrifugeClientConfig - + //MARK - fileprivate(set) var internalState: CentrifugeClientState = .disconnected fileprivate var conn: WebSocketInterface? @@ -107,7 +107,7 @@ public class CentrifugeClient { fileprivate var log: CentrifugeLogger static let barrierQueue = DispatchQueue(label: "com.centrifugal.centrifuge-swift.barrier<\(UUID().uuidString)>", attributes: .concurrent) - + public private(set) var state: CentrifugeClientState { get { return CentrifugeClient.barrierQueue.sync { internalState } @@ -116,7 +116,7 @@ public class CentrifugeClient { CentrifugeClient.barrierQueue.async(flags: .barrier) { self.internalState = newState } } } - + /// Initialize client. /// /// - Parameters: @@ -128,16 +128,16 @@ public class CentrifugeClient { self.config = config self.delegate = delegate self.log = config.logger ?? EmptyLogger.instance - + self.token = config.token; - + if config.data != nil { self.data = config.data; } - + let queueID = UUID().uuidString self.syncQueue = DispatchQueue(label: "com.centrifugal.centrifuge-swift.sync<\(queueID)>") - + var request = URLRequest(url: URL(string: self.url)!) for (key, value) in self.config.headers { request.addValue(value, forHTTPHeaderField: key) @@ -185,7 +185,7 @@ public class CentrifugeClient { } self.conn = ws } - + /** Connect to server. */ @@ -201,7 +201,7 @@ public class CentrifugeClient { strongSelf.conn?.connect() } } - + /** Disconnect from server. */ @@ -211,7 +211,7 @@ public class CentrifugeClient { strongSelf.processDisconnect(code: disconnectedCodeDisconnectCalled, reason: "disconnect called", reconnect: false) } } - + /** setToken allows updating connection token. - parameter token: String @@ -243,7 +243,7 @@ public class CentrifugeClient { self.subscriptions.append(sub) return sub } - + /** Try to get Subscription from internal client registry. Can return nil if Subscription does not exist yet. @@ -255,7 +255,7 @@ public class CentrifugeClient { subscriptionsLock.lock() return self.subscriptions.first(where: { $0.channel == channel }) } - + /** * Say Client that Subscription should be removed from the internal registry. Subscription will be * automatically unsubscribed before removing. @@ -271,7 +271,7 @@ public class CentrifugeClient { } self.subscriptions.removeAll(where: { $0.channel == sub.channel }) } - + /** * Get a map with all client-side suscriptions in client's internal registry. */ @@ -284,7 +284,7 @@ public class CentrifugeClient { } return subs } - + /** Send raw asynchronous (without waiting for a response) message to server. - parameter data: Data @@ -303,7 +303,7 @@ public class CentrifugeClient { }) } } - + /** Publish message Data to channel. - parameter channel: String channel name @@ -330,7 +330,7 @@ public class CentrifugeClient { }) } } - + /** Send RPC command. - parameter method: String @@ -356,7 +356,7 @@ public class CentrifugeClient { }) } } - + public func presence(channel: String, completion: @escaping (Result)->()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -370,7 +370,7 @@ public class CentrifugeClient { }) } } - + public func presenceStats(channel: String, completion: @escaping (Result)->()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -384,7 +384,7 @@ public class CentrifugeClient { }) } } - + public func history(channel: String, limit: Int32 = 0, since: CentrifugeStreamPosition? = nil, reverse: Bool = false, completion: @escaping (Result)->()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -401,7 +401,7 @@ public class CentrifugeClient { } internal extension CentrifugeClient { - + func refreshWithToken(token: String = "", or data: Data = Data()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -439,7 +439,7 @@ internal extension CentrifugeClient { }) } } - + func getBackoffDelay(step: Int, minDelay: Double, maxDelay: Double) -> Double { // Full jitter technique, details: // https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ @@ -458,12 +458,12 @@ internal extension CentrifugeClient { if (currentStep > 31) { currentStep = 31 } return min(maxDelay, minDelay + Double.random(in: 0 ... min(maxDelay, minDelay * pow(2, Double(currentStep))))); } - + func sendSubRefresh(token: String, channel: String, completion: @escaping (Centrifugal_Centrifuge_Protocol_SubRefreshResult?, Error?)->()) { var req = Centrifugal_Centrifuge_Protocol_SubRefreshRequest() req.token = token req.channel = channel - + var command = Centrifugal_Centrifuge_Protocol_Command() command.id = self.nextCommandId() command.subRefresh = req @@ -482,7 +482,7 @@ internal extension CentrifugeClient { } }) } - + func getConnectionToken(completion: @escaping (Result)->()) { self.syncQueue.async { [weak self] in guard let strongSelf = self else { return } @@ -519,7 +519,7 @@ internal extension CentrifugeClient { func getClient() -> String? { return self.client; } - + func unsubscribe(sub: CentrifugeSubscription) { let channel = sub.channel if self.state == .connected { @@ -533,7 +533,7 @@ internal extension CentrifugeClient { }) } } - + func resubscribe() { subscriptionsLock.lock() for sub in self.subscriptions { @@ -541,11 +541,11 @@ internal extension CentrifugeClient { } subscriptionsLock.unlock() } - + func subscribe(channel: String, token: String, data: Data?, recover: Bool, streamPosition: StreamPosition, positioned: Bool, recoverable: Bool, joinLeave: Bool, completion: @escaping (Centrifugal_Centrifuge_Protocol_SubscribeResult?, Error?)->()) { self.sendSubscribe(channel: channel, token: token, data: data, recover: recover, streamPosition: streamPosition, positioned: positioned, recoverable: recoverable, joinLeave: joinLeave, completion: completion) } - + func reconnect(code: UInt32, reason: String) { self.processDisconnect(code: code, reason: reason, reconnect: true) } @@ -556,11 +556,11 @@ fileprivate extension CentrifugeClient { assertIsOnQueue(syncQueue) guard self.state == .connecting else { return } - let updateData = data == nil && config.dataGetter != nil - let updateToken = token.isEmpty && config.tokenGetter != nil if refreshRequired || - updateData || - updateToken { + (data == nil && config.dataGetter != nil) || + (!token.isEmpty && config.tokenGetter != nil) { + let updateData = config.dataGetter != nil + let updateToken = config.tokenGetter != nil if updateData { getConnectionData { [weak self] result in guard let strongSelf = self, strongSelf.state == .connecting else { return } @@ -636,7 +636,7 @@ fileprivate extension CentrifugeClient { }) } } - + func handleConnectResult(res: Centrifugal_Centrifuge_Protocol_ConnectResult?, error: Error?) { if let err = error { defer { @@ -663,7 +663,7 @@ fileprivate extension CentrifugeClient { return } } - + if let result = res { self.state = .connected self.reconnectAttempts = 0 @@ -673,7 +673,7 @@ fileprivate extension CentrifugeClient { cb(nil) } self.connectCallbacks.removeAll(keepingCapacity: false) - + // Process server-side subscriptions. for (channel, subResult) in result.subs { self.serverSubs[channel] = ServerSubscription(recoverable: subResult.recoverable, offset: subResult.offset, epoch: subResult.epoch) @@ -696,26 +696,26 @@ fileprivate extension CentrifugeClient { } // Resubscribe to client-side subscriptions. self.resubscribe() - + // Start reacting on pings from a server. if result.ping > 0 { self.pingInterval = result.ping self.sendPong = result.pong self.startWaitPing() } - + // Periodically refresh connection token. if result.expires { self.startConnectionRefresh(ttl: result.ttl) } } } - + func onData(data: Data) { assertIsOnQueue(syncQueue) handleData(data: data) } - + private func nextCommandId() -> UInt32 { self.commandIdLock.lock() self.commandId += 1 @@ -723,7 +723,7 @@ fileprivate extension CentrifugeClient { self.commandIdLock.unlock() return cid } - + private func sendCommand(command: Centrifugal_Centrifuge_Protocol_Command, completion: @escaping (Centrifugal_Centrifuge_Protocol_Reply?, Error?)->()) { let strongSelf = self let commands: [Centrifugal_Centrifuge_Protocol_Command] = [command] @@ -736,13 +736,13 @@ fileprivate extension CentrifugeClient { return } } - + private func sendCommandAsync(command: Centrifugal_Centrifuge_Protocol_Command) throws { let commands: [Centrifugal_Centrifuge_Protocol_Command] = [command] let data = try CentrifugeSerializer.serializeCommands(commands: commands) self.conn?.write(data: data) } - + private func waitForReply(id: UInt32, completion: @escaping (Centrifugal_Centrifuge_Protocol_Reply?, Error?)->()) { let timeoutTask = DispatchWorkItem { [weak self] in guard let strongSelf = self else { return } @@ -750,13 +750,13 @@ fileprivate extension CentrifugeClient { completion(nil, CentrifugeError.timeout) } self.syncQueue.asyncAfter(deadline: .now() + self.config.timeout, execute: timeoutTask) - + self.opCallbacks[id] = { [weak self] rep in guard let strongSelf = self else { return } timeoutTask.cancel() - + strongSelf.opCallbacks[id] = nil - + if let err = rep.error { completion(nil, err) } else { @@ -764,7 +764,7 @@ fileprivate extension CentrifugeClient { } } } - + private func waitForConnect(completion: @escaping (Error?)->()) { if self.state == .disconnected { completion(CentrifugeError.clientDisconnected) @@ -774,24 +774,24 @@ fileprivate extension CentrifugeClient { completion(nil) return } - + // OK, let's wait. - + let uid = UUID().uuidString - + let timeoutTask = DispatchWorkItem { [weak self] in guard let strongSelf = self else { return } strongSelf.connectCallbacks[uid] = nil completion(CentrifugeError.timeout) } self.syncQueue.asyncAfter(deadline: .now() + self.config.timeout, execute: timeoutTask) - + self.connectCallbacks[uid] = { error in timeoutTask.cancel() completion(error) } } - + private func scheduleReconnect() { assertIsOnQueue(syncQueue) @@ -813,7 +813,7 @@ fileprivate extension CentrifugeClient { self.log.debug("schedule reconnect in \(delay) seconds") self.syncQueue.asyncAfter(deadline: .now() + delay, execute: self.reconnectTask!) } - + private func handlePub(channel: String, pub: Centrifugal_Centrifuge_Protocol_Publication) { subscriptionsLock.lock() let subs = self.subscriptions.filter({ $0.channel == channel }) @@ -846,7 +846,7 @@ fileprivate extension CentrifugeClient { } sub.delegate?.onPublication(sub, event) } - + private func handleJoin(channel: String, join: Centrifugal_Centrifuge_Protocol_Join) { subscriptionsLock.lock() let subs = self.subscriptions.filter({ $0.channel == channel }) @@ -862,7 +862,7 @@ fileprivate extension CentrifugeClient { subscriptionsLock.unlock() sub.delegate?.onJoin(sub, CentrifugeJoinEvent(client: join.info.client, user: join.info.user, connInfo: join.info.connInfo, chanInfo: join.info.chanInfo)) } - + private func handleLeave(channel: String, leave: Centrifugal_Centrifuge_Protocol_Leave) { subscriptionsLock.lock() let subs = self.subscriptions.filter({ $0.channel == channel }) @@ -878,7 +878,7 @@ fileprivate extension CentrifugeClient { subscriptionsLock.unlock() sub.delegate?.onLeave(sub, CentrifugeLeaveEvent(client: leave.info.client, user: leave.info.user, connInfo: leave.info.connInfo, chanInfo: leave.info.chanInfo)) } - + private func handleUnsubscribe(channel: String, unsubscribe: Centrifugal_Centrifuge_Protocol_Unsubscribe) { subscriptionsLock.lock() let subs = self.subscriptions.filter({ $0.channel == channel }) @@ -893,7 +893,7 @@ fileprivate extension CentrifugeClient { } let sub = subs[0] subscriptionsLock.unlock() - + if (unsubscribe.code < 2500) { sub.processUnsubscribe(sendUnsubscribe: false, code: unsubscribe.code, reason: unsubscribe.reason) } else { @@ -901,23 +901,23 @@ fileprivate extension CentrifugeClient { sub.resubscribeIfNecessary() } } - + private func handleSubscribe(channel: String, sub: Centrifugal_Centrifuge_Protocol_Subscribe) { self.serverSubs[channel] = ServerSubscription(recoverable: sub.recoverable, offset: sub.offset, epoch: sub.epoch) let event = CentrifugeServerSubscribedEvent(channel: channel, wasRecovering: false, recovered: false, positioned: sub.positioned, recoverable: sub.recoverable, streamPosition: sub.positioned || sub.recoverable ? StreamPosition(offset: sub.offset, epoch: sub.epoch): nil, data: sub.data) self.delegate?.onSubscribed(self, event) } - + private func handleMessage(message: Centrifugal_Centrifuge_Protocol_Message) { self.delegate?.onMessage(self, CentrifugeMessageEvent(data: message.data)) } - + private func handleDisconnect(disconnect: Centrifugal_Centrifuge_Protocol_Disconnect) { let code = disconnect.code let reconnect = code < 3500 || code >= 5000 || (code >= 4000 && code < 4500) self.processDisconnect(code: code, reason: disconnect.reason, reconnect: reconnect) } - + private func handlePing() { guard self.state == .connected else { return } self.stopWaitPing() @@ -926,7 +926,7 @@ fileprivate extension CentrifugeClient { try? self.sendCommandAsync(command: Centrifugal_Centrifuge_Protocol_Command()) } } - + private func handlePush(push: Centrifugal_Centrifuge_Protocol_Push) { let channel = push.channel if push.hasPub { @@ -952,7 +952,7 @@ fileprivate extension CentrifugeClient { self.handleDisconnect(disconnect: disconnect) } } - + private func handleData(data: Data) { do { let replies = try CentrifugeSerializer.deserializeCommands(data: data) @@ -993,8 +993,8 @@ fileprivate extension CentrifugeClient { private func startConnectionRefresh(ttl: UInt32) { let refreshTask = DispatchWorkItem { [weak self] in guard let strongSelf = self else { return } - let updateData = strongSelf.data == nil && strongSelf.config.dataGetter != nil - let updateToken = strongSelf.token.isEmpty && strongSelf.config.tokenGetter != nil + let updateData = strongSelf.data != nil && strongSelf.config.dataGetter != nil + let updateToken = !strongSelf.token.isEmpty && strongSelf.config.tokenGetter != nil if updateData { strongSelf.config.dataGetter!.getConnectionData(CentrifugeConnectionTokenEvent()) { [weak self] result in guard let strongSelf = self else { return } @@ -1193,11 +1193,11 @@ fileprivate extension CentrifugeClient { } }) } - + private func sendUnsubscribe(channel: String, completion: @escaping (Centrifugal_Centrifuge_Protocol_UnsubscribeResult?, Error?)->()) { var req = Centrifugal_Centrifuge_Protocol_UnsubscribeRequest() req.channel = channel - + var command = Centrifugal_Centrifuge_Protocol_Command() command.id = self.nextCommandId() command.unsubscribe = req @@ -1216,7 +1216,7 @@ fileprivate extension CentrifugeClient { } }) } - + private func sendSubscribe(channel: String, token: String, data: Data?, recover: Bool, streamPosition: StreamPosition, positioned: Bool, recoverable: Bool, joinLeave: Bool, completion: @escaping (Centrifugal_Centrifuge_Protocol_SubscribeResult?, Error?)->()) { var req = Centrifugal_Centrifuge_Protocol_SubscribeRequest() req.channel = channel @@ -1252,12 +1252,12 @@ fileprivate extension CentrifugeClient { } }) } - + private func sendPublish(channel: String, data: Data, completion: @escaping (Centrifugal_Centrifuge_Protocol_PublishResult?, Error?)->()) { var req = Centrifugal_Centrifuge_Protocol_PublishRequest() req.channel = channel req.data = data - + var command = Centrifugal_Centrifuge_Protocol_Command() command.id = self.nextCommandId() command.publish = req @@ -1276,7 +1276,7 @@ fileprivate extension CentrifugeClient { } }) } - + private func sendHistory(channel: String, limit: Int32 = 0, since: CentrifugeStreamPosition?, reverse: Bool = false, completion: @escaping (Result)->()) { var req = Centrifugal_Centrifuge_Protocol_HistoryRequest() req.channel = channel @@ -1315,15 +1315,15 @@ fileprivate extension CentrifugeClient { } }) } - + private func sendPresence(channel: String, completion: @escaping (Result)->()) { var req = Centrifugal_Centrifuge_Protocol_PresenceRequest() req.channel = channel - + var command = Centrifugal_Centrifuge_Protocol_Command() command.id = self.nextCommandId() command.presence = req - + self.sendCommand(command: command, completion: { [weak self] reply, error in guard self != nil else { return } if let err = error { @@ -1344,15 +1344,15 @@ fileprivate extension CentrifugeClient { } }) } - + private func sendPresenceStats(channel: String, completion: @escaping (Result)->()) { var req = Centrifugal_Centrifuge_Protocol_PresenceStatsRequest() req.channel = channel - + var command = Centrifugal_Centrifuge_Protocol_Command() command.id = self.nextCommandId() command.presenceStats = req - + self.sendCommand(command: command, completion: { [weak self] reply, error in guard self != nil else { return } if let err = error { @@ -1370,16 +1370,16 @@ fileprivate extension CentrifugeClient { } }) } - + private func sendRPC(method: String, data: Data, completion: @escaping (Centrifugal_Centrifuge_Protocol_RPCResult?, Error?)->()) { var req = Centrifugal_Centrifuge_Protocol_RPCRequest() req.data = data req.method = method - + var command = Centrifugal_Centrifuge_Protocol_Command() command.id = self.nextCommandId() command.rpc = req - + self.sendCommand(command: command, completion: { [weak self] reply, error in guard self != nil else { return } if let err = error { @@ -1396,7 +1396,7 @@ fileprivate extension CentrifugeClient { } }) } - + private func sendSend(data: Data, completion: @escaping (Error?)->()) { var req = Centrifugal_Centrifuge_Protocol_SendRequest() req.data = data @@ -1409,7 +1409,7 @@ fileprivate extension CentrifugeClient { completion(error) } } - + private func failUnauthorized() -> Void { self.processDisconnect(code: disconnectedCodeUnauthorized, reason: "unauthorized", reconnect: false) }