Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion] Introduce the new pull-based ingestion engine, APIs, and Kafka plugin #16958

Open
wants to merge 36 commits into
base: main
Choose a base branch
from

Conversation

yupeng9
Copy link

@yupeng9 yupeng9 commented Jan 6, 2025

Description

This PR implements the basics of the pull-based ingestion described in this RFC, including:

  1. The APIs for the pull-based ingestion source
  2. A Kafka plugin that implements the ingestion source API
  3. A new IngestionEngine that pulls data from the ingestion sources

Currently WIP, and there are a few improvements to make and test coverage to increase

Related Issues

Resolves #16927 #16929 #16928

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing labels Jan 6, 2025
Copy link
Contributor

github-actions bot commented Jan 6, 2025

❌ Gradle check result for 16dd9d0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious how would the FGAC security model work, espl with security plugin which intercepts transport actions to validate if authorised users can perform bulk actions on certain indices. Is the intent to handle permissions at a Kafka "partition level"
Another aspect is maintaining Kafka checkpoints durably, I'm yet to read that part but would be good to understand how are we handling fail overs and recoveries

Signed-off-by: Yupeng Fu <[email protected]>
Copy link
Contributor

❌ Gradle check result for 88e27f6: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for cc41f8c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for a940910: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Yupeng Fu <[email protected]>
Copy link
Contributor

✅ Gradle check result for 8e7af9c: SUCCESS

Copy link

codecov bot commented Jan 19, 2025

Codecov Report

Attention: Patch coverage is 59.49367% with 320 lines in your changes missing coverage. Please review.

Project coverage is 72.19%. Comparing base (13159c1) to head (4e0a4fa).
Report is 14 commits behind head on main.

Files with missing lines Patch % Lines
...a/org/opensearch/index/engine/IngestionEngine.java 43.10% 208 Missing and 23 partials ⚠️
...rch/indices/pollingingest/DefaultStreamPoller.java 84.76% 11 Missing and 5 partials ⚠️
...ndices/pollingingest/MessageProcessorRunnable.java 80.55% 10 Missing and 4 partials ⚠️
...in/java/org/opensearch/indices/IndicesService.java 7.69% 11 Missing and 1 partial ⚠️
.../java/org/opensearch/plugin/kafka/KafkaOffset.java 61.53% 5 Missing and 5 partials ⚠️
...pensearch/plugin/kafka/KafkaPartitionConsumer.java 87.71% 5 Missing and 2 partials ⚠️
...g/opensearch/cluster/metadata/IngestionSource.java 66.66% 2 Missing and 4 partials ⚠️
...in/java/org/opensearch/index/shard/IndexShard.java 14.28% 2 Missing and 4 partials ⚠️
.../indices/pollingingest/IngestionEngineFactory.java 0.00% 6 Missing ⚠️
...org/opensearch/cluster/metadata/IndexMetadata.java 78.26% 2 Missing and 3 partials ⚠️
... and 4 more
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #16958      +/-   ##
============================================
- Coverage     72.31%   72.19%   -0.13%     
- Complexity    65346    65449     +103     
============================================
  Files          5301     5314      +13     
  Lines        303805   304592     +787     
  Branches      44030    44126      +96     
============================================
+ Hits         219702   219902     +200     
- Misses        66055    66663     +608     
+ Partials      18048    18027      -21     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

❌ Gradle check result for 7a4f25d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 01126c5: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Yupeng Fu <[email protected]>
Copy link
Contributor

✅ Gradle check result for 4e0a4fa: SUCCESS

@@ -194,4 +194,7 @@ grant {
permission java.io.FilePermission "/sys/fs/cgroup/cpuacct/-", "read";
permission java.io.FilePermission "/sys/fs/cgroup/memory", "read";
permission java.io.FilePermission "/sys/fs/cgroup/memory/-", "read";

// allow awaitility in tests
permission java.lang.RuntimePermission "setDefaultUncaughtExceptionHandler";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be limited to test-framework.policy in the same pkg?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed by IngestionEngineTests in server package.

i tried this, but it does not work

grant codeBase "${codebase.awaitility}" {
  permission java.lang.RuntimePermission "setDefaultUncaughtExceptionHandler";
};

any thoughts?

Copy link
Contributor

❌ Gradle check result for a756d18: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 47082b3: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 3d013c7: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 3d013c7: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Yupeng Fu <[email protected]>
Copy link
Contributor

❌ Gradle check result for e53e792: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Pull-based ingestion source APIs
7 participants