Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#956 Flow control fixes #957

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions conf/carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,33 @@ DESTINATIONS = 127.0.0.1:2004

# This is the maximum number of datapoints that can be queued up
# for a single destination. Once this limit is hit, we will
# stop accepting new data if USE_FLOW_CONTROL is True, otherwise
# we will drop any subsequently received datapoints.
# stop accepting new data if USE_FLOW_CONTROL is True. In-flight and
# internally-generated datapoints will still be processed, and data
# will still be dropped if MAX_QUEUE_SIZE_HARD_PCT * MAX_QUEUE_SIZE
# is hit. If USE_FLOW_CONTROL is False, metrics are immediately dropped
# after MAX_QUEUE_SIZE, and MAX_QUEUE_SIZE_HARD_PCT is unused.
MAX_QUEUE_SIZE = 10000

# This is the factor that the queue must be empty before it will accept
# more messages. For a larger site, if the queue is very large it makes sense
# to tune this to allow for incoming stats. So if you have an average
# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense
# to allow stats to start flowing when you've cleared the queue to 95% since
# you should have space to accommodate the next minute's worth of stats
# even before the relay incrementally clears more of the queue
QUEUE_LOW_WATERMARK_PCT = 0.8

# This is the factor of the max length of a queue before data will be dropped
# with USE_FLOW_CONTROL enabled. When incoming data is paused, in-flight data
# is still processed, which can send a queue slightly over the configured max.
MAX_QUEUE_SIZE_HARD_PCT = 1.25

# Set this to False to drop datapoints when any send queue (sending datapoints
# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the
# default) then sockets over which metrics are received will temporarily stop accepting
# data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE.
USE_FLOW_CONTROL = True

# This defines the maximum "message size" between carbon daemons. If
# your queue is large, setting this to a lower number will cause the
# relay to forward smaller discrete chunks of stats, which may prevent
Expand All @@ -492,26 +515,11 @@ MAX_DATAPOINTS_PER_MESSAGE = 500
# If this is blank carbon-relay runs as the user that invokes it
# USER =

# This is the percentage that the queue must be empty before it will accept
# more messages. For a larger site, if the queue is very large it makes sense
# to tune this to allow for incoming stats. So if you have an average
# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense
# to allow stats to start flowing when you've cleared the queue to 95% since
# you should have space to accommodate the next minute's worth of stats
# even before the relay incrementally clears more of the queue
QUEUE_LOW_WATERMARK_PCT = 0.8

# To allow for batch efficiency from the pickle protocol and to benefit from
# other batching advantages, all writes are deferred by putting them into a queue,
# and then the queue is flushed and sent a small fraction of a second later.
TIME_TO_DEFER_SENDING = 0.0001

# Set this to False to drop datapoints when any send queue (sending datapoints
# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the
# default) then sockets over which metrics are received will temporarily stop accepting
# data until the send queues fall below QUEUE_LOW_WATERMARK_PCT * MAX_QUEUE_SIZE.
USE_FLOW_CONTROL = True

# If enabled this setting is used to timeout metric client connection if no
# metrics have been sent in specified time in seconds
#METRIC_CLIENT_IDLE_TIMEOUT = None
Expand Down Expand Up @@ -619,10 +627,27 @@ REPLICATION_FACTOR = 1

# This is the maximum number of datapoints that can be queued up
# for a single destination. Once this limit is hit, we will
# stop accepting new data if USE_FLOW_CONTROL is True, otherwise
# we will drop any subsequently received datapoints.
# stop accepting new data if USE_FLOW_CONTROL is True. In-flight and
# internally-generated datapoints will still be processed, and data
# will still be dropped if MAX_QUEUE_SIZE_HARD_PCT * MAX_QUEUE_SIZE
# is hit. If USE_FLOW_CONTROL is False, metrics are immediately dropped
# after MAX_QUEUE_SIZE, and MAX_QUEUE_SIZE_HARD_PCT is unused.
MAX_QUEUE_SIZE = 10000

# This is the factor that the queue must be empty before it will accept
# more messages. For a larger site, if the queue is very large it makes sense
# to tune this to allow for incoming stats. So if you have an average
# flow of 100k stats/minute, and a MAX_QUEUE_SIZE of 3,000,000, it makes sense
# to allow stats to start flowing when you've cleared the queue to 95% since
# you should have space to accommodate the next minute's worth of stats
# even before the relay incrementally clears more of the queue
QUEUE_LOW_WATERMARK_PCT = 0.8

