Skip to content

Commit

Permalink
Presence & Subscribe Event Engine
Browse files Browse the repository at this point in the history
* Implement contract tests
* Ensures that Subscribe EE always take 'region' parameter received from the response
* Correct handling of heartbeatInterval=0 and suppressLeaveEvents=true
  • Loading branch information
jguz-pubnub committed Nov 21, 2023
1 parent 9b7c1f5 commit 46cd8c9
Show file tree
Hide file tree
Showing 11 changed files with 516 additions and 204 deletions.
15 changes: 12 additions & 3 deletions Sources/PubNub/EventEngine/Presence/PresenceTransition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ class PresenceTransition: TransitionProtocol {
typealias Event = Presence.Event
typealias Invocation = Presence.Invocation

private let configuration: PubNubConfiguration

init(configuration: PubNubConfiguration) {
self.configuration = configuration
}

func canTransition(from state: State, dueTo event: Event) -> Bool {
switch event {
case .joined(_,_):
return true
return configuration.heartbeatInterval > 0
case .left(_,_):
return !(state is Presence.HeartbeatInactive)
return !(state is Presence.HeartbeatInactive) && !configuration.supressLeaveEvents
case .heartbeatSuccess:
return state is Presence.Heartbeating || state is Presence.HeartbeatReconnecting
case .heartbeatFailed(_):
Expand All @@ -62,7 +68,10 @@ class PresenceTransition: TransitionProtocol {
case is Presence.HeartbeatCooldown:
return [.managed(.wait)]
case let state as Presence.HeartbeatReconnecting:
return [.managed(.delayedHeartbeat(channels: state.channels, groups: state.groups, retryAttempt: state.retryAttempt, error: state.error))]
return [.managed(.delayedHeartbeat(
channels: state.channels, groups: state.groups,
retryAttempt: state.retryAttempt, error: state.error
))]
default:
return []
}
Expand Down
62 changes: 18 additions & 44 deletions Sources/PubNub/EventEngine/Subscribe/SubscribeTransition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ class SubscribeTransition: TransitionProtocol {

switch event {
case .handshakeSuccess(let cursor):
results = setReceivingState(from: state, cursor: state.hasTimetoken ? state.cursor : cursor)
results = setReceivingState(from: state, cursor: resolveCursor(for: state, new: cursor))
case .handshakeFailure(let error):
results = setHandshakeReconnectingState(from: state, error: error)
case .handshakeReconnectSuccess(let cursor):
results = setReceivingState(from: state, cursor: state.hasTimetoken ? state.cursor : cursor)
results = setReceivingState(from: state, cursor: resolveCursor(for: state, new: cursor))
case .handshakeReconnectFailure(let error):
results = setHandshakeReconnectingState(from: state, error: error)
case .handshakeReconnectGiveUp(let error):
Expand All @@ -162,9 +162,9 @@ class SubscribeTransition: TransitionProtocol {
case .receiveReconnectGiveUp(let error):
results = setReceiveFailedState(from: state, error: error)
case .subscriptionChanged(let channels, let groups):
results = onSubscriptionChanged(from: state, channels: channels, groups: groups)
results = onSubscriptionAltered(from: state, channels: channels, groups: groups, cursor: state.cursor)
case .subscriptionRestored(let channels, let groups, let cursor):
results = onSubscriptionRestored(from: state, channels: channels, groups: groups, cursor: cursor)
results = onSubscriptionAltered(from: state, channels: channels, groups: groups, cursor: cursor)
case .disconnect:
results = setStoppedState(from: state)
case .unsubscribeAll:
Expand All @@ -178,57 +178,31 @@ class SubscribeTransition: TransitionProtocol {
invocations: onExit(from: state) + results.invocations + onEntry(to: results.state)
)
}
}

fileprivate extension SubscribeTransition {
func onSubscriptionChanged(
from state: State,
channels: [String],
groups: [String]
) -> TransitionResult<State, Invocation> {
let newInput = SubscribeInput(
channels: channels.map { PubNubChannel(id: $0, withPresence: $0.isPresenceChannelName) },
groups: groups.map { PubNubChannel(id: $0, withPresence: $0.isPresenceChannelName) }
)
if newInput.isEmpty {
return setUnsubscribedState(from: state)
} else {
switch state {
case is Subscribe.HandshakingState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: state.cursor))
case is Subscribe.HandshakeReconnectingState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: state.cursor))
case is Subscribe.HandshakeStoppedState:
return TransitionResult(state: Subscribe.HandshakeStoppedState(input: newInput, cursor: state.cursor))
case is Subscribe.HandshakeFailedState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: state.cursor))
case is Subscribe.ReceivingState:
return TransitionResult(state: Subscribe.ReceivingState(input: newInput, cursor: state.cursor))
case is Subscribe.ReceiveReconnectingState:
return TransitionResult(state: Subscribe.ReceivingState(input: newInput, cursor: state.cursor))
case is Subscribe.ReceiveStoppedState:
return TransitionResult(state: Subscribe.ReceiveStoppedState(input: newInput, cursor: state.cursor))
case is Subscribe.ReceiveFailedState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: state.cursor))
case is Subscribe.UnsubscribedState:
return TransitionResult(state: Subscribe.HandshakingState(input: newInput, cursor: state.cursor))
default:
return TransitionResult(state: state)
}

