Skip to content

Commit

Permalink
finagle-core: Add low priority offload executor
Browse files Browse the repository at this point in the history
Problem
In latency-critical applications, there may be tasks that are not
essential for serving a request, such as publishing statistics. However, these
tasks can be numerous and even demanding in terms of computational resources.
They can saturate the entire offload pool queue and cause delays in processing
user requests.

Solution
A solution to this problem could be a low-critical pool where such
tasks can be placed. It is a separate pool with its own queue and dedicated
processing threads. This change also introduces primitives for client code to
allow them to publish low-priority tasks to this pool.

Differential Revision: https://phabricator.twitter.biz/D1189064
  • Loading branch information
Ivan Gorbachev authored and jenkins committed Jan 14, 2025
1 parent c8582af commit 10fcab3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import java.util.concurrent.TimeUnit
private[twitter] class DefaultThreadPoolExecutor(
poolSize: Int,
maxQueueLen: Int,
stats: StatsReceiver)
extends ThreadPoolExecutor(
stats: StatsReceiver,
name: String = "finagle/offload",
) extends ThreadPoolExecutor(
poolSize /*corePoolSize*/,
poolSize /*maximumPoolSize*/,
0L /*keepAliveTime*/,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](maxQueueLen) /*workQueue*/,
new NamedPoolThreadFactory("finagle/offload", makeDaemons = true) /*threadFactory*/,
new NamedPoolThreadFactory(name, makeDaemons = true) /*threadFactory*/,
new RunsOnNettyThread(stats.counter("not_offloaded_tasks")))

private object DefaultThreadPoolExecutor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.util.DefaultTimer
import com.twitter.util.Duration
import com.twitter.util.ExecutorServiceFuturePool
import com.twitter.util.Future
import com.twitter.util.FuturePool
import com.twitter.util.Local
import java.util.concurrent.ExecutorService

/**
Expand All @@ -29,6 +31,19 @@ final class OffloadFuturePool(executor: ExecutorService, stats: StatsReceiver)
val hasAdmissionControl: Boolean = admissionControl.isDefined
}

final class PriorityOffloadFuturePool(
normalPriorityPool: FuturePool,
lowPriorityFuturePool: FuturePool)
extends FuturePool {
override def apply[T](f: => T): Future[T] = {
if (OffloadFuturePool.lowPriorityLocal().isEmpty) {
normalPriorityPool.apply(f)
} else {
lowPriorityFuturePool.apply(f)
}
}
}

object OffloadFuturePool {

/**
Expand All @@ -47,18 +62,40 @@ object OffloadFuturePool {
numWorkers.get.orElse(if (auto()) Some(com.twitter.jvm.numProcs().ceil.toInt) else None)
val maxQueueLen = maxQueueLength()

workers.map { threads =>
val normalPriorityPool = workers.map { threads =>
val stats = FinagleStatsReceiver.scope("offload_pool")
val pool = new OffloadFuturePool(OffloadThreadPool(threads, maxQueueLen, stats), stats)
createPool(OffloadThreadPool(threads, maxQueueLen, stats), stats)
}

val lowPriorityPool = lowPriorityNumWorkers.get.map { threads =>
val stats = FinagleStatsReceiver.scope("low_priority_offload_pool")
createPool(
new DefaultThreadPoolExecutor(threads, maxQueueLen, stats, "finagle/low-priority-offload"),
stats,
)
}

// Start sampling the offload delay if the interval isn't Duration.Top.
if (statsSampleInterval().isFinite && statsSampleInterval() > Duration.Zero) {
val sampleStats = new SampleQueueStats(pool, stats, DefaultTimer)
sampleStats()
normalPriorityPool.map { normalPool =>
lowPriorityPool match {
case Some(lowPriorityPool) => new PriorityOffloadFuturePool(normalPool, lowPriorityPool)
case None => normalPool
}
}
}

pool
private[this] def createPool(
executor: ExecutorService,
stats: StatsReceiver
): FuturePool = {
val pool = new OffloadFuturePool(executor, stats)

// Start sampling the offload delay if the interval isn't Duration.Top.
if (statsSampleInterval().isFinite && statsSampleInterval() > Duration.Zero) {
val sampleStats = new SampleQueueStats(pool, stats, DefaultTimer)
sampleStats()
}

pool
}

/**
Expand All @@ -71,4 +108,10 @@ object OffloadFuturePool {
case Some(pool) => pool
case None => FuturePool.unboundedPool
}

val lowPriorityLocal = new Local[Unit]

def withLowPriorityOffloads[T](fn: => T): T = {
lowPriorityLocal.let(())(fn)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.twitter.finagle.offload

import com.twitter.app.GlobalFlag

object lowPriorityNumWorkers
extends GlobalFlag[Int](
"""Experimental flag. Enables the low priority offload pool using a thread pool with the specified number of threads.
| When this flag is greater that zero, the execution of low priority tasks happen in an isolated pool.
|""".stripMargin)

0 comments on commit 10fcab3

Please sign in to comment.