Skip to content

Commit

Permalink
Feat sampling service #(357)
Browse files Browse the repository at this point in the history
Signed-off-by: Tsonglew <[email protected]>
  • Loading branch information
tsonglew committed Nov 6, 2024
1 parent 55dc5ff commit fc8134f
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Feature:
- Drop support for 3.7 (#356)
- Support sampling rate setup. Provide `SW_SAMPLE_N_PER_3_SECS` environment variable to control it (#357)

- Fixes:
- Fix: user/password replacement is not allowed for relative URLs (#349)
Expand Down
4 changes: 2 additions & 2 deletions docs/en/contribution/CodingStyle.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Since Python 3.5 is end of life, we fully utilize the clarity and performance bo
Please do not use other styles - `+`, `%` or `.format` unless f-string is absolutely unfeasible in the context, or
it is a logger message, which is [optimized](https://docs.python.org/3/howto/logging.html#optimization) for the `%` style

Run `make dev-fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner
Run `make fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner
cases leading to a semantically different conversion.

### Quotes
Expand All @@ -23,7 +23,7 @@ foo = f"I'm a string"
bar = f"This repo is called 'skywalking-python'"
```

Run `make dev-fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it.
Run `make fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it.

## Debug messages
Please import the `logger_debug_enabled` variable and wrap your debug messages with a check.
Expand Down
4 changes: 4 additions & 0 deletions docs/en/setup/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,7 @@ export SW_AGENT_YourConfiguration=YourValue
| plugin_fastapi_collect_http_params | SW_PLUGIN_FASTAPI_COLLECT_HTTP_PARAMS | <class 'bool'> | False | This config item controls that whether the FastAPI plugin should collect the parameters of the request. |
| plugin_bottle_collect_http_params | SW_PLUGIN_BOTTLE_COLLECT_HTTP_PARAMS | <class 'bool'> | False | This config item controls that whether the Bottle plugin should collect the parameters of the request. |
| plugin_celery_parameters_length | SW_PLUGIN_CELERY_PARAMETERS_LENGTH | <class 'int'> | 512 | The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off |
### Sampling Configurations
| Configuration | Environment Variable | Type | Default Value | Description |
| :------------ | :------------ | :------------ | :------------ | :------------ |
| sample_n_per_3_secs | SW_SAMPLE_N_PER_3_SECS | <class 'int'> | 0 | The number of samples to take in every 3 seconds, 0 turns off |
15 changes: 8 additions & 7 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
# limitations under the License.
#

import asyncio
import atexit
import functools
import os
import sys
import asyncio
from queue import Queue, Full
from threading import Thread, Event
from queue import Full, Queue
from threading import Event, Thread
from typing import TYPE_CHECKING, Optional

from skywalking import config, plugins
from skywalking import loggings
from skywalking import meter
from skywalking import profile
from skywalking import config, loggings, meter, plugins, profile, sampling
from skywalking.agent.protocol import Protocol, ProtocolAsync
from skywalking.command import command_service, command_service_async
from skywalking.loggings import logger
Expand Down Expand Up @@ -306,6 +303,8 @@ def start(self) -> None:
profile.init()
if config.agent_meter_reporter_active:
meter.init(force=True) # force re-init after fork()
if config.sample_n_per_3_secs > 0:
sampling.init(force=True)

self.__bootstrap() # calls init_threading

Expand Down Expand Up @@ -517,6 +516,8 @@ async def __start_event_loop_async(self) -> None:
if config.agent_meter_reporter_active:
# meter.init(force=True)
await meter.init_async()
if config.sample_n_per_3_secs > 0:
await sampling.init_async()

self.__bootstrap() # gather all coroutines

Expand Down
6 changes: 5 additions & 1 deletion skywalking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import os
import re
import uuid
from typing import List, Pattern
import warnings
from typing import List, Pattern

RE_IGNORE_PATH: Pattern = re.compile('^$')
RE_HTTP_IGNORE_METHOD: Pattern = RE_IGNORE_PATH
Expand Down Expand Up @@ -213,6 +213,10 @@
# The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off
plugin_celery_parameters_length: int = int(os.getenv('SW_PLUGIN_CELERY_PARAMETERS_LENGTH', '512'))

# BEGIN: Sampling Configurations
# The number of samples to take in every 3 seconds, 0 turns off
sample_n_per_3_secs: int = int(os.getenv('SW_SAMPLE_N_PER_3_SECS', '0'))

# THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS!
options = [key for key in globals() if key not in options] # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS!

Expand Down
50 changes: 50 additions & 0 deletions skywalking/sampling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
from typing import Optional


sampling_service = None


def init(force: bool = False):
"""
If the sampling service is not initialized, initialize it.
if force, we are in a fork(), we force re-initialization
"""
from skywalking.sampling.sampling_service import SamplingService
from skywalking.log import logger

global sampling_service
if sampling_service and not force:
return

logger.debug('Initializing sampling service')
sampling_service = SamplingService()
sampling_service.start()


async def init_async(async_event: Optional[asyncio.Event] = None):
from skywalking.sampling.sampling_service import SamplingServiceAsync

global sampling_service

sampling_service = SamplingServiceAsync()
if async_event is not None:
async_event.set()
task = asyncio.create_task(sampling_service.start())
sampling_service.strong_ref_set.add(task)
101 changes: 101 additions & 0 deletions skywalking/sampling/sampling_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from threading import Lock, Thread

import time
from typing import Set
from skywalking import config
from skywalking.log import logger

import asyncio


class SamplingServiceBase:

def __init__(self):
self.sampling_factor = 0

@property
def reset_sampling_factor_interval(self) -> int:
return 3

@property
def can_sampling(self):
return self.sampling_factor < config.sample_n_per_3_secs

def _try_sampling(self) -> bool:
if self.can_sampling:
self._incr_sampling_factor()
return True
logger.debug('%s try_sampling return false, sampling_factor: %d', self.__class__.__name__, self.sampling_factor)
return False

def _set_sampling_factor(self, val: int):
logger.debug('Set sampling factor to %d', val)
self.sampling_factor = val

def _incr_sampling_factor(self):
self.sampling_factor += 1


class SamplingService(Thread, SamplingServiceBase):

def __init__(self):
Thread.__init__(self, name='SamplingService', daemon=True)
SamplingServiceBase.__init__(self)
self.lock = Lock()

def run(self):
logger.debug('Started sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs)
while True:
self.reset_sampling_factor()
time.sleep(self.reset_sampling_factor_interval)

def try_sampling(self) -> bool:
with self.lock:
return super()._try_sampling()

def force_sampled(self) -> None:
with self.lock:
super()._incr_sampling_factor()

def reset_sampling_factor(self) -> None:
with self.lock:
super()._set_sampling_factor(0)


class SamplingServiceAsync(SamplingServiceBase):

def __init__(self):
super().__init__()
self.strong_ref_set: Set[asyncio.Task[None]] = set()

async def start(self):
logger.debug('Started async sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs)
while True:
await self.reset_sampling_factor()
await asyncio.sleep(self.reset_sampling_factor_interval)

def try_sampling(self) -> bool:
return super()._try_sampling()

def force_sampled(self):
super()._incr_sampling_factor()

async def reset_sampling_factor(self):
super()._set_sampling_factor(0)
4 changes: 4 additions & 0 deletions skywalking/trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from skywalking import profile
from skywalking.agent import agent
from skywalking.profile.profile_status import ProfileStatusReference
from skywalking import sampling
from skywalking.trace import ID
from skywalking.trace.carrier import Carrier
from skywalking.trace.segment import Segment, SegmentRef
Expand Down Expand Up @@ -327,4 +328,7 @@ def get_context() -> SpanContext:
if spans:
return spans[-1].context

if sampling.sampling_service and not sampling.sampling_service.try_sampling():
return NoopContext()

return SpanContext()
88 changes: 88 additions & 0 deletions tests/unit/test_sampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest

from skywalking.sampling.sampling_service import SamplingService, SamplingServiceAsync


class TestSampling(unittest.TestCase):

def test_try_sampling(self):
from skywalking import config

config.sample_n_per_3_secs = 2
sampling_service = SamplingService()
assert sampling_service.try_sampling()
assert sampling_service.try_sampling()
assert not sampling_service.try_sampling()

def test_force_sampled(self):

from skywalking import config

config.sample_n_per_3_secs = 1
sampling_service = SamplingService()
assert sampling_service.try_sampling()
sampling_service.force_sampled()
assert sampling_service.sampling_factor == 2

def test_reset_sampling_factor(self):
from skywalking import config

config.sample_n_per_3_secs = 1
sampling_service = SamplingService()
assert sampling_service.try_sampling()
assert not sampling_service.try_sampling()
sampling_service.reset_sampling_factor()
assert sampling_service.try_sampling()


class TestSamplingAsync(unittest.IsolatedAsyncioTestCase):

async def test_try_sampling(self):
from skywalking import config

config.sample_n_per_3_secs = 2
sampling_service = SamplingServiceAsync()
assert sampling_service.try_sampling()
assert sampling_service.try_sampling()
assert not sampling_service.try_sampling()

async def test_force_sampled(self):

from skywalking import config

config.sample_n_per_3_secs = 1
sampling_service = SamplingServiceAsync()
assert sampling_service.try_sampling()
sampling_service.force_sampled()
assert sampling_service.sampling_factor == 2

async def test_reset_sampling_factor(self):
from skywalking import config

config.sample_n_per_3_secs = 1
sampling_service = SamplingServiceAsync()
assert sampling_service.try_sampling()
assert not sampling_service.try_sampling()
await sampling_service.reset_sampling_factor()
assert sampling_service.try_sampling()


if __name__ == '__main__':
unittest.main()

0 comments on commit fc8134f

Please sign in to comment.