From 1ce84363b9d50a154c78102c6080c393dcff7723 Mon Sep 17 00:00:00 2001 From: Tsonglew Date: Tue, 5 Nov 2024 22:23:54 +0800 Subject: [PATCH] Feat sampling service Signed-off-by: Tsonglew --- CHANGELOG.md | 5 ++ docs/en/contribution/CodingStyle.md | 4 +- docs/en/setup/Configuration.md | 4 + skywalking/agent/__init__.py | 13 ++- skywalking/config.py | 6 +- skywalking/meter/pvm/data_source.py | 2 +- skywalking/sampling/__init__.py | 49 +++++++++++ skywalking/sampling/sampling_service.py | 106 ++++++++++++++++++++++++ skywalking/trace/context.py | 4 + tests/unit/test_sampling.py | 88 ++++++++++++++++++++ tools/config_doc_gen.py | 2 +- 11 files changed, 271 insertions(+), 12 deletions(-) create mode 100644 skywalking/sampling/__init__.py create mode 100644 skywalking/sampling/sampling_service.py create mode 100644 tests/unit/test_sampling.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8aaedf01..a311f0ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ ## Change Logs +### 1.1.1 + +- Feature: + - Users now can specify the `SW_SAMPLE_N_PER_3_SECS` environment variable to control the sampling rate (#357) + ### 1.1.0 - Feature: diff --git a/docs/en/contribution/CodingStyle.md b/docs/en/contribution/CodingStyle.md index 1acf5484..467649b3 100644 --- a/docs/en/contribution/CodingStyle.md +++ b/docs/en/contribution/CodingStyle.md @@ -6,7 +6,7 @@ Since Python 3.5 is end of life, we fully utilize the clarity and performance bo Please do not use other styles - `+`, `%` or `.format` unless f-string is absolutely unfeasible in the context, or it is a logger message, which is [optimized](https://docs.python.org/3/howto/logging.html#optimization) for the `%` style -Run `make dev-fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner +Run `make fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner cases leading to a semantically different conversion. ### Quotes @@ -23,7 +23,7 @@ foo = f"I'm a string" bar = f"This repo is called 'skywalking-python'" ``` -Run `make dev-fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it. +Run `make fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it. ## Debug messages Please import the `logger_debug_enabled` variable and wrap your debug messages with a check. diff --git a/docs/en/setup/Configuration.md b/docs/en/setup/Configuration.md index c03e706e..2ee6f26e 100644 --- a/docs/en/setup/Configuration.md +++ b/docs/en/setup/Configuration.md @@ -97,3 +97,7 @@ export SW_AGENT_YourConfiguration=YourValue | plugin_fastapi_collect_http_params | SW_PLUGIN_FASTAPI_COLLECT_HTTP_PARAMS | | False | This config item controls that whether the FastAPI plugin should collect the parameters of the request. | | plugin_bottle_collect_http_params | SW_PLUGIN_BOTTLE_COLLECT_HTTP_PARAMS | | False | This config item controls that whether the Bottle plugin should collect the parameters of the request. | | plugin_celery_parameters_length | SW_PLUGIN_CELERY_PARAMETERS_LENGTH | | 512 | The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | +### Sampling Configurations +| Configuration | Environment Variable | Type | Default Value | Description | +| :------------ | :------------ | :------------ | :------------ | :------------ | +| sample_n_per_3_secs | SW_SAMPLE_N_PER_3_SECS | | -1 | The number of samples to take in every 3 seconds | diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 70b28e98..07ec2833 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -15,19 +15,16 @@ # limitations under the License. # +import asyncio import atexit import functools import os import sys -import asyncio -from queue import Queue, Full -from threading import Thread, Event +from queue import Full, Queue +from threading import Event, Thread from typing import TYPE_CHECKING, Optional -from skywalking import config, plugins -from skywalking import loggings -from skywalking import meter -from skywalking import profile +from skywalking import config, loggings, meter, plugins, profile, sampling from skywalking.agent.protocol import Protocol, ProtocolAsync from skywalking.command import command_service, command_service_async from skywalking.loggings import logger @@ -306,6 +303,7 @@ def start(self) -> None: profile.init() if config.agent_meter_reporter_active: meter.init(force=True) # force re-init after fork() + sampling.init(force=True) self.__bootstrap() # calls init_threading @@ -517,6 +515,7 @@ async def __start_event_loop_async(self) -> None: if config.agent_meter_reporter_active: # meter.init(force=True) await meter.init_async() + await sampling.init_async() self.__bootstrap() # gather all coroutines diff --git a/skywalking/config.py b/skywalking/config.py index 37a9381c..b4ddd845 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -32,8 +32,8 @@ import os import re import uuid -from typing import List, Pattern import warnings +from typing import List, Pattern RE_IGNORE_PATH: Pattern = re.compile('^$') RE_HTTP_IGNORE_METHOD: Pattern = RE_IGNORE_PATH @@ -213,6 +213,10 @@ # The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off plugin_celery_parameters_length: int = int(os.getenv('SW_PLUGIN_CELERY_PARAMETERS_LENGTH', '512')) +# BEGIN: Sampling Configurations +# The number of samples to take in every 3 seconds +sample_n_per_3_secs: int = int(os.getenv('SW_SAMPLE_N_PER_3_SECS', '-1')) + # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS! options = [key for key in globals() if key not in options] # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS! diff --git a/skywalking/meter/pvm/data_source.py b/skywalking/meter/pvm/data_source.py index 3f97c781..4a54122f 100644 --- a/skywalking/meter/pvm/data_source.py +++ b/skywalking/meter/pvm/data_source.py @@ -23,4 +23,4 @@ def register(self): for name in dir(self): if name.endswith('generator'): generator = getattr(self, name)() - Gauge.Builder('instance_pvm_' + name[:-10], generator).build() + Gauge.Builder(f'instance_pvm_{name[:-10]}', generator).build() diff --git a/skywalking/sampling/__init__.py b/skywalking/sampling/__init__.py new file mode 100644 index 00000000..9678bdfc --- /dev/null +++ b/skywalking/sampling/__init__.py @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import asyncio +from typing import Optional + + +sampling_service = None + + +def init(force: bool = False): + """ + If the sampling service is not initialized, initialize it. + if force, we are in a fork(), we force re-initialization + """ + from skywalking.sampling.sampling_service import SamplingService + from skywalking.log import logger + + global sampling_service + if sampling_service and not force: + return + + logger.debug('Initializing sampling service') + sampling_service = SamplingService() + sampling_service.start() + +async def init_async(async_event: Optional[asyncio.Event] = None): + from skywalking.sampling.sampling_service import SamplingServiceAsync + + global sampling_service + + sampling_service = SamplingServiceAsync() + if async_event is not None: + async_event.set() + task = asyncio.create_task(sampling_service.start()) + sampling_service.strong_ref_set.add(task) \ No newline at end of file diff --git a/skywalking/sampling/sampling_service.py b/skywalking/sampling/sampling_service.py new file mode 100644 index 00000000..b91b4876 --- /dev/null +++ b/skywalking/sampling/sampling_service.py @@ -0,0 +1,106 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from threading import Lock, Thread + +import time +from typing import Set +from skywalking import config +from skywalking.log import logger + +import asyncio + +class SamplingServiceBase: + + def __init__(self): + self.sampling_factor = 0 + + @property + def reset_sampling_factor_interval(self) -> int: + return 3 + + @property + def on(self): + return config.sample_n_per_3_secs >= 0 + + @property + def can_sampling(self): + return self.sampling_factor < config.sample_n_per_3_secs + + def _try_sampling(self) -> bool: + if not self.on or self.can_sampling: + self._incr_sampling_factor() + return True + logger.debug('%s try_sampling return false, sampling_factor: %d', self.__class__.__name__, self.sampling_factor) + return False + + def _set_sampling_factor(self, val: int): + logger.debug('Set sampling factor to %d', val) + self.sampling_factor = val + + def _incr_sampling_factor(self): + self.sampling_factor += 1 + + +class SamplingService(Thread, SamplingServiceBase): + + def __init__(self): + Thread.__init__(self, name='SamplingService', daemon=True) + SamplingServiceBase.__init__(self) + self.lock = Lock() + + def run(self): + logger.debug('Started sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs) + while True: + if self.on: + self.reset_sampling_factor() + time.sleep(3) + + def try_sampling(self) -> bool: + with self.lock: + return super()._try_sampling() + + def force_sampled(self) -> None: + with self.lock: + super()._incr_sampling_factor() + + def reset_sampling_factor(self) -> None: + with self.lock: + super()._set_sampling_factor(0) + + +class SamplingServiceAsync(SamplingServiceBase): + + def __init__(self): + super().__init__() + self.strong_ref_set: Set[asyncio.Task[None]] = set() + + async def start(self): + logger.debug('Started async sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs) + while True: + if self.on: + await self.reset_sampling_factor() + await asyncio.sleep(self.reset_sampling_factor_interval) + + def try_sampling(self) -> bool: + return super()._try_sampling() + + def force_sampled(self): + super()._incr_sampling_factor() + + async def reset_sampling_factor(self): + super()._set_sampling_factor(0) diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py index cf79d0d6..2fee0249 100644 --- a/skywalking/trace/context.py +++ b/skywalking/trace/context.py @@ -20,6 +20,7 @@ from skywalking import profile from skywalking.agent import agent from skywalking.profile.profile_status import ProfileStatusReference +from skywalking import sampling from skywalking.trace import ID from skywalking.trace.carrier import Carrier from skywalking.trace.segment import Segment, SegmentRef @@ -327,4 +328,7 @@ def get_context() -> SpanContext: if spans: return spans[-1].context + if sampling.sampling_service and not sampling.sampling_service.try_sampling(): + return NoopContext() + return SpanContext() diff --git a/tests/unit/test_sampling.py b/tests/unit/test_sampling.py new file mode 100644 index 00000000..920b01f1 --- /dev/null +++ b/tests/unit/test_sampling.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import unittest + +from skywalking.sampling.sampling_service import SamplingService, SamplingServiceAsync + + +class TestSampling(unittest.TestCase): + + def test_try_sampling(self): + from skywalking import config + + config.sample_n_per_3_secs = 2 + sampling_service = SamplingService() + assert sampling_service.try_sampling() + assert sampling_service.try_sampling() + assert not sampling_service.try_sampling() + + def test_force_sampled(self): + + from skywalking import config + + config.sample_n_per_3_secs = 1 + sampling_service = SamplingService() + assert sampling_service.try_sampling() + sampling_service.force_sampled() + assert sampling_service.sampling_factor == 2 + + def test_reset_sampling_factor(self): + from skywalking import config + + config.sample_n_per_3_secs = 1 + sampling_service = SamplingService() + assert sampling_service.try_sampling() + assert not sampling_service.try_sampling() + sampling_service.reset_sampling_factor() + assert sampling_service.try_sampling() + +class TestSamplingAsync(unittest.IsolatedAsyncioTestCase): + + async def test_try_sampling(self): + from skywalking import config + + config.sample_n_per_3_secs = 2 + sampling_service = SamplingServiceAsync() + assert sampling_service.try_sampling() + assert sampling_service.try_sampling() + assert not sampling_service.try_sampling() + + async def test_force_sampled(self): + + from skywalking import config + + config.sample_n_per_3_secs = 1 + sampling_service = SamplingServiceAsync() + assert sampling_service.try_sampling() + sampling_service.force_sampled() + assert sampling_service.sampling_factor == 2 + + async def test_reset_sampling_factor(self): + from skywalking import config + + config.sample_n_per_3_secs = 1 + sampling_service = SamplingServiceAsync() + assert sampling_service.try_sampling() + assert not sampling_service.try_sampling() + await sampling_service.reset_sampling_factor() + assert sampling_service.try_sampling() + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/config_doc_gen.py b/tools/config_doc_gen.py index 01e527f7..9004b700 100644 --- a/tools/config_doc_gen.py +++ b/tools/config_doc_gen.py @@ -87,7 +87,7 @@ def create_entry(comment: str, config_index: int) -> str: """ def env_var_name(config_entry): - return 'SW_' + config_entry.upper() + return f'SW_{config_entry.upper()}' configuration = list(OPTIONS.keys())[config_index] type_ = OPTIONS[configuration][1]