Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate OpenSearchSink off of plugin Settings #5273

Open
wants to merge 65 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
8b993d0
restruction OpenSearchSinkConfiguration to use openSearchSinkConfig i…
Galactus22625 Dec 10, 2024
23afcbf
rebuild readIndexConfig and associated changes
Galactus22625 Dec 11, 2024
acbfb7b
rebuild readRetryConfig. Remove most instances of PluginSetting from…
Galactus22625 Dec 11, 2024
2c65480
save point
Galactus22625 Dec 11, 2024
d1cb3fa
Rework OpenSearchSinkConfigurationTests.java
Galactus22625 Dec 13, 2024
40f6266
Rework RetryConfigurationTests.java
Galactus22625 Dec 13, 2024
b47c0da
small changes
Galactus22625 Dec 13, 2024
ce07672
small changes
Galactus22625 Dec 13, 2024
19dc50e
condense many test yaml files into one
Galactus22625 Dec 13, 2024
915ce5c
Rework ConnectionConfigurationTests.java
Galactus22625 Dec 16, 2024
595b766
Rework IndexConfigurationTests.java.java
Galactus22625 Dec 16, 2024
d150343
copy files over from source
Galactus22625 Dec 17, 2024
7b78c78
adjust all unit tests
Galactus22625 Dec 18, 2024
55eb90b
spotless check
Galactus22625 Dec 18, 2024
e5aad51
Integration tests to OpenSearchSinkConfig from PluginSetting
Galactus22625 Dec 18, 2024
0024ef0
build.gradle change?
Galactus22625 Dec 18, 2024
b1d5425
remove hosts in integration test metadata
Galactus22625 Dec 19, 2024
c949f91
changing enforcement for required and non required fields
Galactus22625 Dec 19, 2024
53398fb
reset awsauthconfig to normal
Galactus22625 Dec 19, 2024
be265c0
replace authentication field from unit tests
Galactus22625 Dec 19, 2024
5f9b9f6
document_id_field to document_id since deprecated
Galactus22625 Dec 19, 2024
8450dfe
replace pluginSetting dependent parts of integration tests
Galactus22625 Dec 19, 2024
fcbbaf6
pipeline name missing from integration tests
Galactus22625 Dec 19, 2024
11b803b
find what is null
Galactus22625 Dec 19, 2024
9378889
initialize pipelineDescription Mock
Galactus22625 Dec 19, 2024
96ea7e1
initialize pluginSetting Mock
Galactus22625 Dec 19, 2024
d6aa92a
add pluginSetting name mock
Galactus22625 Dec 19, 2024
c53fb65
me when I see stack trace
Galactus22625 Dec 19, 2024
a0af91e
revert bulk retry strat
Galactus22625 Dec 19, 2024
1c9cd9e
revert reversion
Galactus22625 Dec 19, 2024
c8d6e5e
pipelineValue has to be null for documents to write. Originally pipe…
Galactus22625 Dec 21, 2024
21535f6
unit test update
Galactus22625 Dec 21, 2024
d42bb12
Integration test changes
Galactus22625 Dec 23, 2024
44f8529
integration test adjustments
Galactus22625 Dec 23, 2024
8fa8501
remove Routing Field since its not in documentation
Galactus22625 Dec 23, 2024
b0aaee2
remove routing field integration test
Galactus22625 Dec 23, 2024
507a935
integration test adjustment :/
Galactus22625 Dec 23, 2024
f6081ad
fix invalidactions integration test
Galactus22625 Dec 23, 2024
b8d1b03
addressing various pr comments
Galactus22625 Jan 2, 2025
7ee3442
action type as enum
Galactus22625 Jan 3, 2025
5443060
indexconfiguration constants to opensearchsinkconfig
Galactus22625 Jan 3, 2025
a0de738
readd deprecated fields
Galactus22625 Jan 3, 2025
6d7181f
fix integration tests to reuse deprecatd routing field and document i…
Galactus22625 Jan 3, 2025
e484b7d
action to enum integratin test fixes
Galactus22625 Jan 3, 2025
c8636fd
string should be fine for integration test
Galactus22625 Jan 3, 2025
da25170
enum string check
Galactus22625 Jan 3, 2025
df7c929
fix enum string check
Galactus22625 Jan 3, 2025
018857c
document id field should have testIdField as value in Integration Tests
Galactus22625 Jan 3, 2025
b1202b0
integraton tests should pass now
Galactus22625 Jan 3, 2025
5d59aef
Connection configuration switched back to using metadata
Galactus22625 Jan 3, 2025
2ab0612
yaml lines not needed conncection configuraion tests switched back to…
Galactus22625 Jan 3, 2025
0f5e6f1
style fix
Galactus22625 Jan 6, 2025
21b2b97
assert True issue
Galactus22625 Jan 6, 2025
6d87001
Merge branch 'opensearch-project:main' into openSearch-sink-configura…
Galactus22625 Jan 6, 2025
2e2c586
put routing field test back
Galactus22625 Jan 7, 2025
0eeada6
add license header to new files
Galactus22625 Jan 8, 2025
7edfe65
move build.gradle changes to data-prepper-plugins/opensearch/build.gr…
Galactus22625 Jan 8, 2025
ddc7faf
Merge branch 'main' into openSearch-sink-configuration-changes
Galactus22625 Jan 16, 2025
3d5d8c8
update actions artifacts version for integration tests
Galactus22625 Jan 16, 2025
2c78c46
fixed new test
Galactus22625 Jan 16, 2025
db8d37a
Merge branch 'opensearch-project:main' into openSearch-sink-configura…
Galactus22625 Jan 17, 2025
7ff3d00
FailedbulkOperationConveter takes the same input twice
Galactus22625 Jan 18, 2025
c7ac019
readd a routing field test
Galactus22625 Jan 22, 2025
122aaba
remove refernces to invalid authentication configuration
Galactus22625 Jan 23, 2025
faa5c17
we want to support authentication: username: password: config
Galactus22625 Jan 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
run: ./gradlew --parallel --max-workers 2 build
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should not show up as changes in this PR since they were already applied. You may need to rebase from main to correct this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by merging main into branch

