Skip to content

Commit

Permalink
finagle/finagle-core: Introduce SLOStatsFilter for recording SLO viol…
Browse files Browse the repository at this point in the history
…ations

Problem

Currently, server-side stats expose success rate and latency metrics, but there is no
way to know how often a target latency is being violated.

Solution

Introduce SLOStatsFilter (still in development, not yet added to the default server stack)
that can be configured with a target latency. This filter records both latency violations and
response failure violations to provide a picture of the server's SLO compliance.

Differential Revision: https://phabricator.twitter.biz/D1191333
  • Loading branch information
jcrossley authored and jenkins committed Dec 27, 2024
1 parent 8b6572d commit 7954703
Show file tree
Hide file tree
Showing 2 changed files with 317 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.twitter.finagle.filter

import com.twitter.finagle.FailureFlags
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.param
import com.twitter.finagle.Service
import com.twitter.finagle.ServiceFactory
import com.twitter.finagle.SimpleFilter
import com.twitter.finagle.Stack
import com.twitter.finagle.Stackable
import com.twitter.finagle.service.ReqRep
import com.twitter.finagle.service.ResponseClass
import com.twitter.finagle.service.ResponseClassifier
import com.twitter.util.Duration
import com.twitter.util.Future
import com.twitter.util.Stopwatch
import com.twitter.util.Throw
import com.twitter.util.Try

