Skip to content

Commit

Permalink
Merge branch 'main' into ff-use-mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
sebsto authored Oct 8, 2024
2 parents b299131 + 4295e51 commit 3eda467
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 45 deletions.
3 changes: 1 addition & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ let package = Package(
.byName(name: "AWSLambdaRuntimeCore"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOFoundationCompat", package: "swift-nio"),
],
swiftSettings: [.swiftLanguageMode(.v5)]
]
),
.target(
name: "AWSLambdaRuntimeCore",
Expand Down
15 changes: 8 additions & 7 deletions Sources/AWSLambdaRuntime/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ extension LambdaCodableAdapter {

extension LambdaRuntime {
/// Initialize an instance with a `LambdaHandler` defined in the form of a closure **with a non-`Void` return type**.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic `Output` into a `ByteBuffer`. `JSONEncoder()` used as default.
/// - Parameter decoder: The decoder object that will be used to decode the incoming `ByteBuffer` event into the generic `Event` type. `JSONDecoder()` used as default.
/// - Parameters:
/// - decoder: The decoder object that will be used to decode the incoming `ByteBuffer` event into the generic `Event` type. `JSONDecoder()` used as default.
/// - encoder: The encoder object that will be used to encode the generic `Output` into a `ByteBuffer`. `JSONEncoder()` used as default.
/// - body: The handler in the form of a closure.
public convenience init<Event: Decodable, Output>(
body: @escaping (Event, LambdaContext) async throws -> Output,
decoder: JSONDecoder = JSONDecoder(),
encoder: JSONEncoder = JSONEncoder(),
decoder: JSONDecoder = JSONDecoder()
body: sending @escaping (Event, LambdaContext) async throws -> Output
)
where
Handler == LambdaCodableAdapter<
Expand All @@ -97,8 +98,8 @@ extension LambdaRuntime {
/// - Parameter body: The handler in the form of a closure.
/// - Parameter decoder: The decoder object that will be used to decode the incoming `ByteBuffer` event into the generic `Event` type. `JSONDecoder()` used as default.
public convenience init<Event: Decodable>(
body: @escaping (Event, LambdaContext) async throws -> Void,
decoder: JSONDecoder = JSONDecoder()
decoder: JSONDecoder = JSONDecoder(),
body: sending @escaping (Event, LambdaContext) async throws -> Void
)
where
Handler == LambdaCodableAdapter<
Expand Down
2 changes: 1 addition & 1 deletion Sources/AWSLambdaRuntimeCore/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public enum Lambda {
}

/// The default EventLoop the Lambda is scheduled on.
public static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
public static let defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
}

// MARK: - Public API
Expand Down
4 changes: 2 additions & 2 deletions Sources/AWSLambdaRuntimeCore/LambdaHandlers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ extension LambdaRuntime {
>(
encoder: Encoder,
decoder: Decoder,
body: @escaping (Event, LambdaContext) async throws -> Output
body: sending @escaping (Event, LambdaContext) async throws -> Output
)
where
Handler == LambdaCodableAdapter<
Expand All @@ -220,7 +220,7 @@ extension LambdaRuntime {
/// - body: The handler in the form of a closure.
public convenience init<Event: Decodable, Decoder: LambdaEventDecoder>(
decoder: Decoder,
body: @escaping (Event, LambdaContext) async throws -> Void
body: sending @escaping (Event, LambdaContext) async throws -> Void
)
where
Handler == LambdaCodableAdapter<
Expand Down
7 changes: 4 additions & 3 deletions Sources/AWSLambdaRuntimeCore/LambdaRequestID.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,16 @@ struct LambdaRequestID {
}

/// thread safe secure random number generator.
private static var generator = SystemRandomNumberGenerator()
private static func generateRandom() -> Self {
var generator = SystemRandomNumberGenerator()

var _uuid: uuid_t = LambdaRequestID.null
// https://tools.ietf.org/html/rfc4122#page-14
// o Set all the other bits to randomly (or pseudo-randomly) chosen
// values.
withUnsafeMutableBytes(of: &_uuid) { ptr in
ptr.storeBytes(of: Self.generator.next(), toByteOffset: 0, as: UInt64.self)
ptr.storeBytes(of: Self.generator.next(), toByteOffset: 8, as: UInt64.self)
ptr.storeBytes(of: generator.next(), toByteOffset: 0, as: UInt64.self)
ptr.storeBytes(of: generator.next(), toByteOffset: 8, as: UInt64.self)
}

// o Set the four most significant bits (bits 12 through 15) of the
Expand Down
73 changes: 43 additions & 30 deletions Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func withMockServer<Result>(
_ body: (_ port: Int) async throws -> Result
) async throws -> Result {
let eventLoopGroup = NIOSingletons.posixEventLoopGroup
let server = MockLambdaServer(behavior: behaviour, port: port, keepAlive: keepAlive)
let port = try await server.start().get()
let server = MockLambdaServer(behavior: behaviour, port: port, keepAlive: keepAlive, eventLoopGroup: eventLoopGroup)
let port = try await server.start()

let result: Swift.Result<Result, any Error>
do {
Expand All @@ -37,13 +37,13 @@ func withMockServer<Result>(
result = .failure(error)
}

try? await server.stop().get()
try? await server.stop()
return try result.get()
}

final class MockLambdaServer {
final class MockLambdaServer<Behavior: LambdaServerBehavior> {
private let logger = Logger(label: "MockLambdaServer")
private let behavior: LambdaServerBehavior
private let behavior: Behavior
private let host: String
private let port: Int
private let keepAlive: Bool
Expand All @@ -52,7 +52,13 @@ final class MockLambdaServer {
private var channel: Channel?
private var shutdown = false

init(behavior: LambdaServerBehavior, host: String = "127.0.0.1", port: Int = 7000, keepAlive: Bool = true) {
init(
behavior: Behavior,
host: String = "127.0.0.1",
port: Int = 7000,
keepAlive: Bool = true,
eventLoopGroup: MultiThreadedEventLoopGroup
) {
self.group = NIOSingletons.posixEventLoopGroup
self.behavior = behavior
self.host = host
Expand All @@ -64,39 +70,41 @@ final class MockLambdaServer {
assert(shutdown)
}

func start() -> EventLoopFuture<Int> {
let bootstrap = ServerBootstrap(group: group)
fileprivate func start() async throws -> Int {
let logger = self.logger
let keepAlive = self.keepAlive
let behavior = self.behavior

let channel = try await ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
do {
try channel.pipeline.syncOperations.configureHTTPServerPipeline(withErrorHandling: true)
try channel.pipeline.syncOperations.addHandler(
HTTPHandler(logger: self.logger, keepAlive: self.keepAlive, behavior: self.behavior)
HTTPHandler(logger: logger, keepAlive: keepAlive, behavior: behavior)
)
return channel.eventLoop.makeSucceededVoidFuture()
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
}
return bootstrap.bind(host: self.host, port: self.port).flatMap { channel in
self.channel = channel
guard let localAddress = channel.localAddress else {
return channel.eventLoop.makeFailedFuture(ServerError.cantBind)
}
self.logger.info("\(self) started and listening on \(localAddress)")
return channel.eventLoop.makeSucceededFuture(localAddress.port!)
.bind(host: self.host, port: self.port)
.get()

self.channel = channel
guard let localAddress = channel.localAddress else {
throw ServerError.cantBind
}
self.logger.info("\(self) started and listening on \(localAddress)")
return localAddress.port!
}

func stop() -> EventLoopFuture<Void> {
fileprivate func stop() async throws {
self.logger.info("stopping \(self)")
guard let channel = self.channel else {
return self.group.next().makeFailedFuture(ServerError.notReady)
}
return channel.close().always { _ in
self.shutdown = true
self.logger.info("\(self) stopped")
}
let channel = self.channel!
try? await channel.close().get()
self.shutdown = true
self.logger.info("\(self) stopped")
}
}

Expand Down Expand Up @@ -221,32 +229,37 @@ final class HTTPHandler: ChannelInboundHandler {
}
let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: status, headers: headers)

let logger = self.logger
context.write(wrapOutboundOut(.head(head))).whenFailure { error in
self.logger.error("\(self) write error \(error)")
logger.error("write error \(error)")
}

if let b = body {
var buffer = context.channel.allocator.buffer(capacity: b.utf8.count)
buffer.writeString(b)
context.write(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenFailure { error in
self.logger.error("\(self) write error \(error)")
logger.error("write error \(error)")
}
}

let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)

let keepAlive = self.keepAlive
context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in
if case .failure(let error) = result {
self.logger.error("\(self) write error \(error)")
logger.error("write error \(error)")
}
if !self.keepAlive {
if !keepAlive {
let context = loopBoundContext.value
context.close().whenFailure { error in
self.logger.error("\(self) close error \(error)")
logger.error("close error \(error)")
}
}
}
}
}

protocol LambdaServerBehavior {
protocol LambdaServerBehavior: Sendable {
func getInvocation() -> GetInvocationResult
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError>
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError>
Expand Down

0 comments on commit 3eda467

Please sign in to comment.