From 3de51674198eb9d3238718e25a98588c1e1e5d3c Mon Sep 17 00:00:00 2001 From: RuiyangSun Date: Sun, 21 Apr 2024 17:58:02 +0800 Subject: [PATCH 1/3] chore(BidrectionalSyncStream): too long line --- Sources/SyncStream/BidirectionalSyncStream.swift | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Sources/SyncStream/BidirectionalSyncStream.swift b/Sources/SyncStream/BidirectionalSyncStream.swift index 00011cf..98a3755 100644 --- a/Sources/SyncStream/BidirectionalSyncStream.swift +++ b/Sources/SyncStream/BidirectionalSyncStream.swift @@ -247,7 +247,10 @@ public extension BidirectionalSyncStream { _: YieldT.Type = YieldT.self, _: SendT.Type = SendT.self, _: ReturnT.Type = ReturnT.self - ) -> (stream: BidirectionalSyncStream, continuation: BidirectionalSyncStream.Continuation) { + ) -> ( + stream: BidirectionalSyncStream, + continuation: BidirectionalSyncStream.Continuation + ) { let stream = BidirectionalSyncStream { _ in } let continuation = stream.continuation return (stream, continuation) From 1545f36e7cfd64abec3622b7879bdbbaf947b9ca Mon Sep 17 00:00:00 2001 From: RuiyangSun Date: Sun, 21 Apr 2024 18:01:43 +0800 Subject: [PATCH 2/3] feat: add `BidirectionalAsyncStream` --- Sources/SyncStream/AsyncSemphore.swift | 106 ++++++++ .../SyncStream/BidirectionalAsyncStream.swift | 242 ++++++++++++++++++ 2 files changed, 348 insertions(+) create mode 100644 Sources/SyncStream/AsyncSemphore.swift create mode 100644 Sources/SyncStream/BidirectionalAsyncStream.swift diff --git a/Sources/SyncStream/AsyncSemphore.swift b/Sources/SyncStream/AsyncSemphore.swift new file mode 100644 index 0000000..87cb610 --- /dev/null +++ b/Sources/SyncStream/AsyncSemphore.swift @@ -0,0 +1,106 @@ +// Copyright 2024-2024 Ruiyang Sun. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Dispatch +import Foundation + +@available(macOS 10.15, *) +internal actor AsyncSemphore { + // MARK: Lifecycle + + deinit {} + + internal init(value: Int) { + self.value = value + } + + // MARK: Internal + + internal func wait() async { + value -= 1 + if value < 0 { + _ = await withCheckedContinuation { continuation in + let workItem = DispatchWorkItem { continuation.resume() } + self.worksAndIDs.append((workItem, UUID())) + } + } + } + + internal func wait(timeout: DispatchTime) async -> DispatchTimeoutResult { + await withCheckedContinuation { continuation in + value -= 1 + if value >= 0 { + continuation.resume(returning: .success) + return + } + + let id = UUID() + let workItem = DispatchWorkItem { continuation.resume(returning: .success) } + self.worksAndIDs.append((workItem, id)) + + queue.asyncAfter(deadline: timeout) { + Task { + if await self.removeWork(withID: id) { + continuation.resume(returning: .timedOut) + } + } + } + } + } + + internal func wait(wallTimeout: DispatchWallTime) async -> DispatchTimeoutResult { + await withCheckedContinuation { continuation in + value -= 1 + if value >= 0 { + continuation.resume(returning: .success) + return + } + + let id = UUID() + let workItem = DispatchWorkItem { continuation.resume(returning: .success) } + self.worksAndIDs.append((workItem, id)) + + queue.asyncAfter(wallDeadline: wallTimeout) { + Task { + if await self.removeWork(withID: id) { + continuation.resume(returning: .timedOut) + } + } + } + } + } + + internal func signal() async { + value += 1 + if let work = worksAndIDs.first { + worksAndIDs.removeFirst() + queue.sync(execute: work.work) + } + } + + // MARK: Private + + private var value: Int + private var queue = DispatchQueue(label: "com.AsyncDispatchSemphore.\(UUID().uuidString)") + private var worksAndIDs = [(work: DispatchWorkItem, id: UUID)]() + + private func removeWork(withID id: UUID) async -> Bool { + if let index = worksAndIDs.firstIndex(where: { $0.id == id }) { + worksAndIDs.remove(at: index) + value += 1 + return true + } + return false + } +} diff --git a/Sources/SyncStream/BidirectionalAsyncStream.swift b/Sources/SyncStream/BidirectionalAsyncStream.swift new file mode 100644 index 0000000..b2f68dd --- /dev/null +++ b/Sources/SyncStream/BidirectionalAsyncStream.swift @@ -0,0 +1,242 @@ +// Copyright 2024-2024 Ruiyang Sun. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Dispatch +import Foundation + +// MARK: - BidirectionalAsyncStream + +/// A mechanism inspired by Python's generator to allow for bidirectional communication between two +/// parties. One party can yield a value and the other party can send a value back. +@available(macOS 10.15, *) +public class BidirectionalAsyncStream { + // MARK: Lifecycle + + /// Creates a new `BidirectionalSyncStream`. + /// + /// - Parameters: + /// - YieldT: The type of the value to yield. + /// - SendT: The type of the value to send. + /// - ReturnT: The type of the value to return. + /// - build: A async closure that takes a `Continuation` and returns `Void`. + public init( + _: YieldT.Type = YieldT.self, + _: SendT.Type = SendT.self, + _: ReturnT.Type = ReturnT.self, + _ build: @escaping (Continuation) async -> Void + ) { + self.build = build + continuation = Continuation() + } + + deinit {} + + // MARK: Public + + /// Advances the stream to the next value. In this stream, it is used to + /// start the stream. + /// + /// - Returns: The next value in the stream. + /// - Throws: `StopIteration` if the stream has finished. + /// - Throws: `WrongStreamUse` if invalid interaction with the stream is detected. + public func next() async throws -> YieldT { + if case let .finished(value) = finished { + throw StopIteration(value: value) + } + if started { + throw WrongStreamUse( + message: "The BidirectionalSyncStream has already started, " + + "Use send() instead of next() to continue the stream." + ) + } + await start() + + await continuation.yieldSemaphore.wait() + switch continuation.state { + case let .yielded(value): + continuation.state = .waitingForSend + return value + + case let .finished(value): + finished = .finished(value) + throw StopIteration(value: value) + + default: + throw WrongStreamUse(message: "yield or return must be called in the continuation closure") + } + } + + /// Sends a value to the stream, and returns the next value. + /// + /// - Parameters: + /// - element: The value to send. + /// + /// - Returns: The next value in the stream. + /// + /// - Throws: `StopIteration` if the stream has finished. + /// - Throws: `WrongStreamUse` if invalid interaction with the stream is detected. + /// + /// - Note: This method can only be called after calling `next()`. + public func send(_ element: SendT) async throws -> YieldT { + guard started else { + throw WrongStreamUse( + message: "The BidirectionalSyncStream has not started yet, " + + "Use next() to start the stream." + ) + } + + if case let .finished(value) = finished { + throw StopIteration(value: value) + } + + continuation.sendValue = element + continuation.state = .sended(element) + await continuation.sendSemaphore.signal() + await continuation.yieldSemaphore.wait() + switch continuation.state { + case let .yielded(value): + continuation.state = .waitingForSend + return value + + case let .finished(value): + finished = .finished(value) + throw StopIteration(value: value) + + default: + throw WrongStreamUse(message: "yield or return must be called in the continuation closure") + } + } + + // MARK: Internal + + internal enum State { + case idle + case yielded(YieldT) + case waitingForSend + case sended(SendT) + case finished(ReturnT) + } + + // MARK: Private + + private var started = false + private var finished: State = .idle + private var build: (Continuation) async -> Void + private var continuation: Continuation + private var queue = DispatchQueue(label: "com.BidirectionalAsyncStream.\(UUID().uuidString)") + + private func start() async { + started = true + Task { await build(continuation) } + } +} + +// MARK: BidirectionalAsyncStream.Continuation + +@available(macOS 10.15, *) +public extension BidirectionalAsyncStream { + /// A continuation of the `BidirectionalAsyncStream`. + /// It is used to communicate between the two parties. + class Continuation { + // MARK: Lifecycle + + deinit {} + + // MARK: Public + + /// Yields a value to the stream and waits for a value to be sent back. + /// + /// - Parameters: + /// - element: The value to yield. + /// + /// - Returns: The value sent back. + @discardableResult + public func yield(_ element: YieldT) async -> SendT { + if finished { + fatalError("The stream has finished. Cannot yield any more.") + } + + state = .yielded(element) + await yieldSemaphore.signal() + await sendSemaphore.wait() + return sendValue! + } + + /// Returns a value to the stream and finishes the stream. + /// This is the last call in the stream. + public func `return`(_ element: ReturnT) async { + if finished { + fatalError("The stream has finished. Cannot return any more.") + } + + finished = true + state = .finished(element) + await yieldSemaphore.signal() + } + + // MARK: Internal + + internal var state: State = .idle + internal var yieldSemaphore = AsyncSemphore(value: 0) + internal var sendSemaphore = AsyncSemphore(value: 0) + internal var sendValue: SendT? + + // MARK: Private + + private var finished = false + } +} + +@available(macOS 10.15, *) +public extension BidirectionalAsyncStream { + /// Converts the stream to a `SyncStream`. + /// + /// Only works when the `SendT` type is `NoneType`, and the `YieldT` type is the same as the `ReturnT` type. + func toAsyncStream() async -> AsyncStream where SendT.Type == NoneType.Type, YieldT.Type == ReturnT.Type { + AsyncStream { continuation in + Task { + do { + let value = try await self.next() + continuation.yield(value) + while true { + let value = try await self.send(NoneType()) + continuation.yield(value) + } + } catch { + if let value = (error as? StopIteration)?.value { + continuation.yield(value) + } + continuation.finish() + } + } + } + } + + /// Constructs an Bidrectional asynchronous stream from the Element Type + /// + /// - Returns: A tuple containing the stream and its continuation. The continuation + /// should be passed to the producer while the stream should be passed to the consumer. + static func makeStream( + _: YieldT.Type = YieldT.self, + _: SendT.Type = SendT.self, + _: ReturnT.Type = ReturnT.self + ) -> ( + stream: BidirectionalAsyncStream, + continuation: BidirectionalAsyncStream.Continuation + ) { + let stream = BidirectionalAsyncStream { _ in } + let continuation = stream.continuation + return (stream, continuation) + } +} From b67defd573731703541dc61f6a141bf2b18414e6 Mon Sep 17 00:00:00 2001 From: RuiyangSun Date: Sun, 21 Apr 2024 18:02:09 +0800 Subject: [PATCH 3/3] test: add tests for `BidirectionalAsyncStream` --- Tests/SyncStreamTests/AsyncSemphore.swift | 112 +++++++++++++++ .../BidirectionalAsyncStreamTests.swift | 136 ++++++++++++++++++ 2 files changed, 248 insertions(+) create mode 100644 Tests/SyncStreamTests/AsyncSemphore.swift create mode 100644 Tests/SyncStreamTests/BidirectionalAsyncStreamTests.swift diff --git a/Tests/SyncStreamTests/AsyncSemphore.swift b/Tests/SyncStreamTests/AsyncSemphore.swift new file mode 100644 index 0000000..b6851d0 --- /dev/null +++ b/Tests/SyncStreamTests/AsyncSemphore.swift @@ -0,0 +1,112 @@ +// Copyright 2024-2024 Ruiyang Sun. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +@testable import SyncStream +import XCTest + +@available(macOS 10.15, *) +final class AsyncDispatchSemaphoreTests: XCTestCase { + func testSemaphoreInitialization() async { + let semaphore = AsyncSemphore(value: 1) + await semaphore.signal() // Increase the semaphore to ensure it's initialized correctly. + await semaphore.wait() // This should pass immediately if the semaphore was initialized with value 1. + } + + func testSemaphoreWaitAndSignal() async { + let semaphore = AsyncSemphore(value: 0) + + let expectation = XCTestExpectation(description: "Semaphore signal") + + Task { + await semaphore.wait() + expectation.fulfill() + } + + await semaphore.signal() // This should fulfill the expectation by allowing the wait to complete. + + await fulfillment(of: [expectation], timeout: 2.0) + } + + func testSemaphoreWaitWithTimeoutSuccess() async { + let semaphore = AsyncSemphore(value: 1) + + let start = Date() + let result = await semaphore.wait(timeout: .now() + 0.5) + XCTAssertEqual(result, .success) + let end = Date() + XCTAssertTrue(end.timeIntervalSince(start) <= 0.5) + } + + func testSemaphoreWaitWithTimeoutBySignal() async { + let semaphore = AsyncSemphore(value: 0) + + let expectation = XCTestExpectation(description: "Semaphore signal") + + Task { + let result = await semaphore.wait(timeout: .now() + 3.0) + XCTAssertEqual(result, .success) + expectation.fulfill() + } + + await semaphore.signal() // This should fulfill the expectation by allowing the wait to complete. + + await fulfillment(of: [expectation], timeout: 2.0) + } + + func testSemaphoreWaitWithTimeoutFailure() async { + let semaphore = AsyncSemphore(value: 0) + + let start = Date() + let result = await semaphore.wait(timeout: .now() + 0.5) + XCTAssertEqual(result, .timedOut) + let end = Date() + XCTAssertTrue(end.timeIntervalSince(start) >= 0.5) + } + + func testSemphoreWaitWithWallTimeoutBySignal() async { + let semaphore = AsyncSemphore(value: 0) + + let expectation = XCTestExpectation(description: "Semaphore signal") + + Task { + let result = await semaphore.wait(wallTimeout: .now() + 3.0) + XCTAssertEqual(result, .success) + expectation.fulfill() + } + + await semaphore.signal() // This should fulfill the expectation by allowing the wait to complete. + + await fulfillment(of: [expectation], timeout: 2.0) + } + + func testSemaphoreWaitWithWallTimeoutSuccess() async { + let semaphore = AsyncSemphore(value: 1) + + let start = Date() + let result = await semaphore.wait(wallTimeout: .now() + 0.5) + XCTAssertEqual(result, .success) + let end = Date() + XCTAssertTrue(end.timeIntervalSince(start) <= 0.5) + } + + func testSemaphoreWaitWithWallTimeoutFailure() async { + let semaphore = AsyncSemphore(value: 0) + + let start = Date() + let result = await semaphore.wait(wallTimeout: .now() + 0.5) + XCTAssertEqual(result, .timedOut) + let end = Date() + XCTAssertTrue(end.timeIntervalSince(start) >= 0.5) + } +} diff --git a/Tests/SyncStreamTests/BidirectionalAsyncStreamTests.swift b/Tests/SyncStreamTests/BidirectionalAsyncStreamTests.swift new file mode 100644 index 0000000..c753a0c --- /dev/null +++ b/Tests/SyncStreamTests/BidirectionalAsyncStreamTests.swift @@ -0,0 +1,136 @@ +// Copyright 2024-2024 Ruiyang Sun. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +@testable import SyncStream +import XCTest + +final class BidirectionalAsyncStreamTests: XCTestCase { + func testBidirectionalAsyncStreamYieldAndSend() async throws { + let expectation = expectation(description: "BidirectionalSyncStream yields and sends values correctly") + let stream = BidirectionalAsyncStream { continuation in + let first = await continuation.yield(1) + XCTAssertEqual(first, 2) + let second = await continuation.yield(3) + XCTAssertEqual(second, 4) + await continuation.return(NoneType()) + } + + do { + var receivedYields = [Int]() + let firstYield = try await stream.next() + receivedYields.append(firstYield) + let secondYield = try await stream.send(2) + receivedYields.append(secondYield) + _ = try await stream.send(4) + XCTAssertEqual(receivedYields, [1, 3]) + } catch { + XCTAssertNotNil(error as? StopIteration) + expectation.fulfill() + } + + await fulfillment(of: [expectation], timeout: 5) + } + + func testBidirectionalSyncStreamStopIteration() async { + let stream = BidirectionalAsyncStream { continuation in + await continuation.return(NoneType()) + } + + let expectation = expectation(description: "BidirectionalSyncStream return correctly") + do { + _ = try await stream.next() + } catch { + XCTAssertNotNil(error as? StopIteration) + XCTAssertNotNil((error as? StopIteration)?.value) + expectation.fulfill() + } + await fulfillment(of: [expectation], timeout: 5) + } + + func testBidirectionalSyncStreamWrongUse() async { + let stream = BidirectionalAsyncStream { continuation in + await continuation.yield(1) + } + + let expectation = expectation(description: "BidirectionalSyncStream throws WrongStreamUse") + do { + _ = try await stream.send(1) + } catch { + XCTAssertTrue(error is WrongStreamUse) + expectation.fulfill() + } + await fulfillment(of: [expectation], timeout: 5) + } + + func testNextafterStarted() async { + let stream = BidirectionalAsyncStream { continuation in + await continuation.yield(1) + } + + _ = try? await stream.next() + let expectation = expectation(description: "BidirectionalSyncStream throws WrongStreamUse") + do { + _ = try await stream.next() + } catch { + XCTAssertTrue(error is WrongStreamUse) + expectation.fulfill() + } + await fulfillment(of: [expectation], timeout: 5) + } + + func testCalledAfterFinished() async { + let stream = BidirectionalAsyncStream { continuation in + await continuation.return(NoneType()) + } + + _ = try? await stream.next() + let expectation1 = expectation(description: "BidirectionalSyncStream should return") + do { + _ = try await stream.next() + } catch { + XCTAssertTrue(error is StopIteration) + expectation1.fulfill() + } + let expectation2 = expectation(description: "BidirectionalSyncStream should return") + do { + _ = try await stream.next() + } catch { + XCTAssertTrue(error is StopIteration) + expectation2.fulfill() + } + let expectation3 = expectation(description: "BidirectionalSyncStream should return") + do { + _ = try await stream.send(1) + } catch { + XCTAssertTrue(error is StopIteration) + expectation3.fulfill() + } + + await fulfillment(of: [expectation1, expectation2, expectation3], timeout: 5) + } + + func testToSyncStream() async { + let bidStream = BidirectionalAsyncStream { continuation in + await continuation.yield(1) + await continuation.yield(2) + await continuation.return(3) + } + let stream = await bidStream.toAsyncStream() + var idx = 1 + for try await element in stream { + XCTAssertEqual(element, idx) + idx += 1 + } + } +}