From 50fe576ec690236cc74c87c20d7ca20bcb3a68de Mon Sep 17 00:00:00 2001 From: jguz-pubnub Date: Tue, 20 Aug 2024 12:25:42 +0200 Subject: [PATCH] Changes from feat/listeners --- .../Sources/DetailTableViewController.swift | 6 +- PubNub.xcodeproj/project.pbxproj | 2 + .../Effects/EmitMessagesEffect.swift | 6 +- .../Subscribe/Effects/EmitStatusEffect.swift | 6 +- .../Subscribe/Effects/SubscribeEffects.swift | 11 +- .../EventEngine/Subscribe/Subscribe.swift | 11 +- .../Subscribe/SubscribeTransition.swift | 110 ++++-- .../Events/New/EventListenerInterface.swift | 4 +- Sources/PubNub/Events/New/Subscription.swift | 1 - .../PubNub/Events/New/SubscriptionSet.swift | 2 +- .../Subscription/ConnectionStatus.swift | 54 +-- ...entEngineSubscriptionSessionStrategy.swift | 2 +- .../LegacySubscriptionSessionStrategy.swift | 16 +- .../Subscription/SubscriptionSession.swift | 31 +- .../Subscribe/EmitMessagesTests.swift | 34 +- .../Subscribe/EmitStatusTests.swift | 28 +- .../Subscribe/SubscribeTransitionTests.swift | 344 ++++++++++-------- .../SubscriptionIntegrationTests.swift | 50 ++- .../Routers/SubscribeRouterTests.swift | 22 +- 19 files changed, 402 insertions(+), 338 deletions(-) diff --git a/Examples/Sources/DetailTableViewController.swift b/Examples/Sources/DetailTableViewController.swift index 94682640..4b6125a3 100644 --- a/Examples/Sources/DetailTableViewController.swift +++ b/Examples/Sources/DetailTableViewController.swift @@ -249,12 +249,10 @@ class DetailTableViewController: UITableViewController { print("The signal is \(signal.payload) and was sent by \(signal.publisher ?? "")") case let .connectionStatusChanged(connectionChange): switch connectionChange { - case .connecting: - print("Status connecting...") case .connected: print("Status connected!") - case .reconnecting: - print("Status reconnecting...") + case .subscriptionChanged: + print("Subscription changed") case .disconnected: print("Status disconnected") case .disconnectedUnexpectedly: diff --git a/PubNub.xcodeproj/project.pbxproj b/PubNub.xcodeproj/project.pbxproj index c6e9265e..b69b119d 100644 --- a/PubNub.xcodeproj/project.pbxproj +++ b/PubNub.xcodeproj/project.pbxproj @@ -451,6 +451,7 @@ 3DA24A412C2AAB23005B959B /* PubNubObjC+ChannelGroups.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DA24A402C2AAB23005B959B /* PubNubObjC+ChannelGroups.swift */; }; 3DA24A432C2AAB54005B959B /* PubNubObjC+AppContext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DA24A422C2AAB54005B959B /* PubNubObjC+AppContext.swift */; }; 3DA24A452C2AABC0005B959B /* PubNubObjC+Files.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DA24A442C2AABC0005B959B /* PubNubObjC+Files.swift */; }; + 3DA0C7D02BFE59AC000FFE6C /* SubscriptionListenersContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DA0C7CF2BFE59AC000FFE6C /* SubscriptionListenersContainer.swift */; }; 3DACC7F72AB88F8E00210B14 /* Data+CommonCrypto.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DACC7F62AB88F8E00210B14 /* Data+CommonCrypto.swift */; }; 3DB2C4872C0F4B250060B8CF /* PubNubStatusListenerObjC.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB2C4862C0F4B250060B8CF /* PubNubStatusListenerObjC.swift */; }; 3DB9255C2B7A2B89001B7E90 /* SubscriptionStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925592B7A2B89001B7E90 /* SubscriptionStreamTests.swift */; }; @@ -1072,6 +1073,7 @@ 3DA24A402C2AAB23005B959B /* PubNubObjC+ChannelGroups.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "PubNubObjC+ChannelGroups.swift"; sourceTree = ""; }; 3DA24A422C2AAB54005B959B /* PubNubObjC+AppContext.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "PubNubObjC+AppContext.swift"; sourceTree = ""; }; 3DA24A442C2AABC0005B959B /* PubNubObjC+Files.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "PubNubObjC+Files.swift"; sourceTree = ""; }; + 3DA0C7CF2BFE59AC000FFE6C /* SubscriptionListenersContainer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubscriptionListenersContainer.swift; sourceTree = ""; }; 3DACC7F62AB88F8E00210B14 /* Data+CommonCrypto.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Data+CommonCrypto.swift"; sourceTree = ""; }; 3DB2C4862C0F4B250060B8CF /* PubNubStatusListenerObjC.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubStatusListenerObjC.swift; sourceTree = ""; }; 3DB925592B7A2B89001B7E90 /* SubscriptionStreamTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionStreamTests.swift; sourceTree = ""; }; diff --git a/Sources/PubNub/EventEngine/Subscribe/Effects/EmitMessagesEffect.swift b/Sources/PubNub/EventEngine/Subscribe/Effects/EmitMessagesEffect.swift index a64a1faa..0a8246f8 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Effects/EmitMessagesEffect.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Effects/EmitMessagesEffect.swift @@ -37,14 +37,14 @@ class MessageCache { struct EmitMessagesEffect: EffectHandler { let messages: [SubscribeMessagePayload] let cursor: SubscribeCursor - let listeners: [BaseSubscriptionListener] + let listeners: WeakSet let messageCache: MessageCache func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { // Attempt to detect missed messages due to queue overflow if messages.count >= 100 { listeners.forEach { - $0.emit(subscribe: .errorReceived( + $0?.emit(subscribe: .errorReceived( PubNubError( .messageCountExceededMaximum, router: nil, @@ -68,7 +68,7 @@ struct EmitMessagesEffect: EffectHandler { } listeners.forEach { - $0.emit(batch: filteredMessages) + $0?.emit(batch: filteredMessages) } completionBlock([]) diff --git a/Sources/PubNub/EventEngine/Subscribe/Effects/EmitStatusEffect.swift b/Sources/PubNub/EventEngine/Subscribe/Effects/EmitStatusEffect.swift index bcb2df5d..92fd10e1 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Effects/EmitStatusEffect.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Effects/EmitStatusEffect.swift @@ -11,16 +11,16 @@ import Foundation struct EmitStatusEffect: EffectHandler { let statusChange: Subscribe.ConnectionStatusChange - let listeners: [BaseSubscriptionListener] + let listeners: WeakSet func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { if let error = statusChange.error { listeners.forEach { - $0.emit(subscribe: .errorReceived(error)) + $0?.emit(subscribe: .errorReceived(error)) } } listeners.forEach { - $0.emit(subscribe: .connectionChanged(statusChange.newStatus)) + $0?.emit(subscribe: .connectionChanged(statusChange.newStatus)) } completionBlock([]) } diff --git a/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffects.swift b/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffects.swift index 02035957..049c65cb 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffects.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Effects/SubscribeEffects.swift @@ -15,7 +15,7 @@ import Foundation class HandshakeEffect: EffectHandler { private let subscribeEffect: SubscribeEffect - init(request: SubscribeRequest, listeners: [BaseSubscriptionListener]) { + init(request: SubscribeRequest, listeners: WeakSet) { self.subscribeEffect = SubscribeEffect( request: request, listeners: listeners, @@ -25,7 +25,6 @@ class HandshakeEffect: EffectHandler { } func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) { - subscribeEffect.listeners.forEach { $0.emit(subscribe: .connectionChanged(.connecting)) } subscribeEffect.performTask(completionBlock: completionBlock) } @@ -43,7 +42,7 @@ class HandshakeEffect: EffectHandler { class ReceivingEffect: EffectHandler { private let subscribeEffect: SubscribeEffect - init(request: SubscribeRequest, listeners: [BaseSubscriptionListener]) { + init(request: SubscribeRequest, listeners: WeakSet) { self.subscribeEffect = SubscribeEffect( request: request, listeners: listeners, @@ -69,13 +68,13 @@ class ReceivingEffect: EffectHandler { private class SubscribeEffect: EffectHandler { let request: SubscribeRequest - let listeners: [BaseSubscriptionListener] + let listeners: WeakSet let onResponseReceived: (SubscribeResponse) -> Subscribe.Event let onErrorReceived: (PubNubError) -> Subscribe.Event init( request: SubscribeRequest, - listeners: [BaseSubscriptionListener], + listeners: WeakSet, onResponseReceived: @escaping ((SubscribeResponse) -> Subscribe.Event), onErrorReceived: @escaping ((PubNubError) -> Subscribe.Event) ) { @@ -91,7 +90,7 @@ private class SubscribeEffect: EffectHandler { switch $0 { case .success(let response): selfRef.listeners.forEach { - $0.emit(subscribe: .responseReceived( + $0?.emit(subscribe: .responseReceived( SubscribeResponseHeader( channels: selfRef.request.channels.map { PubNubChannel(channel: $0) }, groups: selfRef.request.groups.map { PubNubChannel(channel: $0) }, diff --git a/Sources/PubNub/EventEngine/Subscribe/Subscribe.swift b/Sources/PubNub/EventEngine/Subscribe/Subscribe.swift index e3063dd5..e9b7d64f 100644 --- a/Sources/PubNub/EventEngine/Subscribe/Subscribe.swift +++ b/Sources/PubNub/EventEngine/Subscribe/Subscribe.swift @@ -35,7 +35,7 @@ extension Subscribe { struct HandshakingState: SubscribeState { let input: SubscribeInput let cursor: SubscribeCursor - let connectionStatus = ConnectionStatus.connecting + let connectionStatus = ConnectionStatus.disconnected } struct HandshakeStoppedState: SubscribeState { @@ -61,7 +61,7 @@ extension Subscribe { struct ReceivingState: SubscribeState { let input: SubscribeInput let cursor: SubscribeCursor - let connectionStatus = ConnectionStatus.connected + let connectionStatus: ConnectionStatus } struct ReceiveStoppedState: SubscribeState { @@ -119,9 +119,12 @@ extension Subscribe { extension Subscribe { struct Dependencies { let configuration: PubNubConfiguration - let listeners: [BaseSubscriptionListener] + let listeners: WeakSet - init(configuration: PubNubConfiguration, listeners: [BaseSubscriptionListener] = []) { + init( + configuration: PubNubConfiguration, + listeners: WeakSet = WeakSet([]) + ) { self.configuration = configuration self.listeners = listeners } diff --git a/Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift b/Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift index ef076916..e57e5726 100644 --- a/Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift +++ b/Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift @@ -30,7 +30,7 @@ class SubscribeTransition: TransitionProtocol { case .subscriptionRestored: return true case .unsubscribeAll: - return true + return !(state is Subscribe.UnsubscribedState) case .disconnect: return !( state is Subscribe.HandshakeStoppedState || state is Subscribe.ReceiveStoppedState || @@ -140,25 +140,64 @@ fileprivate extension SubscribeTransition { if newInput.isEmpty { return setUnsubscribedState(from: state) - } else { - switch state { - case is Subscribe.HandshakingState: - return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor)) - case is Subscribe.HandshakeStoppedState: - return TransitionResult(state: Subscribe.HandshakeStoppedState(input: newInput, cursor: cursor)) - case is Subscribe.HandshakeFailedState: - return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor)) - case is Subscribe.ReceivingState: - return TransitionResult(state: Subscribe.ReceivingState(input: newInput, cursor: cursor)) - case is Subscribe.ReceiveStoppedState: - return TransitionResult(state: Subscribe.ReceiveStoppedState(input: newInput, cursor: cursor)) - case is Subscribe.ReceiveFailedState: - return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor)) - case is Subscribe.UnsubscribedState: - return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor)) - default: - return TransitionResult(state: state) - } + } + + let invocations: [EffectInvocation] = state is Subscribe.ReceivingState ? [ + .regular(.emitStatus(change: Subscribe.ConnectionStatusChange( + oldStatus: state.connectionStatus, + newStatus: .subscriptionChanged( + channels: newInput.subscribedChannelNames, + groups: newInput.subscribedGroupNames + ), + error: nil + ))) + ] : [] + + switch state { + case is Subscribe.HandshakingState: + return TransitionResult( + state: Subscribe.HandshakingState(input: newInput, cursor: cursor), + invocations: invocations + ) + case is Subscribe.HandshakeStoppedState: + return TransitionResult( + state: Subscribe.HandshakeStoppedState(input: newInput, cursor: cursor), + invocations: invocations + ) + case is Subscribe.HandshakeFailedState: + return TransitionResult( + state: Subscribe.HandshakingState(input: newInput, cursor: cursor), + invocations: invocations + ) + case is Subscribe.ReceivingState: + let newStatus: ConnectionStatus = .subscriptionChanged( + channels: newInput.subscribedChannelNames, + groups: newInput.subscribedGroupNames + ) + return TransitionResult( + state: Subscribe.ReceivingState(input: newInput, cursor: cursor, connectionStatus: newStatus), + invocations: invocations + ) + case is Subscribe.ReceiveStoppedState: + return TransitionResult( + state: Subscribe.ReceiveStoppedState(input: newInput, cursor: cursor), + invocations: invocations + ) + case is Subscribe.ReceiveFailedState: + return TransitionResult( + state: Subscribe.HandshakingState(input: newInput, cursor: cursor), + invocations: invocations + ) + case is Subscribe.UnsubscribedState: + return TransitionResult( + state: Subscribe.HandshakingState(input: newInput, cursor: cursor), + invocations: invocations + ) + default: + return TransitionResult( + state: state, + invocations: invocations + ) } } } @@ -202,27 +241,30 @@ fileprivate extension SubscribeTransition { messages: [SubscribeMessagePayload] = [] ) -> TransitionResult { let emitMessagesInvocation = EffectInvocation.managed( - Subscribe.Invocation.emitMessages(events: messages, forCursor: cursor) - ) - let emitStatusInvocation = EffectInvocation.regular( - Subscribe.Invocation.emitStatus(change: Subscribe.ConnectionStatusChange( - oldStatus: state.connectionStatus, - newStatus: .connected, - error: nil - )) + Subscribe.Invocation.emitMessages( + events: messages, + forCursor: cursor + ) ) if state is Subscribe.HandshakingState { - return TransitionResult( - state: Subscribe.ReceivingState(input: state.input, cursor: cursor), - invocations: [messages.isEmpty ? nil : emitMessagesInvocation, emitStatusInvocation].compactMap { $0 } + let emitStatusInvocation = EffectInvocation.regular( + Subscribe.Invocation.emitStatus(change: Subscribe.ConnectionStatusChange( + oldStatus: state.connectionStatus, + newStatus: .connected, + error: nil + )) ) - } else { return TransitionResult( - state: Subscribe.ReceivingState(input: state.input, cursor: cursor), - invocations: [messages.isEmpty ? nil : emitMessagesInvocation].compactMap { $0 } + state: Subscribe.ReceivingState(input: state.input, cursor: cursor, connectionStatus: .connected), + invocations: [messages.isEmpty ? nil : emitMessagesInvocation, emitStatusInvocation].compactMap { $0 } ) } + + return TransitionResult( + state: Subscribe.ReceivingState(input: state.input, cursor: cursor, connectionStatus: state.connectionStatus), + invocations: [messages.isEmpty ? nil : emitMessagesInvocation].compactMap { $0 } + ) } } diff --git a/Sources/PubNub/Events/New/EventListenerInterface.swift b/Sources/PubNub/Events/New/EventListenerInterface.swift index acb314e7..4fc56e2d 100644 --- a/Sources/PubNub/Events/New/EventListenerInterface.swift +++ b/Sources/PubNub/Events/New/EventListenerInterface.swift @@ -28,7 +28,7 @@ public class StatusListener: StatusListenerInterface { public let queue: DispatchQueue public var onConnectionStateChange: ((ConnectionStatus) -> Void)? - init( + public init( uuid: UUID = UUID(), queue: DispatchQueue = .main, onConnectionStateChange: @escaping ((ConnectionStatus) -> Void) @@ -80,7 +80,7 @@ public class EventListener: EventListenerInterface { public var onFileEvent: ((PubNubFileChangeEvent) -> Void)? public var onAppContext: ((PubNubAppContextEvent) -> Void)? - init( + public init( queue: DispatchQueue = .main, uuid: UUID = UUID(), onEvent: ((PubNubEvent) -> Void)? = nil, diff --git a/Sources/PubNub/Events/New/Subscription.swift b/Sources/PubNub/Events/New/Subscription.swift index 878649c7..885bc3e2 100644 --- a/Sources/PubNub/Events/New/Subscription.swift +++ b/Sources/PubNub/Events/New/Subscription.swift @@ -142,7 +142,6 @@ extension Subscription: SubscribeCapable { let channels = subscriptionType == .channel ? [self] : [] let channelGroups = subscriptionType == .channelGroup ? [self] : [] - pubnub.registerAdapter(adapter) pubnub.internalSubscribe(with: channels, and: channelGroups, at: timetoken) } diff --git a/Sources/PubNub/Events/New/SubscriptionSet.swift b/Sources/PubNub/Events/New/SubscriptionSet.swift index 749f37fe..36d2dd20 100644 --- a/Sources/PubNub/Events/New/SubscriptionSet.swift +++ b/Sources/PubNub/Events/New/SubscriptionSet.swift @@ -173,7 +173,6 @@ extension SubscriptionSet: SubscribeCapable { return } pubnub.registerAdapter(adapter) - currentSubscriptions.forEach { pubnub.registerAdapter($0.adapter) } let channels = currentSubscriptions.filter { $0.subscriptionType == .channel @@ -200,6 +199,7 @@ extension SubscriptionSet: SubscribeCapable { guard let pubnub = currentSubscriptions.first?.pubnub, !isDisposed else { return } + pubnub.subscription.remove(adapter) pubnub.internalUnsubscribe( from: currentSubscriptions.filter { $0.subscriptionType == .channel }, and: currentSubscriptions.filter { $0.subscriptionType == .channelGroup }, diff --git a/Sources/PubNub/Subscription/ConnectionStatus.swift b/Sources/PubNub/Subscription/ConnectionStatus.swift index 4ddb5d56..55f43b29 100644 --- a/Sources/PubNub/Subscription/ConnectionStatus.swift +++ b/Sources/PubNub/Subscription/ConnectionStatus.swift @@ -12,25 +12,22 @@ import Foundation /// Status of a connection to a remote system public enum ConnectionStatus: Equatable { - /// Attempting to connect to a remote system - @available(*, deprecated, message: "This case will be removed in future versions") - case connecting /// Successfully connected to a remote system case connected /// Explicit disconnect from a remote system case disconnected - /// Attempting to reconnect to a remote system - @available(*, deprecated, message: "This case will be removed in future versions") - case reconnecting /// Unexpected disconnect from a remote system case disconnectedUnexpectedly(PubNubError) /// Unable to establish initial connection. Applies if `enableEventEngine` in `PubNubConfiguration` is true. case connectionError(PubNubError) + /// SDK subscribed with a new mix of channels (fired every time the channel/channel group mix changed) + /// since the initial connection + case subscriptionChanged(channels: [String], groups: [String]) /// If the connection is connected or attempting to connect public var isActive: Bool { switch self { - case .connecting, .connected, .reconnecting: + case .connected, .subscriptionChanged: return true default: return false @@ -41,37 +38,52 @@ public enum ConnectionStatus: Equatable { public var isConnected: Bool { if case .connected = self { return true + } else if case .subscriptionChanged = self { + return true } else { return false } } + public static func == (lhs: ConnectionStatus, rhs: ConnectionStatus) -> Bool { + switch (lhs, rhs) { + case (.connected, .connected): + return true + case (.disconnected, .disconnected): + return true + case let (.disconnectedUnexpectedly(lhsError), .disconnectedUnexpectedly(rhsError)): + return lhsError == rhsError + case let (.connectionError(lhsError), .connectionError(rhsError)): + return lhsError == rhsError + case let (.subscriptionChanged(lhsChannels, lhsGroups), .subscriptionChanged(rhsChannels, rhsGroups)): + return Set(lhsChannels) == Set(rhsChannels) && Set(lhsGroups) == Set(rhsGroups) + default: + return false + } + } + // swiftlint:disable:next cyclomatic_complexity func canTransition(to state: ConnectionStatus) -> Bool { switch (self, state) { - case (.connecting, .connected): - return true - case (.connecting, .disconnected): + case (.disconnected, .connected): return true - case (.connecting, .disconnectedUnexpectedly): + case (.disconnected, .connectionError): return true - case (.connecting, .connectionError): + case (.disconnected, .disconnectedUnexpectedly): return true - case (.connected, .disconnected): - return true - case (.reconnecting, .connected): + case (.disconnectedUnexpectedly, .connected): return true - case (.reconnecting, .disconnected): + case (.disconnectedUnexpectedly, .disconnected): return true - case (.reconnecting, .disconnectedUnexpectedly): + case (.connected, .subscriptionChanged): return true - case (.reconnecting, .connectionError): + case (.connected, .disconnected): return true - case (.disconnected, .connecting): + case (.connected, .disconnectedUnexpectedly): return true - case (.disconnectedUnexpectedly, .connecting): + case (.subscriptionChanged, .disconnectedUnexpectedly): return true - case (.connectionError, .connecting): + case (.subscriptionChanged, .disconnected): return true default: return false diff --git a/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift b/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift index abe084d5..89ccd1dc 100644 --- a/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift +++ b/Sources/PubNub/Subscription/Strategy/EventEngineSubscriptionSessionStrategy.swift @@ -76,7 +76,7 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy { subscribeEngine.dependencies = EventEngineDependencies( value: Subscribe.Dependencies( configuration: configuration, - listeners: listeners.allObjects + listeners: listeners ) ) } diff --git a/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift b/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift index a33384f0..a55b86b2 100644 --- a/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift +++ b/Sources/PubNub/Subscription/Strategy/LegacySubscriptionSessionStrategy.swift @@ -73,7 +73,6 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { presenceSession: SessionReplaceable ) { self.configuration = configuration - var mutableSession = subscribeSession filterExpression = configuration.filterExpression nonSubscribeSession = presenceSession @@ -82,21 +81,9 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { sessionStream = SessionListener(queue: responseQueue) // Add listener to session + var mutableSession = subscribeSession mutableSession.sessionStream = sessionStream longPollingSession = mutableSession - - sessionStream.didRetryRequest = { [weak self] _ in - self?.connectionStatus = .reconnecting - } - - sessionStream.sessionDidReceiveChallenge = { [weak self] _, _ in - if self?.connectionStatus == .reconnecting { - // Delay time for server to process connection after TLS handshake - DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 0.05) { - self?.connectionStatus = .connected - } - } - } } deinit { @@ -130,7 +117,6 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy { /// - parameter timetoken: The timetoken to subscribe with func reconnect(at cursor: SubscribeCursor? = nil) { if !connectionStatus.isActive { - connectionStatus = .connecting // Start subscribe loop performSubscribeLoop(at: cursor) // Start presence heartbeat diff --git a/Sources/PubNub/Subscription/SubscriptionSession.swift b/Sources/PubNub/Subscription/SubscriptionSession.swift index d174f162..69852b01 100644 --- a/Sources/PubNub/Subscription/SubscriptionSession.swift +++ b/Sources/PubNub/Subscription/SubscriptionSession.swift @@ -116,13 +116,13 @@ class SubscriptionSession: EventListenerInterface, StatusListenerInterface { withPresence: Bool = false, using pubnub: PubNub ) { - let channelSubscriptions = channels.compactMap { + let channelSubscriptions = Set(channels).compactMap { pubnub.channel($0).subscription( queue: queue, options: withPresence ? ReceivePresenceEvents() : SubscriptionOptions.empty() ) } - let channelGroupSubscriptions = groups.compactMap { + let channelGroupSubscriptions = Set(groups).compactMap { pubnub.channelGroup($0).subscription( queue: queue, options: withPresence ? ReceivePresenceEvents() : SubscriptionOptions.empty() @@ -299,7 +299,8 @@ extension SubscriptionSession { ) } - // Returns an array of subscriptions that subscribe to at least one name in common with the given Subscription + // Returns an array of subscriptions, excluding the given subscription and the global listener, + // that subscribe to at least one name in common with the given subscription func matchingSubscriptions(for subscription: Subscription, presenceOnly: Bool) -> [SubscribeMessagesReceiver] { let allSubscriptions = strategy.listeners.compactMap { $0 as? BaseSubscriptionListenerAdapter @@ -307,15 +308,14 @@ extension SubscriptionSession { let namesToFind = subscription.subscriptionNames.filter { presenceOnly ? $0.isPresenceChannelName : true } - - return allSubscriptions.filter { - $0.uuid != subscription.uuid && $0.uuid != globalEventsListener.uuid - }.compactMap { - $0.receiver - }.filter { - ($0.subscriptionTopology[subscription.subscriptionType] ?? [String]()).contains { - namesToFind.contains($0) + return allSubscriptions.compactMap { + if $0.uuid != subscription.uuid && $0.uuid != globalEventsListener.uuid { + return $0.receiver + } else { + return nil } + }.filter { + !(Set($0.subscriptionTopology[subscription.subscriptionType] ?? []).isDisjoint(with: namesToFind)) } } @@ -342,14 +342,7 @@ extension SubscriptionSession { } let channels = presenceItemsOnly ? [] : Set(subscriptions.filter { - matchingSubscriptions( - for: $0, - presenceOnly: false - ).isEmpty && - matchingSubscriptions( - for: $0, - presenceOnly: true - ).isEmpty + matchingSubscriptions(for: $0, presenceOnly: false).isEmpty && matchingSubscriptions(for: $0, presenceOnly: true).isEmpty }.flatMap { $0.subscriptionNames }).symmetricDifference(presenceItems.map { diff --git a/Tests/PubNubTests/EventEngine/Subscribe/EmitMessagesTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/EmitMessagesTests.swift index 9ff29869..324a3e30 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/EmitMessagesTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/EmitMessagesTests.swift @@ -54,7 +54,7 @@ class EmitMessagesTests: XCTestCase { let effect = EmitMessagesEffect( messages: messages, cursor: SubscribeCursor(timetoken: 12345, region: 11), - listeners: listeners, + listeners: WeakSet(listeners), messageCache: MessageCache() ) @@ -77,15 +77,16 @@ class EmitMessagesTests: XCTestCase { expectation.assertForOverFulfill = true expectation.expectedFulfillmentCount = listeners.count + let generatedMessages = (1...100).map { + generateMessage( + with: .message, + payload: AnyJSON("Hello, it's message number \($0)") + ) + } let effect = EmitMessagesEffect( - messages: (1...100).map { - generateMessage( - with: .message, - payload: AnyJSON("Hello, it's message number \($0)") - ) - }, + messages: generatedMessages, cursor: SubscribeCursor(timetoken: 12345, region: 11), - listeners: listeners, + listeners: WeakSet(listeners), messageCache: MessageCache() ) @@ -110,15 +111,16 @@ class EmitMessagesTests: XCTestCase { expectation.assertForOverFulfill = true expectation.expectedFulfillmentCount = listeners.count + let generatedMessages = (1...50).map { _ in + generateMessage( + with: .message, + payload: AnyJSON("Hello, it's a message") + ) + } let effect = EmitMessagesEffect( - messages: (1...50).map { _ in - generateMessage( - with: .message, - payload: AnyJSON("Hello, it's a message") - ) - }, + messages: generatedMessages, cursor: SubscribeCursor(timetoken: 12345, region: 11), - listeners: listeners, + listeners: WeakSet(listeners), messageCache: MessageCache() ) @@ -156,7 +158,7 @@ class EmitMessagesTests: XCTestCase { let effect = EmitMessagesEffect( messages: newMessages, cursor: SubscribeCursor(timetoken: 12345, region: 11), - listeners: listeners, + listeners: WeakSet(listeners), messageCache: cache ) diff --git a/Tests/PubNubTests/EventEngine/Subscribe/EmitStatusTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/EmitStatusTests.swift index 07b4b0c5..0ffc60d0 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/EmitStatusTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/EmitStatusTests.swift @@ -39,14 +39,16 @@ class EmitStatusTests: XCTestCase { expectation.expectedFulfillmentCount = listeners.count expectation.assertForOverFulfill = true + let testedStatusChange = Subscribe.ConnectionStatusChange( + oldStatus: .disconnected, + newStatus: .connected, + error: nil + ) let effect = EmitStatusEffect( - statusChange: Subscribe.ConnectionStatusChange( - oldStatus: .disconnected, - newStatus: .connected, - error: nil - ), - listeners: listeners + statusChange: testedStatusChange, + listeners: WeakSet(listeners) ) + listeners.forEach { $0.onEmitSubscribeEventCalled = { event in if case let .connectionChanged(status) = event { @@ -74,14 +76,16 @@ class EmitStatusTests: XCTestCase { errorExpectation.expectedFulfillmentCount = listeners.count errorExpectation.assertForOverFulfill = true + let testedStatusChange = Subscribe.ConnectionStatusChange( + oldStatus: .disconnected, + newStatus: .connected, + error: PubNubError(.unknown) + ) let effect = EmitStatusEffect( - statusChange: Subscribe.ConnectionStatusChange( - oldStatus: .disconnected, - newStatus: .connected, - error: PubNubError(.unknown) - ), - listeners: listeners + statusChange: testedStatusChange, + listeners: WeakSet(listeners) ) + listeners.forEach { $0.onEmitSubscribeEventCalled = { event in if case let .connectionChanged(status) = event { diff --git a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift index 86b5e664..79452f35 100644 --- a/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift +++ b/Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift @@ -77,25 +77,25 @@ class SubscribeTransitionTests: XCTestCase { groups: ["g1", "g1-pnpres", "g2", "g2", "g2-pnpres", "g3"] ) ) - let expectedInvocations: [EffectInvocation] = [ - .managed(.handshakeRequest( - channels: ["c1", "c1-pnpres", "c2"], - groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"] - )) - ] - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: false) ] - let expectedState = Subscribe.HandshakingState(input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor(timetoken: 0)!) + let expectedInvocations: [EffectInvocation] = [ + .managed(.handshakeRequest( + channels: ["c1", "c1-pnpres", "c2"], + groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"] + )) + ] + let expectedState = Subscribe.HandshakingState( + input: SubscribeInput(channels: expChannels, groups: expGroups), + cursor: SubscribeCursor(timetoken: 0)! + ) XCTAssertTrue(results.state.isEqual(to: expectedState)) XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations)) @@ -113,25 +113,26 @@ class SubscribeTransitionTests: XCTestCase { groups: ["g1", "g1-pnpres", "g2", "g2", "g2-pnpres", "g3"] ) ) - let expectedInvocations: [EffectInvocation] = [ - .managed(.handshakeRequest( - channels: ["c1", "c1-pnpres", "c2"], - groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"] - )) - ] - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: false) ] - let expectedState = Subscribe.HandshakingState(input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor(timetoken: 0, region: 0)) + let expectedInvocations: [EffectInvocation] = [ + .managed(.handshakeRequest( + channels: ["c1", "c1-pnpres", "c2"], + groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"] + )) + ] + + let expectedState = Subscribe.HandshakingState( + input: SubscribeInput(channels: expChannels, groups: expGroups), + cursor: SubscribeCursor(timetoken: 0, region: 0) + ) XCTAssertTrue(results.state.isEqual(to: expectedState)) XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations)) @@ -154,10 +155,11 @@ class SubscribeTransitionTests: XCTestCase { PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: false) ] - let expectedState = Subscribe.HandshakeStoppedState(input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor(timetoken: 0, region: 0)) + + let expectedState = Subscribe.HandshakeStoppedState( + input: SubscribeInput(channels: expectedChannels,groups: expectedGroups), + cursor: SubscribeCursor(timetoken: 0, region: 0) + ) XCTAssertTrue(results.state.isEqual(to: expectedState)) XCTAssertTrue(results.invocations.isEmpty) @@ -171,13 +173,6 @@ class SubscribeTransitionTests: XCTestCase { groups: ["g1", "g1-pnpres", "g2", "g2", "g2-pnpres", "g3"] ) ) - let expectedInvocations: [EffectInvocation] = [ - .cancel(.handshakeRequest), - .managed(.handshakeRequest( - channels: ["c1", "c1-pnpres", "c2"], - groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"] - )) - ] let expectedChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: false) @@ -187,48 +182,74 @@ class SubscribeTransitionTests: XCTestCase { PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: false) ] - let expectedState = Subscribe.HandshakingState(input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor(timetoken: 0, region: 0)) + let expectedNewStatus: ConnectionStatus = .subscriptionChanged( + channels: expectedChannels.map { $0.id }, + groups: expectedGroups.map { $0.id } + ) + let expectedInvocations: [EffectInvocation] = [ + .cancel(.handshakeRequest), + .managed(.handshakeRequest( + channels: ["c1", "c1-pnpres", "c2"], + groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"] + )) + ] + + let expectedState = Subscribe.HandshakingState( + input: SubscribeInput(channels: expectedChannels, groups: expectedGroups), + cursor: SubscribeCursor(timetoken: 0, region: 0) + ) XCTAssertTrue(results.state.isEqual(to: expectedState)) XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations)) } func test_SubscriptionChangedForReceivingState() throws { + let status: ConnectionStatus = .subscriptionChanged( + channels: input.subscribedChannelNames, + groups: input.subscribedGroupNames + ) let results = transition.transition( - from: Subscribe.ReceivingState(input: input, cursor: SubscribeCursor(timetoken: 5001000, region: 22)), + from: Subscribe.ReceivingState( + input: input, cursor: SubscribeCursor(timetoken: 5001000, region: 22), + connectionStatus: status + ), event: .subscriptionChanged( channels: ["c1", "c1", "c1-pnpres", "c2"], groups: ["g1", "g1-pnpres", "g2", "g2", "g2-pnpres", "g3"] ) ) + let expChannels = [ + PubNubChannel(id: "c1", withPresence: true), + PubNubChannel(id: "c2", withPresence: false) + ] + let expGroups = [ + PubNubChannel(id: "g1", withPresence: true), + PubNubChannel(id: "g2", withPresence: true), + PubNubChannel(id: "g3", withPresence: false) + ] + + let expectedNewStatus: ConnectionStatus = .subscriptionChanged( + channels: expChannels.map { $0.id }, + groups: expGroups.map { $0.id } + ) let expectedInvocations: [EffectInvocation] = [ .cancel(.receiveMessages), + .regular(.emitStatus(change: Subscribe.ConnectionStatusChange( + oldStatus: status, + newStatus: expectedNewStatus, + error: nil + ))), .managed(.receiveMessages( channels: ["c1", "c1-pnpres", "c2"], groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"], cursor: SubscribeCursor(timetoken: 5001000, region: 22) )) ] - let expectedChannels = [ - PubNubChannel(id: "c1", withPresence: true), - PubNubChannel(id: "c2", withPresence: false) - ] - let expectedGroups = [ - PubNubChannel(id: "g1", withPresence: true), - PubNubChannel(id: "g2", withPresence: true), - PubNubChannel(id: "g3", withPresence: false) - ] + let expectedState = Subscribe.ReceivingState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor( - timetoken: 5001000, - region: 22 - ) + input: SubscribeInput(channels: expChannels, groups: expGroups), + cursor: SubscribeCursor(timetoken: 5001000, region: 22), + connectionStatus: .subscriptionChanged(channels: expChannels.map { $0.id }, groups: expGroups.map { $0.id }) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -247,27 +268,24 @@ class SubscribeTransitionTests: XCTestCase { groups: ["g1", "g1-pnpres", "g2", "g2", "g2-pnpres", "g3"] ) ) - - let expectedInvocations: [EffectInvocation] = [ - .managed(.handshakeRequest( - channels: ["c1", "c1-pnpres", "c2"], - groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"] - )) - ] - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: false) ] + let expectedInvocations: [EffectInvocation] = [ + .managed(.handshakeRequest( + channels: ["c1", "c1-pnpres", "c2"], + groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3"] + )) + ] let expectedState = Subscribe.HandshakingState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor(timetoken: 500100900, region: 11) + input: SubscribeInput(channels: expChannels, groups: expGroups), + cursor: SubscribeCursor(timetoken: 500100900, region: 11) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -282,23 +300,18 @@ class SubscribeTransitionTests: XCTestCase { groups: ["g1", "g1-pnpres", "g2", "g2", "g2-pnpres", "g3"] ) ) - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: false) ] let expectedState = Subscribe.ReceiveStoppedState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor( - timetoken: 500100900, - region: 11 - ) + input: SubscribeInput(channels: expChannels, groups: expGroups), + cursor: SubscribeCursor(timetoken: 500100900, region: 11) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -308,40 +321,58 @@ class SubscribeTransitionTests: XCTestCase { // MARK: - Subscription Restored func test_SubscriptionRestoredForReceivingState() throws { + let status: ConnectionStatus = .subscriptionChanged( + channels: input.subscribedChannelNames, + groups: input.subscribedGroupNames + ) let results = transition.transition( - from: Subscribe.ReceivingState(input: input, cursor: SubscribeCursor(timetoken: 1500100900, region: 41)), + from: Subscribe.ReceivingState( + input: input, cursor: SubscribeCursor(timetoken: 1500100900, region: 41), + connectionStatus: status + ), event: .subscriptionRestored( channels: ["c1", "c1-pnpres", "c2", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4", "g4"], cursor: SubscribeCursor(timetoken: 100, region: 55) ) ) - let expectedInvocations: [EffectInvocation] = [ - .cancel(.receiveMessages), - .managed(.receiveMessages( - channels: ["c1", "c1-pnpres", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], - groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4"], - cursor: SubscribeCursor(timetoken: 100, region: 55) - )) - ] - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: true), PubNubChannel(id: "c3", withPresence: true), PubNubChannel(id: "c4", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: true), PubNubChannel(id: "g4", withPresence: false) ] + let expectedNewStatus: ConnectionStatus = .subscriptionChanged( + channels: expChannels.map { $0.id }, + groups: expGroups.map { $0.id } + ) + let expectedInvocations: [EffectInvocation] = [ + .cancel(.receiveMessages), + .regular(.emitStatus(change: Subscribe.ConnectionStatusChange( + oldStatus: status, + newStatus: expectedNewStatus, + error: nil + ))), + .managed(.receiveMessages( + channels: ["c1", "c1-pnpres", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], + groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4"], + cursor: SubscribeCursor(timetoken: 100, region: 55) + )) + ] + let expectedState = Subscribe.ReceivingState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), - cursor: SubscribeCursor(timetoken: 100, region: 55) + input: SubscribeInput(channels: expChannels, groups: expGroups), + cursor: SubscribeCursor(timetoken: 100, region: 55), + connectionStatus: .subscriptionChanged( + channels: expChannels.map { $0.id }, + groups: expGroups.map { $0.id } + ) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -361,29 +392,28 @@ class SubscribeTransitionTests: XCTestCase { cursor: SubscribeCursor(timetoken: 100, region: 55) ) ) - let expectedInvocations: [EffectInvocation] = [ - .managed(.handshakeRequest( - channels: ["c1", "c1-pnpres", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], - groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4"] - )) - ] - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: true), PubNubChannel(id: "c3", withPresence: true), PubNubChannel(id: "c4", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: true), PubNubChannel(id: "g4", withPresence: false) ] + + let expectedInvocations: [EffectInvocation] = [ + .managed(.handshakeRequest( + channels: ["c1", "c1-pnpres", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], + groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4"] + )) + ] let expectedState = Subscribe.HandshakingState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor(timetoken: 100, region: 55) + input: SubscribeInput(channels: expChannels, groups: expGroups), + cursor: SubscribeCursor(timetoken: 100, region: 55) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -402,23 +432,20 @@ class SubscribeTransitionTests: XCTestCase { cursor: SubscribeCursor(timetoken: 100, region: 55) ) ) - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: true), PubNubChannel(id: "c3", withPresence: true), PubNubChannel(id: "c4", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: true), PubNubChannel(id: "g4", withPresence: false) ] let expectedState = Subscribe.ReceiveStoppedState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), + input: SubscribeInput(channels: expChannels, groups: expGroups), cursor: SubscribeCursor(timetoken: 100, region: 55) ) @@ -435,30 +462,27 @@ class SubscribeTransitionTests: XCTestCase { cursor: SubscribeCursor(timetoken: 100, region: 55) ) ) - let expectedInvocations: [EffectInvocation] = [ - .cancel(.handshakeRequest), - .managed(.handshakeRequest( - channels: ["c1", "c1-pnpres", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], - groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4"] - )) - ] - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: true), PubNubChannel(id: "c3", withPresence: true), PubNubChannel(id: "c4", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: true), PubNubChannel(id: "g4", withPresence: false) ] + let expectedInvocations: [EffectInvocation] = [ + .cancel(.handshakeRequest), + .managed(.handshakeRequest( + channels: ["c1", "c1-pnpres", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], + groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4"] + )) + ] let expectedState = Subscribe.HandshakingState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), + input: SubscribeInput(channels: expChannels, groups: expGroups), cursor: SubscribeCursor(timetoken: 100, region: 55) ) @@ -479,29 +503,28 @@ class SubscribeTransitionTests: XCTestCase { cursor: SubscribeCursor(timetoken: 100, region: 55) ) ) - let expectedInvocations: [EffectInvocation] = [ - .managed(.handshakeRequest( - channels: ["c1", "c1-pnpres", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], - groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4"] - )) - ] - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: true), PubNubChannel(id: "c3", withPresence: true), PubNubChannel(id: "c4", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: true), PubNubChannel(id: "g4", withPresence: false) ] + let expectedInvocations: [EffectInvocation] = [ + .managed(.handshakeRequest( + channels: ["c1", "c1-pnpres", "c2", "c2-pnpres", "c3", "c3-pnpres", "c4"], + groups: ["g1", "g1-pnpres", "g2", "g2-pnpres", "g3", "g3-pnpres", "g4"] + )) + ] + let expectedState = Subscribe.HandshakingState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), cursor: SubscribeCursor(timetoken: 100, region: 55) + input: SubscribeInput(channels: expChannels, groups: expGroups), + cursor: SubscribeCursor(timetoken: 100, region: 55) ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -517,23 +540,20 @@ class SubscribeTransitionTests: XCTestCase { cursor: SubscribeCursor(timetoken: 100, region: 55) ) ) - let expectedChannels = [ + let expChannels = [ PubNubChannel(id: "c1", withPresence: true), PubNubChannel(id: "c2", withPresence: true), PubNubChannel(id: "c3", withPresence: true), PubNubChannel(id: "c4", withPresence: false) ] - let expectedGroups = [ + let expGroups = [ PubNubChannel(id: "g1", withPresence: true), PubNubChannel(id: "g2", withPresence: true), PubNubChannel(id: "g3", withPresence: true), PubNubChannel(id: "g4", withPresence: false) ] let expectedState = Subscribe.HandshakeStoppedState( - input: SubscribeInput( - channels: expectedChannels, - groups: expectedGroups - ), + input: SubscribeInput(channels: expChannels, groups: expGroups), cursor: SubscribeCursor(timetoken: 100, region: 55) ) @@ -555,7 +575,7 @@ class SubscribeTransitionTests: XCTestCase { let expectedInvocations: [EffectInvocation] = [ .cancel(.handshakeRequest), .regular(.emitStatus(change: Subscribe.ConnectionStatusChange( - oldStatus: .connecting, + oldStatus: .disconnected, newStatus: .connected, error: nil ))), @@ -566,7 +586,8 @@ class SubscribeTransitionTests: XCTestCase { ] let expectedState = Subscribe.ReceivingState( input: input, - cursor: SubscribeCursor(timetoken: 1500100900, region: 41) + cursor: SubscribeCursor(timetoken: 1500100900, region: 41), + connectionStatus: .connected ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -583,7 +604,7 @@ class SubscribeTransitionTests: XCTestCase { let expectedInvocations: [EffectInvocation] = [ .cancel(.handshakeRequest), .regular(.emitStatus(change: Subscribe.ConnectionStatusChange( - oldStatus: .connecting, + oldStatus: .disconnected, newStatus: .connectionError(PubNubError(.unknown)), error: PubNubError(.unknown)) )) @@ -604,7 +625,8 @@ class SubscribeTransitionTests: XCTestCase { let results = transition.transition( from: Subscribe.ReceivingState( input: input, - cursor: SubscribeCursor(timetoken: 18001000, region: 123) + cursor: SubscribeCursor(timetoken: 18001000, region: 123), + connectionStatus: .connected ), event: .receiveSuccess( cursor: SubscribeCursor(timetoken: 18002000, region: 123), @@ -625,7 +647,8 @@ class SubscribeTransitionTests: XCTestCase { ] let expectedState = Subscribe.ReceivingState( input: input, - cursor: SubscribeCursor(timetoken: 18002000, region: 123) + cursor: SubscribeCursor(timetoken: 18002000, region: 123), + connectionStatus: .connected ) XCTAssertTrue(results.state.isEqual(to: expectedState)) @@ -638,7 +661,8 @@ class SubscribeTransitionTests: XCTestCase { let results = transition.transition( from: Subscribe.ReceivingState( input: input, - cursor: SubscribeCursor(timetoken: 100500900, region: 11) + cursor: SubscribeCursor(timetoken: 100500900, region: 11), + connectionStatus: .connected ), event: .receiveFailure(error: PubNubError(.unknown)) ) @@ -762,7 +786,7 @@ class SubscribeTransitionTests: XCTestCase { let expectedInvocations: [EffectInvocation] = [ .cancel(.handshakeRequest), .regular(.emitStatus(change: Subscribe.ConnectionStatusChange( - oldStatus: .connecting, + oldStatus: .disconnected, newStatus: .disconnected, error: nil ))) @@ -780,7 +804,8 @@ class SubscribeTransitionTests: XCTestCase { let results = transition.transition( from: Subscribe.ReceivingState( input: input, - cursor: SubscribeCursor(timetoken: 123, region: 456) + cursor: SubscribeCursor(timetoken: 123, region: 456), + connectionStatus: .connected ), event: .disconnect ) @@ -811,14 +836,13 @@ class SubscribeTransitionTests: XCTestCase { let expectedInvocations: [EffectInvocation] = [ .cancel(.handshakeRequest), .regular(.emitStatus(change: Subscribe.ConnectionStatusChange( - oldStatus: .connecting, + oldStatus: .disconnected, newStatus: .disconnected, error: nil ))) ] - let expectedState = Subscribe.UnsubscribedState() - XCTAssertTrue(results.state.isEqual(to: expectedState)) + XCTAssertTrue(results.state.isEqual(to: Subscribe.UnsubscribedState())) XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations)) } @@ -865,7 +889,8 @@ class SubscribeTransitionTests: XCTestCase { let results = transition.transition( from: Subscribe.ReceivingState( input: input, - cursor: SubscribeCursor(timetoken: 123, region: 456) + cursor: SubscribeCursor(timetoken: 123, region: 456), + connectionStatus: .connected ), event: .unsubscribeAll ) @@ -877,9 +902,8 @@ class SubscribeTransitionTests: XCTestCase { error: nil ))) ] - let expectedState = Subscribe.UnsubscribedState() - XCTAssertTrue(results.state.isEqual(to: expectedState)) + XCTAssertTrue(results.state.isEqual(to: Subscribe.UnsubscribedState())) XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations)) } @@ -899,9 +923,8 @@ class SubscribeTransitionTests: XCTestCase { error: nil ))) ] - let expectedState = Subscribe.UnsubscribedState() - XCTAssertTrue(results.state.isEqual(to: expectedState)) + XCTAssertTrue(results.state.isEqual(to: Subscribe.UnsubscribedState())) XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations)) } @@ -920,9 +943,8 @@ class SubscribeTransitionTests: XCTestCase { error: nil ))) ] - let expectedState = Subscribe.UnsubscribedState() - XCTAssertTrue(results.state.isEqual(to: expectedState)) + XCTAssertTrue(results.state.isEqual(to: Subscribe.UnsubscribedState())) XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations)) } } diff --git a/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift b/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift index 9892df70..0085a0ee 100644 --- a/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift +++ b/Tests/PubNubTests/Integration/SubscriptionIntegrationTests.swift @@ -32,8 +32,6 @@ class SubscriptionIntegrationTests: XCTestCase { for config in [configuration, eeConfiguration] { XCTContext.runActivity(named: "Testing configuration with enableEventEngine=\(config.enableEventEngine)") { _ in let subscribeExpect = expectation(description: "Subscribe Expectation") - let connectingExpect = expectation(description: "Connecting Expectation") - let disconnectedExpect = expectation(description: "Disconnected Expectation") disconnectedExpect.assertForOverFulfill = true disconnectedExpect.expectedFulfillmentCount = 1 @@ -46,8 +44,6 @@ class SubscriptionIntegrationTests: XCTestCase { switch event { case let .connectionStatusChanged(status): switch status { - case .connecting: - connectingExpect.fulfill() case .disconnectedUnexpectedly: disconnectedExpect.fulfill() case .connectionError: @@ -66,7 +62,7 @@ class SubscriptionIntegrationTests: XCTestCase { pubnub.subscribe(to: [testChannel]) defer { pubnub.disconnect() } - wait(for: [subscribeExpect, connectingExpect, disconnectedExpect], timeout: 10.0) + wait(for: [subscribeExpect, disconnectedExpect], timeout: 10.0) } } } @@ -82,7 +78,8 @@ class SubscriptionIntegrationTests: XCTestCase { let configWithEventEngineEnabled = PubNubConfiguration( publishKey: configurationFromBundle.publishKey, subscribeKey: configurationFromBundle.subscribeKey, - userId: configurationFromBundle.userId + userId: configurationFromBundle.userId, + enableEventEngine: true ) for config in [configurationFromBundle, configWithEventEngineEnabled] { @@ -163,7 +160,8 @@ class SubscriptionIntegrationTests: XCTestCase { let configWithEventEngineEnabled = PubNubConfiguration( publishKey: configurationFromBundle.publishKey, subscribeKey: configurationFromBundle.subscribeKey, - userId: configurationFromBundle.userId + userId: configurationFromBundle.userId, + enableEventEngine: true ) for config in [configurationFromBundle, configWithEventEngineEnabled] { @@ -208,7 +206,6 @@ class SubscriptionIntegrationTests: XCTestCase { firstSubscription = nil secondSubscription = nil pubnub.unsubscribe(from: [self.testChannel]) - subscriptionSet?.unsubscribe() subscriptionSet = nil default: break @@ -242,7 +239,8 @@ class SubscriptionIntegrationTests: XCTestCase { let configWithEventEngineEnabled = PubNubConfiguration( publishKey: configurationFromBundle.publishKey, subscribeKey: configurationFromBundle.subscribeKey, - userId: configurationFromBundle.userId + userId: configurationFromBundle.userId, + enableEventEngine: true ) for config in [configurationFromBundle, configWithEventEngineEnabled] { @@ -253,7 +251,7 @@ class SubscriptionIntegrationTests: XCTestCase { let statusExpect = XCTestExpectation(description: "StatusExpect") statusExpect.assertForOverFulfill = true - statusExpect.expectedFulfillmentCount = 3 + statusExpect.expectedFulfillmentCount = 2 let pubnub = PubNub(configuration: config) var statusCounter = 0 @@ -263,13 +261,11 @@ class SubscriptionIntegrationTests: XCTestCase { messageExpect.fulfill() pubnub.unsubscribe(from: [self.testChannel]) } - pubnub.onConnectionStateChange = { [unowned pubnub] change in + pubnub.onConnectionStateChange = { [unowned pubnub, unowned self] change in if statusCounter == 0 { - XCTAssertTrue(change == .connecting) - } else if statusCounter == 1 { XCTAssertTrue(change == .connected) pubnub.publish(channel: self.testChannel, message: "This is a message", completion: nil) - } else if statusCounter == 2 { + } else if statusCounter == 1 { XCTAssertTrue(change == .disconnected) } else { XCTFail("Unexpected condition") @@ -299,7 +295,9 @@ class SubscriptionIntegrationTests: XCTestCase { subscribeKey: PubNubConfiguration(from: testsBundle).subscribeKey, userId: PubNubConfiguration(from: testsBundle).userId )) - let timetoken = Timetoken(Int(Date().timeIntervalSince1970 * 10000000)) + let timetoken = Timetoken( + Int(Date().timeIntervalSince1970 * 10000000) + ) pubnub.publish(channel: testChannel, message: "Message", completion: { [unowned pubnub, unowned self] _ in pubnub.publish(channel: self.testChannel, message: "Second message", completion: { _ in @@ -328,7 +326,7 @@ class SubscriptionIntegrationTests: XCTestCase { func test_SimultaneousSubscriptionsToTheSameChannel() { let expectation = XCTestExpectation(description: "Test Simultaneous Subscriptions") expectation.assertForOverFulfill = true - expectation.expectedFulfillmentCount = 2 + expectation.expectedFulfillmentCount = 1 let pubnub = PubNub(configuration: PubNubConfiguration( publishKey: PubNubConfiguration(from: testsBundle).publishKey, @@ -336,10 +334,10 @@ class SubscriptionIntegrationTests: XCTestCase { userId: PubNubConfiguration(from: testsBundle).userId )) + let channelName = "channel" + pubnub.onConnectionStateChange = { newStatus in switch newStatus { - case .connecting: - expectation.fulfill() case .connected: expectation.fulfill() default: @@ -347,17 +345,17 @@ class SubscriptionIntegrationTests: XCTestCase { } } - pubnub.subscribe(to: ["channel"]) - pubnub.subscribe(to: ["channel"]) + pubnub.subscribe(to: [channelName]) + pubnub.subscribe(to: [channelName]) - XCTAssertEqual(pubnub.subscribedChannels, ["channel"]) + XCTAssertEqual(pubnub.subscribedChannels, [channelName]) wait(for: [expectation], timeout: 5.0) } func test_SimultaneousSubscriptionsToTheSameChannelWithTimetoken() { let expectation = XCTestExpectation(description: "Test Simultaneous Subscriptions With Timetoken") expectation.assertForOverFulfill = true - expectation.expectedFulfillmentCount = 3 + expectation.expectedFulfillmentCount = 1 let pubnub = PubNub(configuration: PubNubConfiguration( publishKey: PubNubConfiguration(from: testsBundle).publishKey, @@ -365,10 +363,10 @@ class SubscriptionIntegrationTests: XCTestCase { userId: PubNubConfiguration(from: testsBundle).userId )) + let channelName = "channel" + pubnub.onConnectionStateChange = { newStatus in switch newStatus { - case .connecting: - expectation.fulfill() case .connected: expectation.fulfill() default: @@ -376,8 +374,8 @@ class SubscriptionIntegrationTests: XCTestCase { } } - pubnub.subscribe(to: ["channel"]) - pubnub.subscribe(to: ["channel"], at: Timetoken(Int(Date().timeIntervalSince1970 * 10000000))) + pubnub.subscribe(to: [channelName]) + pubnub.subscribe(to: [channelName], at: Timetoken(Int(Date().timeIntervalSince1970 * 10000000))) wait(for: [expectation], timeout: 5.0) } diff --git a/Tests/PubNubTests/Networking/Routers/SubscribeRouterTests.swift b/Tests/PubNubTests/Networking/Routers/SubscribeRouterTests.swift index 6baf0dd6..6d319a0a 100644 --- a/Tests/PubNubTests/Networking/Routers/SubscribeRouterTests.swift +++ b/Tests/PubNubTests/Networking/Routers/SubscribeRouterTests.swift @@ -851,16 +851,21 @@ extension SubscribeRouterTests { let mockResponses = ["subscription_handshake_success", "subscription_invalid_json", "cancelled"] let mockResult = mockSubscriptionSession(with: mockResponses, raw: [corruptedData], and: configuration) let errorExpect = XCTestExpectation(description: "Error Event") - let statusExpect = XCTestExpectation(description: "Status Event") let pubnub = PubNub(configuration: configuration) + let statusExpect: XCTestExpectation? = if (configuration.enableEventEngine) { + XCTestExpectation(description: "Status Event") + } else { + nil + } + mockResult.listener.didReceiveSubscription = { [mockResult] event in switch event { case .subscriptionChanged: break case let .connectionStatusChanged(connection): - if connection == .disconnected { - statusExpect.fulfill() + if case .connectionError = connection { + statusExpect?.fulfill() } case let .subscribeError(error): XCTAssertEqual(error.reason, .jsonDataDecodingFailure) @@ -874,7 +879,7 @@ extension SubscribeRouterTests { XCTAssertEqual(mockResult.subscriptionSession.subscribedChannels, [testChannel]) defer { mockResult.listener.cancel() } - wait(for: [errorExpect, statusExpect], timeout: 1.0, enforceOrder: true) + wait(for: [errorExpect, statusExpect].compactMap { $0 }, timeout: 1.0, enforceOrder: true) } } } @@ -936,8 +941,9 @@ extension SubscribeRouterTests { let statusExpect = XCTestExpectation(description: "Status Event") let otherChannel = "OtherChannel" let pubnub = PubNub(configuration: configuration) + let subscriptionSession = mockResult.subscriptionSession - mockResult.listener.didReceiveSubscription = { [weak self, mockResult] event in + mockResult.listener.didReceiveSubscription = { [weak self, weak subscriptionSession] event in switch event { case let .subscriptionChanged(change): switch change { @@ -953,8 +959,8 @@ extension SubscribeRouterTests { case let .connectionStatusChanged(status): switch status { case .connected: - mockResult.subscriptionSession.unsubscribeAll() - XCTAssertEqual(mockResult.subscriptionSession.subscribedChannels, []) + subscriptionSession?.unsubscribeAll() + XCTAssertEqual(subscriptionSession?.subscribedChannels, []) statusExpect.fulfill() case .disconnected: statusExpect.fulfill() @@ -969,8 +975,6 @@ extension SubscribeRouterTests { mockResult.subscriptionSession.subscribe(to: [testChannel, otherChannel], using: pubnub) XCTAssertTrue(mockResult.subscriptionSession.subscribedChannels.contains(testChannel)) XCTAssertTrue(mockResult.subscriptionSession.subscribedChannels.contains(otherChannel)) - mockResult.subscriptionSession.unsubscribeAll() - XCTAssertEqual(mockResult.subscriptionSession.subscribedChannels, []) defer { mockResult.listener.cancel() } wait(for: [statusExpect], timeout: 1.0)