Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(retry): moving retry logic away from EventEngine
Browse files Browse the repository at this point in the history
jguz-pubnub committed Apr 8, 2024
1 parent f744f43 commit d24c0f6
Showing 22 changed files with 85 additions and 1,616 deletions.
72 changes: 53 additions & 19 deletions PubNub.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -41,19 +41,6 @@ class PresenceEffectFactory: EffectHandlerFactory {
sessionResponseQueue: sessionResponseQueue
)
)
case let .delayedHeartbeat(channels, groups, retryAttempt, reason):
return DelayedHeartbeatEffect(
request: PresenceHeartbeatRequest(
channels: channels,
groups: groups,
channelStates: presenceStateContainer.getStates(forChannels: channels),
configuration: dependencies.value.configuration,
session: session,
sessionResponseQueue: sessionResponseQueue
),
retryAttempt: retryAttempt,
reason: reason
)
case let .leave(channels, groups):
return LeaveEffect(
request: PresenceLeaveRequest(
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ class PresenceHeartbeatRequest {
)
request = session.request(
with: PresenceRouter(endpoint, configuration: configuration),
requestOperator: nil
requestOperator: configuration.automaticRetry?.retryOperator(for: .presence)
)
request?.validate().response(on: sessionResponseQueue, decoder: GenericServiceResponseDecoder()) { result in
switch result {
@@ -60,29 +60,4 @@ class PresenceHeartbeatRequest {
func cancel() {
request?.cancel(PubNubError(.clientCancelled))
}

func reconnectionDelay(dueTo error: PubNubError, retryAttempt: Int) -> TimeInterval? {
guard let automaticRetry = configuration.automaticRetry else {
return nil
}
guard automaticRetry.retryOperator(for: .presence) != nil else {
return nil
}
guard automaticRetry.retryLimit > retryAttempt else {
return nil
}
guard let underlyingError = error.underlying else {
return automaticRetry.policy.delay(for: retryAttempt)
}
guard let urlResponse = error.affected.findFirst(by: PubNubError.AffectedValue.response) else {
return nil
}

let shouldRetry = automaticRetry.shouldRetry(
response: urlResponse,
error: underlyingError
)

return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil
}
}
13 changes: 0 additions & 13 deletions Sources/PubNub/EventEngine/Presence/Presence.swift
Original file line number Diff line number Diff line change
@@ -42,12 +42,6 @@ extension Presence {
let input: PresenceInput
}

struct HeartbeatReconnecting: PresenceState {
let input: PresenceInput
let retryAttempt: Int
let error: PubNubError
}

struct HeartbeatFailed: PresenceState {
let input: PresenceInput
let error: PubNubError
@@ -74,7 +68,6 @@ extension Presence {
case timesUp
case heartbeatSuccess
case heartbeatFailed(error: PubNubError)
case heartbeatGiveUp(error: PubNubError)
}
}

@@ -90,20 +83,16 @@ extension Presence {
enum Invocation: AnyEffectInvocation {
case heartbeat(channels: [String], groups: [String])
case leave(channels: [String], groups: [String])
case delayedHeartbeat(channels: [String], groups: [String], retryAttempt: Int, error: PubNubError)
case wait

// swiftlint:disable:next nesting
enum Cancellable: AnyCancellableInvocation {
case wait
case delayedHeartbeat

var id: String {
switch self {
case .wait:
return "Presence.ScheduleNextHeartbeat"
case .delayedHeartbeat:
return "Presence.HeartbeatReconnect"
}
}
}
@@ -114,8 +103,6 @@ extension Presence {
return "Presence.Heartbeat"
case .wait:
return Cancellable.wait.id
case .delayedHeartbeat:
return Cancellable.delayedHeartbeat.id
case .leave:
return "Presence.Leave"
}
34 changes: 4 additions & 30 deletions Sources/PubNub/EventEngine/Presence/PresenceTransition.swift
Original file line number Diff line number Diff line change
@@ -28,11 +28,9 @@ class PresenceTransition: TransitionProtocol {
case .left:
return !(state is Presence.HeartbeatInactive)
case .heartbeatSuccess:
return state is Presence.Heartbeating || state is Presence.HeartbeatReconnecting
return state is Presence.Heartbeating
case .heartbeatFailed:
return state is Presence.Heartbeating || state is Presence.HeartbeatReconnecting
case .heartbeatGiveUp:
return state is Presence.HeartbeatReconnecting
return state is Presence.Heartbeating
case .timesUp:
return state is Presence.HeartbeatCooldown
case .leftAll:
@@ -51,11 +49,6 @@ class PresenceTransition: TransitionProtocol {
channels: state.channels,
groups: state.input.groups
))]
case let state as Presence.HeartbeatReconnecting:
return [.managed(.delayedHeartbeat(
channels: state.channels, groups: state.groups,
retryAttempt: state.retryAttempt, error: state.error
))]
case is Presence.HeartbeatCooldown:
return [.managed(.wait)]
default:
@@ -67,8 +60,6 @@ class PresenceTransition: TransitionProtocol {
switch state {
case is Presence.HeartbeatCooldown:
return [.cancel(.wait)]
case is Presence.HeartbeatReconnecting:
return [.cancel(.delayedHeartbeat)]
default:
return []
}
@@ -85,9 +76,7 @@ class PresenceTransition: TransitionProtocol {
case .heartbeatSuccess:
results = heartbeatSuccessTransition(from: state)
case let .heartbeatFailed(error):
results = heartbeatReconnectingTransition(from: state, dueTo: error)
case let .heartbeatGiveUp(error):
results = heartbeatReconnectingGiveUpTransition(from: state, dueTo: error)
results = heartbeatFailedTransition(from: state, dueTo: error)
case .timesUp:
results = heartbeatingTransition(from: state)
case .leftAll:
@@ -156,22 +145,7 @@ fileprivate extension PresenceTransition {
}

fileprivate extension PresenceTransition {
func heartbeatReconnectingTransition(
from state: State,
dueTo error: PubNubError
) -> TransitionResult<State, Invocation> {
return TransitionResult(
state: Presence.HeartbeatReconnecting(
input: state.input,
retryAttempt: ((state as? Presence.HeartbeatReconnecting)?.retryAttempt ?? -1) + 1,
error: error
)
)
}
}

fileprivate extension PresenceTransition {
func heartbeatReconnectingGiveUpTransition(
func heartbeatFailedTransition(
from state: State,
dueTo error: PubNubError
) -> TransitionResult<State, Invocation> {
Original file line number Diff line number Diff line change
@@ -45,20 +45,6 @@ class SubscribeEffectFactory: EffectHandlerFactory {
sessionResponseQueue: sessionResponseQueue
), listeners: dependencies.value.listeners
)
case let .handshakeReconnect(channels, groups, retryAttempt, reason):
return HandshakeReconnectEffect(
request: SubscribeRequest(
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
channelStates: presenceStateContainer.getStates(forChannels: channels),
timetoken: 0,
session: session,
sessionResponseQueue: sessionResponseQueue
), listeners: dependencies.value.listeners,
error: reason,
retryAttempt: retryAttempt
)
case let .receiveMessages(channels, groups, cursor):
return ReceivingEffect(
request: SubscribeRequest(
@@ -72,21 +58,6 @@ class SubscribeEffectFactory: EffectHandlerFactory {
sessionResponseQueue: sessionResponseQueue
), listeners: dependencies.value.listeners
)
case let .receiveReconnect(channels, groups, cursor, retryAttempt, reason):
return ReceiveReconnectEffect(
request: SubscribeRequest(
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
channelStates: [:],
timetoken: cursor.timetoken,
region: cursor.region,
session: session,
sessionResponseQueue: sessionResponseQueue
), listeners: dependencies.value.listeners,
error: reason,
retryAttempt: retryAttempt
)
case let .emitMessages(messages, cursor):
return EmitMessagesEffect(
messages: messages,
Original file line number Diff line number Diff line change
@@ -65,105 +65,6 @@ class ReceivingEffect: EffectHandler {
}
}

// MARK: - HandshakeReconnectEffect

class HandshakeReconnectEffect: EffectHandler {
private let subscribeEffect: SubscribeEffect
private let timerEffect: TimerEffect?
private let error: PubNubError

init(
request: SubscribeRequest,
listeners: [BaseSubscriptionListener],
error: PubNubError,
retryAttempt: Int
) {
self.timerEffect = TimerEffect(interval: request.reconnectionDelay(
dueTo: error,
retryAttempt: retryAttempt
))
self.subscribeEffect = SubscribeEffect(
request: request,
listeners: listeners,
onResponseReceived: { .handshakeReconnectSuccess(cursor: $0.cursor) },
onErrorReceived: { .handshakeReconnectFailure(error: $0) }
)
self.error = error
}

func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) {
guard let timerEffect = timerEffect else {
completionBlock([.handshakeReconnectGiveUp(error: error)]); return
}
timerEffect.performTask { [weak self] _ in
self?.subscribeEffect.performTask(completionBlock: completionBlock)
}
}

func cancelTask() {
timerEffect?.cancelTask()
subscribeEffect.cancelTask()
}

deinit {
cancelTask()
}
}

// MARK: - ReceiveReconnectEffect

class ReceiveReconnectEffect: EffectHandler {
private let subscribeEffect: SubscribeEffect
private let timerEffect: TimerEffect?
private let error: PubNubError

init(
request: SubscribeRequest,
listeners: [BaseSubscriptionListener],
error: PubNubError,
retryAttempt: Int
) {
self.timerEffect = TimerEffect(interval: request.reconnectionDelay(
dueTo: error,
retryAttempt: retryAttempt
))
self.subscribeEffect = SubscribeEffect(
request: request,
listeners: listeners,
onResponseReceived: { .receiveReconnectSuccess(cursor: $0.cursor, messages: $0.messages) },
onErrorReceived: { .receiveReconnectFailure(error: $0) }
)
self.error = error
}

func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) {
subscribeEffect.listeners.forEach {
$0.emit(subscribe: .connectionChanged(.reconnecting))
}
guard let timerEffect = timerEffect else {
completionBlock([.receiveReconnectGiveUp(error: error)]); return
}
subscribeEffect.request.onAuthChallengeReceived = { [weak self] in
// Delay time for server to process connection after TLS handshake
DispatchQueue.global(qos: .default).asyncAfter(deadline: DispatchTime.now() + 0.05) {
self?.subscribeEffect.listeners.forEach { $0.emit(subscribe: .connectionChanged(.connected)) }
}
}
timerEffect.performTask { [weak self] _ in
self?.subscribeEffect.performTask(completionBlock: completionBlock)
}
}

func cancelTask() {
timerEffect?.cancelTask()
subscribeEffect.cancelTask()
}

deinit {
cancelTask()
}
}

// MARK: - SubscribeEffect

private class SubscribeEffect: EffectHandler {
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ class SubscribeRequest {
private let session: SessionReplaceable
private let sessionResponseQueue: DispatchQueue
private let channelStates: [String: JSONCodable]

private var request: RequestReplaceable?

var retryLimit: UInt { configuration.automaticRetry?.retryLimit ?? 0 }
@@ -52,29 +51,6 @@ class SubscribeRequest {
}
}

func reconnectionDelay(dueTo error: PubNubError, retryAttempt: Int) -> TimeInterval? {
guard let automaticRetry = configuration.automaticRetry else {
return nil
}
guard automaticRetry.retryOperator(for: .subscribe) != nil else {
return nil
}
guard automaticRetry.retryLimit > retryAttempt else {
return nil
}
guard let underlyingError = error.underlying else {
return automaticRetry.policy.delay(for: retryAttempt)
}
guard let urlResponse = error.affected.findFirst(by: PubNubError.AffectedValue.response) else {
return nil
}
let shouldRetry = automaticRetry.shouldRetry(
response: urlResponse,
error: underlyingError
)
return shouldRetry ? automaticRetry.policy.delay(for: retryAttempt) : nil
}

func execute(onCompletion: @escaping (Result<SubscribeResponse, PubNubError>) -> Void) {
let router = SubscribeRouter(
.subscribe(
@@ -90,7 +66,7 @@ class SubscribeRequest {
)
request = session.request(
with: router,
requestOperator: nil
requestOperator: configuration.automaticRetry?.retryOperator(for: .subscribe)
)
request?.validate().response(
on: sessionResponseQueue,
34 changes: 0 additions & 34 deletions Sources/PubNub/EventEngine/Subscribe/Subscribe.swift
Original file line number Diff line number Diff line change
@@ -44,14 +44,6 @@ extension Subscribe {
let connectionStatus = ConnectionStatus.disconnected
}

struct HandshakeReconnectingState: SubscribeState {
let input: SubscribeInput
let cursor: SubscribeCursor
let retryAttempt: Int
let reason: PubNubError
let connectionStatus = ConnectionStatus.connecting
}

struct HandshakeFailedState: SubscribeState {
let input: SubscribeInput
let cursor: SubscribeCursor
@@ -72,14 +64,6 @@ extension Subscribe {
let connectionStatus = ConnectionStatus.connected
}

struct ReceiveReconnectingState: SubscribeState {
let input: SubscribeInput
let cursor: SubscribeCursor
let retryAttempt: Int
let reason: PubNubError
let connectionStatus = ConnectionStatus.connected
}

struct ReceiveStoppedState: SubscribeState {
let input: SubscribeInput
let cursor: SubscribeCursor
@@ -116,14 +100,8 @@ extension Subscribe {
case subscriptionRestored(channels: [String], groups: [String], cursor: SubscribeCursor)
case handshakeSuccess(cursor: SubscribeCursor)
case handshakeFailure(error: PubNubError)
case handshakeReconnectSuccess(cursor: SubscribeCursor)
case handshakeReconnectFailure(error: PubNubError)
case handshakeReconnectGiveUp(error: PubNubError)
case receiveSuccess(cursor: SubscribeCursor, messages: [SubscribeMessagePayload])
case receiveFailure(error: PubNubError)
case receiveReconnectSuccess(cursor: SubscribeCursor, messages: [SubscribeMessagePayload])
case receiveReconnectFailure(error: PubNubError)
case receiveReconnectGiveUp(error: PubNubError)
case disconnect
case reconnect(cursor: SubscribeCursor?)
case unsubscribeAll
@@ -155,29 +133,21 @@ extension Subscribe {
extension Subscribe {
enum Invocation: AnyEffectInvocation {
case handshakeRequest(channels: [String], groups: [String])
case handshakeReconnect(channels: [String], groups: [String], retryAttempt: Int, reason: PubNubError)
case receiveMessages(channels: [String], groups: [String], cursor: SubscribeCursor)
case receiveReconnect(channels: [String], groups: [String], cursor: SubscribeCursor, retryAttempt: Int, reason: PubNubError)
case emitStatus(change: Subscribe.ConnectionStatusChange)
case emitMessages(events: [SubscribeMessagePayload], forCursor: SubscribeCursor)

// swiftlint:disable:next nesting
enum Cancellable: AnyCancellableInvocation {
case handshakeRequest
case handshakeReconnect
case receiveMessages
case receiveReconnect

var id: String {
switch self {
case .handshakeRequest:
return "Subscribe.HandshakeRequest"
case .handshakeReconnect:
return "Subscribe.HandshakeReconnect"
case .receiveMessages:
return "Subscribe.ReceiveMessages"
case .receiveReconnect:
return "Subscribe.ReceiveReconnect"
}
}
}
@@ -186,12 +156,8 @@ extension Subscribe {
switch self {
case .handshakeRequest:
return Cancellable.handshakeRequest.id
case .handshakeReconnect:
return Cancellable.handshakeReconnect.id
case .receiveMessages:
return Cancellable.receiveMessages.id
case .receiveReconnect:
return Cancellable.receiveReconnect.id
case .emitMessages:
return "Subscribe.EmitMessages"
case .emitStatus:
99 changes: 1 addition & 98 deletions Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift
Original file line number Diff line number Diff line change
@@ -15,29 +15,16 @@ class SubscribeTransition: TransitionProtocol {
typealias Event = Subscribe.Event
typealias Invocation = Subscribe.Invocation

// swiftlint:disable:next cyclomatic_complexity
func canTransition(from state: State, dueTo event: Event) -> Bool {
switch event {
case .handshakeSuccess:
return state is Subscribe.HandshakingState
case .handshakeFailure:
return state is Subscribe.HandshakingState
case .handshakeReconnectSuccess:
return state is Subscribe.HandshakeReconnectingState
case .handshakeReconnectFailure:
return state is Subscribe.HandshakeReconnectingState
case .handshakeReconnectGiveUp:
return state is Subscribe.HandshakeReconnectingState
case .receiveSuccess:
return state is Subscribe.ReceivingState
case .receiveFailure:
return state is Subscribe.ReceivingState
case .receiveReconnectSuccess:
return state is Subscribe.ReceiveReconnectingState
case .receiveReconnectFailure:
return state is Subscribe.ReceiveReconnectingState
case .receiveReconnectGiveUp:
return state is Subscribe.ReceiveReconnectingState
case .subscriptionChanged:
return true
case .subscriptionRestored:
@@ -62,12 +49,8 @@ class SubscribeTransition: TransitionProtocol {
switch state {
case is Subscribe.HandshakingState:
return [.cancel(.handshakeRequest)]
case is Subscribe.HandshakeReconnectingState:
return [.cancel(.handshakeReconnect)]
case is Subscribe.ReceivingState:
return [.cancel(.receiveMessages)]
case is Subscribe.ReceiveReconnectingState:
return [.cancel(.receiveReconnect)]
default:
return []
}
@@ -84,17 +67,6 @@ class SubscribeTransition: TransitionProtocol {
)
)
]
case let state as Subscribe.HandshakeReconnectingState:
return [
.managed(
.handshakeReconnect(
channels: state.input.allSubscribedChannelNames,
groups: state.input.allSubscribedGroupNames,
retryAttempt: state.retryAttempt,
reason: state.reason
)
)
]
case let state as Subscribe.ReceivingState:
return [
.managed(
@@ -105,47 +77,22 @@ class SubscribeTransition: TransitionProtocol {
)
)
]
case let state as Subscribe.ReceiveReconnectingState:
return [
.managed(
.receiveReconnect(
channels: state.input.allSubscribedChannelNames,
groups: state.input.allSubscribedGroupNames,
cursor: state.cursor,
retryAttempt: state.retryAttempt,
reason: state.reason
)
)
]
default:
return []
}
}

// swiftlint:disable:next cyclomatic_complexity
func transition(from state: State, event: Event) -> TransitionResult<State, Invocation> {
var results: TransitionResult<State, Invocation>

switch event {
case let .handshakeSuccess(cursor):
results = setReceivingState(from: state, cursor: resolveCursor(for: state, new: cursor))
case let .handshakeFailure(error):
results = setHandshakeReconnectingState(from: state, error: error)
case let .handshakeReconnectSuccess(cursor):
results = setReceivingState(from: state, cursor: resolveCursor(for: state, new: cursor))
case let .handshakeReconnectFailure(error):
results = setHandshakeReconnectingState(from: state, error: error)
case let .handshakeReconnectGiveUp(error):
results = setHandshakeFailedState(from: state, error: error)
case let .receiveSuccess(cursor, messages):
results = setReceivingState(from: state, cursor: cursor, messages: messages)
case .receiveFailure(let error):
results = setReceiveReconnectingState(from: state, error: error)
case let .receiveReconnectSuccess(cursor, messages):
results = setReceivingState(from: state, cursor: cursor, messages: messages)
case let .receiveReconnectFailure(error):
results = setReceiveReconnectingState(from: state, error: error)
case let .receiveReconnectGiveUp(error):
results = setReceiveFailedState(from: state, error: error)
case let .subscriptionChanged(channels, groups):
results = onSubscriptionAltered(from: state, channels: channels, groups: groups, cursor: state.cursor)
@@ -180,7 +127,6 @@ class SubscribeTransition: TransitionProtocol {
}

fileprivate extension SubscribeTransition {
// swiftlint:disable:next cyclomatic_complexity
func onSubscriptionAltered(
from state: State,
channels: [String],
@@ -198,16 +144,12 @@ fileprivate extension SubscribeTransition {
switch state {
case is Subscribe.HandshakingState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor))
case is Subscribe.HandshakeReconnectingState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor))
case is Subscribe.HandshakeStoppedState:
return TransitionResult(state: Subscribe.HandshakeStoppedState(input: newInput, cursor: cursor))
case is Subscribe.HandshakeFailedState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor))
case is Subscribe.ReceivingState:
return TransitionResult(state: Subscribe.ReceivingState(input: newInput, cursor: cursor))
case is Subscribe.ReceiveReconnectingState:
return TransitionResult(state: Subscribe.ReceivingState(input: newInput, cursor: cursor))
case is Subscribe.ReceiveStoppedState:
return TransitionResult(state: Subscribe.ReceiveStoppedState(input: newInput, cursor: cursor))
case is Subscribe.ReceiveFailedState:
@@ -232,22 +174,6 @@ fileprivate extension SubscribeTransition {
}
}

fileprivate extension SubscribeTransition {
func setHandshakeReconnectingState(
from state: State,
error: PubNubError
) -> TransitionResult<State, Invocation> {
return TransitionResult<State, Invocation>(
state: Subscribe.HandshakeReconnectingState(
input: state.input,
cursor: state.cursor,
retryAttempt: ((state as? Subscribe.HandshakeReconnectingState)?.retryAttempt ?? -1) + 1,
reason: error
)
)
}
}

fileprivate extension SubscribeTransition {
func setHandshakeFailedState(
from state: State,
@@ -286,7 +212,7 @@ fileprivate extension SubscribeTransition {
))
)

if state is Subscribe.HandshakingState || state is Subscribe.HandshakeReconnectingState {
if state is Subscribe.HandshakingState {
return TransitionResult(
state: Subscribe.ReceivingState(input: state.input, cursor: cursor),
invocations: [messages.isEmpty ? nil : emitMessagesInvocation, emitStatusInvocation].compactMap { $0 }
@@ -300,30 +226,11 @@ fileprivate extension SubscribeTransition {
}
}

fileprivate extension SubscribeTransition {
func setReceiveReconnectingState(
from state: State,
error: PubNubError
) -> TransitionResult<State, Invocation> {
return TransitionResult(
state: Subscribe.ReceiveReconnectingState(
input: state.input,
cursor: state.cursor,
retryAttempt: ((state as? Subscribe.ReceiveReconnectingState)?.retryAttempt ?? -1) + 1,
reason: error
)
)
}
}

fileprivate extension SubscribeTransition {
func setReceiveFailedState(
from state: State,
error: PubNubError
) -> TransitionResult<State, Invocation> {
guard let state = state as? Subscribe.ReceiveReconnectingState else {
return TransitionResult(state: state)
}
return TransitionResult(
state: Subscribe.ReceiveFailedState(
input: state.input,
@@ -361,12 +268,8 @@ fileprivate extension SubscribeTransition {
switch state {
case is Subscribe.HandshakingState:
return handshakeStoppedTransition
case is Subscribe.HandshakeReconnectingState:
return handshakeStoppedTransition
case is Subscribe.ReceivingState:
return receiveStoppedTransition
case is Subscribe.ReceiveReconnectingState:
return receiveStoppedTransition
default:
return TransitionResult(state: state)
}
1 change: 1 addition & 0 deletions Sources/PubNub/Networking/Request/Request.swift
Original file line number Diff line number Diff line change
@@ -289,6 +289,7 @@ final class Request {
delegate.retryResult(for: self, dueTo: error, andPrevious: previousError) { retryResult in
switch retryResult {
case let .success(retryAfter):
self.atomicState.lockedWrite { $0.responesData = nil }
delegate.retryRequest(self, withDelay: retryAfter)
case let .failure(error):
self.finish(error: PubNubError.retry(error, router: self.router))
18 changes: 10 additions & 8 deletions Sources/PubNub/Subscription/SubscriptionSession.swift
Original file line number Diff line number Diff line change
@@ -156,19 +156,21 @@ class SubscriptionSession: EventEmitter, StatusEmitter {
and groups: [String] = [],
presenceOnly: Bool = false
) {
let channelNamesToUnsubscribe = channels.flatMap {
presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName]
}
let groupNamesToUnsubscribe = groups.flatMap {
presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName]
}
internalUnsubscribe(
from: channels.map { Subscription(queue: queue, entity: channel($0)) },
and: groups.map { Subscription(queue: queue, entity: channelGroup($0)) },
from: globalChannelSubscriptions.compactMap { channelNamesToUnsubscribe.contains($0.key) ? $0.value : nil },
and: globalGroupSubscriptions.compactMap { groupNamesToUnsubscribe.contains($0.key) ? $0.value : nil },
presenceOnly: presenceOnly
)
channels.flatMap {
presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName]
}.forEach {
channelNamesToUnsubscribe.forEach {
globalChannelSubscriptions.removeValue(forKey: $0)
}
groups.flatMap {
presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName]
}.forEach {
groupNamesToUnsubscribe.forEach {
globalGroupSubscriptions.removeValue(forKey: $0)
}
}
Original file line number Diff line number Diff line change
@@ -20,8 +20,6 @@ extension Presence.Invocation: ContractTestIdentifiable {
return "HEARTBEAT"
case .leave:
return "LEAVE"
case .delayedHeartbeat:
return "DELAYED_HEARTBEAT"
case .wait:
return "WAIT"
}
@@ -33,8 +31,6 @@ extension Presence.Invocation.Cancellable: ContractTestIdentifiable {
switch self {
case .wait:
return "CANCEL_WAIT"
case .delayedHeartbeat:
return "CANCEL_DELAYED_HEARTBEAT"
}
}
}
@@ -58,8 +54,6 @@ extension Presence.Event: ContractTestIdentifiable {
return "HEARTBEAT_SUCCESS"
case .heartbeatFailed:
return "HEARTBEAT_FAILURE"
case .heartbeatGiveUp:
return "HEARTBEAT_GIVEUP"
}
}
}
@@ -86,7 +80,6 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep

override func createPubNubClient() -> PubNub {
let container = DependencyContainer(configuration: self.configuration)
let key = PresenceEventEngineDependencyKey.self

self.dispatcherDecorator = DispatcherDecorator(wrappedInstance: EffectDispatcher(
factory: PresenceEffectFactory(
Original file line number Diff line number Diff line change
@@ -18,12 +18,8 @@ extension Subscribe.Invocation: ContractTestIdentifiable {
switch self {
case .handshakeRequest:
return "HANDSHAKE"
case .handshakeReconnect:
return "HANDSHAKE_RECONNECT"
case .receiveMessages:
return "RECEIVE_MESSAGES"
case .receiveReconnect:
return "RECEIVE_RECONNECT"
case .emitMessages:
return "EMIT_MESSAGES"
case .emitStatus:
@@ -37,12 +33,8 @@ extension Subscribe.Invocation.Cancellable: ContractTestIdentifiable {
switch self {
case .handshakeRequest:
return "CANCEL_HANDSHAKE"
case .handshakeReconnect:
return "CANCEL_HANDSHAKE_RECONNECT"
case .receiveMessages:
return "CANCEL_RECEIVE_MESSAGES"
case .receiveReconnect:
return "CANCEL_RECEIVE_RECONNECT"
}
}
}
@@ -54,22 +46,10 @@ extension Subscribe.Event: ContractTestIdentifiable {
return "HANDSHAKE_SUCCESS"
case .handshakeFailure:
return "HANDSHAKE_FAILURE"
case .handshakeReconnectSuccess:
return "HANDSHAKE_RECONNECT_SUCCESS"
case .handshakeReconnectFailure:
return "HANDSHAKE_RECONNECT_FAILURE"
case .handshakeReconnectGiveUp:
return "HANDSHAKE_RECONNECT_GIVEUP"
case .receiveSuccess:
return "RECEIVE_SUCCESS"
case .receiveFailure:
return "RECEIVE_FAILURE"
case .receiveReconnectSuccess:
return "RECEIVE_RECONNECT_SUCCESS"
case .receiveReconnectFailure:
return "RECEIVE_RECONNECT_FAILURE"
case .receiveReconnectGiveUp:
return "RECEIVE_RECONNECT_GIVEUP"
case .subscriptionChanged:
return "SUBSCRIPTION_CHANGED"
case .subscriptionRestored:
@@ -115,7 +95,6 @@ class PubNubSubscribeEngineContractTestsSteps: PubNubEventEngineContractTestsSte

override func createPubNubClient() -> PubNub {
let container = DependencyContainer(configuration: self.configuration)
let key = SubscribeEventEngineDependencyKey.self

self.dispatcherDecorator = DispatcherDecorator(wrappedInstance: EffectDispatcher(
factory: SubscribeEffectFactory(
@@ -174,8 +153,8 @@ class PubNubSubscribeEngineContractTestsSteps: PubNubEventEngineContractTestsSte
self.subscribeSynchronously(self.client, to: ["test"])
}

When("I subscribe with timetoken 42") { _, _ in
self.subscribeSynchronously(self.client, to: ["test"], timetoken: 42)
When("I subscribe with timetoken 12345678901234567") { _, _ in
self.subscribeSynchronously(self.client, to: ["test"], timetoken: 12345678901234567)
}

Then("I receive an error in my subscribe response") { _, _ in

This file was deleted.

197 changes: 2 additions & 195 deletions Tests/PubNubTests/EventEngine/Presence/PresenceTransitionTests.swift
Original file line number Diff line number Diff line change
@@ -20,8 +20,6 @@ extension Presence.Invocation: Equatable {
return lC.sorted(by: <) == rC.sorted(by: <) && lG.sorted(by: <) == rG.sorted(by: <)
case let (.leave(lC, lG), .leave(rC, rG)):
return lC.sorted(by: <) == rC.sorted(by: <) && lG.sorted(by: <) == rG.sorted(by: <)
case let (.delayedHeartbeat(lC, lG, lAtt, lErr),.delayedHeartbeat(rC, rG, rAtt, rErr)):
return lC.sorted(by: <) == rC.sorted(by: <) && lG.sorted(by: <) == rG.sorted(by: <) && lAtt == rAtt && lErr == rErr
case (.wait, .wait):
return true
default:
@@ -39,8 +37,6 @@ extension Presence.Event: Equatable {
return lC.sorted(by: <) == rC.sorted(by: <) && lG.sorted(by: <) == rG.sorted(by: <)
case let (.heartbeatFailed(lError), .heartbeatFailed(rError)):
return lError == rError
case let (.heartbeatGiveUp(lError), .heartbeatGiveUp(rError)):
return lError == rError
case (.leftAll, .leftAll):
return true
case (.reconnect, .reconnect):
@@ -148,27 +144,6 @@ class PresenceTransitionTests: XCTestCase {
XCTAssertTrue(results.invocations.isEmpty)
}

func testPresence_JoinedEventForReconnectingState() {
let input = PresenceInput(
channels: ["c1", "c2"],
groups: ["g1", "g2"]
)
let results = transition.transition(
from: Presence.HeartbeatReconnecting(input: input, retryAttempt: 1, error: PubNubError(.unknown)),
event: .joined(channels: ["c3"], groups: ["g3"])
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.cancel(.delayedHeartbeat),
.regular(.heartbeat(channels: ["c1", "c2", "c3"], groups: ["g1", "g2", "g3"]))
]
let expectedState = Presence.Heartbeating(
input: PresenceInput(channels: ["c1", "c2", "c3"], groups: ["g1", "g2", "g3"])
)

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

func testPresence_JoinedEventForCooldownState() {
let input = PresenceInput(
channels: ["c1", "c2"],
@@ -230,28 +205,6 @@ class PresenceTransitionTests: XCTestCase {
XCTAssertTrue(results.invocations.isEmpty)
}

func testPresence_LeftEventForReconnectingState() {
let input = PresenceInput(
channels: ["c1", "c2", "c3"],
groups: ["g1", "g2", "g3"]
)
let results = transition.transition(
from: Presence.HeartbeatReconnecting(input: input, retryAttempt: 1, error: PubNubError(.unknown)),
event: .left(channels: ["c3"], groups: ["g3"])
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.cancel(.delayedHeartbeat),
.regular(.leave(channels: ["c3"], groups: ["g3"])),
.regular(.heartbeat(channels: ["c1", "c2"], groups: ["g1", "g2"]))
]
let expectedState = Presence.Heartbeating(
input: PresenceInput(channels: ["c1", "c2"], groups: ["g1", "g2"])
)

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

func testPresence_LeftEventForCooldownState() {
let input = PresenceInput(
channels: ["c1", "c2", "c3"],
@@ -360,29 +313,6 @@ class PresenceTransitionTests: XCTestCase {
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

func testPresence_LeftAllForReconnectingState() {
let input = PresenceInput(
channels: ["c1", "c2"],
groups: ["g1", "g2"]
)
let results = transition.transition(
from: Presence.HeartbeatReconnecting(
input: input,
retryAttempt: 1,
error: PubNubError(.unknown)
),
event: .leftAll
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.cancel(.delayedHeartbeat),
.regular(.leave(channels: ["c1", "c2"], groups: ["g1", "g2"]))
]
let expectedState = Presence.HeartbeatInactive()

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

func testPresence_LeftAllWithSuppressLeaveEventsSetInConfig() {
let input = PresenceInput(
channels: ["c1", "c2", "c3"],
@@ -444,31 +374,6 @@ class PresenceTransitionTests: XCTestCase {
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

func testPresence_ReconnectForHeartbeatingState() {
let input = PresenceInput(
channels: ["c1", "c2"],
groups: ["g1", "g2"]
)

let results = transition.transition(
from: Presence.Heartbeating(input: input),
event: .heartbeatFailed(error: PubNubError(.unknown))
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.managed(.delayedHeartbeat(
channels: ["c1", "c2"], groups: ["g1", "g2"],
retryAttempt: 0, error: PubNubError(.unknown)
))
]
let expectedState = Presence.HeartbeatReconnecting(
input: input,
retryAttempt: 0, error: PubNubError(.unknown)
)

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

// MARK: - Disconnect

func testPresence_DisconnectForHeartbeatingState() {
@@ -508,25 +413,6 @@ class PresenceTransitionTests: XCTestCase {
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

func testPresence_DisconnectForHeartbeatReconnectingState() {
let input = PresenceInput(
channels: ["c1", "c2"],
groups: ["g1", "g2"]
)
let results = transition.transition(
from: Presence.HeartbeatReconnecting(input: input, retryAttempt: 1, error: PubNubError(.unknown)),
event: .disconnect
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.cancel(.delayedHeartbeat),
.regular(.leave(channels: ["c1", "c2"], groups: ["g1", "g2"]))
]
let expectedState = Presence.HeartbeatStopped(input: input)

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

// MARK: - Heartbeat Success

func testPresence_HeartbeatSuccessForHeartbeatingState() {
@@ -547,29 +433,6 @@ class PresenceTransitionTests: XCTestCase {
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

func testPresence_HeartbeatSuccessForReconnectingState() {
let input = PresenceInput(
channels: ["c1", "c2"],
groups: ["g1", "g2"]
)
let results = transition.transition(
from: Presence.HeartbeatReconnecting(
input: input,
retryAttempt: 1,
error: PubNubError(.unknown)
),
event: .heartbeatSuccess
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.cancel(.delayedHeartbeat),
.managed(.wait)
]
let expectedState = Presence.HeartbeatCooldown(input: input)

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

// MARK: - Heartbeat Failed

func testPresence_HeartbeatFailedForHeartbeatingState() {
@@ -581,69 +444,13 @@ class PresenceTransitionTests: XCTestCase {
from: Presence.Heartbeating(input: input),
event: .heartbeatFailed(error: PubNubError(.unknown))
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.managed(.delayedHeartbeat(
channels: ["c1", "c2"], groups: ["g1", "g2"],
retryAttempt: 0, error: PubNubError(.unknown)
))
]
let expectedState = Presence.HeartbeatReconnecting(
input: input,
retryAttempt: 0,
error: PubNubError(.unknown)
)

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

func testPresence_HeartbeatFailedForReconnectingState() {
let input = PresenceInput(
channels: ["c1", "c2"],
groups: ["g1", "g2"]
)
let results = transition.transition(
from: Presence.HeartbeatReconnecting(input: input, retryAttempt: 1, error: PubNubError(.unknown)),
event: .heartbeatFailed(error: PubNubError(.badServerResponse))
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.cancel(.delayedHeartbeat),
.managed(.delayedHeartbeat(
channels: ["c1", "c2"], groups: ["g1", "g2"],
retryAttempt: 2, error: PubNubError(.badServerResponse)
))
]
let expectedState = Presence.HeartbeatReconnecting(
input: input,
retryAttempt: 2,
error: PubNubError(.badServerResponse)
)

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
}

// MARK: - Heartbeat Give Up

func testPresence_HeartbeatGiveUpForReconnectingState() {
let input = PresenceInput(
channels: ["c1", "c2"],
groups: ["g1", "g2"]
)
let results = transition.transition(
from: Presence.HeartbeatReconnecting(input: input, retryAttempt: 1, error: PubNubError(.unknown)),
event: .heartbeatGiveUp(error: PubNubError(.badServerResponse))
)
let expectedInvocations: [EffectInvocation<Presence.Invocation>] = [
.cancel(.delayedHeartbeat),
]
let expectedState = Presence.HeartbeatFailed(
input: input,
error: PubNubError(.badServerResponse)
error: PubNubError(.unknown)
)

XCTAssertTrue(results.state.isEqual(to: expectedState))
XCTAssertTrue(results.invocations.elementsEqual(expectedInvocations))
XCTAssertTrue(results.invocations.isEmpty)
}

// MARK: - Times Up
285 changes: 0 additions & 285 deletions Tests/PubNubTests/EventEngine/Subscribe/SubscribeEffectsTests.swift
Original file line number Diff line number Diff line change
@@ -179,291 +179,6 @@ extension SubscribeEffectsTests {
}
}

// MARK: - HandshakeReconnecting

extension SubscribeEffectsTests {
func test_HandshakeReconnectingSuccess() {
mockResponse(subscribeResponse: SubscribeResponse(
cursor: SubscribeCursor(timetoken: 12345, region: 1),
messages: []
))

let delayRange = 2.0...3.0
let expectation = XCTestExpectation(description: "Effect Completion")
expectation.expectedFulfillmentCount = 1
expectation.assertForOverFulfill = true

let testedInvocation: Subscribe.Invocation = .handshakeReconnect(
channels: ["channel1", "channel1-pnpres", "channel2"],
groups: ["g1", "g2", "g2-pnpres"],
retryAttempt: 1,
reason: PubNubError(
URLError(.badServerResponse).pubnubReason!,
underlying: URLError(.badServerResponse),
affected: [.response(HTTPURLResponse(statusCode: 500)!)]
)
)
let expectedOutput: Subscribe.Event = .handshakeReconnectSuccess(
cursor: SubscribeCursor(timetoken: 12345, region: 1)
)
let effect = factory.effect(
for: testedInvocation,
with: EventEngineDependencies(value: Subscribe.Dependencies(
configuration: configWithLinearPolicy(delayRange.lowerBound)
))
)

effect.performTask {
XCTAssertEqual([expectedOutput], $0)
expectation.fulfill()
}
wait(for: [expectation], timeout: 2 * delayRange.upperBound)
}

func test_HandshakeReconnectingFailed() {
mockResponse(
errorIfAny: URLError(.cannotFindHost),
httpResponse: HTTPURLResponse(statusCode: 404)!
)

let delayRange = 2.0...3.0
let expectation = XCTestExpectation(description: "Effect Completion")
expectation.expectedFulfillmentCount = 1
expectation.assertForOverFulfill = true

let testedInvocation: Subscribe.Invocation = .handshakeReconnect(
channels: ["channel1", "channel1-pnpres", "channel2"],
groups: ["g1", "g2", "g2-pnpres"],
retryAttempt: 1,
reason: PubNubError(
URLError(.badServerResponse).pubnubReason!,
underlying: URLError(.badServerResponse),
affected: [.response(HTTPURLResponse(statusCode: 500)!)]
)
)
let expectedOutput: Subscribe.Event = .handshakeReconnectFailure(
error: PubNubError(.nameResolutionFailure, underlying: URLError(.cannotFindHost))
)
let effect = factory.effect(
for: testedInvocation,
with: EventEngineDependencies(value: Subscribe.Dependencies(
configuration: configWithLinearPolicy(delayRange.lowerBound)
))
)

effect.performTask {
XCTAssertEqual([expectedOutput], $0)
expectation.fulfill()
}
wait(for: [expectation], timeout: 2 * delayRange.upperBound)
}

func test_HandshakeReconnectGiveUp() {
let delayRange = 2.0...3.0
let expectation = XCTestExpectation(description: "Effect Completion")
expectation.expectedFulfillmentCount = 1
expectation.assertForOverFulfill = true

let testedInvocation: Subscribe.Invocation = .handshakeReconnect(
channels: ["channel1", "channel1-pnpres", "channel2"],
groups: ["g1", "g2", "g2-pnpres"],
retryAttempt: 3,
reason: PubNubError(URLError(.badServerResponse).pubnubReason!, underlying: URLError(.badServerResponse))
)
let expectedOutput: Subscribe.Event = .handshakeReconnectGiveUp(
error: PubNubError(.badServerResponse)
)
let effect = factory.effect(
for: testedInvocation,
with: EventEngineDependencies(value: Subscribe.Dependencies(
configuration: configWithLinearPolicy(delayRange.lowerBound)
))
)

effect.performTask {
XCTAssertEqual([expectedOutput], $0)
expectation.fulfill()
}
wait(for: [expectation], timeout: 1.0)
}

func test_HandshakeReconnectIsDelayed() {
mockResponse(subscribeResponse: SubscribeResponse(
cursor: SubscribeCursor(timetoken: 12345, region: 1),
messages: []
))

let expectation = XCTestExpectation(description: "Effect Completion")
expectation.expectedFulfillmentCount = 1
expectation.assertForOverFulfill = true

let testedInvocation: Subscribe.Invocation = .handshakeReconnect(
channels: ["channel1", "channel1-pnpres", "channel2"],
groups: ["g1", "g2", "g2-pnpres"],
retryAttempt: 3,
reason: PubNubError(
URLError(.badServerResponse).pubnubReason!,
underlying: URLError(.badServerResponse),
affected: [.response(HTTPURLResponse(statusCode: 500)!)]
)
)

let delayRange = 2.0...3.0
let startDate = Date()
let depValue = Subscribe.Dependencies(configuration: configWithLinearPolicy(delayRange.lowerBound))
let effect = factory.effect(for: testedInvocation, with: EventEngineDependencies(value: depValue))

effect.performTask { _ in
XCTAssertTrue(Int(Date().timeIntervalSince(startDate)) <= Int(delayRange.upperBound))
expectation.fulfill()
}
wait(for: [expectation], timeout: 2 * delayRange.upperBound)
}
}

// MARK: - ReceiveReconnecting

extension SubscribeEffectsTests {
func test_ReceiveReconnectingSuccess() {
mockResponse(subscribeResponse: SubscribeResponse(
cursor: SubscribeCursor(timetoken: 12345, region: 1),
messages: [firstMessage, secondMessage]
))

let expectation = XCTestExpectation(description: "Effect Completion")
expectation.expectedFulfillmentCount = 1
expectation.assertForOverFulfill = true

let testedInvocation: Subscribe.Invocation = .receiveReconnect(
channels: ["channel1", "channel1-pnpres", "channel2"],
groups: ["g1", "g2", "g2-pnpres"],
cursor: SubscribeCursor(timetoken: 1111, region: 1),
retryAttempt: 1,
reason: PubNubError(
URLError(.badServerResponse).pubnubReason!,
underlying: URLError(.badServerResponse),
affected: [.response(HTTPURLResponse(statusCode: 500)!)]
)
)
let expectedOutput: Subscribe.Event = .receiveReconnectSuccess(
cursor: SubscribeCursor(timetoken: 12345, region: 1),
messages: [firstMessage, secondMessage]
)

let delayRange = 2.0...3.0
let depValue = Subscribe.Dependencies(configuration: configWithLinearPolicy(delayRange.lowerBound))
let effect = factory.effect(for: testedInvocation, with: EventEngineDependencies(value: depValue))

effect.performTask {
XCTAssertEqual([expectedOutput], $0)
expectation.fulfill()
}
wait(for: [expectation], timeout: 2 * delayRange.upperBound)
}

func test_ReceiveReconnectingFailure() {
let expectation = XCTestExpectation(description: "Effect Completion")
expectation.expectedFulfillmentCount = 1
expectation.assertForOverFulfill = true

mockResponse(
errorIfAny: URLError(.cannotFindHost),
httpResponse: HTTPURLResponse(statusCode: 404)!
)
let testedInvocation: Subscribe.Invocation = .receiveReconnect(
channels: ["channel1", "channel1-pnpres", "channel2"],
groups: ["g1", "g2", "g2-pnpres"],
cursor: SubscribeCursor(timetoken: 1111, region: 1),
retryAttempt: 1,
reason: PubNubError(
URLError(.badServerResponse).pubnubReason!,
underlying: URLError(.badServerResponse),
affected: [.response(HTTPURLResponse(statusCode: 500)!)]
)
)

let expectedOutput: Subscribe.Event = .receiveReconnectFailure(error: PubNubError(.nameResolutionFailure))
let delayRange = 2.0...3.0
let depValue = Subscribe.Dependencies(configuration: configWithLinearPolicy(delayRange.lowerBound))
let effect = factory.effect(for: testedInvocation, with: EventEngineDependencies(value: depValue))

effect.performTask {
XCTAssertEqual([expectedOutput], $0)
expectation.fulfill()
}
wait(for: [expectation], timeout: 2 * delayRange.upperBound)
}

func test_ReceiveReconnectGiveUp() {
let expectation = XCTestExpectation(description: "Effect Completion")
expectation.expectedFulfillmentCount = 1
expectation.assertForOverFulfill = true

mockResponse(
errorIfAny: URLError(.cannotFindHost),
httpResponse: HTTPURLResponse(statusCode: 404)!
)

let testedInvocation: Subscribe.Invocation = .receiveReconnect(
channels: ["channel1", "channel1-pnpres", "channel2"],
groups: ["g1", "g2", "g2-pnpres"],
cursor: SubscribeCursor(timetoken: 1111, region: 1),
retryAttempt: 3,
reason: PubNubError(
URLError(.badServerResponse).pubnubReason!,
underlying: URLError(.badServerResponse)
)
)
let expectedOutput: Subscribe.Event = .receiveReconnectGiveUp(
error: PubNubError(.badServerResponse)
)

let delayRange = 2.0...3.0
let depValue = Subscribe.Dependencies(configuration: configWithLinearPolicy(delayRange.lowerBound))
let effect = factory.effect(for: testedInvocation, with: EventEngineDependencies(value: depValue))

effect.performTask {
XCTAssertEqual([expectedOutput], $0)
expectation.fulfill()
}
wait(for: [expectation], timeout: 1.0)
}

func test_ReceiveReconnectingIsDelayed() {
let expectation = XCTestExpectation(description: "Effect Completion")
expectation.expectedFulfillmentCount = 1
expectation.assertForOverFulfill = true

mockResponse(subscribeResponse: SubscribeResponse(
cursor: SubscribeCursor(timetoken: 12345, region: 1),
messages: [firstMessage, secondMessage]
))

let testedInvocation: Subscribe.Invocation = .receiveReconnect(
channels: ["channel1", "channel1-pnpres", "channel2"],
groups: ["g1", "g2", "g2-pnpres"],
cursor: SubscribeCursor(timetoken: 1111, region: 1),
retryAttempt: 1,
reason: PubNubError(
URLError(.badServerResponse).pubnubReason!,
underlying: URLError(.badServerResponse),
affected: [.response(HTTPURLResponse(statusCode: 500)!)]
)
)

let delayRange = 2.0...3.0
let startDate = Date()
let depValue = Subscribe.Dependencies(configuration: configWithLinearPolicy(delayRange.lowerBound))
let effect = factory.effect(for: testedInvocation, with: EventEngineDependencies(value: depValue))

effect.performTask { _ in
XCTAssertTrue(Int(Date().timeIntervalSince(startDate)) <= Int(delayRange.upperBound))
expectation.fulfill()
}
wait(for: [expectation], timeout: 2 * delayRange.upperBound)
}
}

// MARK: - Helpers

private extension SubscribeEffectsTests {

This file was deleted.

470 changes: 8 additions & 462 deletions Tests/PubNubTests/EventEngine/Subscribe/SubscribeTransitionTests.swift

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -469,7 +469,7 @@ extension SubscribeRouterTests {
case let .channelMetadataSet(changeset):
XCTAssertEqual(try? changeset.apply(to: baseChannel).transcode(), patchedChannel)
objectExpect.fulfill()
case let .subscriptionChanged(change):
case .subscriptionChanged:
break
default:
XCTFail("Incorrect Event Received")
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ class SubscribeSessionFactoryTests: XCTestCase {
)

let dependencyContainer = DependencyContainer(configuration: config)
let nextDependencyContainer = DependencyContainer(configuration: config)
let nextDependencyContainer = DependencyContainer(configuration: newConfig)
let first = dependencyContainer.subscriptionSession
let third = nextDependencyContainer.subscriptionSession

0 comments on commit d24c0f6

Please sign in to comment.