-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limiting network traffic rate for uploading stage. #48
Conversation
ch_backup/config.py
Outdated
@@ -101,6 +101,11 @@ def _as_seconds(t: str) -> int: | |||
"uploading_threads": 4, | |||
# The maximum number of objects the stage's input queue can hold simultaneously, `0`is unbounded | |||
"queue_size": 10, | |||
# Upper bound of network loading per second on the uploading stage. (bytes/sec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It is better to move this settings in a sub-dict under a key
"rate_limiter": {...}
- Let's use human readable notation like
1Mib
. Checkhumanfriendly.parse_size()
- How about renaming
uploading_traffic_limit_retry_time
toretry_interval
anduploading_traffic_limit
tomax_upload_rate
@@ -134,7 +135,8 @@ def build_uploading_stage( | |||
buffer_size = stage_config["buffer_size"] | |||
chunk_size = stage_config["chunk_size"] | |||
queue_size = stage_config["queue_size"] | |||
|
|||
traffic_limit_per_sec = stage_config["uploading_traffic_limit"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also suggest renaming in according to key names
@@ -152,6 +154,12 @@ def build_uploading_stage( | |||
), | |||
maxsize=queue_size, | |||
), | |||
thread_map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it must be before ChunkingStage
which is set first for now.
ch-backup/ch_backup/storage/async_pipeline/pipeline_builder.py
Lines 174 to 177 in aa1e6ac
self.append( | |
thread_flat_map(ChunkingStage(chunk_size, buffer_size), maxsize=queue_size), | |
*stages | |
) |
@@ -83,6 +85,36 @@ def __call__(self, part: UploadingPart, index: int) -> UploadingPart: | |||
return part | |||
|
|||
|
|||
class TrafficLimitingStage(Handler): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets rename to RateLimiterStage
. I think we can limit not only traffic by means of it.
get_time_func: Callable = time.time, | ||
sleep_func: Callable = time.sleep, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this parameters here. But only in RateLimiter
class.
self._sleep_func = sleep_func | ||
|
||
def __call__(self, part: UploadingPart, index: int) -> UploadingPart: | ||
while not self._rate_limiter.grant(len(part.data)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would propose to return here count of bytes(tokens) that can pass through without exceeding the limit. And pass these bytes further. Pay attention that it will be just bytes, not UploadingPart
which is created only by StartMultipartUploadStage
ch_backup/util.py
Outdated
if self._limit_per_sec == 0: | ||
return True | ||
current_time = self._get_time_func() | ||
lapse = current_time - self._bucket_last_update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add comments to this code. Like replenish bucket
and so on. Or move it to a separate function self._replenish_bucket()
ch_backup/util.py
Outdated
self._bucket_tokens = self._limit_per_sec | ||
self._bucket_last_update = self._get_time_func() | ||
|
||
def grant(self, tokens=1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add function that extracts all available tokens. We can use it in RateLimiterStage
instead of grant()
. But let grant()
stay because it can be useful.
ch_backup/util.py
Outdated
) | ||
if self._bucket_tokens >= tokens: | ||
self._bucket_tokens -= tokens | ||
self._bucket_last_update = current_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think last_update
must be updated after every replenish
ch_backup/util.py
Outdated
@@ -371,6 +371,38 @@ def dataclass_from_dict(type_: Type[T], data: dict) -> T: | |||
return type_(**{k: v for k, v in data.items() if k in class_fields}) # type: ignore[call-arg] | |||
|
|||
|
|||
class RateLimiter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this class into a separate import module from util.py
And clickhouse01 has test clickhouse data test1 | ||
And we have executed queries on clickhouse01 | ||
""" | ||
CREATE USER test_user IDENTIFIED WITH plaintext_password BY 'password'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's test only data table without ACl entites. In my opinion they are complicating test. SImplify as far as possible
Given ch-backup configuration on clickhouse01 | ||
""" | ||
storage: | ||
uploading_traffic_limit_retry_time: 33554432 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use Examples:
clause to parametrize these both tests to one. Have a look at deduplication.feature
for example.
tests/unit/test_rate_limiter.py
Outdated
chunk_size=st.just(16), | ||
buffer_size=st.just(128), | ||
) | ||
def test_traffic_rate_limiting(content_size, chunk_size, buffer_size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's write comprehensive unit tests only for RateLimiter
class.
Testing on pipeline level looks cumbersome and overloaded with irrilevant details.
@aalexfvk, Fixed all requested changes in the last commit. |
No description provided.