private func resolveCursor(
for currentState: State,
new cursor: SubscribeCursor
) -> SubscribeCursor {
if currentState.hasTimetoken {
return SubscribeCursor(
timetoken: currentState.cursor.timetoken,
region: cursor.region
)
}
return cursor
}
}

fileprivate extension SubscribeTransition {
func onSubscriptionRestored(
func onSubscriptionAltered(
from state: State,
channels: [String],
groups: [String],
cursor: SubscribeCursor
) -> TransitionResult<State, Invocation> {
let newInput = SubscribeInput(
channels: channels.map { PubNubChannel(id: $0, withPresence: $0.isPresenceChannelName) },
groups: groups.map { PubNubChannel(id: $0, withPresence: $0.isPresenceChannelName) }
channels: channels.map { PubNubChannel(channel: $0) },
groups: groups.map { PubNubChannel(channel: $0) }
)

if newInput.isEmpty {
Expand Down
2 changes: 1 addition & 1 deletion Sources/PubNub/Networking/Routers/PresenceRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct PresenceRouter: HTTPRouter {
case let .heartbeat(channels, _, _):
path = "/v2/presence/sub-key/\(subscribeKey)/channel/\(channels.commaOrCSVString.urlEncodeSlash)/heartbeat"
case let .leave(channels, _):
path = "/v2/presence/sub_key/\(subscribeKey)/channel/\(channels.commaOrCSVString.urlEncodeSlash)/leave"
path = "/v2/presence/sub-key/\(subscribeKey)/channel/\(channels.commaOrCSVString.urlEncodeSlash)/leave"
case let .hereNow(channels, _, _, _):
path = "/v2/presence/sub-key/\(subscribeKey)/channel/\(channels.commaOrCSVString.urlEncodeSlash)"
case .hereNowGlobal:
Expand Down
2 changes: 1 addition & 1 deletion Sources/PubNub/Subscription/SubscribeSessionFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public class SubscribeSessionFactory {
let presenceEngine = EventEngineFactory().presenceEngine(
with: config,
dispatcher: presenceDispatcher,
transition: PresenceTransition()
transition: PresenceTransition(configuration: config)
)
let subscriptionSession = SubscriptionSession(
strategy: EventEngineSubscriptionSessionStrategy(
Expand Down
87 changes: 65 additions & 22 deletions Tests/PubNubContractTest/PubNubContractTestCase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,31 @@ let defaultPublishKey = "demo-36"
@objc public class PubNubContractTestCase: XCTestCase {
public var messageReceivedHandler: ((PubNubMessage, [PubNubMessage]) -> Void)?
public var statusReceivedHandler: ((SubscriptionListener.StatusEvent, [SubscriptionListener.StatusEvent]) -> Void)?
public var presenceChangeReceivedHandler: ((PubNubPresenceChange, [PubNubPresenceChange]) -> Void)?

fileprivate static var _receivedErrorStatuses: [SubscriptionListener.StatusEvent] = []
fileprivate static var _receivedStatuses: [SubscriptionListener.StatusEvent] = []
fileprivate static var _receivedMessages: [PubNubMessage] = []
fileprivate static var _receivedPresenceChanges: [PubNubPresenceChange] = []

fileprivate static var _currentScenario: CCIScenarioDefinition?
fileprivate static var _apiCallResults: [Any] = []
fileprivate lazy var currentConfiguration: PubNubConfiguration = { createConfiguration() }()

fileprivate static var _currentConfiguration = PubNubContractTestCase._defaultConfiguration
fileprivate static var _defaultConfiguration: PubNubConfiguration {
PubNubConfiguration(
publishKey: defaultPublishKey,
subscribeKey: defaultSubscribeKey,
userId: UUID().uuidString,
useSecureConnections: false,
origin: mockServerAddress,
supressLeaveEvents: true
)
}

fileprivate static var currentClient: PubNub?

public var configuration: PubNubConfiguration { currentConfiguration }
public var configuration: PubNubConfiguration { PubNubContractTestCase._currentConfiguration }

public var expectSubscribeFailure: Bool { false }

Expand All @@ -71,6 +87,11 @@ let defaultPublishKey = "demo-36"
get { PubNubContractTestCase._receivedMessages }
set { PubNubContractTestCase._receivedMessages = newValue }
}

public var receivedPresenceChanges: [PubNubPresenceChange] {
get { PubNubContractTestCase._receivedPresenceChanges }
set { PubNubContractTestCase._receivedPresenceChanges = newValue }
}

public var apiCallResults: [Any] {
get { PubNubContractTestCase._apiCallResults }
Expand All @@ -81,23 +102,14 @@ let defaultPublishKey = "demo-36"
if PubNubContractTestCase.currentClient == nil {
PubNubContractTestCase.currentClient = createPubNubClient()
}

return PubNubContractTestCase.currentClient!
}

func replacePubNubConfiguration(with configuration: PubNubConfiguration) {
currentConfiguration = configuration
}

func createConfiguration() -> PubNubConfiguration {
PubNubConfiguration(
publishKey: defaultPublishKey,
subscribeKey: defaultSubscribeKey,
userId: UUID().uuidString,
useSecureConnections: false,
origin: mockServerAddress,
supressLeaveEvents: true
)
if PubNubContractTestCase.currentClient != nil {
preconditionFailure("Cannot replace configuration when PubNub instance was already created")
}
PubNubContractTestCase._currentConfiguration = configuration
}

func createPubNubClient() -> PubNub {
Expand All @@ -118,19 +130,14 @@ let defaultPublishKey = "demo-36"
}

public func handleAfterHook() {
currentConfiguration = PubNubConfiguration(publishKey: defaultPublishKey,
subscribeKey: defaultSubscribeKey,
userId: UUID().uuidString,
useSecureConnections: false,
origin: mockServerAddress,
supressLeaveEvents: true)

PubNubContractTestCase._currentConfiguration = PubNubContractTestCase._defaultConfiguration
PubNubContractTestCase.currentClient?.unsubscribeAll()
PubNubContractTestCase.currentClient = nil

receivedErrorStatuses.removeAll()
receivedStatuses.removeAll()
receivedMessages.removeAll()
receivedPresenceChanges.removeAll()
apiCallResults.removeAll()
}

Expand Down Expand Up @@ -313,6 +320,15 @@ let defaultPublishKey = "demo-36"
handler(message, strongSelf.receivedMessages)
}
}

listener.didReceivePresence = { [weak self] presenceChange in
guard let strongSelf = self else { return }
strongSelf.receivedPresenceChanges.append(presenceChange)

if let handler = strongSelf.presenceChangeReceivedHandler {
handler(presenceChange, strongSelf.receivedPresenceChanges)
}
}

client.add(listener)
client.subscribe(to: channels, and: groups, at: timetoken, withPresence: presence)
Expand All @@ -339,6 +355,33 @@ let defaultPublishKey = "demo-36"
return receivedMessages.count > 0 ? receivedMessages : nil
}
}

// MARK: - Presence

@discardableResult
public func waitForPresenceChanges(_: PubNub, count: Int) -> [PubNubPresenceChange]? {
if receivedPresenceChanges.count < count {
let receivedPresenceChangeExpectation = expectation(description: "Subscribe messages")
receivedPresenceChangeExpectation.assertForOverFulfill = false
presenceChangeReceivedHandler = { _, presenceChanges in
if presenceChanges.count >= count {
receivedPresenceChangeExpectation.fulfill()
}
}

wait(for: [receivedPresenceChangeExpectation], timeout: 10.0)
}

defer {
receivedPresenceChanges.removeAll()
}

if receivedPresenceChanges.count > count {
return Array(receivedPresenceChanges[..<count])
} else {
return receivedPresenceChanges.count > 0 ? receivedPresenceChanges : nil
}
}

// MARK: - Results handling

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
transitionDecorator = nil
super.handleAfterHook()
}

override func handleBeforeHook() {
override func createPubNubClient() -> PubNub {
dispatcherDecorator = DispatcherDecorator(wrappedInstance: EffectDispatcher(
factory: PresenceEffectFactory(
session: HTTPSession(
Expand All @@ -104,19 +104,20 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep
)
))
transitionDecorator = TransitionDecorator(
wrappedInstance: PresenceTransition()
wrappedInstance: PresenceTransition(configuration: configuration)
)
super.handleBeforeHook()
}

override func createPubNubClient() -> PubNub {

let factory = EventEngineFactory()
let subscriptionSession = SubscriptionSession(
strategy: EventEngineSubscriptionSessionStrategy(
configuration: self.configuration,
subscribeEngine: factory.subscribeEngine(
with: self.configuration,
dispatcher: EmptyDispatcher(),
dispatcher: EffectDispatcher(factory: SubscribeEffectFactory(session: HTTPSession(
configuration: URLSessionConfiguration.subscription,
sessionQueue: DispatchQueue(label: "Subscribe Response Queue"),
sessionStream: SessionListener()
))),
transition: SubscribeTransition()
),
presenceEngine: factory.presenceEngine(
Expand All @@ -134,5 +135,65 @@ class PubNubPresenceEngineContractTestsSteps: PubNubEventEngineContractTestsStep

override public func setup() {
startCucumberHookEventsListening()

Given("^the demo keyset with Presence Event Engine enabled$") { args, _ in
self.replacePubNubConfiguration(with: PubNubConfiguration(
publishKey: self.configuration.publishKey,
subscribeKey: self.configuration.subscribeKey,
userId: self.configuration.userId,
useSecureConnections: self.configuration.useSecureConnections,
origin: self.configuration.origin,
enableEventEngine: true
))
}

Given("^heartbeatInterval set to '([0-9]+)', timeout set to '([0-9]+)' and suppressLeaveEvents set to '(.*)'$") { args, _ in
self.replacePubNubConfiguration(with: PubNubConfiguration(
publishKey: self.configuration.publishKey,
subscribeKey: self.configuration.subscribeKey,
userId: self.configuration.userId,
useSecureConnections: self.configuration.useSecureConnections,
origin: self.configuration.origin,
durationUntilTimeout: UInt(args![1])!,
heartbeatInterval: UInt(args![0])!,
supressLeaveEvents: args![2] == "true",
enableEventEngine: self.configuration.enableEventEngine
))
}

When("^I join '(.*)', '(.*)', '(.*)' channels$") { args, _ in
let firstChannel = args?[0] ?? ""
let secondChannel = args?[1] ?? ""
let thirdChannel = args?[2] ?? ""

self.subscribeSynchronously(self.client, to: [firstChannel, secondChannel, thirdChannel], with: true)
}

Then("^I wait for getting Presence joined events$") { args, _ in
XCTAssertNotNil(self.waitForPresenceChanges(self.client, count: 3))
}

Then("^I wait for getting Presence left events$") { args, _ in
XCTAssertNotNil(self.waitForPresenceChanges(self.client, count: 2))
}

Then("^I leave '(.*)' and '(.*)' channels$") { args, _ in
let firstChannel = args?[0] ?? ""
let secondChannel = args?[1] ?? ""

self.client.unsubscribe(from: [firstChannel, secondChannel])
}

Match(["And"], "^The timeout expires$") { _, _ in
self.waitFor(delay: TimeInterval(self.configuration.durationUntilTimeout))
}

Match(["And", "Then"], "^I observe the following Events and Invocations of the Presence EE:$") { args, value in
let recordedEvents = self.transitionDecorator.recordedEvents.map { $0.contractTestIdentifier }
let recordedInvocations = self.dispatcherDecorator.recordedInvocations.map { $0.contractTestIdentifier }

XCTAssertTrue(recordedEvents.elementsEqual(self.extractExpectedResults(from: value).events))
XCTAssertTrue(recordedInvocations.elementsEqual(self.extractExpectedResults(from: value).invocations))
}
}
}
Loading

0 comments on commit 46cd8c9

Please sign in to comment.