Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: connection pipeline cache does not shrink #4491

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Conversation

kostasrim
Copy link
Contributor

@kostasrim kostasrim commented Jan 21, 2025

Add test to show that pipeline cache won't shrink once it's filled if clients ping pong between async and sync dispatch

Proves #4461

@kostasrim kostasrim self-assigned this Jan 21, 2025
@kostasrim kostasrim changed the title chore: connection pipeline cache grows without shrinking chore: connection pipeline cache does not shrink Jan 21, 2025
# pipeline_cache_bytes because it recycled too many messages, they won't gradually be released
# if one command (one connection out of `n` connections) dispatches async. Only 1 command out of
# n connections must be dispatched async and the pipeline won't gradually be relesed.
for i in range(30):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cam drain the pipeline cache bytes once we stop dispatching async. But on large pool of connections only one command must dispatch async and then we need to internally reset the counter. If this pattern continues the size of the cache will remain constant and will not be released gradually.

info = await good_client.info()

# Drained
assert info["pipeline_cache_bytes"] == 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drained completely

@@ -316,6 +314,36 @@ QueueBackpressure& GetQueueBackpressure() {

thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;

class PipelineCacheSizePaceMaker {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe PipelineWatermarkTracker

@@ -316,6 +314,36 @@ QueueBackpressure& GetQueueBackpressure() {

thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;

class PipelineCacheSizePaceMaker {
public:
bool WatermarkReached(size_t pipeline_sz) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe CheckAndUpdateWatermark


@dfly_args({"proactor_threads": 1})
async def test_pipeline_cache_size(df_factory):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add some comments on this test

@adiholden
Copy link
Collaborator

The main purpose of the pipeline cache is to reduce number of allocations.
I would like to see in a test that if we have one or several connections running commands in pipeline we utilize the cache in optimal way so that when the commands are executed the cache does not grows and shrinks and grows and shrinks, and when we finish with execution the cache shrinks

@adiholden
Copy link
Collaborator

also lets try to think when does this algorithm does not performs well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants