Skip to content

Commit

Permalink
Added strategies to handle subscription loop
Browse files Browse the repository at this point in the history
  • Loading branch information
jguz-pubnub committed Oct 25, 2023
1 parent 9fc77ba commit 04ba22d
Show file tree
Hide file tree
Showing 9 changed files with 942 additions and 130 deletions.
24 changes: 24 additions & 0 deletions PubNub.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,10 @@
3D9B29FA2A65609000C988C9 /* EmitMessagesTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D9B29F42A65609000C988C9 /* EmitMessagesTests.swift */; };
3D9B29FB2A65609000C988C9 /* SubscribeRequestTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D9B29F52A65609000C988C9 /* SubscribeRequestTests.swift */; };
3D9B29FD2A6560FB00C988C9 /* PresenceTransitionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D9B29FC2A6560FB00C988C9 /* PresenceTransitionTests.swift */; };
3DAAF5502AE28B3E00D3761A /* LegacySubscriptionSessionStrategy+Presence.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DAAF54E2AE28B3E00D3761A /* LegacySubscriptionSessionStrategy+Presence.swift */; };
3DAAF5512AE28B3E00D3761A /* LegacySubscriptionSessionStrategy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DAAF54F2AE28B3E00D3761A /* LegacySubscriptionSessionStrategy.swift */; };
3DAAF5532AE28B5F00D3761A /* EventEngineSubscriptionSessionStrategy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DAAF5522AE28B5F00D3761A /* EventEngineSubscriptionSessionStrategy.swift */; };
3DAAF5552AE28B6C00D3761A /* SubscriptionSessionStrategy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DAAF5542AE28B6C00D3761A /* SubscriptionSessionStrategy.swift */; };
3DACA65B2AD706FD00FCBF43 /* PubNubEventEngineContractTestSteps.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DACA65A2AD706FD00FCBF43 /* PubNubEventEngineContractTestSteps.swift */; };
3DACA65C2AD706FD00FCBF43 /* PubNubEventEngineContractTestSteps.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DACA65A2AD706FD00FCBF43 /* PubNubEventEngineContractTestSteps.swift */; };
3DACA65E2AD70E9500FCBF43 /* PubNubSubscribeEngineContractTestsSteps.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DACA65D2AD70E9500FCBF43 /* PubNubSubscribeEngineContractTestsSteps.swift */; };
Expand Down Expand Up @@ -964,6 +968,10 @@
3D9B29F42A65609000C988C9 /* EmitMessagesTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EmitMessagesTests.swift; sourceTree = "<group>"; };
3D9B29F52A65609000C988C9 /* SubscribeRequestTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeRequestTests.swift; sourceTree = "<group>"; };
3D9B29FC2A6560FB00C988C9 /* PresenceTransitionTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PresenceTransitionTests.swift; sourceTree = "<group>"; };
3DAAF54E2AE28B3E00D3761A /* LegacySubscriptionSessionStrategy+Presence.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "LegacySubscriptionSessionStrategy+Presence.swift"; sourceTree = "<group>"; };
3DAAF54F2AE28B3E00D3761A /* LegacySubscriptionSessionStrategy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = LegacySubscriptionSessionStrategy.swift; sourceTree = "<group>"; };
3DAAF5522AE28B5F00D3761A /* EventEngineSubscriptionSessionStrategy.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EventEngineSubscriptionSessionStrategy.swift; sourceTree = "<group>"; };
3DAAF5542AE28B6C00D3761A /* SubscriptionSessionStrategy.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubscriptionSessionStrategy.swift; sourceTree = "<group>"; };
3DACA65A2AD706FD00FCBF43 /* PubNubEventEngineContractTestSteps.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubEventEngineContractTestSteps.swift; sourceTree = "<group>"; };
3DACA65D2AD70E9500FCBF43 /* PubNubSubscribeEngineContractTestsSteps.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PubNubSubscribeEngineContractTestsSteps.swift; sourceTree = "<group>"; };
3DACA6602AD70EAD00FCBF43 /* PubNubPresenceEngineContractTestSteps.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubPresenceEngineContractTestSteps.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1653,6 +1661,7 @@
35A66A8522F8DB2E00AC67A9 /* Subscription */ = {
isa = PBXGroup;
children = (
3DAAF54D2AE28B3E00D3761A /* Strategy */,
35A66A8D22F911DB00AC67A9 /* SubscribeSessionFactory.swift */,
35A66A7422F861BA00AC67A9 /* SubscriptionSession.swift */,
35C829DB23147AC000F59D3C /* SubscriptionState.swift */,
Expand Down Expand Up @@ -2048,6 +2057,17 @@
path = Subscribe;
sourceTree = "<group>";
};
3DAAF54D2AE28B3E00D3761A /* Strategy */ = {
isa = PBXGroup;
children = (
3DAAF5542AE28B6C00D3761A /* SubscriptionSessionStrategy.swift */,
3DAAF54E2AE28B3E00D3761A /* LegacySubscriptionSessionStrategy+Presence.swift */,
3DAAF54F2AE28B3E00D3761A /* LegacySubscriptionSessionStrategy.swift */,
3DAAF5522AE28B5F00D3761A /* EventEngineSubscriptionSessionStrategy.swift */,
);
path = Strategy;
sourceTree = "<group>";
};
3DACA6552AD706E400FCBF43 /* EventEngine */ = {
isa = PBXGroup;
children = (
Expand Down Expand Up @@ -3252,6 +3272,7 @@
35481BF6252275B5004E07B5 /* PubNubFile.swift in Sources */,
35CDA4CC2510031E00218137 /* XMLDecoder.swift in Sources */,
3DE748812A1FA426009B0809 /* EmitMessagesEffect.swift in Sources */,
3DAAF5532AE28B5F00D3761A /* EventEngineSubscriptionSessionStrategy.swift in Sources */,
35D0615F2304830600FDB2F9 /* GenericServicePayloadResponse.swift in Sources */,
35A66A8E22F911DB00AC67A9 /* SubscribeSessionFactory.swift in Sources */,
35D8D4C522EB4600001B07D9 /* AnyJSON.swift in Sources */,
Expand All @@ -3270,6 +3291,7 @@
3534D4E222C56533008E89FA /* TimeRouter.swift in Sources */,
35CDFEAF22E7664D00F3B9F2 /* URLQueryItem+PubNub.swift in Sources */,
35304F8A22FE5425006A02CA /* Validated.swift in Sources */,
3DAAF5502AE28B3E00D3761A /* LegacySubscriptionSessionStrategy+Presence.swift in Sources */,
35EE358822E247B200E3F081 /* URLSessionConfiguration+PubNub.swift in Sources */,
3DE748822A1FA426009B0809 /* EmitStatusEffect.swift in Sources */,
35A6C7A822FBCC8B00E97CC5 /* PushRouter.swift in Sources */,
Expand All @@ -3296,6 +3318,7 @@
3DE748862A1FA426009B0809 /* Subscribe.swift in Sources */,
35B6FBAF22F226F4005EE490 /* NSNumber+PubNub.swift in Sources */,
357024BF283C07C900567EE8 /* Objects+PubNub.swift in Sources */,
3DAAF5552AE28B6C00D3761A /* SubscriptionSessionStrategy.swift in Sources */,
35B0ACE3252BE36D00537A18 /* File+PubNub.swift in Sources */,
35CF549C248ABE8B0099FE81 /* PubNubObjectMetadataPatcher.swift in Sources */,
35C829DC23147AC000F59D3C /* SubscriptionState.swift in Sources */,
Expand Down Expand Up @@ -3362,6 +3385,7 @@
3DE748872A1FA426009B0809 /* SubscribeTransition.swift in Sources */,
3D8773682A613A58004A2953 /* PresenceTransition.swift in Sources */,
3559978C230A02B7000BCFD1 /* PubNubLogger.swift in Sources */,
3DAAF5512AE28B3E00D3761A /* LegacySubscriptionSessionStrategy.swift in Sources */,
35AE6A3224FD6CEE00BBFA37 /* FileManagementRouter.swift in Sources */,
3D93EC342A41A92D0056C26D /* SubscribeRequest.swift in Sources */,
35089A0B22E56F1F002BCC94 /* Constants.swift in Sources */,
Expand Down
2 changes: 2 additions & 0 deletions Sources/PubNub/PubNubConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ public struct PubNubConfiguration: Hashable {
public var useInstanceId: Bool
/// Whether a request identifier should be included on outgoing requests
public var useRequestId: Bool
/// A flag describing whether to enable new strategy for handling subscription loop
public var enableEventEngine: Bool = false
/// Reconnection policy which will be used if/when a request fails
public var automaticRetry: AutomaticRetry?
/// URLSessionConfiguration used for URLSession network events
Expand Down
27 changes: 27 additions & 0 deletions Sources/PubNub/Subscription/ConnectionStatus.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,31 @@ public enum ConnectionStatus: Equatable {
return false
}
}

func canTransition(to state: ConnectionStatus) -> Bool {
switch (self, state) {
case (.connecting, .reconnecting):
return false
case (.connecting, _):
return true
case (.connected, .connecting):
return false
case (.connected, _):
return true
case (.reconnecting, .connecting):
return false
case (.reconnecting, _):
return true
case (.disconnected, .connecting):
return true
case (.disconnected, _):
return false
case (.disconnectedUnexpectedly, .connecting):
return true
case (.disconnectedUnexpectedly, _):
return false
case (.connectionError, _):
return false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
//
// EventEngineSubscriptionSessionStrategy.swift
//
// PubNub Real-time Cloud-Hosted Push API and Push Notification Client Frameworks
// Copyright © 2019 PubNub Inc.
// http://www.pubnub.com/
// http://www.pubnub.com/terms
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//

import Foundation

class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy {
let uuid = UUID()

var privateListeners: WeakSet<ListenerType> = WeakSet([])
var configuration: SubscriptionConfiguration
var subscribeEngine: SubscribeEngine
var presenceEngine: PresenceEngine
var previousTokenResponse: SubscribeCursor?

internal init(
configuration: SubscriptionConfiguration,
subscribeEngine: SubscribeEngine,
presenceEngine: PresenceEngine
) {
self.subscribeEngine = subscribeEngine
self.configuration = configuration
self.presenceEngine = presenceEngine
self.listenForStateUpdates()
}

var subscribedChannels: [String] {
subscribeEngine.state.input.subscribedChannels
}

var subscribedChannelGroups: [String] {
subscribeEngine.state.input.subscribedGroups
}

var subscriptionCount: Int {
subscribeEngine.state.input.totalSubscribedCount
}

var connectionStatus: ConnectionStatus {
subscribeEngine.state.connectionStatus
}

deinit {
PubNub.log.debug("SubscriptionSession Destroyed")
// Poke the session factory to clean up nil values
SubscribeSessionFactory.shared.sessionDestroyed()
}

private func listenForStateUpdates() {
subscribeEngine.onStateUpdated = { [weak self] state in
if state.hasTimetoken {
self?.previousTokenResponse = state.cursor
}
}
}

private func updateSubscribeEngineInput() {
subscribeEngine.customInput = EventEngineCustomInput(
value: Subscribe.EngineInput(
configuration: configuration,
listeners: privateListeners.allObjects
)
)
}

private func sendSubscribeEvent(event: Subscribe.Event) {
updateSubscribeEngineInput()
subscribeEngine.send(event: event)
}

private func updatePresenceEngineInput() {
presenceEngine.customInput = EventEngineCustomInput(
value: Presence.EngineInput(
configuration: configuration
)
)
}

private func sendPresenceEvent(event: Presence.Event) {
updatePresenceEngineInput()
presenceEngine.send(event: event)
}

// MARK: - Subscription Loop

func subscribe(
to channels: [String],
and groups: [String] = [],
at cursor: SubscribeCursor? = nil,
withPresence: Bool = false
) {
let newInput = subscribeEngine.state.input + SubscribeInput(
channels: channels.map { PubNubChannel(id: $0, withPresence: withPresence) },
groups: groups.map { PubNubChannel(id: $0, withPresence: withPresence) }
)
if let cursor = cursor, cursor.timetoken != 0 {
sendSubscribeEvent(event: .subscriptionRestored(
channels: newInput.allSubscribedChannels,
groups: newInput.allSubscribedGroups,
cursor: cursor
))
} else {
sendSubscribeEvent(event: .subscriptionChanged(
channels: newInput.allSubscribedChannels,
groups: newInput.allSubscribedGroups
))
}
sendPresenceEvent(event: .joined(
channels: newInput.presenceSubscribedChannels,
groups: newInput.presenceSubscribedGroups
))
}

func reconnect(at cursor: SubscribeCursor? = nil) {
let input = subscribeEngine.state.input
let channels = input.allSubscribedChannels
let groups = input.allSubscribedGroups

if let cursor = cursor {
sendSubscribeEvent(event: .subscriptionRestored(channels: channels, groups: groups, cursor: cursor))
} else {
sendSubscribeEvent(event: .reconnect)
}
}

func disconnect() {
sendSubscribeEvent(event: .disconnect)
sendPresenceEvent(event: .disconnect)
}

// MARK: - Unsubscribe

func unsubscribe(from channels: [String], and groups: [String] = [], presenceOnly: Bool = false) {
let newInput = subscribeEngine.state.input - (
channels: channels.map { presenceOnly ? $0.presenceChannelName : $0 },
groups: groups.map { presenceOnly ? $0.presenceChannelName : $0 }
)
sendSubscribeEvent(event: .subscriptionChanged(
channels: newInput.allSubscribedChannels,
groups: newInput.allSubscribedGroups
))
sendPresenceEvent(event: .left(
channels: channels,
groups: groups
))
}

func unsubscribeAll() {
sendSubscribeEvent(event: .unsubscribeAll)
sendPresenceEvent(event: .leftAll)
}
}

extension EventEngineSubscriptionSessionStrategy: EventStreamEmitter {
typealias ListenerType = BaseSubscriptionListener

var listeners: [ListenerType] {
privateListeners.allObjects
}

func add(_ listener: ListenerType) {
// Ensure that we cancel the previously attached token
listener.token?.cancel()
// Add new token to the listener
listener.token = ListenerToken { [weak self, weak listener] in
if let listener = listener {
self?.privateListeners.remove(listener)
self?.updateSubscribeEngineInput()
}
}
privateListeners.update(listener)
updateSubscribeEngineInput()
}

func notify(listeners closure: (ListenerType) -> Void) {
listeners.forEach { closure($0) }
}
}
Loading

0 comments on commit 04ba22d

Please sign in to comment.