Skip to content

Commit

Permalink
Limiting network traffic rate for uploading stage. (#48)
Browse files Browse the repository at this point in the history
* Limiting network traffic rate for uploading stage.
  • Loading branch information
MikhailBurdukov committed Aug 29, 2023
1 parent 3440923 commit 00ff3e2
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 1 deletion.
11 changes: 11 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ def _as_seconds(t: str) -> int:
# The maximum number of objects the stage's input queue can hold simultaneously, `0`is unbounded
"queue_size": 10,
},
"rate_limiter": {
# Upper bound of network loading per second on the uploading stage for each worker. (bytes/sec)
# Example: for following conf:
# workers: 4
# max_upload_rate: 100
# Total upload rate = 400 bytes/sec.
# If the value is 0, then the traffic rate is unlimited.
"max_upload_rate": 0,
# The wait time for the next attempt upload the chunk to the storage. The value in seconds.
"retry_interval": 0.01,
},
# Same structure as 'storage' section, but for cloud storage
"cloud_storage": {
"encryption": True,
Expand Down
57 changes: 57 additions & 0 deletions ch_backup/storage/async_pipeline/base_pipeline/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Rate limiter module.
"""
import time
from typing import Callable


class RateLimiter:
"""
Rate limiter based on token bucket algorithm without a separate replenishment process.
"""

def __init__(self, limit_per_sec: int, get_time_func: Callable = time.time):
self._limit_per_sec = limit_per_sec
self._get_time_func = get_time_func
self._bucket_tokens = self._limit_per_sec
self._bucket_last_update = self._get_time_func()

def _replenish_bucket(self):
"""
Replenish the bucket with tokens depending on the time of the last update.
"""
current_time = self._get_time_func()
lapse = current_time - self._bucket_last_update
self._bucket_tokens = min(
self._limit_per_sec, self._bucket_tokens + int(lapse * self._limit_per_sec)
)
self._bucket_last_update = current_time

def extract_tokens(self, desired_quantity):
"""
Extract minimum from available in bucket and wanted number of tokens from the bucket.
"""
if self._limit_per_sec == 0:
return desired_quantity

self._replenish_bucket()
extracted = min(desired_quantity, self._bucket_tokens)

self._bucket_tokens -= extracted
return extracted

def grant(self, tokens=1):
"""
If there's enough tokens in a bucket to grant
requested number of tokens extract them and return True. Otherwise return False.
"""

if self._limit_per_sec == 0:
return True
self._replenish_bucket()

if self._bucket_tokens >= tokens:
self._bucket_tokens -= tokens
return True

return False
10 changes: 10 additions & 0 deletions ch_backup/storage/async_pipeline/pipeline_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DeleteMultipleStorageStage,
DownloadStorageStage,
EncryptStage,
RateLimiterStage,
ReadFileStage,
ReadFilesTarballStage,
StartMultipartUploadStage,
Expand Down Expand Up @@ -135,6 +136,11 @@ def build_uploading_stage(
chunk_size = stage_config["chunk_size"]
queue_size = stage_config["queue_size"]

rate_limiter_config = self._config["rate_limiter"]

max_upload_rate = rate_limiter_config["max_upload_rate"]
retry_interval = rate_limiter_config["retry_interval"]

storage = get_storage_engine(stage_config)

if source_size > chunk_size:
Expand Down Expand Up @@ -164,6 +170,10 @@ def build_uploading_stage(
]

self.append(
thread_flat_map(
RateLimiterStage(max_upload_rate, retry_interval),
maxsize=queue_size,
),
thread_flat_map(ChunkingStage(chunk_size, buffer_size), maxsize=queue_size),
*stages
)
Expand Down
1 change: 1 addition & 0 deletions ch_backup/storage/async_pipeline/stages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
StartMultipartUploadStage,
StorageUploadingStage,
)
from .storage.rate_limiter_stage import RateLimiterStage
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Data rate limiting stage.
"""

import time
from typing import Iterator

from ch_backup.storage.async_pipeline.base_pipeline.handler import Handler
from ch_backup.storage.async_pipeline.base_pipeline.rate_limiter import RateLimiter
from ch_backup.storage.async_pipeline.stages.types import StageType


class RateLimiterStage(Handler):
"""
A bottleneck for controlling the number of data to prevent excessive loading.
Based on tocken bucket algorithm.
"""

stype = StageType.STORAGE

def __init__(
self, traffic_limit_per_sec: int, retry_interval: float = 0.01
) -> None:
self._retry_interval = retry_interval
self._rate_limiter = RateLimiter(limit_per_sec=traffic_limit_per_sec)

def __call__(self, value: bytes, index: int) -> Iterator[bytes]:
while len(value) > 0:
available_tokens = self._rate_limiter.extract_tokens(len(value))

pass_bytes = min(available_tokens, len(value))

yield value[:pass_bytes]

value = value[pass_bytes:]
if len(value) > 0:
time.sleep(self._retry_interval)
3 changes: 2 additions & 1 deletion tests/integration/ch_backup.featureset
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ features/single_table.feature
features/ssl_support.feature
features/table_engines.feature
features/udf_support.feature
features/backup_misc_commands.feature
features/traffic_limiting.feature
features/backup_misc_commands.feature
33 changes: 33 additions & 0 deletions tests/integration/features/traffic_limiting.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
Feature: Backup & Restore sources scenario with traffic limit.

Background:
Given default configuration
And a working s3
And a working zookeeper on zookeeper01
And a working clickhouse on clickhouse01
And clickhouse on clickhouse01 has test schema
And clickhouse01 has test clickhouse data test1

Scenario Outline: Test restore sources set traffic rate limit.
Given ch-backup configuration on clickhouse01
"""
rate_limiter:
max_upload_rate: <rate>
"""
When we create clickhouse01 clickhouse backup
Then we got the following backups on clickhouse01
| num | state | data_count | link_count | title |
| 0 | created | 4 | 0 | shared |

When we restore clickhouse backup #0 to clickhouse02
Given a working clickhouse on clickhouse02
Then we got same clickhouse data at clickhouse01 clickhouse02

Examples:
| rate |
# unlimited
| 0 |
# 5MB
| 5242880 |
# 16 MB
| 16777216 |
70 changes: 70 additions & 0 deletions tests/unit/test_rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Unit test for RateLimiter.
"""
from typing import List

import pytest

from ch_backup.storage.async_pipeline.base_pipeline.rate_limiter import RateLimiter


class TimeMocker:
def __init__(self) -> None:
self._timer = 0.0

def time(self):
return self._timer

def sleep(self, sleep_time: float) -> None:
self._timer = self._timer + sleep_time


@pytest.mark.parametrize(
"data_size, rate, expected_time",
[
# expected_time = divide with round up(data_size,rate) - 1
(0, 0, 0),
(0, 10000, 0),
(1000, 0, 0),
(10, 1, 9),
(1, 10, 0),
(10, 10, 0),
(10, 4, 2),
(123456, 5321, 23),
],
)
def test_rate_limiter_extract(data_size: int, rate: int, expected_time: int) -> None:
timer = TimeMocker()
data = bytes("a" * data_size, encoding="utf-8")
rate_limiter = RateLimiter(rate, timer.time)

while len(data) > 0:
available = rate_limiter.extract_tokens(len(data))
data = data[available:]
if len(data) > 0:
timer.sleep(1)

assert timer.time() == expected_time


@pytest.mark.parametrize(
"chunks_sizes, rate, expected_time",
[
([100, 123, 531, 1], 0, 0),
([1], 1, 0),
([1, 1, 1], 2, 1),
([1, 2, 2, 1], 2, 3),
([10, 1, 9, 2, 11], 11, 2),
([1, 2, 1, 2, 3, 1, 1, 1], 3, 3),
],
)
def test_rate_limiter_grand(
chunks_sizes: List[int], rate: int, expected_time: int
) -> None:
timer = TimeMocker()

rate_limiter = RateLimiter(rate, timer.time)
for chunk_size in chunks_sizes:
while not rate_limiter.grant(chunk_size):
timer.sleep(1)
assert timer.time() == expected_time

0 comments on commit 00ff3e2

Please sign in to comment.