Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change naming from sdk cluster to non-product cluster #3982

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class AtlasElasticsearchDatabase {
private static volatile RestClient lowLevelClient;

private static volatile RestClient esProductClusterClient;
private static volatile RestClient esSDKClusterClient;
private static volatile RestClient esNonProductClusterClient;
public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.hostname";

public static List<HttpHost> getHttpHosts() throws AtlasException {
Expand Down Expand Up @@ -115,10 +115,6 @@ public static RestClient getProductClusterClient() {
if (esProductClusterClient == null) {
try {
HttpHost productHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_PRODUCT_SEARCH_CLUSTER_URL.getString());
if (productHost == null) {
LOG.error("Invalid product cluster URL configuration");
return null;
}

RestClientBuilder builder = RestClient.builder(productHost);
builder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
Expand All @@ -138,37 +134,33 @@ public static RestClient getProductClusterClient() {
return esProductClusterClient;
}

public static RestClient getSDKClusterClient() {
public static RestClient getNonProductSearchClusterClient() {
if (!AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION.getBoolean()) {
return null;
}

if (esSDKClusterClient == null) {
if (esNonProductClusterClient == null) {
synchronized (AtlasElasticsearchDatabase.class) {
if (esSDKClusterClient == null) {
if (esNonProductClusterClient == null) {
try {
HttpHost sdkHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_SDK_SEARCH_CLUSTER_URL.getString());
if (sdkHost == null) {
LOG.error("Invalid SDK cluster URL configuration");
return null;
}
HttpHost nonProductHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_NON_PRODUCT_SEARCH_CLUSTER_URL.getString());

RestClientBuilder builder = RestClient.builder(sdkHost);
RestClientBuilder builder = RestClient.builder(nonProductHost);
builder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> 3600000)));
builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(AtlasConfiguration.INDEX_CLIENT_CONNECTION_TIMEOUT.getInt())
.setSocketTimeout(AtlasConfiguration.INDEX_CLIENT_SOCKET_TIMEOUT.getInt()));

esSDKClusterClient = builder.build();
esNonProductClusterClient = builder.build();
} catch (Exception e) {
LOG.error("Failed to initialize SDK cluster client", e);
LOG.error("Failed to initialize Non-product cluster client", e);
return null;
}
}
}
}
return esSDKClusterClient;
return esNonProductClusterClient;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class AtlasElasticsearchQuery implements AtlasIndexQuery<AtlasJanusVertex
private RestClient lowLevelRestClient;

private RestClient esProductClusterClient;
private RestClient esSDKClusterClient;
private RestClient esNonProductClusterClient;
private String index;
private SearchSourceBuilder sourceBuilder;
private SearchResponse searchResponse;
Expand All @@ -85,11 +85,11 @@ public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestHighLevelClient esClie
this.sourceBuilder = sourceBuilder;
}

public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestClient restClient, String index, SearchParams searchParams, RestClient esProductClusterClient, RestClient esSDKClusterClient) {
public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestClient restClient, String index, SearchParams searchParams, RestClient esProductClusterClient, RestClient esNonProductClusterClient) {
this(graph, index);
this.lowLevelRestClient = restClient;
this.esProductClusterClient = esProductClusterClient;
this.esSDKClusterClient = esSDKClusterClient;
this.esNonProductClusterClient = esNonProductClusterClient;
this.searchParams = searchParams;
}

Expand All @@ -99,8 +99,8 @@ private AtlasElasticsearchQuery(AtlasJanusGraph graph, String index) {
searchResponse = null;
}