private[twitter] object SLOStatsFilter {
val role = Stack.Role("SLOStats")

sealed trait Param {
def mk(): (Param, Stack.Param[Param]) = (this, Param.param)
}

object Param {
case class Configured(
latency: Duration)
extends Param

case object Disabled extends Param

implicit val param: Stack.Param[SLOStatsFilter.Param] = Stack.Param(Disabled)
}

val Disabled: Param = Param.Disabled

def configured(latency: Duration): Param = {
Param.Configured(latency)
}

def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module3[param.Stats, param.ResponseClassifier, Param, ServiceFactory[Req, Rep]] {
val role = SLOStatsFilter.role
val description =
"Record number of SLO violations of underlying service"
override def make(
_stats: param.Stats,
_responseClassifier: param.ResponseClassifier,
params: Param,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {
params match {
case Param.Disabled => next
case Param.Configured(latency) =>
val param.Stats(statsReceiver) = _stats
val param.ResponseClassifier(responseClassifier) = _responseClassifier
new SLOStatsFilter(
statsReceiver.scope("slo"),
latency.inNanoseconds,
responseClassifier).andThen(next)
}
}
}
}

/**
* A [[com.twitter.finagle.Filter]] that records the number of slo violations from the underlying
* service. A request is classified as violating the slo if any of the following occur:
* - The response returns after `latency` duration has elapsed
* - The response is classified as a failure according to the ResponseClassifier (but is not
* ignorable or interrupted)
*/
private[finagle] class SLOStatsFilter[Req, Rep](
statsReceiver: StatsReceiver,
latencyNanos: Long,
responseClassifier: ResponseClassifier,
nowNanos: () => Long = Stopwatch.systemNanos)
extends SimpleFilter[Req, Rep] {

private[this] val violationsScope = statsReceiver.scope("violations")
private[this] val violationsTotalCounter = violationsScope.counter("total")
private[this] val violationsFailuresCounter = violationsScope.counter("failures")
private[this] val violationsLatencyCounter = violationsScope.counter("latency")

def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
val start = nowNanos()
service(request).respond { response =>
if (!isIgnorable(response)) {
var violated = false
if (nowNanos() - start > latencyNanos) {
violated = true
violationsLatencyCounter.incr()
}

if (isFailure(request, response)) {
violated = true
violationsFailuresCounter.incr()
}

if (violated) {
violationsTotalCounter.incr()
}
}
}
}

private[this] def isFailure(request: Req, response: Try[Rep]): Boolean = {
responseClassifier
.applyOrElse(ReqRep(request, response), ResponseClassifier.Default) match {
case ResponseClass.Failed(_) if !isInterrupted(response) => true
case _ => false
}
}

private[this] def isIgnorable(response: Try[Rep]): Boolean = response match {
case Throw(f: FailureFlags[_]) => f.isFlagged(FailureFlags.Ignorable)
case _ => false
}

private[this] def isInterrupted(response: Try[Rep]): Boolean = response match {
case Throw(f: FailureFlags[_]) => f.isFlagged(FailureFlags.Interrupted)
case _ => false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package com.twitter.finagle.filter

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.Failure
import com.twitter.finagle.FailureFlags
import com.twitter.finagle.Service
import com.twitter.finagle.ServiceFactory
import com.twitter.finagle.Stack
import com.twitter.finagle.param.Stats
import com.twitter.finagle.service.ReqRep
import com.twitter.finagle.service.ResponseClass
import com.twitter.finagle.service.ResponseClassifier
import com.twitter.finagle.service.ResponseClassifier.named
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.util.Await
import com.twitter.util.Future
import com.twitter.util.MockTimer
import com.twitter.util.Return
import com.twitter.util.Time
import org.scalatest.funsuite.AnyFunSuite

class SLOStatsFilterTest extends AnyFunSuite {

private[this] val unitSvc: Service[Unit, Unit] = Service.mk { _ =>
Future.Unit
}

private[this] def mkService(
statsReceiver: StatsReceiver,
params: Stack.Params,
underlying: Service[Unit, Unit] = unitSvc
): Service[Unit, Unit] = {
val factory = SLOStatsFilter.module
.toStack(Stack.leaf(Stack.Role("test"), ServiceFactory.const(underlying)))
.make(params + Stats(statsReceiver))
Await.result(factory())
}

test("Does not create the filter if not configured") {
val statsReceiver = new InMemoryStatsReceiver
val service = mkService(statsReceiver, Stack.Params.empty)
Await.result(service(()))
assert(!statsReceiver.counters.contains(Seq("slo", "violations", "total")))
}

test("Does not create the filter if disabled") {
val statsReceiver = new InMemoryStatsReceiver
val service = mkService(statsReceiver, Stack.Params.empty + SLOStatsFilter.Disabled)
Await.result(service(()))

assert(!statsReceiver.counters.contains(Seq("slo", "violations", "total")))
}

test("Creates the filter if configured") {
val statsReceiver = new InMemoryStatsReceiver
val service =
mkService(statsReceiver, Stack.Params.empty + SLOStatsFilter.configured(5.seconds))
Await.result(service(()))

assert(statsReceiver.counters.contains(Seq("slo", "violations", "total")))
}

test("Records latency violation if latency is violated") {
val latency = 5.seconds
val statsReceiver = new InMemoryStatsReceiver
Time.withCurrentTimeFrozen { tc =>
val timer = new MockTimer
val filter = new SLOStatsFilter[Unit, Unit](
statsReceiver = statsReceiver,
latencyNanos = latency.inNanoseconds,
responseClassifier = ResponseClassifier.Default,
nowNanos = () => Time.now.inNanoseconds
)

val service: Service[Unit, Unit] = Service.mk { _ =>
Future.Unit.delayed(latency + 1.second)(timer)
}

val res = filter((), service)

tc.advance(latency + 1.second)
timer.tick()

Await.result(res)

assert(statsReceiver.counters(Seq("violations", "total")) == 1)
assert(statsReceiver.counters(Seq("violations", "latency")) == 1)
assert(statsReceiver.counters(Seq("violations", "failures")) == 0)
}
}

test("Records failure violation if response is a failure according to classifier") {
val latency = 5.seconds
val statsReceiver = new InMemoryStatsReceiver
val responseClassifier = named("SuccessIsFailure") {
case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure
}

val filter = new SLOStatsFilter[Unit, Unit](
statsReceiver = statsReceiver,
latencyNanos = latency.inNanoseconds,
responseClassifier = responseClassifier,
nowNanos = () => Time.now.inNanoseconds
)

val service: Service[Unit, Unit] = Service.mk { _ =>
Future.Done
}

Await.result(filter((), service))

assert(statsReceiver.counters(Seq("violations", "total")) == 1)
assert(statsReceiver.counters(Seq("violations", "latency")) == 0)
assert(statsReceiver.counters(Seq("violations", "failures")) == 1)
}

test(
"Records latency and failure violation if violated, does not double-count total violations") {
val latency = 5.seconds
val statsReceiver = new InMemoryStatsReceiver
Time.withCurrentTimeFrozen { tc =>
val timer = new MockTimer
val responseClassifier = named("SuccessIsFailure") {
case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure
}
val filter = new SLOStatsFilter[Unit, Unit](
statsReceiver = statsReceiver,
latencyNanos = latency.inNanoseconds,
responseClassifier = responseClassifier,
nowNanos = () => Time.now.inNanoseconds
)

val service: Service[Unit, Unit] = Service.mk { _ =>
Future.Unit.delayed(latency + 1.second)(timer)
}

val res = filter((), service)

tc.advance(latency + 1.second)
timer.tick()

Await.result(res)

assert(statsReceiver.counters(Seq("violations", "total")) == 1)
assert(statsReceiver.counters(Seq("violations", "latency")) == 1)
assert(statsReceiver.counters(Seq("violations", "failures")) == 1)
}
}

test("Does not record violation if response is ignorable failure") {
val latency = 5.seconds
val statsReceiver = new InMemoryStatsReceiver
val responseClassifier = named("SuccessIsFailure") {
case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure
}

val filter = new SLOStatsFilter[Unit, Unit](
statsReceiver = statsReceiver,
latencyNanos = latency.inNanoseconds,
responseClassifier = responseClassifier,
nowNanos = () => Time.now.inNanoseconds
)

val service: Service[Unit, Unit] = Service.mk { _ =>
Future.exception(new Failure("boom!", flags = FailureFlags.Ignorable))
}

intercept[Exception](Await.result(filter((), service)))
assert(statsReceiver.counters(Seq("violations", "total")) == 0)
}

test("Does not record violation if response is interrupted failure") {
val latency = 5.seconds
val statsReceiver = new InMemoryStatsReceiver

val filter = new SLOStatsFilter[Unit, Unit](
statsReceiver = statsReceiver,
latencyNanos = latency.inNanoseconds,
responseClassifier = ResponseClassifier.Default,
nowNanos = () => Time.now.inNanoseconds
)

val service: Service[Unit, Unit] = Service.mk { _ =>
Future.exception(new Failure("boom!", flags = FailureFlags.Interrupted))
}

intercept[Exception](Await.result(filter((), service)))
assert(statsReceiver.counters(Seq("violations", "total")) == 0)
}
}

0 comments on commit 7954703

Please sign in to comment.