with:
name: data-prepper-test-results-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'
Expand All @@ -45,7 +45,7 @@ jobs:

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
path: test-results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
./gradlew :data-prepper-plugins:opensearch:integrationTest -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin -Dtests.opensearch.bundle=true -Dtests.opensearch.version=opendistro:${{ matrix.opendistro }}
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: data-prepper-opensearch-integration-tests-opendistro-${{ matrix.opendistro }}-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'
Expand All @@ -56,7 +56,7 @@ jobs:

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
path: test-results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
./gradlew :data-prepper-plugins:opensearch:integrationTest -Dtests.opensearch.host=localhost:9200 -Dtests.opensearch.user=admin -Dtests.opensearch.password=admin -Dtests.opensearch.bundle=true -Dtests.opensearch.version=opensearch:${{ matrix.opensearch }}
- name: Upload Unit Test Results
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: data-prepper-opensearch-integration-tests-opensearch-${{ matrix.opensearch }}-java-${{ matrix.java }}
path: '**/test-results/**/*.xml'
Expand All @@ -56,7 +56,7 @@ jobs:

steps:
- name: Download Artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
path: test-results

Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/

dependencies {
compileOnly 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
implementation project(':data-prepper-api')
implementation libs.armeria.core
testImplementation project(':data-prepper-api').sourceSets.test.output
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -105,9 +104,6 @@ public final class BulkRetryStrategy {
private final PluginMetrics pluginMetrics;
private final Supplier<AccumulatingBulkRequest> bulkRequestSupplier;
private final int maxRetries;
private final String pluginId;
private final String pluginName;
private final String pipelineName;
private final ObjectMapper objectMapper;

private final Counter sentDocumentsCounter;
Expand Down Expand Up @@ -150,15 +146,13 @@ public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOpera
final PluginMetrics pluginMetrics,
final int maxRetries,
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier,
final PluginSetting pluginSetting) {
final String pipelineName,
final String pluginName) {
this.requestFunction = requestFunction;
this.logFailure = logFailure;
this.pluginMetrics = pluginMetrics;
this.bulkRequestSupplier = bulkRequestSupplier;
this.maxRetries = maxRetries;
this.pipelineName = pluginSetting.getPipelineName();
this.pluginId = pluginSetting.getName();
this.pluginName = pluginSetting.getName();
this.objectMapper = new ObjectMapper();

sentDocumentsCounter = pluginMetrics.counter(DOCUMENTS_SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.dataprepper.plugins.sink.opensearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.auth.AuthScope;
Expand All @@ -29,9 +28,12 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ServerlessOptions;

import org.opensearch.dataprepper.plugins.source.opensearch.AuthConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.arns.Arn;
Expand Down Expand Up @@ -61,36 +63,17 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DISTRIBUTION_VERSION;

public class ConnectionConfiguration {
static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";
private static final String AOS_SERVICE_NAME = "es";
private static final String AOSS_SERVICE_NAME = "aoss";
private static final String DEFAULT_AWS_REGION = "us-east-1";

public static final String HOSTS = "hosts";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String SOCKET_TIMEOUT = "socket_timeout";
public static final String CONNECT_TIMEOUT = "connect_timeout";
public static final String CERT_PATH = "cert";
public static final String INSECURE = "insecure";
public static final String AWS_OPTION = "aws";
public static final String AWS_SIGV4 = "aws_sigv4";
public static final String AWS_REGION = "aws_region";
public static final String AWS_STS_ROLE_ARN = "aws_sts_role_arn";
public static final String AWS_STS_EXTERNAL_ID = "aws_sts_external_id";
public static final String AWS_STS_HEADER_OVERRIDES = "aws_sts_header_overrides";
public static final String PROXY = "proxy";
public static final String SERVERLESS = "serverless";
public static final String SERVERLESS_OPTIONS = "serverless_options";
public static final String COLLECTION_NAME = "collection_name";
public static final String NETWORK_POLICY_NAME = "network_policy_name";
public static final String VPCE_ID = "vpce_id";
public static final String REQUEST_COMPRESSION_ENABLED = "enable_request_compression";

/**
Expand All @@ -112,7 +95,6 @@ public class ConnectionConfiguration {
private final String awsStsExternalId;
private final Map<String, String> awsStsHeaderOverrides;
private final Optional<String> proxy;
private final String pipelineName;
private final boolean serverless;
private final String serverlessNetworkPolicyName;
private final String serverlessCollectionName;
Expand Down Expand Up @@ -203,100 +185,83 @@ private ConnectionConfiguration(final Builder builder) {
this.serverlessCollectionName = builder.serverlessCollectionName;
this.serverlessVpceId = builder.serverlessVpceId;
this.requestCompressionEnabled = builder.requestCompressionEnabled;
this.pipelineName = builder.pipelineName;
this.authConfig = builder.authConfig;
}

public static ConnectionConfiguration readConnectionConfiguration(final PluginSetting pluginSetting){
@SuppressWarnings("unchecked")
final List<String> hosts = (List<String>) pluginSetting.getAttributeFromSettings(HOSTS);
public static ConnectionConfiguration readConnectionConfiguration(final OpenSearchSinkConfig openSearchSinkConfig){
final List<String> hosts = openSearchSinkConfig.getHosts();
ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(hosts);
final String username = (String) pluginSetting.getAttributeFromSettings(USERNAME);
final String password = (String) pluginSetting.getAttributeFromSettings(PASSWORD);
builder.withPipelineName(pluginSetting.getPipelineName());
final AuthConfig authConfig = AuthConfig.readAuthConfig(pluginSetting);
if (authConfig != null) {
if (username != null || password != null) {
throw new IllegalStateException("Deprecated username and password should not be set " +
"when authentication is configured.");
}
builder = builder.withAuthConfig(authConfig);
} else {
if (username != null) {
builder = builder.withUsername(username);
}
if (password != null) {
builder = builder.withPassword(password);
}
final String username = openSearchSinkConfig.getUsername();
final String password = openSearchSinkConfig.getPassword();
if (username != null) {
builder = builder.withUsername(username);
}
if (password != null) {
builder = builder.withPassword(password);
}
final Integer socketTimeout = pluginSetting.getIntegerOrDefault(SOCKET_TIMEOUT, null);
final Integer socketTimeout = openSearchSinkConfig.getSocketTimeout();
if (socketTimeout != null) {
builder = builder.withSocketTimeout(socketTimeout);
}
final Integer connectTimeout = pluginSetting.getIntegerOrDefault(CONNECT_TIMEOUT, null);
final Integer connectTimeout = openSearchSinkConfig.getConnectTimeout();
if (connectTimeout != null) {
builder = builder.withConnectTimeout(connectTimeout);
}

Map<String, Object> awsOption = pluginSetting.getTypedMap(AWS_OPTION, String.class, Object.class);
boolean awsOptionUsed = false;
builder.withAwsSigv4(false);
if (awsOption != null && !awsOption.isEmpty()) {
awsOptionUsed = true;
builder.withAwsSigv4(true);
builder.withAwsRegion((String)(awsOption.getOrDefault(AWS_REGION.substring(4), DEFAULT_AWS_REGION)));
builder.withAWSStsRoleArn((String)(awsOption.getOrDefault(AWS_STS_ROLE_ARN.substring(4), null)));
builder.withAWSStsExternalId((String)(awsOption.getOrDefault(AWS_STS_EXTERNAL_ID.substring(4), null)));
builder.withAwsStsHeaderOverrides((Map<String, String>)awsOption.get(AWS_STS_HEADER_OVERRIDES.substring(4)));
builder.withServerless(OBJECT_MAPPER.convertValue(
awsOption.getOrDefault(SERVERLESS, false), Boolean.class));

Map<String, String> serverlessOptions = (Map<String, String>) awsOption.get(SERVERLESS_OPTIONS);
if (serverlessOptions != null && !serverlessOptions.isEmpty()) {
builder.withServerlessNetworkPolicyName((String)(serverlessOptions.getOrDefault(NETWORK_POLICY_NAME, null)));
builder.withServerlessCollectionName((String)(serverlessOptions.getOrDefault(COLLECTION_NAME, null)));
builder.withServerlessVpceId((String)(serverlessOptions.getOrDefault(VPCE_ID, null)));
}
} else {
builder.withServerless(false);
}
boolean awsSigv4 = pluginSetting.getBooleanOrDefault(AWS_SIGV4, false);
final String awsOptionConflictMessage = String.format("%s option cannot be used along with %s option", AWS_SIGV4, AWS_OPTION);
if (awsSigv4) {
if (awsOptionUsed) {
throw new RuntimeException(awsOptionConflictMessage);
final AwsAuthenticationConfiguration awsAuthenticationConfiguration = openSearchSinkConfig.getAwsAuthenticationOptions();
boolean awsSigv4 = openSearchSinkConfig.isAwsSigv4();
if (awsAuthenticationConfiguration != null) {
builder = builder.withAwsSigv4(true)
.withAwsRegion(awsAuthenticationConfiguration.getAwsRegion().toString())
.withAWSStsRoleArn(awsAuthenticationConfiguration.getAwsStsRoleArn())
.withAWSStsExternalId(awsAuthenticationConfiguration.getAwsStsExternalId())
.withAwsStsHeaderOverrides(awsAuthenticationConfiguration.getAwsStsHeaderOverrides())
.withServerless(awsAuthenticationConfiguration.isServerlessCollection());

final ServerlessOptions serverlessOptions = awsAuthenticationConfiguration.getServerlessOptions();
if (serverlessOptions != null) {
builder = builder.withServerlessNetworkPolicyName(serverlessOptions.getNetworkPolicyName())
.withServerlessCollectionName(serverlessOptions.getCollectionName())
.withServerlessVpceId(serverlessOptions.getVpceId());
}
} else if (awsSigv4) {
builder = builder.withAwsSigv4(awsSigv4)
.withAwsRegion(openSearchSinkConfig.getAwsRegion())
.withAWSStsRoleArn(openSearchSinkConfig.getAwsStsRoleArn())
.withAWSStsExternalId(openSearchSinkConfig.getAwsStsExternalId())
.withAwsStsHeaderOverrides(openSearchSinkConfig.getAwsStsHeaderOverrides());

final ServerlessOptions serverlessOptions = openSearchSinkConfig.getServerlessOptions();
if (serverlessOptions != null) {
builder = builder.withServerlessNetworkPolicyName(serverlessOptions.getNetworkPolicyName())
.withServerlessCollectionName(serverlessOptions.getCollectionName())
.withServerlessVpceId(serverlessOptions.getVpceId());
}
builder.withAwsSigv4(true);
builder.withAwsRegion(pluginSetting.getStringOrDefault(AWS_REGION, DEFAULT_AWS_REGION));
builder.withAWSStsRoleArn(pluginSetting.getStringOrDefault(AWS_STS_ROLE_ARN, null));
builder.withAWSStsExternalId(pluginSetting.getStringOrDefault(AWS_STS_EXTERNAL_ID, null));
builder.withAwsStsHeaderOverrides(pluginSetting.getTypedMap(AWS_STS_HEADER_OVERRIDES, String.class, String.class));
} else {
builder.withServerless(false);
}

final String certPath = pluginSetting.getStringOrDefault(CERT_PATH, null);
final boolean insecure = pluginSetting.getBooleanOrDefault(INSECURE, false);
final String certPath = openSearchSinkConfig.getCertPath();
final boolean insecure = openSearchSinkConfig.isInsecure();
// Insecure == true will override configured certPath
if (insecure) {
builder.withInsecure(insecure);
} else if (certPath != null) {
builder.withCert(certPath);
}
final String proxy = pluginSetting.getStringOrDefault(PROXY, null);
builder = builder.withProxy(proxy);

final String proxy = openSearchSinkConfig.getProxy();
if (proxy != null) {
builder = builder.withProxy(proxy);
}

final String distributionVersionName = pluginSetting.getStringOrDefault(DISTRIBUTION_VERSION,
DistributionVersion.DEFAULT.getVersion());
final DistributionVersion distributionVersion = DistributionVersion.fromTypeName(distributionVersionName);
final boolean requestCompressionEnabled = pluginSetting.getBooleanOrDefault(
REQUEST_COMPRESSION_ENABLED, !DistributionVersion.ES6.equals(distributionVersion));
final boolean requestCompressionEnabled = openSearchSinkConfig.getEnableRequestCompression();
builder = builder.withRequestCompressionEnabled(requestCompressionEnabled);

return builder.build();
}

public String getPipelineName() {
return pipelineName;
}

public RestHighLevelClient createClient(AwsCredentialsSupplier awsCredentialsSupplier) {
final HttpHost[] httpHosts = new HttpHost[hosts.size()];
Expand Down Expand Up @@ -632,11 +597,6 @@ public Builder withProxy(final String proxy) {
return this;
}

public Builder withPipelineName(final String pipelineName) {
this.pipelineName = pipelineName;
return this;
}

public Builder withServerless(boolean serverless) {
this.serverless = serverless;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.micrometer.core.instrument.Counter;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,7 +13,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

public class OpenSearchClientRefresher implements PluginComponentRefresher<OpenSearchClient, PluginSetting> {
public class OpenSearchClientRefresher implements PluginComponentRefresher<OpenSearchClient, OpenSearchSinkConfig> {
static final String CREDENTIALS_CHANGED = "credentialsChanged";
static final String CLIENT_REFRESH_ERRORS = "clientRefreshErrors";
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchClientRefresher.class);
Expand Down Expand Up @@ -54,8 +54,8 @@ public OpenSearchClient get() {
}

@Override
public void update(PluginSetting pluginSetting) {
final ConnectionConfiguration newConfig = ConnectionConfiguration.readConnectionConfiguration(pluginSetting);
public void update(OpenSearchSinkConfig openSearchSinkConfig) {
final ConnectionConfiguration newConfig = ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig);
if (basicAuthChanged(newConfig)) {
credentialsChangeCounter.increment();
readWriteLock.writeLock().lock();
Expand Down
Loading
Loading