public AtlasElasticsearchQuery(AtlasJanusGraph graph, String index, RestClient restClient,RestClient esProductClusterClient, RestClient esSDKClusterClient) {
this(graph, restClient, index, null, esProductClusterClient, esSDKClusterClient);
public AtlasElasticsearchQuery(AtlasJanusGraph graph, String index, RestClient restClient,RestClient esProductClusterClient, RestClient esNonProductClusterClient) {
this(graph, restClient, index, null, esProductClusterClient, esNonProductClusterClient);
}

public AtlasElasticsearchQuery(String index, RestClient restClient) {
Expand Down Expand Up @@ -134,7 +134,7 @@ private RestClient getESClient() {
return Optional.ofNullable(esProductClusterClient)
.orElse(lowLevelRestClient);
} else {
return Optional.ofNullable(esSDKClusterClient)
return Optional.ofNullable(esNonProductClusterClient)
.orElse(lowLevelRestClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getLowLevelClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getProductClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getSDKClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getNonProductSearchClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase.getGraphInstance;
import static org.apache.atlas.type.Constants.STATE_PROPERTY_KEY;

Expand All @@ -122,8 +122,8 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
private final RestHighLevelClient elasticsearchClient;
private final RestClient restClient;

private final RestClient esProductClusterClient;
private final RestClient esSDKClusterClient;
private final RestClient esProductClusterClient;
private final RestClient esNonProductClusterClient;

private final ThreadLocal<GremlinGroovyScriptEngine> scriptEngine = ThreadLocal.withInitial(() -> {
DefaultImportCustomizer.Builder builder = DefaultImportCustomizer.build()
Expand All @@ -134,10 +134,10 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
});

public AtlasJanusGraph() {
this(getGraphInstance(), getClient(), getLowLevelClient(), getProductClusterClient(), getSDKClusterClient());
this(getGraphInstance(), getClient(), getLowLevelClient(), getProductClusterClient(), getNonProductSearchClusterClient());
}

public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsearchClient, RestClient restClient,RestClient esProductClusterClient, RestClient esSDKClusterClient ) {
public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsearchClient, RestClient restClient,RestClient esProductClusterClient, RestClient esNonProductClusterClient) {
//determine multi-properties once at startup
JanusGraphManagement mgmt = null;

Expand All @@ -161,7 +161,7 @@ public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsear
this.restClient = restClient;
this.elasticsearchClient = elasticsearchClient;
this.esProductClusterClient = esProductClusterClient;
this.esSDKClusterClient = esSDKClusterClient;
this.esNonProductClusterClient = esNonProductClusterClient;
}

@Override
Expand Down Expand Up @@ -333,7 +333,7 @@ public AtlasIndexQuery<AtlasJanusVertex, AtlasJanusEdge> elasticsearchQuery(Stri
LOG.error("restClient is not initiated, failed to run query on ES");
throw new AtlasBaseException(INDEX_SEARCH_FAILED, "restClient is not initiated");
}
return new AtlasElasticsearchQuery(this, restClient, INDEX_PREFIX + indexName, searchParams, esProductClusterClient, esSDKClusterClient);
return new AtlasElasticsearchQuery(this, restClient, INDEX_PREFIX + indexName, searchParams, esProductClusterClient, esNonProductClusterClient);
}

@Override
Expand Down Expand Up @@ -389,7 +389,7 @@ public AtlasIndexQuery elasticsearchQuery(String indexName) throws AtlasBaseExce
LOG.error("restClient is not initiated, failed to run query on ES");
throw new AtlasBaseException(INDEX_SEARCH_FAILED, "restClient is not initiated");
}
return new AtlasElasticsearchQuery(this, indexName, restClient, esProductClusterClient, esSDKClusterClient);
return new AtlasElasticsearchQuery(this, indexName, restClient, esProductClusterClient, esNonProductClusterClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getLowLevelClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getProductClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getSDKClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getNonProductSearchClusterClient;


import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
Expand Down Expand Up @@ -357,7 +357,7 @@ public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraph() {

@Override
public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraphBulkLoading() {
return new AtlasJanusGraph(getBulkLoadingGraphInstance(), getClient(), getLowLevelClient(), getProductClusterClient(), getSDKClusterClient());
return new AtlasJanusGraph(getBulkLoadingGraphInstance(), getClient(), getLowLevelClient(), getProductClusterClient(), getNonProductSearchClusterClient());
}

private static void startEmbeddedSolr() throws AtlasException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public enum AtlasConfiguration {
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION("atlas.indexsearch.request.isolation.enable", false),
ATLAS_ELASTICSEARCH_PRODUCT_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.product.cluster.url","atlas-elasticsearch2-product-search-headless.atlas.svc.cluster.local:9200"),
ATLAS_ELASTICSEARCH_SDK_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.sdk.cluster.url","atlas-elasticsearch2-sdk-search-headless.atlas.svc.cluster.local:9200"),
ATLAS_ELASTICSEARCH_NON_PRODUCT_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.nonproduct.cluster.url","atlas-elasticsearch2-non-product-search-headless.atlas.svc.cluster.local:9200"),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION("atlas.indexsearch.enable.janus.optimization", false),
ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false),
Expand Down
Loading