Skip to content

Commit 7c48e84

Browse files
committed
Feat sampling service
Signed-off-by: Tsonglew <tsonglew@gmail.com>
1 parent 1ce6910 commit 7c48e84

File tree

11 files changed

+271
-12
lines changed

11 files changed

+271
-12
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
## Change Logs
22

3+
### 1.1.1
4+
5+
- Feature:
6+
- Users now can specify the `SW_SAMPLE_N_PER_3_SECS` environment variable to control the sampling rate (#357)
7+
38
### 1.1.0
49

510
- Feature:

docs/en/contribution/CodingStyle.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Since Python 3.5 is end of life, we fully utilize the clarity and performance bo
66
Please do not use other styles - `+`, `%` or `.format` unless f-string is absolutely unfeasible in the context, or
77
it is a logger message, which is [optimized](https://docs.python.org/3/howto/logging.html#optimization) for the `%` style
88

9-
Run `make dev-fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner
9+
Run `make fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner
1010
cases leading to a semantically different conversion.
1111

1212
### Quotes
@@ -23,7 +23,7 @@ foo = f"I'm a string"
2323
bar = f"This repo is called 'skywalking-python'"
2424
```
2525

26-
Run `make dev-fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it.
26+
Run `make fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it.
2727

2828
## Debug messages
2929
Please import the `logger_debug_enabled` variable and wrap your debug messages with a check.

docs/en/setup/Configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,7 @@ export SW_AGENT_YourConfiguration=YourValue
9797
| plugin_fastapi_collect_http_params | SW_PLUGIN_FASTAPI_COLLECT_HTTP_PARAMS | <class 'bool'> | False | This config item controls that whether the FastAPI plugin should collect the parameters of the request. |
9898
| plugin_bottle_collect_http_params | SW_PLUGIN_BOTTLE_COLLECT_HTTP_PARAMS | <class 'bool'> | False | This config item controls that whether the Bottle plugin should collect the parameters of the request. |
9999
| plugin_celery_parameters_length | SW_PLUGIN_CELERY_PARAMETERS_LENGTH | <class 'int'> | 512 | The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off |
100+
### Sampling Configurations
101+
| Configuration | Environment Variable | Type | Default Value | Description |
102+
| :------------ | :------------ | :------------ | :------------ | :------------ |
103+
| sample_n_per_3_secs | SW_SAMPLE_N_PER_3_SECS | <class 'int'> | -1 | The number of samples to take in every 3 seconds |

skywalking/agent/__init__.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,16 @@
1515
# limitations under the License.
1616
#
1717

18+
import asyncio
1819
import atexit
1920
import functools
2021
import os
2122
import sys
22-
import asyncio
23-
from queue import Queue, Full
24-
from threading import Thread, Event
23+
from queue import Full, Queue
24+
from threading import Event, Thread
2525
from typing import TYPE_CHECKING, Optional
2626

27-
from skywalking import config, plugins
28-
from skywalking import loggings
29-
from skywalking import meter
30-
from skywalking import profile
27+
from skywalking import config, loggings, meter, plugins, profile, sampling
3128
from skywalking.agent.protocol import Protocol, ProtocolAsync
3229
from skywalking.command import command_service, command_service_async
3330
from skywalking.loggings import logger
@@ -306,6 +303,7 @@ def start(self) -> None:
306303
profile.init()
307304
if config.agent_meter_reporter_active:
308305
meter.init(force=True) # force re-init after fork()
306+
sampling.init(force=True)
309307

310308
self.__bootstrap() # calls init_threading
311309

@@ -517,6 +515,7 @@ async def __start_event_loop_async(self) -> None:
517515
if config.agent_meter_reporter_active:
518516
# meter.init(force=True)
519517
await meter.init_async()
518+
await sampling.init_async()
520519

521520
self.__bootstrap() # gather all coroutines
522521

skywalking/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import os
3333
import re
3434
import uuid
35-
from typing import List, Pattern
3635
import warnings
36+
from typing import List, Pattern
3737

3838
RE_IGNORE_PATH: Pattern = re.compile('^$')
3939
RE_HTTP_IGNORE_METHOD: Pattern = RE_IGNORE_PATH
@@ -213,6 +213,10 @@
213213
# The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off
214214
plugin_celery_parameters_length: int = int(os.getenv('SW_PLUGIN_CELERY_PARAMETERS_LENGTH', '512'))
215215

216+
# BEGIN: Sampling Configurations
217+
# The number of samples to take in every 3 seconds
218+
sample_n_per_3_secs: int = int(os.getenv('SW_SAMPLE_N_PER_3_SECS', '-1'))
219+
216220
# THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS!
217221
options = [key for key in globals() if key not in options] # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS!
218222

skywalking/meter/pvm/data_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ def register(self):
2323
for name in dir(self):
2424
if name.endswith('generator'):
2525
generator = getattr(self, name)()
26-
Gauge.Builder('instance_pvm_' + name[:-10], generator).build()
26+
Gauge.Builder(f"instance_pvm_{name[:-10]}", generator).build()

skywalking/sampling/__init__.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
import asyncio
18+
from typing import Optional
19+
20+
21+
sampling_service = None
22+
23+
24+
def init(force: bool = False):
25+
"""
26+
If the sampling service is not initialized, initialize it.
27+
if force, we are in a fork(), we force re-initialization
28+
"""
29+
from skywalking.sampling.sampling_service import SamplingService
30+
from skywalking.log import logger
31+
32+
global sampling_service
33+
if sampling_service and not force:
34+
return
35+
36+
logger.debug('Initializing sampling service')
37+
sampling_service = SamplingService()
38+
sampling_service.start()
39+
40+
async def init_async(async_event: Optional[asyncio.Event] = None):
41+
from skywalking.sampling.sampling_service import SamplingServiceAsync
42+
43+
global sampling_service
44+
45+
sampling_service = SamplingServiceAsync()
46+
if async_event is not None:
47+
async_event.set()
48+
task = asyncio.create_task(sampling_service.start())
49+
sampling_service.strong_ref_set.add(task)
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from threading import Lock, Thread
19+
20+
import time
21+
from typing import Set
22+
from skywalking import config
23+
from skywalking.log import logger
24+
25+
import asyncio
26+
27+
class SamplingServiceBase:
28+
29+
def __init__(self):
30+
self.sampling_factor = 0
31+
32+
@property
33+
def reset_sampling_factor_interval(self) -> int:
34+
return 3
35+
36+
@property
37+
def on(self):
38+
return config.sample_n_per_3_secs >= 0
39+
40+
@property
41+
def can_sampling(self):
42+
return self.sampling_factor < config.sample_n_per_3_secs
43+
44+
def _try_sampling(self) -> bool:
45+
if not self.on or self.can_sampling:
46+
self._incr_sampling_factor()
47+
return True
48+
logger.debug('%s try_sampling return false, sampling_factor: %d', self.__class__.__name__, self.sampling_factor)
49+
return False
50+
51+
def _set_sampling_factor(self, val: int):
52+
logger.debug('Set sampling factor to %d', val)
53+
self.sampling_factor = val
54+
55+
def _incr_sampling_factor(self):
56+
self.sampling_factor += 1
57+
58+
59+
class SamplingService(Thread, SamplingServiceBase):
60+
61+
def __init__(self):
62+
Thread.__init__(self, name='SamplingService', daemon=True)
63+
SamplingServiceBase.__init__(self)
64+
self.lock = Lock()
65+
66+
def run(self):
67+
logger.debug('Started sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs)
68+
while True:
69+
if self.on:
70+
self.reset_sampling_factor()
71+
time.sleep(3)
72+
73+
def try_sampling(self) -> bool:
74+
with self.lock:
75+
return super()._try_sampling()
76+
77+
def force_sampled(self) -> None:
78+
with self.lock:
79+
super()._incr_sampling_factor()
80+
81+
def reset_sampling_factor(self) -> None:
82+
with self.lock:
83+
super()._set_sampling_factor(0)
84+
85+
86+
class SamplingServiceAsync(SamplingServiceBase):
87+
88+
def __init__(self):
89+
super().__init__()
90+
self.strong_ref_set: Set[asyncio.Task[None]] = set()
91+
92+
async def start(self):
93+
logger.debug('Started async sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs)
94+
while True:
95+
if self.on:
96+
await self.reset_sampling_factor()
97+
await asyncio.sleep(self.reset_sampling_factor_interval)
98+
99+
def try_sampling(self) -> bool:
100+
return super()._try_sampling()
101+
102+
def force_sampled(self):
103+
super()._incr_sampling_factor()
104+
105+
async def reset_sampling_factor(self):
106+
super()._set_sampling_factor(0)

skywalking/trace/context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from skywalking import profile
2121
from skywalking.agent import agent
2222
from skywalking.profile.profile_status import ProfileStatusReference
23+
from skywalking import sampling
2324
from skywalking.trace import ID
2425
from skywalking.trace.carrier import Carrier
2526
from skywalking.trace.segment import Segment, SegmentRef
@@ -327,4 +328,7 @@ def get_context() -> SpanContext:
327328
if spans:
328329
return spans[-1].context
329330

331+
if sampling.sampling_service and not sampling.sampling_service.try_sampling():
332+
return NoopContext()
333+
330334
return SpanContext()

tests/unit/test_sampling.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
19+
import unittest
20+
21+
from skywalking.sampling.sampling_service import SamplingService, SamplingServiceAsync
22+
23+
24+
class TestSampling(unittest.TestCase):
25+
26+
def test_try_sampling(self):
27+
from skywalking import config
28+
29+
config.sample_n_per_3_secs = 2
30+
sampling_service = SamplingService()
31+
assert sampling_service.try_sampling()
32+
assert sampling_service.try_sampling()
33+
assert not sampling_service.try_sampling()
34+
35+
def test_force_sampled(self):
36+
37+
from skywalking import config
38+
39+
config.sample_n_per_3_secs = 1
40+
sampling_service = SamplingService()
41+
assert sampling_service.try_sampling()
42+
sampling_service.force_sampled()
43+
assert sampling_service.sampling_factor == 2
44+
45+
def test_reset_sampling_factor(self):
46+
from skywalking import config
47+
48+
config.sample_n_per_3_secs = 1
49+
sampling_service = SamplingService()
50+
assert sampling_service.try_sampling()
51+
assert not sampling_service.try_sampling()
52+
sampling_service.reset_sampling_factor()
53+
assert sampling_service.try_sampling()
54+
55+
class TestSamplingAsync(unittest.IsolatedAsyncioTestCase):
56+
57+
async def test_try_sampling(self):
58+
from skywalking import config
59+
60+
config.sample_n_per_3_secs = 2
61+
sampling_service = SamplingServiceAsync()
62+
assert sampling_service.try_sampling()
63+
assert sampling_service.try_sampling()
64+
assert not sampling_service.try_sampling()
65+
66+
async def test_force_sampled(self):
67+
68+
from skywalking import config
69+
70+
config.sample_n_per_3_secs = 1
71+
sampling_service = SamplingServiceAsync()
72+
assert sampling_service.try_sampling()
73+
sampling_service.force_sampled()
74+
assert sampling_service.sampling_factor == 2
75+
76+
async def test_reset_sampling_factor(self):
77+
from skywalking import config
78+
79+
config.sample_n_per_3_secs = 1
80+
sampling_service = SamplingServiceAsync()
81+
assert sampling_service.try_sampling()
82+
assert not sampling_service.try_sampling()
83+
await sampling_service.reset_sampling_factor()
84+
assert sampling_service.try_sampling()
85+
86+
87+
if __name__ == '__main__':
88+
unittest.main()

0 commit comments

Comments
 (0)