Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core]Support async lookup in hash store #4423

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

neuyilan
Copy link
Member

@neuyilan neuyilan commented Oct 31, 2024

Currently, lookup join does not really support asynchronous, and the purpose of this PR is to support asynchronous lookup join.

This pr only support the hash store async lookup, in the next pr, I will do the rocksdb store async lookup.

@neuyilan neuyilan changed the title [Core]support async lookup in hash store [Core]Support async lookup in hash store Oct 31, 2024
@neuyilan neuyilan marked this pull request as draft October 31, 2024 12:44
@neuyilan neuyilan marked this pull request as ready for review October 31, 2024 13:06
@neuyilan neuyilan closed this Oct 31, 2024
@neuyilan neuyilan reopened this Oct 31, 2024
@neuyilan
Copy link
Member Author

neuyilan commented Nov 1, 2024

@JingsongLi PTAL, thanks

@JingsongLi JingsongLi closed this Nov 1, 2024
@JingsongLi JingsongLi reopened this Nov 1, 2024
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @neuyilan , I took a rough look and found that there are many thread safety risks in certain areas. Can we go back to this requirement? Is it necessary for us to do multi-threaded access? Is this effective? Why not increase Flink's parallelism?

}

private synchronized Triple<BinaryRow, Integer, InternalRow> extractPartitionAndBucket(
InternalRow key) {
InternalRow adjustedKey = key;
if (keyRearrange != null) {
adjustedKey = keyRearrange.replaceRow(adjustedKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is reused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have encapsulated these as a separate function called extractPartartitionAndBucket. And it has been declared as synchronized, so there will be no thread safety issues.

}

private synchronized Triple<BinaryRow, Integer, InternalRow> extractPartitionAndBucket(
InternalRow key) {
InternalRow adjustedKey = key;
if (keyRearrange != null) {
adjustedKey = keyRearrange.replaceRow(adjustedKey);
}
extractor.setRecord(adjustedKey);
int bucket = extractor.bucket();
BinaryRow partition = extractor.partition();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is reused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

InternalRow adjustedKey = key;
if (keyRearrange != null) {
adjustedKey = keyRearrange.replaceRow(adjustedKey);
}
extractor.setRecord(adjustedKey);
int bucket = extractor.bucket();
BinaryRow partition = extractor.partition();

InternalRow trimmedKey = key;
if (trimmedKeyRearrange != null) {
trimmedKey = trimmedKeyRearrange.replaceRow(trimmedKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is reused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as above

@neuyilan
Copy link
Member Author

neuyilan commented Nov 4, 2024

Hi @neuyilan , I took a rough look and found that there are many thread safety risks in certain areas. Can we go back to this requirement? Is it necessary for us to do multi-threaded access? Is this effective? Why not increase Flink's parallelism?

Yes, there are too many thread safety issues in the current design. So essentially, access is synchronous. Even if 'lookup. sync' is set to true in flink connector, it does not have any acceleration effect.

I think supporting asynchronous multi-threaded access is necessary. And in our testing, it was effective. Asynchronous multi-threaded access has the following benefits compared to increasing concurrency in Flink:

  1. Reduce the usage of memory resources. Each Flink subtask will occupy additional memory; The more sub tasks there are, the more memory will be occupied. Assuming one 4G TM. In our scenario, if asynchronous multi-threaded access is enabled, performance can be improved by approximately 7 times. This requires 7 TMs to achieve, but these 7 TMs will occupy an additional 4 * 7G of memory. This is even more useful in elastic resources. In situations where memory resources are a bottleneck, my job has a hard limit on memory, allowing only a maximum of 4G memory to be used (managed by Yarn), but the CPU can be exceeded (cgroup soft limit). So this feature can be used to enable asynchronous multi-threaded access, accelerating performance without adding additional resources. No additional costs and increase efficiency.

  2. Reduce cache disk usage, as currently cached data is exclusive to each task. If multi-threaded access can be utilized within a task, it can reduce the cache disk usage.

@neuyilan
Copy link
Member Author

neuyilan commented Nov 6, 2024

Hi, @JingsongLi Could you please review it again.

@neuyilan neuyilan requested a review from JingsongLi November 6, 2024 08:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants