Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time decay of t-digest #127

Open
ajwerner opened this issue May 8, 2019 · 11 comments
Open

Time decay of t-digest #127

ajwerner opened this issue May 8, 2019 · 11 comments

Comments

@ajwerner
Copy link
Contributor

ajwerner commented May 8, 2019

Hey @tdunning,

This is a re-posting of the now closed #55. I'm interested in re-opening the exploration into how to properly decay a t-digest. The t-digest is attractive due to its high precision and relatively compact serialization with no range configuration. It is a perfect fit for approximate quantile estimation of large data sets. It seems to me that it could additionally be a good data structure to track activity on a server.

The common data structure people reach for in the context of server monitoring is the HDR histogram. The especially nice thing about the histogram in this setting is that it is robust to non-uniform sampling rate. A common monitoring architecture today (using something like https://prometheus.io/) will collect a snapshot of values from servers periodically where the collecting server chooses the timestamp. Histograms can report cumulative values and then can subtract the previous timestamp's bucket counts from the current to get a view of the data between the two.

The t-digest doesn't offer this convenient subtraction mechanism making it more difficult to use in this sort of setting. One solution people sometimes offer is to use collection as a trigger to reset the t-digest. This is problematic in the face of more than one collector (as is the recommendation for HA prometheus).

An alternative to only reporting the cumulative distribution would be to sample a distribution which represents the previous trailing period. I've experimented with this a bit and it seems roughly reasonable though with some unexpected behavior as few new points are recorded and the count decays away.

The approach in the blog post linked to #55 feels both high in terms of overhead and it is not completely obvious to me exactly how to map that idea on to the t-digest. Have you given this topic more thought? Do you have a hunch on good approaches?

@tdunning
Copy link
Owner

tdunning commented May 9, 2019 via email

@ajwerner
Copy link
Contributor Author

ajwerner commented May 9, 2019

If events don't have event times or you don't want to respect them then you
have to depend on processing time. This is bad juju because it means that
the results of a program depend on when it is run, but it can be done. In
that case, the different HA collectors will have slightly discrepant
results.

I tend to agree with this point-of-view. The metrics world roughly falls in to 2 camps, push based and pull based. Ideally the pull-based metrics would would have agreed to some amount of server buffering of recent data where the client dictates the timestamp. This at least would have made HA easier to implement without the complications discussed. Unfortunately Prometheus seems to be the winning and in that world the collector assigns the timestamp. The Prometheus push gateway is even worse as there the Prometheus server assigns the timestamp based on when the collector reads from the push gateway. Alas event times in the Prometheus metrics world isn't really an option.

As such, we can build a history of digests for relatively small periods and
occasionally combine them into larger intervals.

This has added memory overhead for each of the finer granularity time windows.

@ajwerner
Copy link
Contributor Author

ajwerner commented May 9, 2019

For some more context, I'd love to hack soon on adding first-class support for t-digests into prometheus (prometheus/prometheus#2682). Prometheus today allows you to report histogram buckets where each bucket is a timeseries and then provides query-language support to aggregate them. These things are often hundreds of buckets and give pretty low precision with a pre-configured range. t-digests could use a similar pattern such that instead of a timeseries per bucket you have two timeseries per centroid based on the index of the centroid where the total number of timeseries is 2*delta. The high-index timeseries will be NaN most of the time but prometheus does a good job with compression (I need to verify that it compresses contiguous NaNs well but that's what I have in mind).

I'm worried about the aliasing and lost due to ticking at a fixed rate that's on the order of the collection rate. I'm worried about the overhead (primarily memory overhead) of keeping multiple t-digests at a finer granularity and merging them at read time and rolling them off the end of the ring-buffer. I suppose this could be made better by eliminating the write buffer of the merging digest for "sealed" t-digests making this approach somewhat more attractive.

Something I've been playing with lately is just quantizing time in to some tick unit, say maybe 1 second, and then using EWMA techniques on the count. Imagine we have some value alpha < 1, I add points with a weight of alpha and then every tick unit I merge the centroids and decay their count by 1-alpha.

The formula for choosing the decay factor is driven by the tick rate and a user-provided timescale (say 5 minutes).

averageTickAge = timescale / (2 * tick)
alpha = 2 / (averageTickAge + 1)

This approach seems to get close to what I want to see. It seems particularly in the middle of the distribution to track the trailing distribution on roughly the timescale provided. The problem as far as I can tell is that the decaying seems to flatten out and become fatter-tailed over time. I suspect the addition of some decay threshold to filter out centroids which have been sufficiently decayed will help and I'm going to experiment with that. Perhaps that value should just be alpha^(2 * averageTickAge).

Does this basic approach of quantizing time and using a traditional EWMA over weights seem reasonable? If yes, then there's the details of how sound are the actual details of this approach.

@tdunning
Copy link
Owner

Restarting.

Yes. This approach to decay sounds pretty reasonable, but you will have very little idea about accuracy as things begin to decay. I suppose that you can still say that the invariants hold as much as they ever did, but I don't think you can say much more than that.

Is the cost of windowed digests all that high? If you have compression = 100, then sealed digests will only have about 100 floats to store or fewer if there are not very many samples. This is on the order of a kilobyte or so. How many of these do you think you will need to keep in memory? Even keeping millions isn't that big of a deal.

@tdunning
Copy link
Owner

On the topic of integration with Prometheus, I think that sounds like a great idea.

How can I help?

@ajwerner
Copy link
Contributor Author

I don' have any answers but maybe these thoughts can whet your appetite on the problem (or discourage, who knows).

The first step to first class support in prometheus will be to figure out how to structure the data. The basic unit of the prometheus data model is, as I like to think about it, is a time series. A time series is a set of timestamp keys and floating point values. A time series is identified by a set of labels (and importantly a special _name label). The fact that these time series might be gauges or counters or rates is pretty abstracted from the storage layer. In practice this means the storage layer has two data structures, and index from sets of labels to timeseries ID and then a timeseries as a columnar representation of the (ts, val). It uses delta encoding etc to make the timeseries storage compact. Another very important (albeit somewhat sad) truth about the prometheus ecosystem is that you don't get event timestamps, only collection timestamps.

To make this more explicit, below find the representation of a histogram:

raft_process_applycommitted_latency_bucket{store="1",le="14847"} 1
raft_process_applycommitted_latency_bucket{store="1",le="18431"} 2
raft_process_applycommitted_latency_bucket{store="1",le="19455"} 4
raft_process_applycommitted_latency_bucket{store="1",le="21503"} 10
raft_process_applycommitted_latency_bucket{store="1",le="22527"} 16
raft_process_applycommitted_latency_bucket{store="1",le="23551"} 21
raft_process_applycommitted_latency_bucket{store="1",le="24575"} 25
raft_process_applycommitted_latency_bucket{store="1",le="25599"} 30
raft_process_applycommitted_latency_bucket{store="1",le="26623"} 35
raft_process_applycommitted_latency_bucket{store="1",le="27647"} 41
raft_process_applycommitted_latency_bucket{store="1",le="28671"} 48
raft_process_applycommitted_latency_bucket{store="1",le="29695"} 51
raft_process_applycommitted_latency_bucket{store="1",le="30719"} 55
raft_process_applycommitted_latency_bucket{store="1",le="31743"} 62
raft_process_applycommitted_latency_bucket{store="1",le="32767"} 70
raft_process_applycommitted_latency_bucket{store="1",le="34815"} 84
raft_process_applycommitted_latency_bucket{store="1",le="36863"} 99
raft_process_applycommitted_latency_bucket{store="1",le="38911"} 110
raft_process_applycommitted_latency_bucket{store="1",le="40959"} 120
raft_process_applycommitted_latency_bucket{store="1",le="43007"} 134
raft_process_applycommitted_latency_bucket{store="1",le="45055"} 144
raft_process_applycommitted_latency_bucket{store="1",le="47103"} 149
raft_process_applycommitted_latency_bucket{store="1",le="49151"} 153
raft_process_applycommitted_latency_bucket{store="1",le="51199"} 160
raft_process_applycommitted_latency_bucket{store="1",le="55295"} 166
raft_process_applycommitted_latency_bucket{store="1",le="57343"} 168
raft_process_applycommitted_latency_bucket{store="1",le="59391"} 172
raft_process_applycommitted_latency_bucket{store="1",le="61439"} 175
raft_process_applycommitted_latency_bucket{store="1",le="63487"} 180
raft_process_applycommitted_latency_bucket{store="1",le="65535"} 182
raft_process_applycommitted_latency_bucket{store="1",le="69631"} 185
raft_process_applycommitted_latency_bucket{store="1",le="73727"} 193
raft_process_applycommitted_latency_bucket{store="1",le="77823"} 199
raft_process_applycommitted_latency_bucket{store="1",le="81919"} 208
raft_process_applycommitted_latency_bucket{store="1",le="86015"} 217
raft_process_applycommitted_latency_bucket{store="1",le="90111"} 227
raft_process_applycommitted_latency_bucket{store="1",le="94207"} 232
raft_process_applycommitted_latency_bucket{store="1",le="98303"} 243
raft_process_applycommitted_latency_bucket{store="1",le="102399"} 248
raft_process_applycommitted_latency_bucket{store="1",le="106495"} 251
raft_process_applycommitted_latency_bucket{store="1",le="110591"} 254
raft_process_applycommitted_latency_bucket{store="1",le="114687"} 258
raft_process_applycommitted_latency_bucket{store="1",le="118783"} 261
raft_process_applycommitted_latency_bucket{store="1",le="122879"} 265
raft_process_applycommitted_latency_bucket{store="1",le="126975"} 267
raft_process_applycommitted_latency_bucket{store="1",le="139263"} 268
raft_process_applycommitted_latency_bucket{store="1",le="147455"} 272
raft_process_applycommitted_latency_bucket{store="1",le="155647"} 274
raft_process_applycommitted_latency_bucket{store="1",le="163839"} 276
raft_process_applycommitted_latency_bucket{store="1",le="180223"} 278
raft_process_applycommitted_latency_bucket{store="1",le="196607"} 281
raft_process_applycommitted_latency_bucket{store="1",le="524287"} 282
raft_process_applycommitted_latency_bucket{store="1",le="1.179647e+06"} 283
raft_process_applycommitted_latency_bucket{store="1",le="1.245183e+06"} 284
raft_process_applycommitted_latency_bucket{store="1",le="1.835007e+06"} 285
raft_process_applycommitted_latency_bucket{store="1",le="5.767167e+06"} 286
raft_process_applycommitted_latency_bucket{store="1",le="+Inf"} 286
raft_process_applycommitted_latency_sum{store="1"} 2.7738338e+07
raft_process_applycommitted_latency_count{store="1"} 286

It might be (is) the case that this histogram is configured for even larger buckets but because they have no values, they are omitted. These le values indicate the cumulative number of events (since process start) that have a value less than the bucket value. When fetching calculating what happened in some time interval, it grabs all of the buckets and then can do subtractions from the beginning to the end after accounting for counter restarts.

A significant barrier to the windowing approach is that prometheus has no mechanism to control retention per time series (prometheus/prometheus#1381). This makes windowing schemes feel less tractable.

I want to explore windowing schemes more but haven't had much time. I started to think through them one day here: https://github.com/ajwerner/tdigest/blob/9165f388a6b2c88703b9630c9d15e59d2d212d7e/windowed/windowed.go#L12-L120

@tdunning
Copy link
Owner

OK. That's interesting. Sad as well.

Things like Hdr histograms have constant bin boundaries so this idea of having a different time series for each bin (or for each cumulative sum) is easy to hack into a simple time series database. I have done the same sort of thing with Open TSDB and accumulated similar karmic debt.

The centroids in a t-digest wander around, however. That makes this much harder.

One possibility is to not use an additive measure if Prometheus allows that. does it?

@ajwerner
Copy link
Contributor Author

One possibility is to not use an additive measure if Prometheus allows that. does it?

There's nothing at all that bounds us to using an additive measure. That's an assumption of the data type encoding. One could imagine doing something like the below. That ends up being pretty good because it will mean highly compressed histograms will have mostly zero values. Sure the values in the low buckets won't compress well, but that seems fine. We aren't trying to do optimal storage here, just something that works. We can also leverage meta buckets to indicate things like start time and window sizes. I think the below encoding could get you a pretty long way. At query time you pull all of the buckets and merge where appropriate. It might be that there are multiple different readings corresponding to the same open window, that seems fine. At least, it seems fine if you could compact them away. In a more advanced time series database that allowed per series retention and compactions, then it really would be totally fine. Alas.

tdigest{bucket="1",type="count"} 1
tdigest{bucket="1",type="val"} 18431
tdigest{bucket="2",type="count"} 2
tdigest{bucket="2",type="val"} 22527
tdigest{type="start"} <timestamp>

@ajwerner
Copy link
Contributor Author

When I said that you don't get an event time I meant event time is collection time. At query time you do, of course, get a timestamp.

@ajwerner
Copy link
Contributor Author

ajwerner commented Jan 19, 2021

Things like Hdr histograms have constant bin boundaries so this idea of having a different time series for each bin (or for each cumulative sum) is easy to hack into a simple time series database. I have done the same sort of thing with Open TSDB and accumulated similar karmic debt.

That's true with a constant configuration. It's also sort of terrible. In the system I currently work on, we, a long time ago, hard-coded all of the buckets to end up having quite high precision in the nanoseconds and only second-level precision in the seconds. It was a bad choice for the system. We can never change it now. With the above proposed centroid index scheme, one could change their compression factor and everything would adjust nicely.

@tdunning
Copy link
Owner

tdunning commented Jan 19, 2021 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants