diff --git a/finagle-core/src/main/scala/com/twitter/finagle/filter/SLOStatsFilter.scala b/finagle-core/src/main/scala/com/twitter/finagle/filter/SLOStatsFilter.scala new file mode 100644 index 0000000000..29cf5096f3 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/filter/SLOStatsFilter.scala @@ -0,0 +1,126 @@ +package com.twitter.finagle.filter + +import com.twitter.finagle.FailureFlags +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.param +import com.twitter.finagle.Service +import com.twitter.finagle.ServiceFactory +import com.twitter.finagle.SimpleFilter +import com.twitter.finagle.Stack +import com.twitter.finagle.Stackable +import com.twitter.finagle.service.ReqRep +import com.twitter.finagle.service.ResponseClass +import com.twitter.finagle.service.ResponseClassifier +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.Stopwatch +import com.twitter.util.Throw +import com.twitter.util.Try + +private[twitter] object SLOStatsFilter { + val role = Stack.Role("SLOStats") + + sealed trait Param { + def mk(): (Param, Stack.Param[Param]) = (this, Param.param) + } + + object Param { + case class Configured( + latency: Duration) + extends Param + + case object Disabled extends Param + + implicit val param: Stack.Param[SLOStatsFilter.Param] = Stack.Param(Disabled) + } + + val Disabled: Param = Param.Disabled + + def configured(latency: Duration): Param = { + Param.Configured(latency) + } + + def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] = + new Stack.Module3[param.Stats, param.ResponseClassifier, Param, ServiceFactory[Req, Rep]] { + val role = SLOStatsFilter.role + val description = + "Record number of SLO violations of underlying service" + override def make( + _stats: param.Stats, + _responseClassifier: param.ResponseClassifier, + params: Param, + next: ServiceFactory[Req, Rep] + ): ServiceFactory[Req, Rep] = { + params match { + case Param.Disabled => next + case Param.Configured(latency) => + val param.Stats(statsReceiver) = _stats + val param.ResponseClassifier(responseClassifier) = _responseClassifier + new SLOStatsFilter( + statsReceiver.scope("slo"), + latency.inNanoseconds, + responseClassifier).andThen(next) + } + } + } +} + +/** + * A [[com.twitter.finagle.Filter]] that records the number of slo violations from the underlying + * service. A request is classified as violating the slo if any of the following occur: + * - The response returns after `latency` duration has elapsed + * - The response is classified as a failure according to the ResponseClassifier (but is not + * ignorable or interrupted) + */ +private[finagle] class SLOStatsFilter[Req, Rep]( + statsReceiver: StatsReceiver, + latencyNanos: Long, + responseClassifier: ResponseClassifier, + nowNanos: () => Long = Stopwatch.systemNanos) + extends SimpleFilter[Req, Rep] { + + private[this] val violationsScope = statsReceiver.scope("violations") + private[this] val violationsTotalCounter = violationsScope.counter("total") + private[this] val violationsFailuresCounter = violationsScope.counter("failures") + private[this] val violationsLatencyCounter = violationsScope.counter("latency") + + def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = { + val start = nowNanos() + service(request).respond { response => + if (!isIgnorable(response)) { + var violated = false + if (nowNanos() - start > latencyNanos) { + violated = true + violationsLatencyCounter.incr() + } + + if (isFailure(request, response)) { + violated = true + violationsFailuresCounter.incr() + } + + if (violated) { + violationsTotalCounter.incr() + } + } + } + } + + private[this] def isFailure(request: Req, response: Try[Rep]): Boolean = { + responseClassifier + .applyOrElse(ReqRep(request, response), ResponseClassifier.Default) match { + case ResponseClass.Failed(_) if !isInterrupted(response) => true + case _ => false + } + } + + private[this] def isIgnorable(response: Try[Rep]): Boolean = response match { + case Throw(f: FailureFlags[_]) => f.isFlagged(FailureFlags.Ignorable) + case _ => false + } + + private[this] def isInterrupted(response: Try[Rep]): Boolean = response match { + case Throw(f: FailureFlags[_]) => f.isFlagged(FailureFlags.Interrupted) + case _ => false + } +} diff --git a/finagle-core/src/test/scala/com/twitter/finagle/filter/SLOStatsFilterTest.scala b/finagle-core/src/test/scala/com/twitter/finagle/filter/SLOStatsFilterTest.scala new file mode 100644 index 0000000000..5153a0d4e3 --- /dev/null +++ b/finagle-core/src/test/scala/com/twitter/finagle/filter/SLOStatsFilterTest.scala @@ -0,0 +1,191 @@ +package com.twitter.finagle.filter + +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.Failure +import com.twitter.finagle.FailureFlags +import com.twitter.finagle.Service +import com.twitter.finagle.ServiceFactory +import com.twitter.finagle.Stack +import com.twitter.finagle.param.Stats +import com.twitter.finagle.service.ReqRep +import com.twitter.finagle.service.ResponseClass +import com.twitter.finagle.service.ResponseClassifier +import com.twitter.finagle.service.ResponseClassifier.named +import com.twitter.finagle.stats.InMemoryStatsReceiver +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.util.Await +import com.twitter.util.Future +import com.twitter.util.MockTimer +import com.twitter.util.Return +import com.twitter.util.Time +import org.scalatest.funsuite.AnyFunSuite + +class SLOStatsFilterTest extends AnyFunSuite { + + private[this] val unitSvc: Service[Unit, Unit] = Service.mk { _ => + Future.Unit + } + + private[this] def mkService( + statsReceiver: StatsReceiver, + params: Stack.Params, + underlying: Service[Unit, Unit] = unitSvc + ): Service[Unit, Unit] = { + val factory = SLOStatsFilter.module + .toStack(Stack.leaf(Stack.Role("test"), ServiceFactory.const(underlying))) + .make(params + Stats(statsReceiver)) + Await.result(factory()) + } + + test("Does not create the filter if not configured") { + val statsReceiver = new InMemoryStatsReceiver + val service = mkService(statsReceiver, Stack.Params.empty) + Await.result(service(())) + assert(!statsReceiver.counters.contains(Seq("slo", "violations", "total"))) + } + + test("Does not create the filter if disabled") { + val statsReceiver = new InMemoryStatsReceiver + val service = mkService(statsReceiver, Stack.Params.empty + SLOStatsFilter.Disabled) + Await.result(service(())) + + assert(!statsReceiver.counters.contains(Seq("slo", "violations", "total"))) + } + + test("Creates the filter if configured") { + val statsReceiver = new InMemoryStatsReceiver + val service = + mkService(statsReceiver, Stack.Params.empty + SLOStatsFilter.configured(5.seconds)) + Await.result(service(())) + + assert(statsReceiver.counters.contains(Seq("slo", "violations", "total"))) + } + + test("Records latency violation if latency is violated") { + val latency = 5.seconds + val statsReceiver = new InMemoryStatsReceiver + Time.withCurrentTimeFrozen { tc => + val timer = new MockTimer + val filter = new SLOStatsFilter[Unit, Unit]( + statsReceiver = statsReceiver, + latencyNanos = latency.inNanoseconds, + responseClassifier = ResponseClassifier.Default, + nowNanos = () => Time.now.inNanoseconds + ) + + val service: Service[Unit, Unit] = Service.mk { _ => + Future.Unit.delayed(latency + 1.second)(timer) + } + + val res = filter((), service) + + tc.advance(latency + 1.second) + timer.tick() + + Await.result(res) + + assert(statsReceiver.counters(Seq("violations", "total")) == 1) + assert(statsReceiver.counters(Seq("violations", "latency")) == 1) + assert(statsReceiver.counters(Seq("violations", "failures")) == 0) + } + } + + test("Records failure violation if response is a failure according to classifier") { + val latency = 5.seconds + val statsReceiver = new InMemoryStatsReceiver + val responseClassifier = named("SuccessIsFailure") { + case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure + } + + val filter = new SLOStatsFilter[Unit, Unit]( + statsReceiver = statsReceiver, + latencyNanos = latency.inNanoseconds, + responseClassifier = responseClassifier, + nowNanos = () => Time.now.inNanoseconds + ) + + val service: Service[Unit, Unit] = Service.mk { _ => + Future.Done + } + + Await.result(filter((), service)) + + assert(statsReceiver.counters(Seq("violations", "total")) == 1) + assert(statsReceiver.counters(Seq("violations", "latency")) == 0) + assert(statsReceiver.counters(Seq("violations", "failures")) == 1) + } + + test( + "Records latency and failure violation if violated, does not double-count total violations") { + val latency = 5.seconds + val statsReceiver = new InMemoryStatsReceiver + Time.withCurrentTimeFrozen { tc => + val timer = new MockTimer + val responseClassifier = named("SuccessIsFailure") { + case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure + } + val filter = new SLOStatsFilter[Unit, Unit]( + statsReceiver = statsReceiver, + latencyNanos = latency.inNanoseconds, + responseClassifier = responseClassifier, + nowNanos = () => Time.now.inNanoseconds + ) + + val service: Service[Unit, Unit] = Service.mk { _ => + Future.Unit.delayed(latency + 1.second)(timer) + } + + val res = filter((), service) + + tc.advance(latency + 1.second) + timer.tick() + + Await.result(res) + + assert(statsReceiver.counters(Seq("violations", "total")) == 1) + assert(statsReceiver.counters(Seq("violations", "latency")) == 1) + assert(statsReceiver.counters(Seq("violations", "failures")) == 1) + } + } + + test("Does not record violation if response is ignorable failure") { + val latency = 5.seconds + val statsReceiver = new InMemoryStatsReceiver + val responseClassifier = named("SuccessIsFailure") { + case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure + } + + val filter = new SLOStatsFilter[Unit, Unit]( + statsReceiver = statsReceiver, + latencyNanos = latency.inNanoseconds, + responseClassifier = responseClassifier, + nowNanos = () => Time.now.inNanoseconds + ) + + val service: Service[Unit, Unit] = Service.mk { _ => + Future.exception(new Failure("boom!", flags = FailureFlags.Ignorable)) + } + + intercept[Exception](Await.result(filter((), service))) + assert(statsReceiver.counters(Seq("violations", "total")) == 0) + } + + test("Does not record violation if response is interrupted failure") { + val latency = 5.seconds + val statsReceiver = new InMemoryStatsReceiver + + val filter = new SLOStatsFilter[Unit, Unit]( + statsReceiver = statsReceiver, + latencyNanos = latency.inNanoseconds, + responseClassifier = ResponseClassifier.Default, + nowNanos = () => Time.now.inNanoseconds + ) + + val service: Service[Unit, Unit] = Service.mk { _ => + Future.exception(new Failure("boom!", flags = FailureFlags.Interrupted)) + } + + intercept[Exception](Await.result(filter((), service))) + assert(statsReceiver.counters(Seq("violations", "total")) == 0) + } +}