Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a new Listeners API #153

Merged
merged 12 commits into from
Feb 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,21 @@ struct SubscribeInput: Equatable {
}

func removing(
channels: [PubNubChannel],
mainChannels: [PubNubChannel],
presenceChannelsOnly: [PubNubChannel],
groups: [PubNubChannel],
mainGroups: [PubNubChannel],
presenceGroupsOnly: [PubNubChannel]
parfeon marked this conversation as resolved.
Show resolved Hide resolved
) -> SubscribeInput.RemovingResult {
// Gets a copy of current channels and channel groups
var currentChannels = channelEntries
var currentGroups = groupEntries

let removedChannels = channels.compactMap {
let removedChannels = mainChannels.compactMap {
currentChannels.removeValue(forKey: $0.id)
} + presenceChannelsOnly.compactMap {
currentChannels.unsubscribePresence($0.id)
}
let removedGroups = groups.compactMap {
let removedGroups = mainGroups.compactMap {
currentGroups.removeValue(forKey: $0.id)
} + presenceGroupsOnly.compactMap {
currentGroups.unsubscribePresence($0.id)
Expand Down
18 changes: 16 additions & 2 deletions Sources/PubNub/EventEngine/Subscribe/Subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,14 @@ extension Subscribe {
let input: SubscribeInput
let cursor: SubscribeCursor
let error: PubNubError
let connectionStatus = ConnectionStatus.connectionError
let connectionStatus: ConnectionStatus

init(input: SubscribeInput, cursor: SubscribeCursor, error: PubNubError) {
self.input = input
self.cursor = cursor
self.error = error
self.connectionStatus = .connectionError(error)
}
}

struct ReceivingState: SubscribeState {
Expand All @@ -83,7 +90,14 @@ extension Subscribe {
let input: SubscribeInput
let cursor: SubscribeCursor
let error: PubNubError
let connectionStatus = ConnectionStatus.disconnectedUnexpectedly
let connectionStatus: ConnectionStatus

init(input: SubscribeInput, cursor: SubscribeCursor, error: PubNubError) {
self.input = input
self.cursor = cursor
self.error = error
self.connectionStatus = .disconnectedUnexpectedly(error)
}
}

struct UnsubscribedState: SubscribeState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ fileprivate extension SubscribeTransition {
), invocations: [
.regular(.emitStatus(change: Subscribe.ConnectionStatusChange(
oldStatus: state.connectionStatus,
newStatus: .connectionError,
newStatus: .connectionError(error),
error: error
)))
]
Expand Down Expand Up @@ -329,7 +329,7 @@ fileprivate extension SubscribeTransition {
), invocations: [
.regular(.emitStatus(change: Subscribe.ConnectionStatusChange(
oldStatus: state.connectionStatus,
newStatus: .disconnectedUnexpectedly,
newStatus: .disconnectedUnexpectedly(error),
error: error
)))
]
Expand Down
1 change: 1 addition & 0 deletions Sources/PubNub/Events/New/Entities/EntityCreator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public extension EntityCreator {
// and channel groups.
protocol SubscribeReceiver: AnyObject {
func registerAdapter(_ adapter: BaseSubscriptionListenerAdapter)
func hasRegisteredAdapter(with uuid: UUID) -> Bool

func internalSubscribe(
with channels: [Subscription],
Expand Down
8 changes: 4 additions & 4 deletions Sources/PubNub/Events/New/Entities/EntitySubscribable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ public class ChannelGroupRepresentation: Subscribable {

/// Represents user metadata that can be subscribed to and unsubscribed from using the PubNub service.
public class UserMetadataRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channel, receiver: receiver)
init(id: String, receiver: SubscribeReceiver) {
super.init(name: id, subscriptionType: .channel, receiver: receiver)
}
}

// MARK: - PubNubChannelMetadataRepresentation

/// Represents channel metadata that can be subscribed to and unsubscribed from using the PubNub service.
public class ChannelMetadataRepresentation: Subscribable {
init(name: String, receiver: SubscribeReceiver) {
super.init(name: name, subscriptionType: .channel, receiver: receiver)
init(id: String, receiver: SubscribeReceiver) {
super.init(name: id, subscriptionType: .channel, receiver: receiver)
}
}
2 changes: 0 additions & 2 deletions Sources/PubNub/Events/New/EventEmitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import Foundation
public protocol StatusEmitter: AnyObject {
/// A closure to be called when the connection status changes.
var onConnectionStateChange: ((ConnectionStatus) -> Void)? { get set }
/// A closure to be called when a subscription error occurs.
var onSubscribeError: ((PubNubError) -> Void)? { get set }
}

// MARK: - EventEmitter
Expand Down
36 changes: 18 additions & 18 deletions Sources/PubNub/Events/New/Subscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,15 @@ public final class Subscription: EventEmitter, SubscriptionDisposable {
/// Use this method to create a new instance with the same configuration as the current `Subscription`.
/// The clone is a separate instance that can be used independently.
public func clone() -> Subscription {
Subscription(
let clonedSubscription = Subscription(
queue: queue,
entity: entity,
options: options
)
if receiver?.hasRegisteredAdapter(with: uuid) ?? false {
receiver?.registerAdapter(clonedSubscription.adapter)
}
return clonedSubscription
}

/// Disposes the current `Subscription`, ending the subscription.
Expand Down Expand Up @@ -165,35 +169,31 @@ extension Subscription: SubscribeMessagesReceiver {

func event(from payload: SubscribeMessagePayload) -> PubNubEvent? {
let isNewerOrEqualToTimetoken = payload.publishTimetoken.timetoken >= timetoken ?? 0
let receivedFromCurrentEntity: Bool
let isMatchingEntity: Bool

if subscriptionType == .channel {
receivedFromCurrentEntity = entity.name.matches(string: payload.channel)
isMatchingEntity = isMatchingEntityName(entity.name, string: payload.channel)
} else if subscriptionType == .channelGroup {
receivedFromCurrentEntity = entity.name.matches(string: payload.subscription ?? payload.channel)
isMatchingEntity = isMatchingEntityName(entity.name, string: payload.subscription ?? payload.channel)
} else {
receivedFromCurrentEntity = true
isMatchingEntity = true
}

if receivedFromCurrentEntity && isNewerOrEqualToTimetoken {
if isMatchingEntity && isNewerOrEqualToTimetoken {
let event = payload.asPubNubEvent()
return options.filterCriteriaSatisfied(event: event) ? event : nil
} else {
return nil
}
}
}

// MARK: - Helper String extension

fileprivate extension String {
func matches(string: String) -> Bool {
guard hasSuffix(".*") else {
return self == string

fileprivate func isMatchingEntityName(_ entityName: String, string: String) -> Bool {
guard entityName.hasSuffix(".*") else {
return entityName == string
}
let pattern = "^" + self + "$"
let predicate = NSPredicate(format: "SELF MATCHES %@", pattern)

return predicate.evaluate(with: string)
if let firstIndex = entityName.lastIndex(of: "."), let secondIndex = string.lastIndex(of: ".") {
return entityName.prefix(upTo: firstIndex) == string.prefix(upTo: secondIndex)
}
Comment on lines +194 to +196
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the "original" NSPredicate basically matched the same what is done on the line #192?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should. It compares two String up to the last occurrence of ., assuming both contain the . character. Let's suppose these scenarios:

  1. The message came from channel.item.x and the underlying entity's subscription is channel.item.*:
    • According to the line 195, I will compare channel.item with channel.item
  2. The message came from a.b.c and the underlying entity's subscription is channel.item.*
    • I will compare a.b with channel.item 🔴

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I finally got it.
I was totally fine with extension for String - it looked better.

I probably would use payload.subscription ?? payload.channel to match both channel and channelGroup subscription type. If a subscription has been created for wildcard subscription, then the subscription field will hold the same name (if an event for one of the channels from it) and will be no need for pattern match (just plain ==). The only thing which may require some attention is presence channels / channel group names.

return false
}
}
12 changes: 8 additions & 4 deletions Sources/PubNub/Events/New/SubscriptionSet.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class SubscriptionSet: EventEmitter, SubscriptionDisposable {
/// - options: Additional subscription options
public init(
queue: DispatchQueue = .main,
entities: any Collection<Subscribable>,
entities: any Collection<Subscribable> = [],
options: SubscriptionOptions = SubscriptionOptions.empty()
) {
self.queue = queue
Expand All @@ -71,7 +71,7 @@ public final class SubscriptionSet: EventEmitter, SubscriptionDisposable {
/// - options: Additional subscription options
public init(
queue: DispatchQueue = .main,
subscriptions: any Collection<Subscription>,
subscriptions: any Collection<Subscription> = [],
options: SubscriptionOptions = SubscriptionOptions.empty()
) {
self.queue = queue
Expand Down Expand Up @@ -120,11 +120,15 @@ public final class SubscriptionSet: EventEmitter, SubscriptionDisposable {
/// Use this method to create a new instance with the same configuration as the current `SubscriptionSet`.
/// The clone is a separate instance that can be used independently.
public func clone() -> SubscriptionSet {
SubscriptionSet(
let clonedSubscriptionSet = SubscriptionSet(
queue: queue,
entities: currentSubscriptions.map { $0.entity },
subscriptions: currentSubscriptions.map { $0.clone() },
options: options
)
if let receiver = currentSubscriptions.first?.receiver, receiver.hasRegisteredAdapter(with: uuid) {
receiver.registerAdapter(clonedSubscriptionSet.adapter)
}
return clonedSubscriptionSet
}

/// Disposes of the current instance of `SubscriptionSet`, ending all associated subscriptions.
Expand Down
9 changes: 4 additions & 5 deletions Sources/PubNub/PubNub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ extension PubNub: SubscribeReceiver {
subscription.registerAdapter(adapter)
}

func hasRegisteredAdapter(with uuid: UUID) -> Bool {
subscription.hasRegisteredAdapter(with: uuid)
}

func internalSubscribe(
with channels: [Subscription],
and groups: [Subscription],
Expand Down Expand Up @@ -1514,9 +1518,4 @@ extension PubNub: StatusEmitter {
get { subscription.onConnectionStateChange }
set { subscription.onConnectionStateChange = newValue }
}

public var onSubscribeError: ((PubNubError) -> Void)? {
get { subscription.onSubscribeError }
set { subscription.onSubscribeError = newValue }
}
}
4 changes: 2 additions & 2 deletions Sources/PubNub/Subscription/ConnectionStatus.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public enum ConnectionStatus: Equatable {
@available(*, deprecated, message: "This case will be removed in future versions")
case reconnecting
/// Unexpected disconnect from a remote system
case disconnectedUnexpectedly
case disconnectedUnexpectedly(PubNubError)
/// Unable to establish initial connection. Applies if `enableEventEngine` in `PubNubConfiguration` is true.
case connectionError
case connectionError(PubNubError)

/// If the connection is connected or attempting to connect
public var isActive: Bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,17 @@ class EventEngineSubscriptionSessionStrategy: SubscriptionSessionStrategy {
}

func unsubscribeFrom(
channels: [PubNubChannel],
mainChannels: [PubNubChannel],
presenceChannelsOnly: [PubNubChannel],
groups: [PubNubChannel],
mainGroups: [PubNubChannel],
presenceGroupsOnly: [PubNubChannel]
) {
// Retrieve the current list of subscribed channels and channel groups
let currentChannelsAndGroups = subscribeEngine.state.input
// Provides the outcome after updating the list of channels and channel groups
let removingResult = currentChannelsAndGroups.removing(
channels: channels,presenceChannelsOnly: presenceChannelsOnly,
groups: groups, presenceGroupsOnly: presenceGroupsOnly
mainChannels: mainChannels, presenceChannelsOnly: presenceChannelsOnly,
mainGroups: mainGroups, presenceGroupsOnly: presenceGroupsOnly
)

// Exits if there are no differences for channels or channel groups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy {
// Repeat the request
self?.performSubscribeLoop(at: cursor)
} else {
self?.connectionStatus = .disconnectedUnexpectedly
self?.connectionStatus = .disconnectedUnexpectedly(
error.pubNubError ?? PubNubError(.unknown, underlying: error)
)
}
}
}
Expand All @@ -288,19 +290,19 @@ class LegacySubscriptionSessionStrategy: SubscriptionSessionStrategy {
// MARK: - Unsubscribe

func unsubscribeFrom(
channels: [PubNubChannel],
mainChannels: [PubNubChannel],
presenceChannelsOnly: [PubNubChannel],
groups: [PubNubChannel],
mainGroups: [PubNubChannel],
presenceGroupsOnly: [PubNubChannel]
) {
let subscribeChange = internalState.lockedWrite { state -> SubscriptionChangeEvent in
.unsubscribed(
channels: channels.compactMap {
channels: mainChannels.compactMap {
state.channels.removeValue(forKey: $0.id)
} + presenceChannelsOnly.compactMap {
state.channels.unsubscribePresence($0.id)
},
groups: groups.compactMap {
groups: mainGroups.compactMap {
state.groups.removeValue(forKey: $0.id)
} + presenceGroupsOnly.compactMap {
state.groups.unsubscribePresence($0.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ protocol SubscriptionSessionStrategy: AnyObject {
at cursor: SubscribeCursor?
)
func unsubscribeFrom(
channels: [PubNubChannel],
mainChannels: [PubNubChannel],
presenceChannelsOnly: [PubNubChannel],
groups: [PubNubChannel],
mainGroups: [PubNubChannel],
parfeon marked this conversation as resolved.
Show resolved Hide resolved
presenceGroupsOnly: [PubNubChannel]
)

Expand Down
22 changes: 11 additions & 11 deletions Sources/PubNub/Subscription/SubscriptionSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class SubscriptionSession: EventEmitter, StatusEmitter {

/// `StatusEmitter` conformance
public var onConnectionStateChange: ((ConnectionStatus) -> Void)?
public var onSubscribeError: ((PubNubError) -> Void)?

var previousTokenResponse: SubscribeCursor? {
strategy.previousTokenResponse
Expand Down Expand Up @@ -70,11 +69,8 @@ public class SubscriptionSession: EventEmitter, StatusEmitter {
// Detects status changes and forwards events to the current instance
// representing the Subscribe loop's status emitter
statusListener.didReceiveStatus = { [weak self] statusChange in
switch statusChange {
case .success(let newStatus):
if case .success(let newStatus) = statusChange {
self?.onConnectionStateChange?(newStatus)
case .failure(let error):
self?.onSubscribeError?(error)
}
}
return statusListener
Expand Down Expand Up @@ -211,6 +207,10 @@ public class SubscriptionSession: EventEmitter, StatusEmitter {
// MARK: - SubscribeIntentReceiver

extension SubscriptionSession: SubscribeReceiver {
func hasRegisteredAdapter(with uuid: UUID) -> Bool {
strategy.listeners.contains { $0?.uuid == uuid }
}

// Registers a subscription adapter to translate events from a legacy listener
// into the new Listeners API.
//
Expand All @@ -229,7 +229,7 @@ extension SubscriptionSession: SubscribeReceiver {
// Maps the raw channel/channel group array to collections of `PubNubChannel` that should be unsubscribed to.
private typealias UnsubscribeRetrievalRes = (
presenceOnlyItems: [PubNubChannel],
items: [PubNubChannel]
mainItems: [PubNubChannel]
)

// Composes final PubNubChannel lists the user should subscribe to
Expand Down Expand Up @@ -312,9 +312,9 @@ extension SubscriptionSession: SubscribeReceiver {
remove(channelGroupSubscription.adapter)
}
strategy.unsubscribeFrom(
channels: extractingChannelsRes.items,
mainChannels: extractingChannelsRes.mainItems,
presenceChannelsOnly: extractingChannelsRes.presenceOnlyItems,
groups: extractingGroupsRes.items,
mainGroups: extractingGroupsRes.mainItems,
presenceGroupsOnly: extractingGroupsRes.presenceOnlyItems
)
}
Expand Down Expand Up @@ -365,7 +365,7 @@ extension SubscriptionSession: SubscribeReceiver {

return UnsubscribeRetrievalRes(
presenceOnlyItems: presenceItems,
items: channels
mainItems: channels
)
}
}
Expand Down Expand Up @@ -394,11 +394,11 @@ extension SubscriptionSession: EntityCreator {
}

public func userMetadata(_ name: String) -> UserMetadataRepresentation {
UserMetadataRepresentation(name: name, receiver: self)
UserMetadataRepresentation(id: name, receiver: self)
}

public func channelMetadata(_ name: String) -> ChannelMetadataRepresentation {
ChannelMetadataRepresentation(name: name, receiver: self)
ChannelMetadataRepresentation(id: name, receiver: self)
}
}

Expand Down
Loading
Loading