Skip to content

Commit

Permalink
Merge pull request #830 from appwrite/fix-apple-multiple-subscriptions
Browse files Browse the repository at this point in the history
Fix Apple realtime with multiple subscriptions
  • Loading branch information
abnegate authored May 17, 2024
2 parents 7a10919 + 4644ea2 commit 830a46c
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 105 deletions.
2 changes: 1 addition & 1 deletion templates/apple/Package.swift.twig
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let package = Package(
),
],
dependencies: [
.package(url: "https://github.com/swift-server/async-http-client.git", from: "1.9.0"),
.package(url: "https://github.com/swift-server/async-http-client.git", from: "1.17.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.32.0"),
],
targets: [
Expand Down
10 changes: 7 additions & 3 deletions templates/swift/Sources/Models/RealtimeModels.swift.twig
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import Foundation

public class RealtimeSubscription {
public var close: () -> Void
private var close: () async throws -> Void

init(close: @escaping () -> Void) {
init(close: @escaping () async throws-> Void) {
self.close = close
}

public func close() async throws {
try await self.close()
}
}

public class RealtimeCallback {
Expand All @@ -14,7 +18,7 @@ public class RealtimeCallback {

init(
for channels: Set<String>,
and callback: @escaping (RealtimeResponseEvent) -> Void
with callback: @escaping (RealtimeResponseEvent) -> Void
) {
self.channels = channels
self.callback = callback
Expand Down
100 changes: 55 additions & 45 deletions templates/swift/Sources/Services/Realtime.swift.twig
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,23 @@ open class Realtime : Service {

private let TYPE_ERROR = "error"
private let TYPE_EVENT = "event"
private let DEBOUNCE_MILLIS = 1
private let DEBOUNCE_NANOS = 1_000_000

private var socketClient: WebSocketClient? = nil
private var activeChannels = Set<String>()
private var activeSubscriptions = [Int: RealtimeCallback]()

let connectSync = DispatchQueue(label: "ConnectSync")
let callbackSync = DispatchQueue(label: "CallbackSync")

private var subCallDepth = 0
private var reconnectAttempts = 0
private var subscriptionsCounter = 0
private var reconnect = true

private func createSocket() {
private func createSocket() async throws {
guard activeChannels.count > 0 else {
reconnect = false
closeSocket()
try await closeSocket()
return
}

Expand All @@ -38,17 +37,31 @@ open class Realtime : Service {

if (socketClient != nil) {
reconnect = false
closeSocket()
} else {
socketClient = WebSocketClient(url, tlsEnabled: !client.selfSigned, delegate: self)!
try await closeSocket()
}

try! socketClient?.connect()
socketClient = WebSocketClient(
url,
tlsEnabled: !client.selfSigned,
delegate: self
)

try await socketClient?.connect()
}

private func closeSocket() {
socketClient?.close()
//socket?.close(RealtimeCode.POLICY_VIOLATION.value, null)
private func closeSocket() async throws {
guard let client = socketClient,
let group = client.threadGroup else {
return
}

if (client.isConnected) {
let promise = group.any().makePromise(of: Void.self)
client.close(promise: promise)
try await promise.futureResult.get()
}

try await group.shutdownGracefully()
}

private func getTimeout() -> Int {
Expand All @@ -63,8 +76,8 @@ open class Realtime : Service {
public func subscribe(
channel: String,
callback: @escaping (RealtimeResponseEvent) -> Void
) -> RealtimeSubscription {
return subscribe(
) async throws -> RealtimeSubscription {
return try await subscribe(
channels: [channel],
payloadType: String.self,
callback: callback
Expand All @@ -74,8 +87,8 @@ open class Realtime : Service {
public func subscribe(
channels: Set<String>,
callback: @escaping (RealtimeResponseEvent) -> Void
) -> RealtimeSubscription {
return subscribe(
) async throws -> RealtimeSubscription {
return try await subscribe(
channels: channels,
payloadType: String.self,
callback: callback
Expand All @@ -86,8 +99,8 @@ open class Realtime : Service {
channel: String,
payloadType: T.Type,
callback: @escaping (RealtimeResponseEvent) -> Void
) -> RealtimeSubscription {
return subscribe(
) async throws -> RealtimeSubscription {
return try await subscribe(
channels: [channel],
payloadType: T.self,
callback: callback
Expand All @@ -98,36 +111,38 @@ open class Realtime : Service {
channels: Set<String>,
payloadType: T.Type,
callback: @escaping (RealtimeResponseEvent) -> Void
) -> RealtimeSubscription {
) async throws -> RealtimeSubscription {
subscriptionsCounter += 1
let counter = subscriptionsCounter

let count = subscriptionsCounter

channels.forEach {
activeChannels.insert($0)
}

activeSubscriptions[counter] = RealtimeCallback(
activeSubscriptions[count] = RealtimeCallback(
for: Set(channels),
and: callback
with: callback
)

connectSync.sync {
subCallDepth+=1
}

DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(DEBOUNCE_MILLIS)) {
if (self.subCallDepth == 1) {
self.createSocket()
}
self.connectSync.sync {
self.subCallDepth-=1
}
try await Task.sleep(nanoseconds: UInt64(DEBOUNCE_NANOS))

if self.subCallDepth == 1 {
try await self.createSocket()
}

connectSync.sync {
self.subCallDepth -= 1
}

return RealtimeSubscription {
self.activeSubscriptions[counter] = nil
self.activeSubscriptions[count] = nil
self.cleanUp(channels: channels)
self.createSocket()
try await self.createSocket()
}
}

Expand Down Expand Up @@ -163,7 +178,7 @@ extension Realtime: WebSocketClientDelegate {
}
}

public func onClose(channel: Channel, data: Data) {
public func onClose(channel: Channel, data: Data) async throws {
if (!reconnect) {
reconnect = true
return
Expand All @@ -173,10 +188,11 @@ extension Realtime: WebSocketClientDelegate {

print("Realtime disconnected. Re-connecting in \(timeout / 1000) seconds.")

DispatchQueue.main.asyncAfter(deadline: .now() + .milliseconds(timeout)) {
self.reconnectAttempts += 1
self.createSocket()
}
try await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000))

self.reconnectAttempts += 1

try await self.createSocket()
}

public func onError(error: Swift.Error?, status: HTTPResponseStatus?) {
Expand All @@ -188,16 +204,10 @@ extension Realtime: WebSocketClientDelegate {
}

func handleResponseEvent(from json: [String: Any]) {
guard let data = json["data"] as? [String: Any] else {
return
}
guard let channels = data["channels"] as? Array<String> else {
return
}
guard let events = data["events"] as? Array<String> else {
return
}
guard let payload = data["payload"] as? [String: Any] else {
guard let data = json["data"] as? [String: Any],
let channels = data["channels"] as? [String],
let events = data["events"] as? [String],
let payload = data["payload"] as? [String: Any] else {
return
}
guard channels.contains(where: { channel in
Expand Down
Loading

0 comments on commit 830a46c

Please sign in to comment.