Skip to content

Commit

Permalink
Add threshold before Response compression use thread pool (#5)
Browse files Browse the repository at this point in the history
* Add threshold before using threadpool

* swift format

* Remove loop in testMultipleCompressResponse

* reduce buffer size in test
  • Loading branch information
adam-fowler authored May 24, 2021
1 parent 3135018 commit f71058e
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 63 deletions.
33 changes: 27 additions & 6 deletions Sources/HummingbirdCompression/Application+Compression.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,43 @@
import Hummingbird

extension HBApplication {
/// Indicate where the response compression tasks should be executed
public enum RequestCompressionExecutionPreference {
/// run decompression tasks on the EventLoop
case onEventLoop
/// run decompression tasks on application thread pool
case onThreadPool
}

/// Add Channel Handler for decompressing request that have Content-Encoding header set to gzip or deflate
/// - Parameter limit: Indicate the memory limit of how much to decompress to
public func addRequestDecompression(useThreadPool: Bool, limit: HTTPDecompressionLimit) {
public func addRequestDecompression(execute: RequestCompressionExecutionPreference, limit: HTTPDecompressionLimit) {
precondition(
self.configuration.enableHttpPipelining || useThreadPool == false,
self.configuration.enableHttpPipelining || execute == .onEventLoop,
"Request decompression on the thread pool requires HTTP pipelining assist to be enabled"
)
self.server.addRequestDecompression(limit: limit, threadPool: useThreadPool ? self.threadPool : nil)
self.server.addRequestDecompression(limit: limit, threadPool: execute == .onThreadPool ? self.threadPool : nil)
}

/// Indicate where the response compression tasks should be executed
public enum ResponseCompressionExecutionPreference: Equatable {
/// run all compression tasks on the EventLoop
case onEventLoop
/// run compression tasks that are larger than `threshold` bytes on application thread pool
case onThreadPool(threshold: Int)
}

/// Add Channel Handler for compressing responses where accept-encoding header indicates the client will accept compressed data
public func addResponseCompression(useThreadPool: Bool) {
public func addResponseCompression(execute: ResponseCompressionExecutionPreference) {
precondition(
self.configuration.enableHttpPipelining || useThreadPool == false,
self.configuration.enableHttpPipelining || execute == .onEventLoop,
"Response compression on the thread pool requires HTTP pipelining assist to be enabled"
)
self.server.addResponseCompression(threadPool: useThreadPool ? self.threadPool : nil)
switch execute {
case .onEventLoop:
self.server.addResponseCompression(threadPool: nil)
case .onThreadPool(let threshold):
self.server.addResponseCompression(threadPool: self.threadPool, threadPoolThreshold: threshold)
}
}
}
4 changes: 2 additions & 2 deletions Sources/HummingbirdCompression/HTTPServer+Compression.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ extension HBHTTPServer {
}

/// Add Channel Handler for compressing responses where accept-encoding header indicates the client will accept compressed data
@discardableResult public func addResponseCompression(threadPool: NIOThreadPool?) -> HBHTTPServer {
@discardableResult public func addResponseCompression(threadPool: NIOThreadPool?, threadPoolThreshold: Int = 0) -> HBHTTPServer {
return self.addChannelHandler(
HTTPResponseCompressHandler(threadPool: threadPool)
HTTPResponseCompressHandler(threadPool: threadPool, threadPoolThreshold: threadPoolThreshold)
)
}
}
86 changes: 45 additions & 41 deletions Sources/HummingbirdCompression/ResponseCompression.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ class HTTPResponseCompressHandler: ChannelDuplexHandler, RemovableChannelHandler

enum State {
case head(HTTPResponseHead)
case body(NIOCompressor?)
case body(NIOCompressor, Bool)
case uncompressedBody
case idle
}

var acceptQueue: CircularBuffer<[Substring]>
let compressorWindow: ByteBuffer
var state: State
let threadPool: NIOThreadPool?
let threadPoolThreshold: Int
var pendingPromise: EventLoopPromise<Void>?
var queue: TaskQueue<Void>!

init(threadPool: NIOThreadPool?, windowSize: Int = 32 * 1024) {
init(threadPool: NIOThreadPool?, threadPoolThreshold: Int, windowSize: Int = 32 * 1024) {
self.state = .idle
self.threadPool = threadPool
self.threadPoolThreshold = threadPoolThreshold
self.acceptQueue = .init(initialCapacity: 4)
self.compressorWindow = ByteBufferAllocator().buffer(capacity: windowSize)
}
Expand Down Expand Up @@ -86,86 +89,87 @@ class HTTPResponseCompressHandler: ChannelDuplexHandler, RemovableChannelHandler
compressor.window = self.compressorWindow
do {
try compressor.startStream()

let useThreadPool = part.readableBytes > self.threadPoolThreshold
// edit header, removing content-length and adding content-encoding
head.headers.replaceOrAdd(name: "content-encoding", value: compression.name)
head.headers.remove(name: "content-length")
context.write(wrapOutboundOut(.head(head)), promise: nil)
if let threadPool = self.threadPool {
if useThreadPool, let threadPool = self.threadPool {
self.state = .body(compressor, true)
self.queue.submitTask {
threadPool.runIfActive(eventLoop: context.eventLoop) {
self.writeBuffer(context: context, part: part, compressor: compressor, promise: nil)
}
}
} else {
self.state = .body(compressor, false)
self.writeBuffer(context: context, part: part, compressor: compressor, promise: nil)
}
self.state = .body(compressor)
} catch {
// if compressor failed to start stream then output uncompressed data
self.state = .body(nil)
self.state = .uncompressedBody
context.write(wrapOutboundOut(.head(head)), promise: nil)
context.write(data, promise: nil)
}
} else {
self.state = .body(nil)
self.state = .uncompressedBody
context.write(wrapOutboundOut(.head(head)), promise: nil)
context.write(data, promise: nil)
}

case (.body(let part), .body(let compressor)):
if let compressor = compressor {
if let threadPool = self.threadPool {
self.queue.submitTask {
threadPool.runIfActive(eventLoop: context.eventLoop) {
self.writeBuffer(context: context, part: part, compressor: compressor, promise: nil)
}
case (.body(let part), .body(let compressor, let useThreadPool)):
if useThreadPool, let threadPool = self.threadPool {
self.queue.submitTask {
threadPool.runIfActive(eventLoop: context.eventLoop) {
self.writeBuffer(context: context, part: part, compressor: compressor, promise: nil)
}
} else {
self.writeBuffer(context: context, part: part, compressor: compressor, promise: nil)
}
} else {
context.write(data, promise: nil)
self.writeBuffer(context: context, part: part, compressor: compressor, promise: nil)
}

case (.body, .uncompressedBody):
context.write(data, promise: nil)

case (.end, .head(let head)):
self.state = .idle
context.write(wrapOutboundOut(.head(head)), promise: nil)
context.write(data, promise: self.pendingPromise)
self.pendingPromise = nil

case (.end, .body(let compressor)):
case (.end, .body(let compressor, let useThreadPool)):
self.state = .idle
if let compressor = compressor {
let pendingPromise = self.pendingPromise
if let threadPool = self.threadPool {
self.queue.submitTask {
threadPool.runIfActive(eventLoop: context.eventLoop) {
do {
self.finalizeStream(context: context, compressor: compressor, promise: nil)
try compressor.finishStream()
} catch {
self.pendingPromise?.fail(error)
}
context.eventLoop.execute {
context.writeAndFlush(data, promise: pendingPromise)
}
let pendingPromise = self.pendingPromise
if useThreadPool, let threadPool = self.threadPool {
self.queue.submitTask {
threadPool.runIfActive(eventLoop: context.eventLoop) {
do {
self.finalizeStream(context: context, compressor: compressor, promise: nil)
try compressor.finishStream()
} catch {
self.pendingPromise?.fail(error)
}
context.eventLoop.execute {
context.writeAndFlush(data, promise: pendingPromise)
}
}
} else {
do {
self.finalizeStream(context: context, compressor: compressor, promise: nil)
try compressor.finishStream()
} catch {
self.pendingPromise?.fail(error)
}
context.writeAndFlush(data, promise: pendingPromise)
}
} else {
context.write(data, promise: self.pendingPromise)
do {
self.finalizeStream(context: context, compressor: compressor, promise: nil)
try compressor.finishStream()
} catch {
self.pendingPromise?.fail(error)
}
context.writeAndFlush(data, promise: pendingPromise)
}
self.pendingPromise = nil

case (.end, .uncompressedBody):
self.state = .idle
context.write(data, promise: self.pendingPromise)
self.pendingPromise = nil

default:
assertionFailure("Shouldn't get here")
context.close(promise: nil)
Expand Down
29 changes: 15 additions & 14 deletions Tests/HummingbirdCompressionTests/CompressionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addResponseCompression(useThreadPool: true)
app.addResponseCompression(execute: .onThreadPool(threshold: 32000))
try app.XCTStart()
defer { app.XCTStop() }

let testBuffer = self.randomBuffer(size: 261_335)
let testBuffer = self.randomBuffer(size: Int.random(in: 64000...261_335))
app.XCTExecute(uri: "/echo", method: .GET, headers: ["accept-encoding": "gzip"], body: testBuffer) { response in
var body = response.body
let uncompressed = try body?.decompress(with: .gzip)
Expand All @@ -52,7 +52,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addResponseCompression(useThreadPool: false)
app.addResponseCompression(execute: .onEventLoop)
try app.XCTStart()
defer { app.XCTStop() }

Expand All @@ -70,11 +70,12 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addResponseCompression(useThreadPool: true)
app.addResponseCompression(execute: .onThreadPool(threshold: 64000))
app.middleware.add(HBLogRequestsMiddleware(.info))
try app.XCTStart()
defer { app.XCTStop() }

let buffers = (0..<32).map { _ in self.randomBuffer(size: Int.random(in: 16...512_000)) }
let buffers = (0..<32).map { _ in self.randomBuffer(size: Int.random(in: 16...256_000)) }
let futures: [EventLoopFuture<Void>] = buffers.map { buffer in
if Bool.random() == true {
return app.xct.execute(uri: "/echo", method: .GET, headers: ["accept-encoding": "gzip"], body: buffer).flatMapThrowing { response in
Expand All @@ -97,7 +98,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addResponseCompression(useThreadPool: false)
app.addResponseCompression(execute: .onEventLoop)
try app.XCTStart()
defer { app.XCTStop() }

Expand All @@ -124,7 +125,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addRequestDecompression(useThreadPool: true, limit: .none)
app.addRequestDecompression(execute: .onThreadPool, limit: .none)
try app.XCTStart()
defer { app.XCTStop() }

Expand All @@ -142,7 +143,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addRequestDecompression(useThreadPool: false, limit: .none)
app.addRequestDecompression(execute: .onEventLoop, limit: .none)
try app.XCTStart()
defer { app.XCTStop() }

Expand All @@ -160,7 +161,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addRequestDecompression(useThreadPool: true, limit: .none)
app.addRequestDecompression(execute: .onThreadPool, limit: .none)
try app.start()
defer { app.stop() }

Expand Down Expand Up @@ -196,7 +197,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addRequestDecompression(useThreadPool: true, limit: .none)
app.addRequestDecompression(execute: .onThreadPool, limit: .none)
try app.XCTStart()
defer { app.XCTStop() }

Expand All @@ -222,7 +223,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addRequestDecompression(useThreadPool: false, limit: .none)
app.addRequestDecompression(execute: .onEventLoop, limit: .none)
try app.XCTStart()
defer { app.XCTStop() }

Expand All @@ -248,7 +249,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addRequestDecompression(useThreadPool: true, limit: .none)
app.addRequestDecompression(execute: .onThreadPool, limit: .none)
try app.XCTStart()
defer { app.XCTStop() }

Expand All @@ -264,7 +265,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addRequestDecompression(useThreadPool: true, limit: .size(50000))
app.addRequestDecompression(execute: .onThreadPool, limit: .size(50000))
try app.XCTStart()
defer { app.XCTStop() }

Expand All @@ -282,7 +283,7 @@ class HummingBirdCompressionTests: XCTestCase {
let body: HBResponseBody = request.body.buffer.map { .byteBuffer($0) } ?? .empty
return .init(status: .ok, headers: [:], body: body)
}
app.addRequestDecompression(useThreadPool: true, limit: .ratio(3))
app.addRequestDecompression(execute: .onThreadPool, limit: .ratio(3))
try app.XCTStart()
defer { app.XCTStop() }

Expand Down

0 comments on commit f71058e

Please sign in to comment.