Skip to content

Commit

Permalink
Subscribe & Presence Event Engine
Browse files Browse the repository at this point in the history
* Providing full backward compatibility with old subscription loop
* Always using PubNubError for any kind of errors
  • Loading branch information
jguz-pubnub committed Jan 14, 2024
1 parent db3208a commit f420485
Show file tree
Hide file tree
Showing 34 changed files with 1,997 additions and 1,600 deletions.
20 changes: 17 additions & 3 deletions Examples/Sources/MasterDetailTableViewController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,28 @@ class MasterDetailTableViewController: UITableViewController {
print("The signal is \(signal.payload) and was sent by \(signal.publisher ?? "")")
case let .connectionStatusChanged(connectionChange):
switch connectionChange {
case .connecting:
print("Status connecting...")
case .connected:
print("Status connected!")
case .connectionError:
print("Error while attempting to initialize connection")
case .reconnecting:
print("Status reconnecting...")
case .disconnected:
print("Status disconnected")
case .disconnectedUnexpectedly:
print("Disconnected unexpectedly")
print("Status disconnected unexpectedly!")
case .connectionError:
print("Cannot establish initial conection to the remote system")
}
case let .subscriptionChanged(subscribeChange):
switch subscribeChange {
case let .subscribed(channels, groups):
print("\(channels) and \(groups) were added to subscription")
case let .responseHeader(channels, groups, previous, next):
print("\(channels) and \(groups) recevied a response at \(previous?.timetoken ?? 0)")
print("\(next?.timetoken ?? 0) will be used as the new timetoken")
case let .unsubscribed(channels, groups):
print("\(channels) and \(groups) were removed from subscription")
}
case let .presenceChanged(presenceChange):
print("The channel \(presenceChange.channel) has an updated occupancy of \(presenceChange.occupancy)")
Expand Down
8 changes: 4 additions & 4 deletions PubNub.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@
3D389FE72B35AF4A006928E7 /* EmitStatusEffect.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FCB2B35AF4A006928E7 /* EmitStatusEffect.swift */; };
3D389FE82B35AF4A006928E7 /* SubscribeEffects.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FCC2B35AF4A006928E7 /* SubscribeEffects.swift */; };
3D389FE92B35AF4A006928E7 /* SubscribeEffectFactory.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FCD2B35AF4A006928E7 /* SubscribeEffectFactory.swift */; };
3D389FEA2B35AF4A006928E7 /* SubscribeError.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FCF2B35AF4A006928E7 /* SubscribeError.swift */; };
3D389FEB2B35AF4A006928E7 /* SubscribeInput.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FD02B35AF4A006928E7 /* SubscribeInput.swift */; };
3D389FEC2B35AF4A006928E7 /* SubscribeRequest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FD12B35AF4A006928E7 /* SubscribeRequest.swift */; };
3D389FED2B35AF4A006928E7 /* Subscribe.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D389FD22B35AF4A006928E7 /* Subscribe.swift */; };
Expand Down Expand Up @@ -425,6 +424,7 @@
3D38A02D2B35B087006928E7 /* SubscriptionSessionStrategy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D38A0292B35B087006928E7 /* SubscriptionSessionStrategy.swift */; };
3D38A02E2B35B087006928E7 /* LegacySubscriptionSessionStrategy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D38A02A2B35B087006928E7 /* LegacySubscriptionSessionStrategy.swift */; };
3D38A0302B35B208006928E7 /* subscription_handshake_success.json in Resources */ = {isa = PBXBuildFile; fileRef = 3D38A02F2B35B208006928E7 /* subscription_handshake_success.json */; };
3D4ED42F2B519FC500FE58C7 /* SubscriptionSessionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D4ED42E2B519FC500FE58C7 /* SubscriptionSessionTests.swift */; };
3D6265D72ABCA79100FDD5E6 /* CryptorUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D6265D62ABCA79100FDD5E6 /* CryptorUtils.swift */; };
3D758DBF2AAA1C49005D2B36 /* CryptoModule.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D758DBE2AAA1C49005D2B36 /* CryptoModule.swift */; };
3D758DC82AB06A12005D2B36 /* CryptoInputStream.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D758DC62AB06A12005D2B36 /* CryptoInputStream.swift */; };
Expand Down Expand Up @@ -969,7 +969,6 @@
3D389FCB2B35AF4A006928E7 /* EmitStatusEffect.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EmitStatusEffect.swift; sourceTree = "<group>"; };
3D389FCC2B35AF4A006928E7 /* SubscribeEffects.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeEffects.swift; sourceTree = "<group>"; };
3D389FCD2B35AF4A006928E7 /* SubscribeEffectFactory.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeEffectFactory.swift; sourceTree = "<group>"; };
3D389FCF2B35AF4A006928E7 /* SubscribeError.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeError.swift; sourceTree = "<group>"; };
3D389FD02B35AF4A006928E7 /* SubscribeInput.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeInput.swift; sourceTree = "<group>"; };
3D389FD12B35AF4A006928E7 /* SubscribeRequest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeRequest.swift; sourceTree = "<group>"; };
3D389FD22B35AF4A006928E7 /* Subscribe.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Subscribe.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1007,6 +1006,7 @@
3D38A0292B35B087006928E7 /* SubscriptionSessionStrategy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionSessionStrategy.swift; sourceTree = "<group>"; };
3D38A02A2B35B087006928E7 /* LegacySubscriptionSessionStrategy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = LegacySubscriptionSessionStrategy.swift; sourceTree = "<group>"; };
3D38A02F2B35B208006928E7 /* subscription_handshake_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = subscription_handshake_success.json; sourceTree = "<group>"; };
3D4ED42E2B519FC500FE58C7 /* SubscriptionSessionTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionSessionTests.swift; sourceTree = "<group>"; };
3D6265D62ABCA79100FDD5E6 /* CryptorUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CryptorUtils.swift; sourceTree = "<group>"; };
3D758DBE2AAA1C49005D2B36 /* CryptoModule.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CryptoModule.swift; sourceTree = "<group>"; };
3D758DC62AB06A12005D2B36 /* CryptoInputStream.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CryptoInputStream.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1241,6 +1241,7 @@
35458BA1230CB32F0085B502 /* Subscription */ = {
isa = PBXGroup;
children = (
3D4ED42E2B519FC500FE58C7 /* SubscriptionSessionTests.swift */,
35458BA2230CB3570085B502 /* SubscribeSessionFactoryTests.swift */,
);
path = Subscription;
Expand Down Expand Up @@ -2056,7 +2057,6 @@
3D389FCE2B35AF4A006928E7 /* Helpers */ = {
isa = PBXGroup;
children = (
3D389FCF2B35AF4A006928E7 /* SubscribeError.swift */,
3D389FD02B35AF4A006928E7 /* SubscribeInput.swift */,
3D389FD12B35AF4A006928E7 /* SubscribeRequest.swift */,
);
Expand Down Expand Up @@ -3395,7 +3395,6 @@
35B6FBAF22F226F4005EE490 /* NSNumber+PubNub.swift in Sources */,
3D38A02E2B35B087006928E7 /* LegacySubscriptionSessionStrategy.swift in Sources */,
357024BF283C07C900567EE8 /* Objects+PubNub.swift in Sources */,
3D389FEA2B35AF4A006928E7 /* SubscribeError.swift in Sources */,
35B0ACE3252BE36D00537A18 /* File+PubNub.swift in Sources */,
3D758DD52AB48A6A005D2B36 /* CryptorHeader.swift in Sources */,
35CF549C248ABE8B0099FE81 /* PubNubObjectMetadataPatcher.swift in Sources */,
Expand Down Expand Up @@ -3511,6 +3510,7 @@
35CDFEC022E7B48000F3B9F2 /* ImportTestResource.swift in Sources */,
3D38A00C2B35AF6A006928E7 /* SubscribeInputTests.swift in Sources */,
3D38A0132B35AF6B006928E7 /* HeartbeatEffectTests.swift in Sources */,
3D4ED42F2B519FC500FE58C7 /* SubscriptionSessionTests.swift in Sources */,
35403F8A253617A8004B978E /* XMLCodingTests.swift in Sources */,
3557CDF8237F4611004BBACC /* MessageActionsRouterTests.swift in Sources */,
35CDFEAD22E7655700F3B9F2 /* URL+PubNubTests.swift in Sources */,
Expand Down
26 changes: 18 additions & 8 deletions Sources/PubNub/EventEngine/Core/EffectHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,30 @@ protocol DelayedEffectHandler: AnyObject, EffectHandler {
var workItem: DispatchWorkItem? { get set }

func delayInterval() -> TimeInterval?
func onEarlyExit(notify completionBlock: @escaping ([Event]) -> Void)
func onEmptyInterval(notify completionBlock: @escaping ([Event]) -> Void)
func onDelayExpired(notify completionBlock: @escaping ([Event]) -> Void)
}

extension DelayedEffectHandler {
func performTask(completionBlock: @escaping ([Event]) -> Void) {
guard let delay = delayInterval() else {
onEarlyExit(notify: completionBlock); return
// MARK: - TimerEffect

class TimerEffect: EffectHandler {
private let interval: TimeInterval
private var workItem: DispatchWorkItem?

init?(interval: TimeInterval?) {
if let interval = interval {
self.interval = interval
} else {
return nil
}
let workItem = DispatchWorkItem() { [weak self] in
self?.onDelayExpired(notify: completionBlock)
}

func performTask(completionBlock: @escaping ([Void]) -> Void) {
let workItem = DispatchWorkItem() {
completionBlock([])
}
DispatchQueue.global(qos: .default).asyncAfter(
deadline: .now() + delay,
deadline: .now() + interval,
execute: workItem
)
self.workItem = workItem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,70 +10,39 @@

import Foundation

class DelayedHeartbeatEffect: DelayedEffectHandler {
typealias Event = Presence.Event

class DelayedHeartbeatEffect: EffectHandler {
private let request: PresenceHeartbeatRequest
private let configuration: SubscriptionConfiguration
private let retryAttempt: Int
private let reason: PubNubError

var workItem: DispatchWorkItem?
private let timerEffect: TimerEffect?

init(
request: PresenceHeartbeatRequest,
retryAttempt: Int,
reason: PubNubError,
configuration: SubscriptionConfiguration
reason: PubNubError
) {
self.request = request
self.retryAttempt = retryAttempt
self.reason = reason
self.configuration = configuration
self.timerEffect = TimerEffect(interval: request.reconnectionDelay(dueTo: reason, retryAttempt: retryAttempt))
}

func delayInterval() -> TimeInterval? {
guard let automaticRetry = configuration.automaticRetry else {
return nil
}
guard automaticRetry[.presence] != nil else {
return nil
}
guard automaticRetry.retryLimit > retryAttempt else {
return nil
}
guard let underlyingError = reason.underlying else {
return automaticRetry.policy.delay(for: retryAttempt)
}
guard let urlResponse = reason.affected.findFirst(by: PubNubError.AffectedValue.response) else {
return nil
func performTask(completionBlock: @escaping ([Presence.Event]) -> Void) {
guard let timerEffect = timerEffect else {
completionBlock([.heartbeatGiveUp(error: reason)]); return
}

let shouldRetry = automaticRetry.shouldRetry(
response: urlResponse,
error: underlyingError
)

return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil
}

func onEarlyExit(notify completionBlock: @escaping ([Presence.Event]) -> Void) {
completionBlock([.heartbeatGiveUp(error: reason)])
}

func onDelayExpired(notify completionBlock: @escaping ([Presence.Event]) -> Void) {
request.execute() { result in
switch result {
case .success(_):
completionBlock([.heartbeatSuccess])
case .failure(let error):
completionBlock([.heartbeatFailed(error: error)])
timerEffect.performTask { [weak self] _ in
self?.request.execute() { result in
switch result {
case .success(_):
completionBlock([.heartbeatSuccess])
case .failure(let error):
completionBlock([.heartbeatFailed(error: error)])
}
}
}
}

func cancelTask() {
workItem?.cancel()
timerEffect?.cancelTask()
request.cancel()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class PresenceEffectFactory: EffectHandlerFactory {
sessionResponseQueue: sessionResponseQueue
),
retryAttempt: retryAttempt,
reason: reason,
configuration: dependencies.value.configuration
reason: reason
)
case .leave(let channels, let groups):
return LeaveEffect(
Expand Down
32 changes: 15 additions & 17 deletions Sources/PubNub/EventEngine/Presence/Effects/WaitEffect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,27 @@

import Foundation

class WaitEffect: DelayedEffectHandler {
typealias Event = Presence.Event

private let configuration: SubscriptionConfiguration
var workItem: DispatchWorkItem?
class WaitEffect: EffectHandler {
private let timerEffect: TimerEffect?

init(configuration: SubscriptionConfiguration) {
self.configuration = configuration
}

func delayInterval() -> TimeInterval? {
configuration.heartbeatInterval > 0 ? TimeInterval(configuration.heartbeatInterval) : nil
}

func onEarlyExit(notify completionBlock: @escaping ([Presence.Event]) -> Void) {
completionBlock([])
if configuration.heartbeatInterval > 0 {
self.timerEffect = TimerEffect(interval: TimeInterval(configuration.heartbeatInterval))
} else {
self.timerEffect = nil
}
}

func onDelayExpired(notify completionBlock: @escaping ([Presence.Event]) -> Void) {
completionBlock([.timesUp])
func performTask(completionBlock: @escaping ([Presence.Event]) -> Void) {
guard let timerEffect = timerEffect else {
completionBlock([]); return
}
timerEffect.performTask(completionBlock: { _ in
completionBlock([.timesUp])
})
}

func cancelTask() {
workItem?.cancel()
timerEffect?.cancelTask()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,29 @@ class PresenceHeartbeatRequest {
func cancel() {
request?.cancel(PubNubError(.clientCancelled))
}

func reconnectionDelay(dueTo error: PubNubError, retryAttempt: Int) -> TimeInterval? {
guard let automaticRetry = configuration.automaticRetry else {
return nil
}
guard automaticRetry[.presence] != nil else {
return nil
}
guard automaticRetry.retryLimit > retryAttempt else {
return nil
}
guard let underlyingError = error.underlying else {
return automaticRetry.policy.delay(for: retryAttempt)
}
guard let urlResponse = error.affected.findFirst(by: PubNubError.AffectedValue.response) else {
return nil
}

let shouldRetry = automaticRetry.shouldRetry(
response: urlResponse,
error: underlyingError
)

return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct EmitStatusEffect: EffectHandler {
func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) {
if let error = statusChange.error {
listeners.forEach {
$0.emit(subscribe: .errorReceived(error.underlying))
$0.emit(subscribe: .errorReceived(error))
}
}
listeners.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
timetoken: 0,
session: session,
sessionResponseQueue: sessionResponseQueue
)
), listeners: dependencies.value.listeners
)
case .handshakeReconnect(let channels, let groups, let retryAttempt, let reason):
return HandshakeReconnectEffect(
Expand All @@ -55,7 +55,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
timetoken: 0,
session: session,
sessionResponseQueue: sessionResponseQueue
),
), listeners: dependencies.value.listeners,
error: reason,
retryAttempt: retryAttempt
)
Expand All @@ -70,7 +70,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
region: cursor.region,
session: session,
sessionResponseQueue: sessionResponseQueue
)
), listeners: dependencies.value.listeners
)
case .receiveReconnect(let channels, let groups, let cursor, let retryAttempt, let reason):
return ReceiveReconnectEffect(
Expand All @@ -83,7 +83,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
region: cursor.region,
session: session,
sessionResponseQueue: sessionResponseQueue
),
), listeners: dependencies.value.listeners,
error: reason,
retryAttempt: retryAttempt
)
Expand Down
Loading

0 comments on commit f420485

Please sign in to comment.