-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass the pipeline identifier as part of the data prepper config. #5131
Pass the pipeline identifier as part of the data prepper config. #5131
Conversation
Signed-off-by: Souvik Bose <[email protected]>
@@ -96,8 +95,11 @@ public KinesisService(final KinesisSourceConfig kinesisSourceConfig, | |||
this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); | |||
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion()); | |||
this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); | |||
this.pipelineName = pipelineDescription.getPipelineName(); | |||
this.applicationName = pipelineName; | |||
if (kinesisLeaseConfig.getPipelineIdentifier().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also check for null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dlvenable . I have made the changes.
@@ -53,6 +53,7 @@ public class KinesisSourceTest { | |||
private final String PIPELINE_NAME = "kinesis-pipeline-test"; | |||
private final String streamId = "stream-1"; | |||
private static final String codec_plugin_name = "json"; | |||
private static final String PIPELINE_IDENTIFIER = "sample-kinesis-pipeline-0123"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please generate this as a random string in the @BeforeEach
setUp method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dlvenable . I have made the changes.
@@ -70,6 +70,7 @@ public class KinesisServiceTest { | |||
private final String PIPELINE_NAME = "kinesis-pipeline-test"; | |||
private final String streamId = "stream-1"; | |||
private static final String codec_plugin_name = "json"; | |||
private static final String PIPELINE_IDENTIFIER = "sample-kinesis-pipeline-0123"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please generate this as a random string in the @BeforeEach
setUp method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dlvenable . I have made the changes.
@@ -149,6 +150,7 @@ void setup() { | |||
kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class); | |||
kinesisLeaseConfig = mock(KinesisLeaseConfig.class); | |||
workerIdentifierGenerator = mock(WorkerIdentifierGenerator.class); | |||
when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(PIPELINE_IDENTIFIER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changing all the existing tests. Thus, we are not not sufficiently validating this. We should test the condition fully.
I see two options:
- Make method on
KinesisLeaseConfig
that returns the identifier to use. It can have the conditional that you added inKinesisService. Then you can test that in isolation. Be sure to add
@JsonIgnore` if you take this approach. - Create a new test in here that verifies the behavior with
pipeline_identifier
as well as with nopipeline_identifer
. The former test should also have a pipeline name that is different so you can be sure the correct name is chosen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have created separate tests to test against null, empty pipeline identifiers in the KinesisServiceTest
.
Signed-off-by: Souvik Bose <[email protected]>
7da1135
to
c2e7f03
Compare
Description
This PR is to add a
pipeline_identifier
to the data prepper config to ensure that the multiple pipelines can subscribe to the same KDS stream when using enhanced fan-out strategy. This is optional and by default would use the pipeline name as the identifier for the active subscription.Issues Resolved
Resolves #1082
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.