Skip to content

Commit

Permalink
Make _NIOFileSystem strict concurrency compatible (#3098)
Browse files Browse the repository at this point in the history
Motivation:

We're continuing out Strict Concurrency journey, making sure users of
NIO can write data-race-free code.

Modifications:

- Added some missing Sendable annotations in NIOAsyncSequenceProducer
- Made BufferedStream unconditionally Sendable, and required its Element
type to also be Sendable.

    The prior constraint wasn't actually correct. We always
    behaved as though the element types were Sendable, by passing
    them into continuations. This cleans things up.
- Made AnyAsyncSequence Sendable, which it needs to be.
- Made BufferedOrAnyStream Sendable, which it needs to be.
- Made DirectoryEntries explicitly Sendable, which it was.
- Made DirectoryEntries.Batched explicitly Sendable.

Result:

Better concurrency-safety
  • Loading branch information
Lukasa authored Feb 3, 2025
1 parent 2f1c780 commit 08b3b4f
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ let package = Package(
path: "Sources/NIOFileSystem",
exclude: includePrivacyManifest ? [] : ["PrivacyInfo.xcprivacy"],
resources: includePrivacyManifest ? [.copy("PrivacyInfo.xcprivacy")] : [],
swiftSettings: [
swiftSettings: strictConcurrencySettings + [
.define("ENABLE_MOCKING", .when(configuration: .debug))
]
),
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public struct NIOAsyncSequenceProducer<
/// to yield new elements to the sequence.
/// 2. The ``sequence`` which is the actual `AsyncSequence` and
/// should be passed to the consumer.
public struct NewSequence {
public struct NewSequence: Sendable {
/// The source of the ``NIOAsyncSequenceProducer`` used to yield and finish.
public let source: Source
/// The actual sequence which should be passed to the consumer.
Expand Down Expand Up @@ -268,7 +268,7 @@ extension NIOAsyncSequenceProducer {
}

/// The result of a call to ``NIOAsyncSequenceProducer/Source/yield(_:)``.
public enum YieldResult: Hashable {
public enum YieldResult: Hashable, Sendable {
/// Indicates that the caller should produce more elements for now. The delegate's ``NIOAsyncSequenceProducerDelegate/produceMore()``
/// will **NOT** get called, since the demand was already signalled through this ``NIOAsyncSequenceProducer/Source/YieldResult``.
case produceMore
Expand Down
10 changes: 6 additions & 4 deletions Sources/NIOFileSystem/DirectoryEntries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import SystemPackage

/// An `AsyncSequence` of entries in a directory.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct DirectoryEntries: AsyncSequence {
public struct DirectoryEntries: AsyncSequence, Sendable {
public typealias AsyncIterator = DirectoryIterator
public typealias Element = DirectoryEntry

Expand All @@ -35,7 +35,8 @@ public struct DirectoryEntries: AsyncSequence {

/// Creates a ``DirectoryEntries`` sequence by wrapping an `AsyncSequence` of _batches_ of
/// directory entries.
public init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Batched.Element {
@preconcurrency
public init<S: AsyncSequence & Sendable>(wrapping sequence: S) where S.Element == Batched.Element {
self.batchedSequence = Batched(wrapping: sequence)
}

Expand Down Expand Up @@ -85,15 +86,16 @@ extension DirectoryEntries {
/// The ``Batched`` sequence uses `Array<DirectoryEntry>` as its element type rather
/// than `DirectoryEntry`. This can enable better performance by reducing the number of
/// executor hops at the cost of ease-of-use.
public struct Batched: AsyncSequence {
public struct Batched: AsyncSequence, Sendable {
public typealias AsyncIterator = BatchedIterator
public typealias Element = [DirectoryEntry]

private let stream: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>

/// Creates a ``DirectoryEntries/Batched`` sequence by wrapping an `AsyncSequence`
/// of directory entry batches.
public init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Element {
@preconcurrency
public init<S: AsyncSequence & Sendable>(wrapping sequence: S) where S.Element == Element {
self.stream = BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>(wrapping: sequence)
}

Expand Down
10 changes: 5 additions & 5 deletions Sources/NIOFileSystem/Internal/BufferedOrAnyStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import NIOCore

/// Wraps a ``NIOThrowingAsyncSequenceProducer<Element>`` or ``AnyAsyncSequence<Element>``.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal enum BufferedOrAnyStream<Element, Delegate: NIOAsyncSequenceProducerDelegate> {
internal enum BufferedOrAnyStream<Element: Sendable, Delegate: NIOAsyncSequenceProducerDelegate>: Sendable {
typealias AsyncSequenceProducer = NIOThrowingAsyncSequenceProducer<
Element, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, Delegate
>
Expand All @@ -28,7 +28,7 @@ internal enum BufferedOrAnyStream<Element, Delegate: NIOAsyncSequenceProducerDel
self = .nioThrowingAsyncSequenceProducer(stream)
}

internal init<S: AsyncSequence>(wrapping stream: S) where S.Element == Element {
internal init<S: AsyncSequence & Sendable>(wrapping stream: S) where S.Element == Element {
self = .anyAsyncSequence(AnyAsyncSequence(wrapping: stream))
}

Expand Down Expand Up @@ -69,10 +69,10 @@ internal enum BufferedOrAnyStream<Element, Delegate: NIOAsyncSequenceProducerDel
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal struct AnyAsyncSequence<Element>: AsyncSequence {
private let _makeAsyncIterator: () -> AsyncIterator
internal struct AnyAsyncSequence<Element>: AsyncSequence, Sendable {
private let _makeAsyncIterator: @Sendable () -> AsyncIterator

internal init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Element {
internal init<S: AsyncSequence & Sendable>(wrapping sequence: S) where S.Element == Element {
self._makeAsyncIterator = {
AsyncIterator(wrapping: sequence.makeAsyncIterator())
}
Expand Down
12 changes: 6 additions & 6 deletions Sources/NIOFileSystem/Internal/BufferedStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ import NIOConcurrencyHelpers
/// }
///
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal struct BufferedStream<Element> {
internal struct BufferedStream<Element: Sendable> {
final class _Backing: Sendable {
let storage: _BackPressuredStorage

Expand Down Expand Up @@ -199,7 +199,7 @@ extension BufferedStream: AsyncSequence {
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension BufferedStream: Sendable where Element: Sendable {}
extension BufferedStream: Sendable {}

internal struct _ManagedCriticalState<State>: @unchecked Sendable {
let lock: NIOLockedValueBox<State>
Expand Down Expand Up @@ -303,7 +303,7 @@ extension BufferedStream {
/// - Parameter sequence: The elements to write to the asynchronous stream.
/// - Returns: The result that indicates if more elements should be produced at this time.
internal func write<S>(contentsOf sequence: S) throws -> WriteResult
where Element == S.Element, S: Sequence {
where Element == S.Element, S: Sequence, Element: Sendable {
try self._backing.storage.write(contentsOf: sequence)
}

Expand Down Expand Up @@ -363,7 +363,7 @@ extension BufferedStream {
internal func write<S>(
contentsOf sequence: S,
onProduceMore: @escaping @Sendable (Result<Void, Error>) -> Void
) where Element == S.Element, S: Sequence {
) where Element == S.Element, S: Sequence, Element: Sendable {
do {
let writeResult = try self.write(contentsOf: sequence)

Expand Down Expand Up @@ -407,7 +407,7 @@ extension BufferedStream {
/// - Parameters:
/// - sequence: The elements to write to the asynchronous stream.
internal func write<S>(contentsOf sequence: S) async throws
where Element == S.Element, S: Sequence {
where Element == S.Element, S: Sequence, Element: Sendable {
let writeResult = try { try self.write(contentsOf: sequence) }()

switch writeResult {
Expand Down Expand Up @@ -458,7 +458,7 @@ extension BufferedStream {
/// - Parameters:
/// - sequence: The elements to write to the asynchronous stream.
internal func write<S>(contentsOf sequence: S) async throws
where Element == S.Element, S: AsyncSequence {
where Element == S.Element, S: AsyncSequence, Element: Sendable {
for try await element in sequence {
try await self.write(contentsOf: CollectionOfOne(element))
}
Expand Down

0 comments on commit 08b3b4f

Please sign in to comment.