Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass the pipeline identifier as part of the data prepper config. #5131

Merged
merged 2 commits into from
Nov 7, 2024

Conversation

sb2k16
Copy link
Member

@sb2k16 sb2k16 commented Oct 29, 2024

Description

This PR is to add a pipeline_identifier to the data prepper config to ensure that the multiple pipelines can subscribe to the same KDS stream when using enhanced fan-out strategy. This is optional and by default would use the pipeline name as the identifier for the active subscription.

Issues Resolved

Resolves #1082

Check List

  • New functionality includes testing.
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@@ -96,8 +95,11 @@ public KinesisService(final KinesisSourceConfig kinesisSourceConfig,
this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
if (kinesisLeaseConfig.getPipelineIdentifier().isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also check for null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I have made the changes.

@@ -53,6 +53,7 @@ public class KinesisSourceTest {
private final String PIPELINE_NAME = "kinesis-pipeline-test";
private final String streamId = "stream-1";
private static final String codec_plugin_name = "json";
private static final String PIPELINE_IDENTIFIER = "sample-kinesis-pipeline-0123";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please generate this as a random string in the @BeforeEach setUp method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I have made the changes.

@@ -70,6 +70,7 @@ public class KinesisServiceTest {
private final String PIPELINE_NAME = "kinesis-pipeline-test";
private final String streamId = "stream-1";
private static final String codec_plugin_name = "json";
private static final String PIPELINE_IDENTIFIER = "sample-kinesis-pipeline-0123";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please generate this as a random string in the @BeforeEach setUp method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I have made the changes.

@@ -149,6 +150,7 @@ void setup() {
kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class);
kinesisLeaseConfig = mock(KinesisLeaseConfig.class);
workerIdentifierGenerator = mock(WorkerIdentifierGenerator.class);
when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(PIPELINE_IDENTIFIER);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is changing all the existing tests. Thus, we are not not sufficiently validating this. We should test the condition fully.

I see two options:

  1. Make method on KinesisLeaseConfig that returns the identifier to use. It can have the conditional that you added in KinesisService. Then you can test that in isolation. Be sure to add @JsonIgnore` if you take this approach.
  2. Create a new test in here that verifies the behavior with pipeline_identifier as well as with no pipeline_identifer. The former test should also have a pipeline name that is different so you can be sure the correct name is chosen.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created separate tests to test against null, empty pipeline identifiers in the KinesisServiceTest.

@sb2k16 sb2k16 force-pushed the kds-pipeline-identifier branch from 7da1135 to c2e7f03 Compare October 30, 2024 23:36
@kkondaka kkondaka merged commit 8fd743d into opensearch-project:main Nov 7, 2024
47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support AWS Kinesis Data Streams as a Source
4 participants