From fc8134fce71c626e680994c340eb3df38096c965 Mon Sep 17 00:00:00 2001 From: Tsonglew Date: Wed, 6 Nov 2024 09:29:05 +0800 Subject: [PATCH] Feat sampling service #(357) Signed-off-by: Tsonglew --- CHANGELOG.md | 1 + docs/en/contribution/CodingStyle.md | 4 +- docs/en/setup/Configuration.md | 4 + skywalking/agent/__init__.py | 15 ++-- skywalking/config.py | 6 +- skywalking/sampling/__init__.py | 50 ++++++++++++ skywalking/sampling/sampling_service.py | 101 ++++++++++++++++++++++++ skywalking/trace/context.py | 4 + tests/unit/test_sampling.py | 88 +++++++++++++++++++++ 9 files changed, 263 insertions(+), 10 deletions(-) create mode 100644 skywalking/sampling/__init__.py create mode 100644 skywalking/sampling/sampling_service.py create mode 100644 tests/unit/test_sampling.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 381dc5d1..6943691e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Feature: - Drop support for 3.7 (#356) + - Support sampling rate setup. Provide `SW_SAMPLE_N_PER_3_SECS` environment variable to control it (#357) - Fixes: - Fix: user/password replacement is not allowed for relative URLs (#349) diff --git a/docs/en/contribution/CodingStyle.md b/docs/en/contribution/CodingStyle.md index 1acf5484..467649b3 100644 --- a/docs/en/contribution/CodingStyle.md +++ b/docs/en/contribution/CodingStyle.md @@ -6,7 +6,7 @@ Since Python 3.5 is end of life, we fully utilize the clarity and performance bo Please do not use other styles - `+`, `%` or `.format` unless f-string is absolutely unfeasible in the context, or it is a logger message, which is [optimized](https://docs.python.org/3/howto/logging.html#optimization) for the `%` style -Run `make dev-fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner +Run `make fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner cases leading to a semantically different conversion. ### Quotes @@ -23,7 +23,7 @@ foo = f"I'm a string" bar = f"This repo is called 'skywalking-python'" ``` -Run `make dev-fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it. +Run `make fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it. ## Debug messages Please import the `logger_debug_enabled` variable and wrap your debug messages with a check. diff --git a/docs/en/setup/Configuration.md b/docs/en/setup/Configuration.md index c03e706e..c8c16585 100644 --- a/docs/en/setup/Configuration.md +++ b/docs/en/setup/Configuration.md @@ -97,3 +97,7 @@ export SW_AGENT_YourConfiguration=YourValue | plugin_fastapi_collect_http_params | SW_PLUGIN_FASTAPI_COLLECT_HTTP_PARAMS | | False | This config item controls that whether the FastAPI plugin should collect the parameters of the request. | | plugin_bottle_collect_http_params | SW_PLUGIN_BOTTLE_COLLECT_HTTP_PARAMS | | False | This config item controls that whether the Bottle plugin should collect the parameters of the request. | | plugin_celery_parameters_length | SW_PLUGIN_CELERY_PARAMETERS_LENGTH | | 512 | The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | +### Sampling Configurations +| Configuration | Environment Variable | Type | Default Value | Description | +| :------------ | :------------ | :------------ | :------------ | :------------ | +| sample_n_per_3_secs | SW_SAMPLE_N_PER_3_SECS | | 0 | The number of samples to take in every 3 seconds, 0 turns off | diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 70b28e98..b93f620f 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -15,19 +15,16 @@ # limitations under the License. # +import asyncio import atexit import functools import os import sys -import asyncio -from queue import Queue, Full -from threading import Thread, Event +from queue import Full, Queue +from threading import Event, Thread from typing import TYPE_CHECKING, Optional -from skywalking import config, plugins -from skywalking import loggings -from skywalking import meter -from skywalking import profile +from skywalking import config, loggings, meter, plugins, profile, sampling from skywalking.agent.protocol import Protocol, ProtocolAsync from skywalking.command import command_service, command_service_async from skywalking.loggings import logger @@ -306,6 +303,8 @@ def start(self) -> None: profile.init() if config.agent_meter_reporter_active: meter.init(force=True) # force re-init after fork() + if config.sample_n_per_3_secs > 0: + sampling.init(force=True) self.__bootstrap() # calls init_threading @@ -517,6 +516,8 @@ async def __start_event_loop_async(self) -> None: if config.agent_meter_reporter_active: # meter.init(force=True) await meter.init_async() + if config.sample_n_per_3_secs > 0: + await sampling.init_async() self.__bootstrap() # gather all coroutines diff --git a/skywalking/config.py b/skywalking/config.py index 37a9381c..62c89567 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -32,8 +32,8 @@ import os import re import uuid -from typing import List, Pattern import warnings +from typing import List, Pattern RE_IGNORE_PATH: Pattern = re.compile('^$') RE_HTTP_IGNORE_METHOD: Pattern = RE_IGNORE_PATH @@ -213,6 +213,10 @@ # The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off plugin_celery_parameters_length: int = int(os.getenv('SW_PLUGIN_CELERY_PARAMETERS_LENGTH', '512')) +# BEGIN: Sampling Configurations +# The number of samples to take in every 3 seconds, 0 turns off +sample_n_per_3_secs: int = int(os.getenv('SW_SAMPLE_N_PER_3_SECS', '0')) + # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS! options = [key for key in globals() if key not in options] # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS! diff --git a/skywalking/sampling/__init__.py b/skywalking/sampling/__init__.py new file mode 100644 index 00000000..061c007b --- /dev/null +++ b/skywalking/sampling/__init__.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import asyncio +from typing import Optional + + +sampling_service = None + + +def init(force: bool = False): + """ + If the sampling service is not initialized, initialize it. + if force, we are in a fork(), we force re-initialization + """ + from skywalking.sampling.sampling_service import SamplingService + from skywalking.log import logger + + global sampling_service + if sampling_service and not force: + return + + logger.debug('Initializing sampling service') + sampling_service = SamplingService() + sampling_service.start() + + +async def init_async(async_event: Optional[asyncio.Event] = None): + from skywalking.sampling.sampling_service import SamplingServiceAsync + + global sampling_service + + sampling_service = SamplingServiceAsync() + if async_event is not None: + async_event.set() + task = asyncio.create_task(sampling_service.start()) + sampling_service.strong_ref_set.add(task) diff --git a/skywalking/sampling/sampling_service.py b/skywalking/sampling/sampling_service.py new file mode 100644 index 00000000..067c9592 --- /dev/null +++ b/skywalking/sampling/sampling_service.py @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from threading import Lock, Thread + +import time +from typing import Set +from skywalking import config +from skywalking.log import logger + +import asyncio + + +class SamplingServiceBase: + + def __init__(self): + self.sampling_factor = 0 + + @property + def reset_sampling_factor_interval(self) -> int: + return 3 + + @property + def can_sampling(self): + return self.sampling_factor < config.sample_n_per_3_secs + + def _try_sampling(self) -> bool: + if self.can_sampling: + self._incr_sampling_factor() + return True + logger.debug('%s try_sampling return false, sampling_factor: %d', self.__class__.__name__, self.sampling_factor) + return False + + def _set_sampling_factor(self, val: int): + logger.debug('Set sampling factor to %d', val) + self.sampling_factor = val + + def _incr_sampling_factor(self): + self.sampling_factor += 1 + + +class SamplingService(Thread, SamplingServiceBase): + + def __init__(self): + Thread.__init__(self, name='SamplingService', daemon=True) + SamplingServiceBase.__init__(self) + self.lock = Lock() + + def run(self): + logger.debug('Started sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs) + while True: + self.reset_sampling_factor() + time.sleep(self.reset_sampling_factor_interval) + + def try_sampling(self) -> bool: + with self.lock: + return super()._try_sampling() + + def force_sampled(self) -> None: + with self.lock: + super()._incr_sampling_factor() + + def reset_sampling_factor(self) -> None: + with self.lock: + super()._set_sampling_factor(0) + + +class SamplingServiceAsync(SamplingServiceBase): + + def __init__(self): + super().__init__() + self.strong_ref_set: Set[asyncio.Task[None]] = set() + + async def start(self): + logger.debug('Started async sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs) + while True: + await self.reset_sampling_factor() + await asyncio.sleep(self.reset_sampling_factor_interval) + + def try_sampling(self) -> bool: + return super()._try_sampling() + + def force_sampled(self): + super()._incr_sampling_factor() + + async def reset_sampling_factor(self): + super()._set_sampling_factor(0) diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py index cf79d0d6..2fee0249 100644 --- a/skywalking/trace/context.py +++ b/skywalking/trace/context.py @@ -20,6 +20,7 @@ from skywalking import profile from skywalking.agent import agent from skywalking.profile.profile_status import ProfileStatusReference +from skywalking import sampling from skywalking.trace import ID from skywalking.trace.carrier import Carrier from skywalking.trace.segment import Segment, SegmentRef @@ -327,4 +328,7 @@ def get_context() -> SpanContext: if spans: return spans[-1].context + if sampling.sampling_service and not sampling.sampling_service.try_sampling(): + return NoopContext() + return SpanContext() diff --git a/tests/unit/test_sampling.py b/tests/unit/test_sampling.py new file mode 100644 index 00000000..b49e8d06 --- /dev/null +++ b/tests/unit/test_sampling.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from skywalking.sampling.sampling_service import SamplingService, SamplingServiceAsync + + +class TestSampling(unittest.TestCase): + + def test_try_sampling(self): + from skywalking import config + + config.sample_n_per_3_secs = 2 + sampling_service = SamplingService() + assert sampling_service.try_sampling() + assert sampling_service.try_sampling() + assert not sampling_service.try_sampling() + + def test_force_sampled(self): + + from skywalking import config + + config.sample_n_per_3_secs = 1 + sampling_service = SamplingService() + assert sampling_service.try_sampling() + sampling_service.force_sampled() + assert sampling_service.sampling_factor == 2 + + def test_reset_sampling_factor(self): + from skywalking import config + + config.sample_n_per_3_secs = 1 + sampling_service = SamplingService() + assert sampling_service.try_sampling() + assert not sampling_service.try_sampling() + sampling_service.reset_sampling_factor() + assert sampling_service.try_sampling() + + +class TestSamplingAsync(unittest.IsolatedAsyncioTestCase): + + async def test_try_sampling(self): + from skywalking import config + + config.sample_n_per_3_secs = 2 + sampling_service = SamplingServiceAsync() + assert sampling_service.try_sampling() + assert sampling_service.try_sampling() + assert not sampling_service.try_sampling() + + async def test_force_sampled(self): + + from skywalking import config + + config.sample_n_per_3_secs = 1 + sampling_service = SamplingServiceAsync() + assert sampling_service.try_sampling() + sampling_service.force_sampled() + assert sampling_service.sampling_factor == 2 + + async def test_reset_sampling_factor(self): + from skywalking import config + + config.sample_n_per_3_secs = 1 + sampling_service = SamplingServiceAsync() + assert sampling_service.try_sampling() + assert not sampling_service.try_sampling() + await sampling_service.reset_sampling_factor() + assert sampling_service.try_sampling() + + +if __name__ == '__main__': + unittest.main()