Skip to content

Commit

Permalink
Changes from feat/listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
jguz-pubnub committed Aug 20, 2024
1 parent f1dd87e commit 50fe576
Show file tree
Hide file tree
Showing 19 changed files with 402 additions and 338 deletions.
6 changes: 2 additions & 4 deletions Examples/Sources/DetailTableViewController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,10 @@ class DetailTableViewController: UITableViewController {
print("The signal is \(signal.payload) and was sent by \(signal.publisher ?? "")")
case let .connectionStatusChanged(connectionChange):
switch connectionChange {
case .connecting:
print("Status connecting...")
case .connected:
print("Status connected!")
case .reconnecting:
print("Status reconnecting...")
case .subscriptionChanged:
print("Subscription changed")
case .disconnected:
print("Status disconnected")
case .disconnectedUnexpectedly:
Expand Down
2 changes: 2 additions & 0 deletions PubNub.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@
3DA24A412C2AAB23005B959B /* PubNubObjC+ChannelGroups.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DA24A402C2AAB23005B959B /* PubNubObjC+ChannelGroups.swift */; };
3DA24A432C2AAB54005B959B /* PubNubObjC+AppContext.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DA24A422C2AAB54005B959B /* PubNubObjC+AppContext.swift */; };
3DA24A452C2AABC0005B959B /* PubNubObjC+Files.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DA24A442C2AABC0005B959B /* PubNubObjC+Files.swift */; };
3DA0C7D02BFE59AC000FFE6C /* SubscriptionListenersContainer.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DA0C7CF2BFE59AC000FFE6C /* SubscriptionListenersContainer.swift */; };
3DACC7F72AB88F8E00210B14 /* Data+CommonCrypto.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DACC7F62AB88F8E00210B14 /* Data+CommonCrypto.swift */; };
3DB2C4872C0F4B250060B8CF /* PubNubStatusListenerObjC.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB2C4862C0F4B250060B8CF /* PubNubStatusListenerObjC.swift */; };
3DB9255C2B7A2B89001B7E90 /* SubscriptionStreamTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB925592B7A2B89001B7E90 /* SubscriptionStreamTests.swift */; };
Expand Down Expand Up @@ -1072,6 +1073,7 @@
3DA24A402C2AAB23005B959B /* PubNubObjC+ChannelGroups.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "PubNubObjC+ChannelGroups.swift"; sourceTree = "<group>"; };
3DA24A422C2AAB54005B959B /* PubNubObjC+AppContext.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "PubNubObjC+AppContext.swift"; sourceTree = "<group>"; };
3DA24A442C2AABC0005B959B /* PubNubObjC+Files.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "PubNubObjC+Files.swift"; sourceTree = "<group>"; };
3DA0C7CF2BFE59AC000FFE6C /* SubscriptionListenersContainer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubscriptionListenersContainer.swift; sourceTree = "<group>"; };
3DACC7F62AB88F8E00210B14 /* Data+CommonCrypto.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Data+CommonCrypto.swift"; sourceTree = "<group>"; };
3DB2C4862C0F4B250060B8CF /* PubNubStatusListenerObjC.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PubNubStatusListenerObjC.swift; sourceTree = "<group>"; };
3DB925592B7A2B89001B7E90 /* SubscriptionStreamTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionStreamTests.swift; sourceTree = "<group>"; };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ class MessageCache {
struct EmitMessagesEffect: EffectHandler {
let messages: [SubscribeMessagePayload]
let cursor: SubscribeCursor
let listeners: [BaseSubscriptionListener]
let listeners: WeakSet<BaseSubscriptionListener>
let messageCache: MessageCache

func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) {
// Attempt to detect missed messages due to queue overflow
if messages.count >= 100 {
listeners.forEach {
$0.emit(subscribe: .errorReceived(
$0?.emit(subscribe: .errorReceived(
PubNubError(
.messageCountExceededMaximum,
router: nil,
Expand All @@ -68,7 +68,7 @@ struct EmitMessagesEffect: EffectHandler {
}

listeners.forEach {
$0.emit(batch: filteredMessages)
$0?.emit(batch: filteredMessages)
}

completionBlock([])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import Foundation

struct EmitStatusEffect: EffectHandler {
let statusChange: Subscribe.ConnectionStatusChange
let listeners: [BaseSubscriptionListener]
let listeners: WeakSet<BaseSubscriptionListener>

func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) {
if let error = statusChange.error {
listeners.forEach {
$0.emit(subscribe: .errorReceived(error))
$0?.emit(subscribe: .errorReceived(error))
}
}
listeners.forEach {
$0.emit(subscribe: .connectionChanged(statusChange.newStatus))
$0?.emit(subscribe: .connectionChanged(statusChange.newStatus))
}
completionBlock([])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Foundation
class HandshakeEffect: EffectHandler {
private let subscribeEffect: SubscribeEffect

init(request: SubscribeRequest, listeners: [BaseSubscriptionListener]) {
init(request: SubscribeRequest, listeners: WeakSet<BaseSubscriptionListener>) {
self.subscribeEffect = SubscribeEffect(
request: request,
listeners: listeners,
Expand All @@ -25,7 +25,6 @@ class HandshakeEffect: EffectHandler {
}

func performTask(completionBlock: @escaping ([Subscribe.Event]) -> Void) {
subscribeEffect.listeners.forEach { $0.emit(subscribe: .connectionChanged(.connecting)) }
subscribeEffect.performTask(completionBlock: completionBlock)
}

Expand All @@ -43,7 +42,7 @@ class HandshakeEffect: EffectHandler {
class ReceivingEffect: EffectHandler {
private let subscribeEffect: SubscribeEffect

init(request: SubscribeRequest, listeners: [BaseSubscriptionListener]) {
init(request: SubscribeRequest, listeners: WeakSet<BaseSubscriptionListener>) {
self.subscribeEffect = SubscribeEffect(
request: request,
listeners: listeners,
Expand All @@ -69,13 +68,13 @@ class ReceivingEffect: EffectHandler {

private class SubscribeEffect: EffectHandler {
let request: SubscribeRequest
let listeners: [BaseSubscriptionListener]
let listeners: WeakSet<BaseSubscriptionListener>
let onResponseReceived: (SubscribeResponse) -> Subscribe.Event
let onErrorReceived: (PubNubError) -> Subscribe.Event

init(
request: SubscribeRequest,
listeners: [BaseSubscriptionListener],
listeners: WeakSet<BaseSubscriptionListener>,
onResponseReceived: @escaping ((SubscribeResponse) -> Subscribe.Event),
onErrorReceived: @escaping ((PubNubError) -> Subscribe.Event)
) {
Expand All @@ -91,7 +90,7 @@ private class SubscribeEffect: EffectHandler {
switch $0 {
case .success(let response):
selfRef.listeners.forEach {
$0.emit(subscribe: .responseReceived(
$0?.emit(subscribe: .responseReceived(
SubscribeResponseHeader(
channels: selfRef.request.channels.map { PubNubChannel(channel: $0) },
groups: selfRef.request.groups.map { PubNubChannel(channel: $0) },
Expand Down
11 changes: 7 additions & 4 deletions Sources/PubNub/EventEngine/Subscribe/Subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ extension Subscribe {
struct HandshakingState: SubscribeState {
let input: SubscribeInput
let cursor: SubscribeCursor
let connectionStatus = ConnectionStatus.connecting
let connectionStatus = ConnectionStatus.disconnected
}

struct HandshakeStoppedState: SubscribeState {
Expand All @@ -61,7 +61,7 @@ extension Subscribe {
struct ReceivingState: SubscribeState {
let input: SubscribeInput
let cursor: SubscribeCursor
let connectionStatus = ConnectionStatus.connected
let connectionStatus: ConnectionStatus
}

struct ReceiveStoppedState: SubscribeState {
Expand Down Expand Up @@ -119,9 +119,12 @@ extension Subscribe {
extension Subscribe {
struct Dependencies {
let configuration: PubNubConfiguration
let listeners: [BaseSubscriptionListener]
let listeners: WeakSet<BaseSubscriptionListener>

init(configuration: PubNubConfiguration, listeners: [BaseSubscriptionListener] = []) {
init(
configuration: PubNubConfiguration,
listeners: WeakSet<BaseSubscriptionListener> = WeakSet<BaseSubscriptionListener>([])
) {
self.configuration = configuration
self.listeners = listeners
}
Expand Down
110 changes: 76 additions & 34 deletions Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SubscribeTransition: TransitionProtocol {
case .subscriptionRestored:
return true
case .unsubscribeAll:
return true
return !(state is Subscribe.UnsubscribedState)
case .disconnect:
return !(
state is Subscribe.HandshakeStoppedState || state is Subscribe.ReceiveStoppedState ||
Expand Down Expand Up @@ -140,25 +140,64 @@ fileprivate extension SubscribeTransition {

if newInput.isEmpty {
return setUnsubscribedState(from: state)
} else {
switch state {
case is Subscribe.HandshakingState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor))
case is Subscribe.HandshakeStoppedState:
return TransitionResult(state: Subscribe.HandshakeStoppedState(input: newInput, cursor: cursor))
case is Subscribe.HandshakeFailedState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor))
case is Subscribe.ReceivingState:
return TransitionResult(state: Subscribe.ReceivingState(input: newInput, cursor: cursor))
case is Subscribe.ReceiveStoppedState:
return TransitionResult(state: Subscribe.ReceiveStoppedState(input: newInput, cursor: cursor))
case is Subscribe.ReceiveFailedState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor))
case is Subscribe.UnsubscribedState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: cursor))
default:
return TransitionResult(state: state)
}
}

let invocations: [EffectInvocation<Invocation>] = state is Subscribe.ReceivingState ? [
.regular(.emitStatus(change: Subscribe.ConnectionStatusChange(
oldStatus: state.connectionStatus,
newStatus: .subscriptionChanged(
channels: newInput.subscribedChannelNames,
groups: newInput.subscribedGroupNames
),
error: nil
)))
] : []

switch state {
case is Subscribe.HandshakingState:
return TransitionResult(
state: Subscribe.HandshakingState(input: newInput, cursor: cursor),
invocations: invocations
)
case is Subscribe.HandshakeStoppedState:
return TransitionResult(
state: Subscribe.HandshakeStoppedState(input: newInput, cursor: cursor),
invocations: invocations
)
case is Subscribe.HandshakeFailedState:
return TransitionResult(
state: Subscribe.HandshakingState(input: newInput, cursor: cursor),
invocations: invocations
)
case is Subscribe.ReceivingState:
let newStatus: ConnectionStatus = .subscriptionChanged(
channels: newInput.subscribedChannelNames,
groups: newInput.subscribedGroupNames
)
return TransitionResult(
state: Subscribe.ReceivingState(input: newInput, cursor: cursor, connectionStatus: newStatus),
invocations: invocations
)
case is Subscribe.ReceiveStoppedState:
return TransitionResult(
state: Subscribe.ReceiveStoppedState(input: newInput, cursor: cursor),
invocations: invocations
)
case is Subscribe.ReceiveFailedState:
return TransitionResult(
state: Subscribe.HandshakingState(input: newInput, cursor: cursor),
invocations: invocations
)
case is Subscribe.UnsubscribedState:
return TransitionResult(
state: Subscribe.HandshakingState(input: newInput, cursor: cursor),
invocations: invocations
)
default:
return TransitionResult(
state: state,
invocations: invocations
)
}
}
}
Expand Down Expand Up @@ -202,27 +241,30 @@ fileprivate extension SubscribeTransition {
messages: [SubscribeMessagePayload] = []
) -> TransitionResult<State, Invocation> {
let emitMessagesInvocation = EffectInvocation.managed(
Subscribe.Invocation.emitMessages(events: messages, forCursor: cursor)
)
let emitStatusInvocation = EffectInvocation.regular(
Subscribe.Invocation.emitStatus(change: Subscribe.ConnectionStatusChange(
oldStatus: state.connectionStatus,
newStatus: .connected,
error: nil
))
Subscribe.Invocation.emitMessages(
events: messages,
forCursor: cursor
)
)

if state is Subscribe.HandshakingState {
return TransitionResult(
state: Subscribe.ReceivingState(input: state.input, cursor: cursor),
invocations: [messages.isEmpty ? nil : emitMessagesInvocation, emitStatusInvocation].compactMap { $0 }
let emitStatusInvocation = EffectInvocation.regular(
Subscribe.Invocation.emitStatus(change: Subscribe.ConnectionStatusChange(
oldStatus: state.connectionStatus,
newStatus: .connected,
error: nil
))
)
} else {
return TransitionResult(
state: Subscribe.ReceivingState(input: state.input, cursor: cursor),
invocations: [messages.isEmpty ? nil : emitMessagesInvocation].compactMap { $0 }
state: Subscribe.ReceivingState(input: state.input, cursor: cursor, connectionStatus: .connected),
invocations: [messages.isEmpty ? nil : emitMessagesInvocation, emitStatusInvocation].compactMap { $0 }
)
}

return TransitionResult(
state: Subscribe.ReceivingState(input: state.input, cursor: cursor, connectionStatus: state.connectionStatus),
invocations: [messages.isEmpty ? nil : emitMessagesInvocation].compactMap { $0 }
)
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/PubNub/Events/New/EventListenerInterface.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class StatusListener: StatusListenerInterface {
public let queue: DispatchQueue
public var onConnectionStateChange: ((ConnectionStatus) -> Void)?

init(
public init(
uuid: UUID = UUID(),
queue: DispatchQueue = .main,
onConnectionStateChange: @escaping ((ConnectionStatus) -> Void)
Expand Down Expand Up @@ -80,7 +80,7 @@ public class EventListener: EventListenerInterface {
public var onFileEvent: ((PubNubFileChangeEvent) -> Void)?
public var onAppContext: ((PubNubAppContextEvent) -> Void)?

init(
public init(
queue: DispatchQueue = .main,
uuid: UUID = UUID(),
onEvent: ((PubNubEvent) -> Void)? = nil,
Expand Down
1 change: 0 additions & 1 deletion Sources/PubNub/Events/New/Subscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ extension Subscription: SubscribeCapable {
let channels = subscriptionType == .channel ? [self] : []
let channelGroups = subscriptionType == .channelGroup ? [self] : []

pubnub.registerAdapter(adapter)
pubnub.internalSubscribe(with: channels, and: channelGroups, at: timetoken)
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/PubNub/Events/New/SubscriptionSet.swift
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ extension SubscriptionSet: SubscribeCapable {
return
}
pubnub.registerAdapter(adapter)
currentSubscriptions.forEach { pubnub.registerAdapter($0.adapter) }

let channels = currentSubscriptions.filter {
$0.subscriptionType == .channel
Expand All @@ -200,6 +199,7 @@ extension SubscriptionSet: SubscribeCapable {
guard let pubnub = currentSubscriptions.first?.pubnub, !isDisposed else {
return
}
pubnub.subscription.remove(adapter)
pubnub.internalUnsubscribe(
from: currentSubscriptions.filter { $0.subscriptionType == .channel },
and: currentSubscriptions.filter { $0.subscriptionType == .channelGroup },
Expand Down
Loading

0 comments on commit 50fe576

Please sign in to comment.