# This is the factor of the max length of a queue before data will be dropped
# with USE_FLOW_CONTROL enabled. When incoming data is paused, in-flight data
# is still processed, which can send a queue slightly over the configured max.
MAX_QUEUE_SIZE_HARD_PCT = 1.25

# Set this to False to drop datapoints when any send queue (sending datapoints
# to a downstream carbon daemon) hits MAX_QUEUE_SIZE. If this is True (the
# default) then sockets over which metrics are received will temporarily stop accepting
Expand Down
13 changes: 12 additions & 1 deletion lib/carbon/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ def watermarks(self):

@property
def is_full(self):
if settings.CACHE_SIZE_HARD_MAX == float('inf'):
return False
else:
return self.size >= settings.CACHE_SIZE_HARD_MAX

@property
def is_nearly_full(self):
if settings.MAX_CACHE_SIZE == float('inf'):
return False
else:
Expand Down Expand Up @@ -252,8 +259,12 @@ def store(self, metric, datapoint):
# Not a duplicate, hence process if cache is not full
if self.is_full:
log.msg("MetricCache is full: self.size=%d" % self.size)
events.cacheFull()
events.cacheOverflow()
else:
if self.is_nearly_full:
# This will disable reading when flow control is enabled
log.msg("MetricCache is nearly full: self.size=%d" % self.size)
events.cacheFull()
if not self[metric]:
self.new_metrics.append(metric)
self.size += 1
Expand Down
9 changes: 8 additions & 1 deletion lib/carbon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@


SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * settings.QUEUE_LOW_WATERMARK_PCT
if settings.USE_FLOW_CONTROL:
SEND_QUEUE_HARD_MAX = settings.MAX_QUEUE_SIZE * settings.MAX_QUEUE_SIZE_HARD_PCT
else:
SEND_QUEUE_HARD_MAX = settings.MAX_QUEUE_SIZE


class CarbonClientProtocol(object):
Expand Down Expand Up @@ -350,7 +354,10 @@ def sendDatapoint(self, metric, datapoint):
if self.queueSize >= settings.MAX_QUEUE_SIZE:
if not self.queueFull.called:
self.queueFull.callback(self.queueSize)
instrumentation.increment(self.fullQueueDrops)
if self.queueSize < SEND_QUEUE_HARD_MAX:
self.enqueue(metric, datapoint)
else:
instrumentation.increment(self.fullQueueDrops)
else:
self.enqueue(metric, datapoint)

Expand Down
7 changes: 6 additions & 1 deletion lib/carbon/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@
MAX_DATAPOINTS_PER_MESSAGE=500,
MAX_AGGREGATION_INTERVALS=5,
FORWARD_ALL=True,
MAX_QUEUE_SIZE=1000,
MAX_QUEUE_SIZE=10000,
QUEUE_LOW_WATERMARK_PCT=0.8,
MAX_QUEUE_SIZE_HARD_PCT=1.25,
TIME_TO_DEFER_SENDING=0.0001,
ENABLE_AMQP=False,
AMQP_METRIC_NAME_IN_BODY=False,
Expand Down Expand Up @@ -296,6 +297,10 @@ def cleanpath(path):
state.database = database_class(settings)

settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95
if settings.USE_FLOW_CONTROL:
settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05
else:
settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE

if "action" not in self:
self["action"] = "start"
Expand Down
3 changes: 2 additions & 1 deletion lib/carbon/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __call__(self, *args, **kwargs):

metricReceived = Event('metricReceived')
metricGenerated = Event('metricGenerated')
cacheOverflow = Event('cacheOverflow')
cacheFull = Event('cacheFull')
cacheSpaceAvailable = Event('cacheSpaceAvailable')
pauseReceivingMetrics = Event('pauseReceivingMetrics')
Expand All @@ -32,7 +33,7 @@ def __call__(self, *args, **kwargs):
lambda metric, datapoint: state.instrumentation.increment('metricsReceived'))


cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow'))
cacheOverflow.addHandler(lambda: state.instrumentation.increment('cache.overflow'))
cacheFull.addHandler(lambda: setattr(state, 'cacheTooFull', True))
cacheSpaceAvailable.addHandler(lambda: setattr(state, 'cacheTooFull', False))

Expand Down
12 changes: 12 additions & 0 deletions lib/carbon/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,21 @@ def setupAggregatorProcessor(root_service, settings):
"aggregation processor: file does not exist {0}".format(aggregation_rules_path))
RuleManager.read_from(aggregation_rules_path)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupRewriterProcessor(root_service, settings):
from carbon.rewrite import RewriteRuleManager

