From 786f06c0e3accc0c3902189454b146e7af9916ca Mon Sep 17 00:00:00 2001 From: Anton Ivanov Date: Mon, 30 Dec 2024 12:20:07 +0000 Subject: [PATCH] [home-mixer] don't offload scoredCandidateFeaturesCache scoredCandidateFeaturesCache accounts for 60% of offload tasks issued by client OffloadFilter. These tasks are very short (p999 is 50us), nobody waits for their reply (it's a side effect) and therefore can be safely executed by a Netty thread without context switch. JIRA Issues: STOR-8861 Differential Revision: https://phabricator.twitter.biz/D1190370 --- .../finagle/filter/OffloadFilter.scala | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/filter/OffloadFilter.scala b/finagle-core/src/main/scala/com/twitter/finagle/filter/OffloadFilter.scala index d14f51b5be..4f97d4b52c 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/filter/OffloadFilter.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/filter/OffloadFilter.scala @@ -65,6 +65,19 @@ object OffloadFilter { private[finagle] def server[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] = new ServerModule[Req, Rep] + private[this] val offloadsDisabledLocal = new Local[Unit] + + /** + * Disables offloads for the enclosed scope. + * The scope must return Unit, + * this ensures no callbacks can be added to the result. + */ + def withOffloadsDisabled(f: => Unit): Unit = { + offloadsDisabledLocal.let(()) { + f + } + } + final class Client[Req, Rep](pool: FuturePool, statsReceiver: StatsReceiver) extends SimpleFilter[Req, Rep] { @@ -98,21 +111,26 @@ object OffloadFilter { // You would be surprised but this can happen. Same simulations report that we lose a race in // about 1 in 1 000 000 of cases this way (0.0001%). val response = service(request) - val shifted = Promise.interrupts[Rep](response) - response.respond { t => - pool { - val startNs = System.nanoTime() - shifted.update(t) - applyTimeNs.add(System.nanoTime() - startNs) - } + if (offloadsDisabledLocal().isDefined) { + response + + } else { + val shifted = Promise.interrupts[Rep](response) + response.respond { t => + pool { + val startNs = System.nanoTime() + shifted.update(t) + applyTimeNs.add(System.nanoTime() - startNs) + } - val tracing = Trace() - if (tracing.isActivelyTracing) { - tracing.recordBinary(ClientAnnotationKey, pool.poolSize) + val tracing = Trace() + if (tracing.isActivelyTracing) { + tracing.recordBinary(ClientAnnotationKey, pool.poolSize) + } } - } - shifted + shifted + } } }