Skip to content

Commit

Permalink
Merge branch 'main' into platform-n-organisation-search
Browse files Browse the repository at this point in the history
  • Loading branch information
vietnguyengit authored Aug 30, 2024
2 parents 82d806f + 4dc3352 commit fa70f04
Show file tree
Hide file tree
Showing 47 changed files with 7,404 additions and 463 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -28,11 +30,20 @@ public RestClientTransport createRestClientTransport(@Value("${elasticsearch.ser
@Value("${elasticsearch.apiKey}") String apiKey) {
// Create the low-level client
RestClient restClient = RestClient
.builder(HttpHost.create(serverUrl))
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + apiKey)
})
.build();
.builder(HttpHost.create(serverUrl))
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + apiKey)
})
// Avoid issue 2024-08-25 07:17:25.862 WARN org.apache.http.client.protocol.ResponseProcessCookies -
// Invalid cookie header: "Set-Cookie: AWSALB=R21FGZ5zfcmfEoTzPXcvYYgIVrPX5I7qmbzhltwyuGTQLQ5jrn9uvU8
// spUPEFELYK1yZQLtfaQoLBu/tE451zrEaTlD5L6kaSnPvkR+OrhljaMAyG2cHhuiwtRxS;
// Expires=Sun, 01 Sep 2024 07:17:25 GMT; Path=/".
// Invalid 'expires' attribute: Sun, 01 Sep 2024 07:17:25 GMT
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultRequestConfig(RequestConfig.custom()
.setCookieSpec(CookieSpecs.STANDARD)
.build()))
.build();

// Create the transport with a Jackson mapper
return new RestClientTransport(restClient, new JacksonJsonpMapper());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package au.org.aodn.esindexer.configuration;

import au.org.aodn.esindexer.service.FIFOCache;
import au.org.aodn.esindexer.service.GeoNetworkServiceImpl;
import au.org.aodn.esindexer.utils.UrlUtils;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
Expand All @@ -8,6 +9,8 @@
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -19,6 +22,7 @@
import org.springframework.web.client.RestTemplate;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;

@Configuration
Expand Down Expand Up @@ -66,6 +70,15 @@ public RestClient createRestClientTransport1(@Value("${geonetwork.host}") String
new BasicHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE),
new BasicHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
})
// Avoid issue 2024-08-25 07:17:25.862 WARN org.apache.http.client.protocol.ResponseProcessCookies -
// Invalid cookie header: "Set-Cookie: AWSALB=R21FGZ5zfcmfEoTzPXcvYYgIVrPX5I7qmbzhltwyuGTQLQ5jrn9uvU8
// spUPEFELYK1yZQLtfaQoLBu/tE451zrEaTlD5L6kaSnPvkR+OrhljaMAyG2cHhuiwtRxS;
// Expires=Sun, 01 Sep 2024 07:17:25 GMT; Path=/".
// Invalid 'expires' attribute: Sun, 01 Sep 2024 07:17:25 GMT
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultRequestConfig(RequestConfig.custom()
.setCookieSpec(CookieSpecs.STANDARD)
.build()))
.build();
}

Expand All @@ -75,14 +88,30 @@ public GeoNetworkServiceImpl createGeoNetworkServiceImpl(
@Value("${geonetwork.host}") String server,
@Value("${geonetwork.search.api.index}") String indexName,
@Qualifier("gn4ElasticsearchClient") ElasticsearchClient gn4ElasticsearchClient,
RestTemplate indexerRestTemplate) {
RestTemplate indexerRestTemplate,
FIFOCache<String, Map<String, ?>> cache) {

return new GeoNetworkServiceImpl(server, indexName, gn4ElasticsearchClient, indexerRestTemplate);
return new GeoNetworkServiceImpl(
server,
indexName,
gn4ElasticsearchClient,
indexerRestTemplate,
cache
);
}

