Skip to content

Commit

Permalink
EventEngine
Browse files Browse the repository at this point in the history
* Handling state parameter for heartbeat & subscribe
* Added sendsStateAutomatically flag
* Added tests for generic EventEngine class
  • Loading branch information
jguz-pubnub committed Dec 11, 2023
1 parent e3aecaa commit 59825ee
Show file tree
Hide file tree
Showing 28 changed files with 917 additions and 207 deletions.
4 changes: 4 additions & 0 deletions PubNub.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@
3DB56B652A715F7E00FC35A0 /* HeartbeatEffectTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB56B632A715F1700FC35A0 /* HeartbeatEffectTests.swift */; };
3DCC4DE02A42E93200F4A67A /* subscription_handshake_success.json in Resources */ = {isa = PBXBuildFile; fileRef = 3DCC4DDF2A42E93200F4A67A /* subscription_handshake_success.json */; };
3DD048812A8CDC4F00CE0408 /* WaitEffectTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DD048802A8CDC4F00CE0408 /* WaitEffectTests.swift */; };
3DE73D1F2B221493001B5C1E /* EventEngineTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DE73D1E2B221493001B5C1E /* EventEngineTests.swift */; };
3DE7487C2A1FA426009B0809 /* TransitionProtocol.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DE7486D2A1FA426009B0809 /* TransitionProtocol.swift */; };
3DE7487D2A1FA426009B0809 /* Dispatcher.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DE7486E2A1FA426009B0809 /* Dispatcher.swift */; };
3DE7487E2A1FA426009B0809 /* EffectHandler.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DE7486F2A1FA426009B0809 /* EffectHandler.swift */; };
Expand Down Expand Up @@ -982,6 +983,7 @@
3DB56B632A715F1700FC35A0 /* HeartbeatEffectTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = HeartbeatEffectTests.swift; sourceTree = "<group>"; };
3DCC4DDF2A42E93200F4A67A /* subscription_handshake_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = subscription_handshake_success.json; sourceTree = "<group>"; };
3DD048802A8CDC4F00CE0408 /* WaitEffectTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = WaitEffectTests.swift; sourceTree = "<group>"; };
3DE73D1E2B221493001B5C1E /* EventEngineTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EventEngineTests.swift; sourceTree = "<group>"; };
3DE7486D2A1FA426009B0809 /* TransitionProtocol.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TransitionProtocol.swift; sourceTree = "<group>"; };
3DE7486E2A1FA426009B0809 /* Dispatcher.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Dispatcher.swift; sourceTree = "<group>"; };
3DE7486F2A1FA426009B0809 /* EffectHandler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EffectHandler.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -2147,6 +2149,7 @@
3DE748882A1FA449009B0809 /* EventEngine */ = {
isa = PBXGroup;
children = (
3DE73D1E2B221493001B5C1E /* EventEngineTests.swift */,
3DE748892A1FA449009B0809 /* DispatcherTests.swift */,
3D9B29EE2A65605900C988C9 /* Presence */,
3D9B29EF2A65605900C988C9 /* Subscribe */,
Expand Down Expand Up @@ -3441,6 +3444,7 @@
35CDFEAD22E7655700F3B9F2 /* URL+PubNubTests.swift in Sources */,
35CDFEBA22E77E2B00F3B9F2 /* URLSessionConfiguration+PubNubTests.swift in Sources */,
35FE941F22F0929A0051C455 /* RequestRetrierTests.swift in Sources */,
3DE73D1F2B221493001B5C1E /* EventEngineTests.swift in Sources */,
35580686230F47EA005CDD92 /* RequestIdOperatorTests.swift in Sources */,
3DFA33952A8CEFD7003B595F /* DelayedHeartbeatEffectTests.swift in Sources */,
35CDFEA722E75BE800F3B9F2 /* OperationQueue+PubNubTests.swift in Sources */,
Expand Down
6 changes: 3 additions & 3 deletions Sources/PubNub/EventEngine/Core/EventEngineFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ typealias PresenceDispatcher = Dispatcher<Presence.Invocation, Presence.Event, P
class EventEngineFactory {
func subscribeEngine(
with configuration: PubNubConfiguration,
dispatcher: some SubscribeDispatcher = EffectDispatcher(factory: SubscribeEffectFactory.defaultFactory()),
transition: some SubscribeTransitions = SubscribeTransition()
dispatcher: some SubscribeDispatcher,
transition: some SubscribeTransitions
) -> SubscribeEngine {
EventEngine(
state: Subscribe.UnsubscribedState(),
Expand All @@ -51,7 +51,7 @@ class EventEngineFactory {

func presenceEngine(
with configuration: PubNubConfiguration,
dispatcher: some PresenceDispatcher = EffectDispatcher(factory: PresenceEffectFactory.defaultFactory()),
dispatcher: some PresenceDispatcher,
transition: some PresenceTransitions
) -> PresenceEngine {
EventEngine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,16 @@ import Foundation
class PresenceEffectFactory: EffectHandlerFactory {
private let session: SessionReplaceable
private let sessionResponseQueue: DispatchQueue
private let presenceStateContainer: PresenceStateContainer

init(session: SessionReplaceable, sessionResponseQueue: DispatchQueue = .global(qos: .default)) {
init(
session: SessionReplaceable,
sessionResponseQueue: DispatchQueue = .global(qos: .default),
presenceStateContainer: PresenceStateContainer
) {
self.session = session
self.sessionResponseQueue = sessionResponseQueue
}

static func defaultFactory() -> PresenceEffectFactory {
PresenceEffectFactory(
session: HTTPSession(
configuration: .pubnub,
sessionQueue: DispatchQueue(label: "Presence Response Queue"),
sessionStream: SessionListener()
)
)
self.presenceStateContainer = presenceStateContainer
}

func effect(
Expand All @@ -56,6 +52,7 @@ class PresenceEffectFactory: EffectHandlerFactory {
request: PresenceHeartbeatRequest(
channels: channels,
groups: groups,
channelStates: presenceStateContainer.getStates(forChannels: channels),
configuration: dependencies.value.configuration,
session: session,
sessionResponseQueue: sessionResponseQueue
Expand All @@ -66,6 +63,7 @@ class PresenceEffectFactory: EffectHandlerFactory {
request: PresenceHeartbeatRequest(
channels: channels,
groups: groups,
channelStates: presenceStateContainer.getStates(forChannels: channels),
configuration: dependencies.value.configuration,
session: session,
sessionResponseQueue: sessionResponseQueue
Expand All @@ -88,4 +86,8 @@ class PresenceEffectFactory: EffectHandlerFactory {
return WaitEffect(configuration: dependencies.value.configuration)
}
}

deinit {
session.invalidateAndCancel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,102 @@

import Foundation

// MARK: - PresenceStateContainer

class PresenceStateContainer {
private var channelStates: Atomic<[String: [String: JSONCodableScalar]]> = Atomic([:])
private var channelGroupStates: Atomic<[String: [String: JSONCodableScalar]]> = Atomic([:])

static var shared: PresenceStateContainer = PresenceStateContainer()
private init() {}

func registerState(_ state: [String: JSONCodableScalar], forChannels channels: [String]) {
channelStates.lockedWrite { channelStates in
channels.forEach {
channelStates[$0] = state
}
}
}

func registerState(_ state: [String: JSONCodableScalar], forChannelGroups groups: [String]) {
channelGroupStates.lockedWrite { channelGroupStates in
groups.forEach {
channelGroupStates[$0] = state
}
}
}

func removeState(forChannels channels: [String]) {
channelStates.lockedWrite { channelStates in
channels.map {
channelStates[$0] = nil
}
}
}

func removeState(forGroups groups: [String]) {
channelGroupStates.lockedWrite { channelGroupStates in
groups.map {
channelGroupStates[$0] = nil
}
}
}

func getStates(forChannels channels: [String]) -> [String: [String: JSONCodableScalar]] {
channelStates.lockedRead {
$0.filter {
channels.contains($0.key)
}
}
}

func getStates(forGroups channelGroups: [String]) -> [String: [String: JSONCodableScalar]] {
channelGroupStates.lockedRead {
$0.filter {
channelGroups.contains($0.key)
}
}
}
}

// MARK: - PresenceHeartbeatRequest

class PresenceHeartbeatRequest {
let channels: [String]
let groups: [String]
let configuration: PubNubConfiguration

private let session: SessionReplaceable
private let sessionResponseQueue: DispatchQueue
private let channelStates: [String: [String:JSONCodableScalar]]
private var request: RequestReplaceable?

init(
channels: [String],
groups: [String],
channelStates: [String: [String:JSONCodableScalar]],
configuration: PubNubConfiguration,
session: SessionReplaceable,
sessionResponseQueue: DispatchQueue
) {
self.channels = channels
self.groups = groups
self.channelStates = channelStates
self.configuration = configuration
self.session = session
self.sessionResponseQueue = sessionResponseQueue
}

func execute(completionBlock: @escaping (Result<Void, PubNubError>) -> Void) {
request = session.request(with: PresenceRouter(
.heartbeat(channels: channels, groups: groups, presenceTimeout: configuration.durationUntilTimeout),
configuration: configuration, eventEngineEnabled: true), requestOperator: nil
let endpoint = PresenceRouter.Endpoint.heartbeat(
channels: channels,
groups: groups,
channelStates: channelStates,
presenceTimeout: configuration.durationUntilTimeout
)
request = session.request(
with: PresenceRouter(endpoint, configuration: configuration),
requestOperator: nil
)
request?.validate().response(on: sessionResponseQueue, decoder: GenericServiceResponseDecoder()) { result in
switch result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ class PresenceLeaveRequest {
}

func execute(completionBlock: @escaping (Result<Void, PubNubError>) -> Void) {
request = session.request(with: PresenceRouter(
.leave(channels: channels, groups: groups),
configuration: configuration, eventEngineEnabled: true), requestOperator: nil
let endpoint = PresenceRouter.Endpoint.leave(
channels: channels,
groups: groups
)
request = session.request(
with: PresenceRouter(endpoint, configuration: configuration),
requestOperator: nil
)
request?.validate().response(on: sessionResponseQueue, decoder: GenericServiceResponseDecoder()) { result in
switch result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,18 @@ class SubscribeEffectFactory: EffectHandlerFactory {
private let session: SessionReplaceable
private let sessionResponseQueue: DispatchQueue
private let messageCache: MessageCache

private let presenceStateContainer: PresenceStateContainer

init(
session: SessionReplaceable,
sessionResponseQueue: DispatchQueue = .global(qos: .default),
messageCache: MessageCache = MessageCache()
messageCache: MessageCache = MessageCache(),
presenceStateContainer: PresenceStateContainer
) {
self.session = session
self.sessionResponseQueue = sessionResponseQueue
self.messageCache = messageCache
}

static func defaultFactory() -> SubscribeEffectFactory {
SubscribeEffectFactory(session: HTTPSession(
configuration: URLSessionConfiguration.subscription,
sessionQueue: DispatchQueue(label: "Subscribe Response Queue"),
sessionStream: SessionListener()
))
self.presenceStateContainer = presenceStateContainer
}

func effect(
Expand All @@ -61,6 +56,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
channelStates: presenceStateContainer.getStates(forChannels: channels),
timetoken: 0,
session: session,
sessionResponseQueue: sessionResponseQueue
Expand All @@ -72,6 +68,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
channelStates: presenceStateContainer.getStates(forChannels: channels),
timetoken: 0,
session: session,
sessionResponseQueue: sessionResponseQueue
Expand All @@ -85,6 +82,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
channelStates: [:],
timetoken: cursor.timetoken,
region: cursor.region,
session: session,
Expand All @@ -97,6 +95,7 @@ class SubscribeEffectFactory: EffectHandlerFactory {
configuration: dependencies.value.configuration,
channels: channels,
groups: groups,
channelStates: [:],
timetoken: cursor.timetoken,
region: cursor.region,
session: session,
Expand Down
33 changes: 20 additions & 13 deletions Sources/PubNub/EventEngine/Subscribe/Helpers/SubscribeRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,22 @@ class SubscribeRequest {
let timetoken: Timetoken?
let region: Int?

private let configuration: SubscriptionConfiguration
private let configuration: PubNubConfiguration
private let session: SessionReplaceable
private let sessionResponseQueue: DispatchQueue
private let channelStates: [String: [String: JSONCodableScalar]]

private var request: RequestReplaceable?

var retryLimit: UInt {
configuration.automaticRetry?.retryLimit ?? 0
}

init(
configuration: SubscriptionConfiguration,
configuration: PubNubConfiguration,
channels: [String],
groups: [String],
channelStates: [String: [String: JSONCodableScalar]],
timetoken: Timetoken? = nil,
region: Int? = nil,
session: SessionReplaceable,
Expand All @@ -54,6 +57,7 @@ class SubscribeRequest {
self.configuration = configuration
self.channels = channels
self.groups = groups
self.channelStates = channelStates
self.timetoken = timetoken
self.region = region
self.session = session
Expand All @@ -78,18 +82,21 @@ class SubscribeRequest {
}

func execute(onCompletion: @escaping (Result<SubscribeResponse, SubscribeError>) -> Void) {
let router = SubscribeRouter(
.subscribe(
channels: channels,
groups: groups,
channelStates: channelStates,
timetoken: timetoken,
region: region?.description ?? nil,
heartbeat: configuration.durationUntilTimeout,
filter: configuration.filterExpression
),
configuration: configuration
)
request = session.request(
with: SubscribeRouter(
.subscribe(
channels: channels,
groups: groups,
timetoken: timetoken,
region: region?.description ?? nil,
heartbeat: configuration.durationUntilTimeout,
filter: configuration.filterExpression,
eventEngineEnabled: true
), configuration: configuration
), requestOperator: nil
with: router,
requestOperator: nil
)
request?.validate().response(
on: sessionResponseQueue,
Expand Down
17 changes: 16 additions & 1 deletion Sources/PubNub/Extensions/URLQueryItem+PubNub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,22 @@ public extension Array where Element == URLQueryItem {
internal mutating func appendIfPresent(key: QueryKey, value: String?) {
appendIfPresent(name: key.rawValue, value: value)
}


internal mutating func append(key: QueryKey, value: @autoclosure () -> String?, when condition: Bool) {
if condition {
append(URLQueryItem(name: key.rawValue, value: value()))
}
}

internal mutating func appendIfPresent(key: QueryKey, value: @autoclosure () -> String?, when condition: Bool) {
guard condition else {
return
}
if let value = value() {
append(URLQueryItem(name: key.rawValue, value: value))
}
}

/// Creates a new query item with a csv string value and appends only if the value is not empty
mutating func appendIfNotEmpty(name: String, value: [String]) {
if !value.isEmpty {
Expand Down
4 changes: 4 additions & 0 deletions Sources/PubNub/Networking/HTTPRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public protocol RouterConfiguration {
var useRequestId: Bool { get }
/// Ordered list of key-value pairs which identify various consumers.
var consumerIdentifiers: [String: String] { get }
/// This controls whether to enable a new, experimental implementation of Subscription and Presence handling
var enableEventEngine: Bool { get }
/// When `true` the SDK will resend the last channel state that was set using `PubNub.setPresence`
var sendsStateAutomatically: Bool { get }
}

public extension RouterConfiguration {
Expand Down
Loading

0 comments on commit 59825ee

Please sign in to comment.