From e7d1b1975f375a9fdbfc9647fb7eeaccc5ac3ca5 Mon Sep 17 00:00:00 2001 From: Jordan Johnson-Doyle Date: Sat, 17 Nov 2018 19:57:55 +0000 Subject: [PATCH] Allow custom KCL config values to be passed --- README.md | 26 +++++++++++ lib/logstash/inputs/kinesis.rb | 56 +++++++++++++++++++---- spec/inputs/kinesis_spec.rb | 82 ++++++++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index f6b3087..ec9d6a7 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,32 @@ This are the properties you can configure and what are the default values: * **required**: false * **default value**: `"TRIM_HORIZON"` +### Additional KCL Settings + +Each configuration value defined in the KCL config files given below can be passed as snake_case, for example to set `initialLeaseTableReadCapacity` in `LeaseManagementConfig` to 30 the following configuration block could be used: `"lease_management_additional_settings" => { "initial_lease_table_read_capacity" => 30 }` + +* `checkpoint_additional_settings`: Configuration values to set in [CheckpointConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java). + * **required**: false + * **default value**: `{}` +* `coordinator_additional_settings`: Configuration values to set in [CoordinatorConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java). + * **required**: false + * **default value**: `{}` +* `lease_management_additional_settings`: Configuration values to set in [LeaseManagementConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java). + * **required**: false + * **default value**: `{}` +* `lifecycle_additional_settings`: Configuration values to set in [LifecycleConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java). + * **required**: false + * **default value**: `{}` +* `metrics_additional_settings`: Configuration values to set in [MetricsConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java). + * **required**: false + * **default value**: `{}` +* `retrieval_additional_settings`: Configuration values to set in [RetrievalConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java). + * **required**: false + * **default value**: `{}` +* `processor_additional_settings`: Configuration values to set in [ProcessorConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorConfig.java). + * **required**: false + * **default value**: `{}` + ## Authentication This plugin uses the default AWS SDK auth chain, [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html), to determine which credentials the client will use, unless `profile` is set, in which case [ProfileCredentialsProvider](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/profile/ProfileCredentialsProvider.html) is used. diff --git a/lib/logstash/inputs/kinesis.rb b/lib/logstash/inputs/kinesis.rb index c49ed15..18eb23a 100644 --- a/lib/logstash/inputs/kinesis.rb +++ b/lib/logstash/inputs/kinesis.rb @@ -43,10 +43,14 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base attr_reader( :kcl_config, + :kcl_worker, + + :checkpoint_config, + :coordinator_config, + :lease_management_config, + :lifecycle_config, :metrics_config, :retrieval_config, - :lease_management_config, - :kcl_worker, ) # The application name used for the dynamodb coordination table. Must be @@ -72,6 +76,27 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base # Select initial_position_in_stream. Accepts TRIM_HORIZON or LATEST config :initial_position_in_stream, :validate => ["TRIM_HORIZON", "LATEST"], :default => "TRIM_HORIZON" + # Any additional arbitrary kcl options configurable in the CheckpointConfig + config :checkpoint_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the CoordinatorConfig + config :coordinator_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the LeaseManagementConfig + config :lease_management_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the LifecycleConfig + config :lifecycle_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the MetricsConfig + config :metrics_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the RetrievalConfig + config :retrieval_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the ProcessorConfig + config :processor_additional_settings, :validate => :hash, :default => {} + def initialize(params = {}) super(params) end @@ -112,13 +137,30 @@ def register worker_factory([]) ) + @checkpoint_config = send_additional_settings(@kcl_config.checkpoint_config, @checkpoint_additional_settings) + + @coordinator_config = send_additional_settings(@kcl_config.coordinator_config, @coordinator_additional_settings) + + @lifecycle_config = send_additional_settings(@kcl_config.lifecycle_config, @lifecycle_additional_settings) + @metrics_config = @kcl_config.metrics_config.metrics_factory(metrics_factory) + @metrics_config = send_additional_settings(@metrics_config, @metrics_additional_settings) @retrieval_config = @kcl_config.retrieval_config. initial_position_in_stream_extended(software.amazon.kinesis.common.InitialPositionInStreamExtended.new_initial_position(initial_position_in_stream)) + @retrieval_config = send_additional_settings(@retrieval_config, @retrieval_additional_settings) @lease_management_config = @kcl_config.lease_management_config. failover_time_millis(@checkpoint_interval_seconds * 1000 * 3) + @lease_management_config = send_additional_settings(@lease_management_config, @lease_management_additional_settings) + end + + def send_additional_settings(obj, options) + options.each do |key, value| + obj = obj.send(key, value) + end + + obj end def run(output_queue) @@ -128,14 +170,12 @@ def run(output_queue) def kcl_builder(output_queue) Scheduler.new( - @kcl_config.checkpoint_config, - @kcl_config.coordinator_config, + @checkpoint_config, + @coordinator_config, @lease_management_config, - @kcl_config.lifecycle_config, + @lifecycle_config, @metrics_config, - # checkpointing is done on processRecords so we need Kinesis to always call us so we don't lose this shard - # even when we're active - ProcessorConfig.new(worker_factory(output_queue)).call_process_records_even_for_empty_record_list(true), + send_additional_settings(ProcessorConfig.new(worker_factory(output_queue)), @processor_additional_settings), @retrieval_config ) end diff --git a/spec/inputs/kinesis_spec.rb b/spec/inputs/kinesis_spec.rb index 2e9d696..e11c3bf 100644 --- a/spec/inputs/kinesis_spec.rb +++ b/spec/inputs/kinesis_spec.rb @@ -38,6 +38,64 @@ "initial_position_in_stream" => "LATEST" }} + # Config hash to test valid additional_settings + let(:config_with_valid_additional_settings) {{ + "application_name" => "my-processor", + "kinesis_stream_name" => "run-specs", + "codec" => codec, + "metrics" => metrics, + "checkpoint_interval_seconds" => 120, + "region" => "ap-southeast-1", + "profile" => nil, + "coordinator_additional_settings" => { + "max_initialization_attempts" => 2 + }, + "lifecycle_additional_settings" => { + "task_backoff_time_millis" => 20 + }, + "lease_management_additional_settings" => { + "initial_lease_table_read_capacity" => 25, + "initial_lease_table_write_capacity" => 100, + }, + "metrics_additional_settings" => { + "metrics_max_queue_size" => 20000 + }, + "retrieval_additional_settings" => { + "list_shards_backoff_time_in_millis" => 3000 + }, + "processor_additional_settings" => { + "call_process_records_even_for_empty_record_list" => true + } + }} + + # Config hash to test invalid additional_settings where the name is not found + let(:config_with_invalid_additional_settings_name_not_found) {{ + "application_name" => "my-processor", + "kinesis_stream_name" => "run-specs", + "codec" => codec, + "metrics" => metrics, + "checkpoint_interval_seconds" => 120, + "region" => "ap-southeast-1", + "profile" => nil, + "lease_management_additional_settings" => { + "foo" => "bar" + } + }} + + # Config hash to test invalid additional_settings where the type is complex or wrong + let(:config_with_invalid_additional_settings_wrong_type) {{ + "application_name" => "my-processor", + "kinesis_stream_name" => "run-specs", + "codec" => codec, + "metrics" => metrics, + "checkpoint_interval_seconds" => 120, + "region" => "ap-southeast-1", + "profile" => nil, + "coordinator_additional_settings" => { + "max_initialization_attempts" => "invalid_init_attempts" + } + }} + subject!(:kinesis) { LogStash::Inputs::Kinesis.new(config) } let(:kcl_worker) { double('kcl_worker') } let(:metrics) { nil } @@ -76,6 +134,30 @@ assert_credentials_provider.call kinesis.kcl_config.kinesisClient end + + subject!(:kinesis_with_valid_additional_settings) { LogStash::Inputs::Kinesis.new(config_with_valid_additional_settings) } + it "configures the KCL" do + kinesis_with_valid_additional_settings.register + expect(kinesis_with_valid_additional_settings.coordinator_config.applicationName).to eq("my-processor") + expect(kinesis_with_valid_additional_settings.retrieval_config.streamName).to eq("run-specs") + expect(kinesis_with_valid_additional_settings.coordinator_config.maxInitializationAttempts).to eq(2) + expect(kinesis_with_valid_additional_settings.lifecycle_config.taskBackoffTimeMillis).to eq(20) + expect(kinesis_with_valid_additional_settings.lease_management_config.initialLeaseTableReadCapacity).to eq(25) + expect(kinesis_with_valid_additional_settings.lease_management_config.initialLeaseTableWriteCapacity).to eq(100) + expect(kinesis_with_valid_additional_settings.metrics_config.metricsMaxQueueSize).to eq(20000) + expect(kinesis_with_valid_additional_settings.retrieval_config.listShardsBackoffTimeInMillis).to eq(3000) + end + + subject!(:kinesis_with_invalid_additional_settings_name_not_found) { LogStash::Inputs::Kinesis.new(config_with_invalid_additional_settings_name_not_found) } + it "raises NoMethodError for invalid configuration options" do + expect{ kinesis_with_invalid_additional_settings_name_not_found.register }.to raise_error(NoMethodError) + end + + subject!(:kinesis_with_invalid_additional_settings_wrong_type) { LogStash::Inputs::Kinesis.new(config_with_invalid_additional_settings_wrong_type) } + it "raises an error for invalid configuration values such as the wrong type" do + expect{ kinesis_with_invalid_additional_settings_wrong_type.register }.to raise_error(NameError) + end + subject!(:kinesis_with_profile) { LogStash::Inputs::Kinesis.new(config_with_profile) } it "uses ProfileCredentialsProvider if profile is specified" do