@Bean
@ConditionalOnMissingBean(UrlUtils.class)
public UrlUtils createUrlUtils() {
return new UrlUtils();
}
/**
* This cache is use to reduce load to query geonetwork
* @return - A first in first out cache
*/
@Bean
public FIFOCache<String, Map<String, ?>> createFIFOCache(
@Value("${geonetwork.search.fifoCacheSize:50}") int cacheSize) {
return new FIFOCache<>(cacheSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

@Configuration
@EnableRetry
@EnableAsync
public class IndexerConfig {
/**
* We need to create component here because we do not want to run test with real http connection
Expand All @@ -21,4 +26,30 @@ public class IndexerConfig {
public VocabsIndexUtils createVocabsUtils() {
return new VocabsIndexUtils();
}
/**
* This executor is used to limit the number of concurrent call to index metadata so not to flood the
* geonetwork. This is useful because the geonetwork do not care about re-index call it invoke, hence
* the elastic of geonetwork may be flooded by its re-index call.
*
* A small thread size is require to not overload the geonetwork.
*
* @return - An async task executor with blocking queue to stop too many request. This is a limited queue
* and will throw error if new task cannot be added. It is up to the client to implement retry of the
* same task. Indexer will not help you to queue it.
*/
@Bean(name = "asyncIndexMetadata")
public Executor taskExecutor(
@Value("${app.indexing.pool.core:2}") Integer coreSize,
@Value("${app.indexing.pool.max:3}") Integer coreMax,
@Value("${app.indexing.pool.capacity:10}") Integer maxCapacity) {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(coreSize); // Number of concurrent threads
executor.setMaxPoolSize(coreMax); // Max number of concurrent threads
executor.setQueueCapacity(maxCapacity); // Size of the queue
executor.setThreadNamePrefix("Async-");
executor.initialize();

return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@RestController
@RequestMapping(value = "/api/v1/indexer/index")
Expand All @@ -31,14 +33,6 @@ public class IndexerController {
@Autowired
GeoNetworkService geonetworkResourceService;

// @GetMapping(path="/gn_records/{uuid}", produces = "application/json")
// @Operation(description = "Get a document from GeoNetwork Elasticsearch by UUID")
// public ResponseEntity getMetadataRecordFromGeoNetworkElasticsearchByUUID(@PathVariable("uuid") String uuid) {
// logger.info("getting a document by UUID: " + uuid);
// JSONObject response = geonetworkResourceService.searchMetadataBy(uuid);
// return ResponseEntity.status(HttpStatus.OK).body(response.toString());
// }

@GetMapping(path="/records/{uuid}", produces = "application/json")
@Operation(description = "Get a document from GeoNetwork by UUID directly - JSON format response")
public ResponseEntity<String> getMetadataRecordFromGeoNetworkByUUID(@PathVariable("uuid") String uuid) {
Expand Down Expand Up @@ -125,9 +119,12 @@ public void onComplete(Object result) {

@PostMapping(path="/{uuid}", produces = "application/json")
@Operation(security = { @SecurityRequirement(name = "X-API-Key") }, description = "Index a metadata record by UUID")
public ResponseEntity<String> addDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException, FactoryException, JAXBException, TransformException {
public ResponseEntity<String> addDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException, FactoryException, JAXBException, TransformException, ExecutionException, InterruptedException {
String metadataValues = geonetworkResourceService.searchRecordBy(uuid);
return indexerService.indexMetadata(metadataValues);

CompletableFuture<ResponseEntity<String>> f = indexerService.indexMetadata(metadataValues);
// Return when done make it back to sync instead of async
return f.join();
}

@DeleteMapping(path="/{uuid}", produces = "application/json")
Expand Down
30 changes: 30 additions & 0 deletions indexer/src/main/java/au/org/aodn/esindexer/service/FIFOCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package au.org.aodn.esindexer.service;

import java.util.LinkedHashMap;
import java.util.Map;
/**
* A simple FIFO cache to store a few records to improve querying speed
* @param <K>
* @param <V>
*/
public class FIFOCache<K, V> {

private final Map<K, V> cache;

public FIFOCache(int maxSize) {
this.cache = new LinkedHashMap<>(maxSize + 1, 1.0f, false) {
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > maxSize;
}
};
}

public V get(K key) {
return cache.get(key);
}

public void put(K key, V value) {
cache.put(key, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public interface GeoNetworkService {
* @return
*/
boolean isMetadataRecordsCountLessThan(int c);
/**
* Ge the count of the docs
* @return The total number of records
*/
Long getAllMetadataCounts() throws IOException;

Map<String, ?> getAssociatedRecords(String uuid);
}
Loading

0 comments on commit fa70f04

Please sign in to comment.