Skip to content

Commit

Permalink
Add ChannelPipeline.SynchronousOperations.Position (#3065)
Browse files Browse the repository at this point in the history
Adds a new `ChannelPipeline.SynchronousOperations.Position` type to fill
a hole in the API that prevented non-sendable Channel handlers being
added at a position to the pipeline via the synchronous operations.

---------

Co-authored-by: Cory Benfield <[email protected]>
  • Loading branch information
0xTim and Lukasa authored Feb 4, 2025
1 parent 08b3b4f commit ac79370
Show file tree
Hide file tree
Showing 3 changed files with 486 additions and 27 deletions.
112 changes: 101 additions & 11 deletions Sources/NIOCore/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@ public final class ChannelPipeline: ChannelInvoker {
let future: EventLoopFuture<Void>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeCompletedFuture(self.addHandlerSync(handler, name: name, position: position))
let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
future = self.eventLoop.makeCompletedFuture(
self.addHandlerSync(handler, name: name, position: syncPosition)
)
} else {
future = self.eventLoop.submit {
try self.addHandlerSync(handler, name: name, position: position).get()
let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
try self.addHandlerSync(handler, name: name, position: syncPosition).get()
}
}

Expand All @@ -198,7 +202,7 @@ public final class ChannelPipeline: ChannelInvoker {
fileprivate func addHandlerSync(
_ handler: ChannelHandler,
name: String? = nil,
position: ChannelPipeline.Position = .last
position: ChannelPipeline.SynchronousOperations.Position = .last
) -> Result<Void, Error> {
self.eventLoop.assertInEventLoop()

Expand Down Expand Up @@ -1122,11 +1126,12 @@ extension ChannelPipeline {
_ handlers: [ChannelHandler & Sendable],
position: ChannelPipeline.Position
) -> Result<Void, Error> {
switch position {
let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
switch syncPosition {
case .first, .after:
return self._addHandlersSync(handlers.reversed(), position: position)
return self._addHandlersSync(handlers.reversed(), position: syncPosition)
case .last, .before:
return self._addHandlersSync(handlers, position: position)
return self._addHandlersSync(handlers, position: syncPosition)
}
}

Expand All @@ -1143,7 +1148,7 @@ extension ChannelPipeline {
/// - Returns: A result representing whether the handlers were added or not.
fileprivate func addHandlersSyncNotSendable(
_ handlers: [ChannelHandler],
position: ChannelPipeline.Position
position: ChannelPipeline.SynchronousOperations.Position
) -> Result<Void, Error> {
switch position {
case .first, .after:
Expand All @@ -1162,7 +1167,7 @@ extension ChannelPipeline {
/// - Returns: A result representing whether the handlers were added or not.
private func _addHandlersSync<Handlers: Sequence>(
_ handlers: Handlers,
position: ChannelPipeline.Position
position: ChannelPipeline.SynchronousOperations.Position
) -> Result<Void, Error> where Handlers.Element == ChannelHandler & Sendable {
self.eventLoop.assertInEventLoop()

Expand Down Expand Up @@ -1191,7 +1196,7 @@ extension ChannelPipeline {
/// - Returns: A result representing whether the handlers were added or not.
private func _addHandlersSyncNotSendable<Handlers: Sequence>(
_ handlers: Handlers,
position: ChannelPipeline.Position
position: ChannelPipeline.SynchronousOperations.Position
) -> Result<Void, Error> where Handlers.Element == ChannelHandler {
self.eventLoop.assertInEventLoop()

Expand Down Expand Up @@ -1238,20 +1243,67 @@ extension ChannelPipeline {
public func addHandler(
_ handler: ChannelHandler,
name: String? = nil,
position: ChannelPipeline.Position = .last
position: ChannelPipeline.SynchronousOperations.Position = .last
) throws {
try self._pipeline.addHandlerSync(handler, name: name, position: position).get()
}

/// Add a handler to the pipeline.
///
/// - Important: This *must* be called on the event loop.
/// - Parameters:
/// - handler: The handler to add.
/// - name: The name to use for the `ChannelHandler` when it's added. If no name is specified the one will be generated.
/// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`.
@available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead")
@_disfavoredOverload
public func addHandler(
_ handler: ChannelHandler,
name: String? = nil,
position: ChannelPipeline.Position = .last
) throws {
let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
try self._pipeline.addHandlerSync(handler, name: name, position: syncPosition).get()
}

/// Add an array of handlers to the pipeline.
///
/// - Important: This *must* be called on the event loop.
/// - Parameters:
/// - handlers: The handlers to add.
/// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
public func addHandlers(
_ handlers: [ChannelHandler],
position: ChannelPipeline.SynchronousOperations.Position = .last
) throws {
try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get()
}

/// Add an array of handlers to the pipeline.
///
/// - Important: This *must* be called on the event loop.
/// - Parameters:
/// - handlers: The handlers to add.
/// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
@available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead")
@_disfavoredOverload
public func addHandlers(
_ handlers: [ChannelHandler],
position: ChannelPipeline.Position = .last
) throws {
let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
try self._pipeline.addHandlersSyncNotSendable(handlers, position: syncPosition).get()
}

/// Add one or more handlers to the pipeline.
///
/// - Important: This *must* be called on the event loop.
/// - Parameters:
/// - handlers: The handlers to add.
/// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
public func addHandlers(
_ handlers: ChannelHandler...,
position: ChannelPipeline.SynchronousOperations.Position = .last
) throws {
try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get()
}
Expand All @@ -1262,11 +1314,14 @@ extension ChannelPipeline {
/// - Parameters:
/// - handlers: The handlers to add.
/// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
@available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead")
@_disfavoredOverload
public func addHandlers(
_ handlers: ChannelHandler...,
position: ChannelPipeline.Position = .last
) throws {
try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get()
let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)
try self._pipeline.addHandlersSyncNotSendable(handlers, position: syncPosition).get()
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
Expand Down Expand Up @@ -1574,6 +1629,41 @@ extension ChannelPipeline {
}
}

extension ChannelPipeline.SynchronousOperations {
/// A `Position` within the `ChannelPipeline`'s `SynchronousOperations` used to insert non-sendable handlers
/// into the `ChannelPipeline` at a certain position.
public enum Position {
/// The first `ChannelHandler` -- the front of the `ChannelPipeline`.
case first

/// The last `ChannelHandler` -- the back of the `ChannelPipeline`.
case last

/// Before the given `ChannelHandler`.
case before(ChannelHandler)

/// After the given `ChannelHandler`.
case after(ChannelHandler)

public init(_ position: ChannelPipeline.Position) {
switch position {
case .first:
self = .first
case .last:
self = .last
case .before(let handler):
self = .before(handler)
case .after(let handler):
self = .after(handler)
}
}
}
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@available(*, unavailable)
extension ChannelPipeline.SynchronousOperations.Position: Sendable {}

/// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation.
final class HeadChannelHandler: _ChannelOutboundHandler, Sendable {

Expand Down
Loading

0 comments on commit ac79370

Please sign in to comment.