rewrite_rules_path = settings["rewrite-rules"]
RewriteRuleManager.read_from(rewrite_rules_path)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupRelayProcessor(root_service, settings):
from carbon.routers import DatapointRouter
Expand All @@ -191,6 +199,10 @@ def setupRelayProcessor(root_service, settings):
for destination in util.parseDestinations(settings.DESTINATIONS):
state.client_manager.startClient(destination)

if settings.USE_FLOW_CONTROL:
events.cacheFull.addHandler(events.pauseReceivingMetrics)
events.cacheSpaceAvailable.addHandler(events.resumeReceivingMetrics)


def setupWriterProcessor(root_service, settings):
from carbon import cache # NOQA Register CacheFeedingProcessor
Expand Down
4 changes: 4 additions & 0 deletions lib/carbon/tests/benchmark_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
NaiveStrategy, MaxStrategy, RandomStrategy, SortedStrategy, \
TimeSortedStrategy, BucketMaxStrategy

from carbon.conf import settings

settings.CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95
settings.CACHE_SIZE_HARD_MAX = settings.MAX_CACHE_SIZE * 1.05

metric_cache = _MetricCache(DrainStrategy)
count = 0
Expand Down
30 changes: 29 additions & 1 deletion lib/carbon/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class MetricCacheTest(TestCase):
def setUp(self):
settings = {
'MAX_CACHE_SIZE': float('inf'),
'CACHE_SIZE_HARD_MAX': float('inf'),
'CACHE_SIZE_LOW_WATERMARK': float('inf')
}
self._settings_patch = patch.dict('carbon.conf.settings', settings)
Expand Down Expand Up @@ -67,6 +68,13 @@ def test_store_checks_fullness(self):
def test_store_on_full_triggers_events(self):
is_full_mock = PropertyMock(return_value=True)
with patch.object(_MetricCache, 'is_full', is_full_mock):
with patch('carbon.cache.events') as events_mock:
self.metric_cache.store('foo', (123456, 1.0))
events_mock.cacheOverflow.assert_called_with()

def test_store_on_nearly_full_triggers_events(self):
is_nearly_full_mock = PropertyMock(return_value=True)
with patch.object(_MetricCache, 'is_nearly_full', is_nearly_full_mock):
with patch('carbon.cache.events') as events_mock:
self.metric_cache.store('foo', (123456, 1.0))
events_mock.cacheFull.assert_called_with()
Expand Down Expand Up @@ -150,7 +158,7 @@ def test_is_full_short_circuits_on_inf(self):
size_mock.assert_not_called()

def test_is_full(self):
self._settings_patch.values['MAX_CACHE_SIZE'] = 2.0
self._settings_patch.values['CACHE_SIZE_HARD_MAX'] = 2.0
self._settings_patch.start()
with patch('carbon.cache.events'):
self.assertFalse(self.metric_cache.is_full)
Expand Down Expand Up @@ -178,8 +186,18 @@ def test_counts_multiple_datapoints(self):

class DrainStrategyTest(TestCase):
def setUp(self):
settings = {
'MAX_CACHE_SIZE': float('inf'),
'CACHE_SIZE_HARD_MAX': float('inf'),
'CACHE_SIZE_LOW_WATERMARK': float('inf')
}
self._settings_patch = patch.dict('carbon.conf.settings', settings)
self._settings_patch.start()
self.metric_cache = _MetricCache()

def tearDown(self):
self._settings_patch.stop()

def test_bucketmax_strategy(self):
bucketmax_strategy = BucketMaxStrategy(self.metric_cache)
self.metric_cache.strategy = bucketmax_strategy
Expand Down Expand Up @@ -303,8 +321,18 @@ def test_time_sorted_strategy_min_lag(self):

class RandomStrategyTest(TestCase):
def setUp(self):
settings = {
'MAX_CACHE_SIZE': float('inf'),
'CACHE_SIZE_HARD_MAX': float('inf'),
'CACHE_SIZE_LOW_WATERMARK': float('inf')
}
self._settings_patch = patch.dict('carbon.conf.settings', settings)
self._settings_patch.start()
self.metric_cache = _MetricCache()

def tearDown(self):
self._settings_patch.stop()

def test_random_strategy(self):
self.metric_cache.store('foo', (123456, 1.0))
self.metric_cache.store('bar', (123457, 2.0))
Expand Down