Skip to content

Commit

Permalink
Merge pull request #162 from aodn/feature/5972-index-downloaded-data
Browse files Browse the repository at this point in the history
Feature/5972 index downloaded data
  • Loading branch information
HavierD authored Oct 29, 2024
2 parents 30ea32e + 7ffe30c commit 9ca28d4
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public interface AppConstants {
String PORTAL_RECORDS_MAPPING_JSON_FILE = "portal_records_index_schema.json";
String DATASET_INDEX_MAPPING_JSON_FILE = "dataset_index_schema.json";

String FORMAT_XML = "xml";
String FORMAT_ISO19115_3_2018 = "iso19115-3.2018";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package au.org.aodn.esindexer.configuration;

import au.org.aodn.esindexer.service.DataAccessServiceImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DatasetAccessConfig {

@Bean(name = "DataAccessService")
public DataAccessServiceImpl createDataAccessService(
@Value("${dataaccess.host:defaultForTesting}") String serverUrl
){
return new DataAccessServiceImpl(serverUrl);
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package au.org.aodn.esindexer.controller;

import au.org.aodn.esindexer.service.IndexerService;
import au.org.aodn.esindexer.model.Dataset;
import au.org.aodn.esindexer.service.DataAccessService;
import au.org.aodn.esindexer.service.GeoNetworkService;
import au.org.aodn.esindexer.service.IndexerService;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.xml.bind.JAXBException;
import lombok.extern.slf4j.Slf4j;
import org.opengis.referencing.FactoryException;
import org.opengis.referencing.operation.TransformException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.*;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -33,6 +38,9 @@ public class IndexerController {
@Autowired
GeoNetworkService geonetworkResourceService;

@Autowired
DataAccessService dataAccessService;

@GetMapping(path="/records/{uuid}", produces = "application/json")
@Operation(description = "Get a document from GeoNetwork by UUID directly - JSON format response")
public ResponseEntity<String> getMetadataRecordFromGeoNetworkByUUID(@PathVariable("uuid") String uuid) {
Expand Down Expand Up @@ -149,4 +157,67 @@ public ResponseEntity<String> addDocumentByUUID(@PathVariable("uuid") String uui
public ResponseEntity<String> deleteDocumentByUUID(@PathVariable("uuid") String uuid) throws IOException {
return indexerService.deleteDocumentByUUID(uuid);
}

@PostMapping(path="/{uuid}/dataset", produces = "application/json")
@Operation(security = {@SecurityRequirement(name = "X-API-Key") }, description = "Index a dataset by UUID")
public ResponseEntity<List<String>> addDatasetByUUID(@PathVariable("uuid") String uuid) {

// For making sure the dataset entry is not too big, they will be split into smaller chunks by yearmonth
// By default, we assume the dataset started from 1970-01, and until now
LocalDate maxDate = LocalDate.now();
LocalDate startDate = LocalDate.of(1970, 1, 1);
List<CompletableFuture<ResponseEntity<String>>> futures = new ArrayList<>();

try{
while (startDate.isBefore(maxDate)) {
// For speed optimizing, check whether data is existing in this year. If no data, skip to next year
var endDate = startDate.plusYears(1).minusDays(1);
var hasData = dataAccessService.doesDataExist(uuid, startDate, endDate);
if (!hasData) {
log.info("No data found for dataset {} from {} to {}", uuid, startDate, endDate);
startDate = startDate.plusYears(1);
continue;
}

futures.addAll(indexDatasetMonthly(uuid, startDate, endDate));
startDate = startDate.plusYears(1);
}

CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.join();
List<String> results = new ArrayList<>();
for (CompletableFuture<ResponseEntity<String>> future : futures) {
results.add(future.join().getBody());
}

return ResponseEntity.ok(results);
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(List.of(e.getMessage()));
}
}

private List<CompletableFuture<ResponseEntity<String>>> indexDatasetMonthly(
String uuid,
LocalDate startDate,
LocalDate maxDate
) throws InterruptedException, ExecutionException {
List<CompletableFuture<ResponseEntity<String>>> futures = new ArrayList<>();
var startDateToLoop = startDate;

while (startDateToLoop.isBefore(maxDate)) {
var endDate = startDateToLoop.plusMonths(1).minusDays(1);

Dataset dataset = dataAccessService.getIndexingDatasetBy(uuid, startDateToLoop, endDate);
if (dataset != null && dataset.data() != null && !dataset.data().isEmpty()) {
CompletableFuture<ResponseEntity<String>> future = indexerService.indexDataset(dataset);
futures.add(future);
log.info("Indexing dataset {} from {} to {}", uuid, startDateToLoop, endDate);
future.get();
}
startDateToLoop = startDateToLoop.plusMonths(1);
}

return futures;
}

}
10 changes: 10 additions & 0 deletions indexer/src/main/java/au/org/aodn/esindexer/model/Dataset.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package au.org.aodn.esindexer.model;

import java.time.YearMonth;
import java.util.List;

public record Dataset(
String uuid,
YearMonth yearMonth,
List<Datum> data
) {}
49 changes: 49 additions & 0 deletions indexer/src/main/java/au/org/aodn/esindexer/model/Datum.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package au.org.aodn.esindexer.model;

import lombok.Getter;
import lombok.Setter;

import java.time.LocalDate;

// If more fields are needed to be filtered, please add more columns here
// and don't forget updating the override equals() method
@Getter
@Setter
public class Datum {


private final LocalDate time;
private final double longitude;
private final double latitude;
private final double depth;

private long count = 1;

public Datum(LocalDate time, double longitude, double latitude, double depth) {
this.time = time;
this.longitude = longitude;
this.latitude = latitude;
this.depth = depth;
}

// putting all same record into one instance and increment the count is more efficient
public void incrementCount() {
count++;
}

// Don't include variable "count" in the equals() method.
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Datum that = (Datum) obj;
return Double.compare(that.longitude, longitude) == 0 &&
Double.compare(that.latitude, latitude) == 0 &&
Double.compare(that.depth, depth) == 0 &&
time.equals(that.time);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package au.org.aodn.esindexer.service;

import au.org.aodn.esindexer.model.Dataset;

import java.time.LocalDate;

public interface DataAccessService {
Dataset getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate endDate);
boolean doesDataExist(String uuid, LocalDate startDate, LocalDate endDate);
String getServiceUrl();
void setServiceUrl(String url);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package au.org.aodn.esindexer.service;

import au.org.aodn.esindexer.exception.MetadataNotFoundException;
import au.org.aodn.esindexer.model.Datum;
import au.org.aodn.esindexer.model.Dataset;
import org.springframework.http.*;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;

import java.time.LocalDate;
import java.time.YearMonth;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DataAccessServiceImpl implements DataAccessService {

private String serverUrl;

@Override
public String getServiceUrl() {
return serverUrl;
}

@Override
public void setServiceUrl(String url) {
this.serverUrl = url;
}

public DataAccessServiceImpl(String serverUrl) {
setServiceUrl(serverUrl);
}

private final RestTemplate restTemplate = new RestTemplate();
@Override
public Dataset getIndexingDatasetBy(String uuid, LocalDate startDate, LocalDate endDate) {

// currently, we force to get data in the same month and year to simplify the logic
if (startDate.getMonth() != endDate.getMonth() || startDate.getYear() != endDate.getYear()) {
throw new IllegalArgumentException("Start date and end date must be in the same month and year");
}

try {
HttpEntity<String> request = getRequestEntity(null, null);

Map<String, Object> params = new HashMap<>();
params.put("uuid", uuid);

String url = UriComponentsBuilder.fromHttpUrl(getDataAccessEndpoint() + "/data/{uuid}")
.queryParam("is_to_index", "true")
.queryParam("start_date", startDate)
.queryParam("end_date", endDate)
.buildAndExpand(uuid)
.toUriString();

ResponseEntity<Datum[]> responseEntity = restTemplate.exchange(
url,
HttpMethod.GET,
request,
Datum[].class,
params
);



if (responseEntity.getStatusCode().is2xxSuccessful()) {
List<Datum> data = new ArrayList<>();
if (responseEntity.getBody() != null) {
data = List.of(responseEntity.getBody());
}
var dataToIndex = aggregateData(data);
return new Dataset(
uuid,
YearMonth.of(startDate.getYear(), startDate.getMonth()),
dataToIndex
);
}
throw new RuntimeException("Unable to retrieve dataset with UUID: " + uuid );

} catch (HttpClientErrorException.NotFound e) {
throw new MetadataNotFoundException("Unable to find dataset with UUID: " + uuid + " in GeoNetwork");
} catch (Exception e) {
throw new RuntimeException("Exception thrown while retrieving dataset with UUID: " + uuid + e.getMessage(), e);
}
}

@Override
public boolean doesDataExist(String uuid, LocalDate startDate, LocalDate endDate) {
try {
HttpEntity<String> request = getRequestEntity(null, null);

Map<String, Object> params = new HashMap<>();
params.put("uuid", uuid);

String url = UriComponentsBuilder.fromHttpUrl(getDataAccessEndpoint() + "/data/{uuid}/has_data")
.queryParam("start_date", startDate)
.queryParam("end_date", endDate)
.buildAndExpand(uuid)
.toUriString();

ResponseEntity<Boolean> responseEntity = restTemplate.exchange(
url,
HttpMethod.GET,
request,
Boolean.class,
params
);

return Boolean.TRUE.equals(responseEntity.getBody());

} catch (HttpClientErrorException.NotFound e) {
throw new MetadataNotFoundException("Unable to find dataset with UUID: " + uuid + " in GeoNetwork");
} catch (Exception e) {
throw new RuntimeException("Exception thrown while retrieving dataset with UUID: " + uuid + e.getMessage(), e);
}
}

/**
* Summarize the data by counting the number if all the concerned fields are the same
* @param data the data to summarize
* @return the summarized data
*/
private List<Datum> aggregateData(List<Datum> data) {
var aggregatedData = new ArrayList<Datum>();
for (var datum: data) {
if (aggregatedData.contains(datum)) {
var existingDatum = aggregatedData.get(aggregatedData.indexOf(datum));
existingDatum.incrementCount();
} else {
aggregatedData.add(datum);
}
}
return aggregatedData;
}

private String getDataAccessEndpoint() {
return getServiceUrl() + "/api/v1/das/";
}


// parameters are not in use for now. May be useful in the future so just keep it
protected HttpEntity<String> getRequestEntity(MediaType accept, String body) {
HttpHeaders headers = new HttpHeaders();
headers.setAccept(List.of(
MediaType.TEXT_PLAIN,
MediaType.APPLICATION_JSON,
MediaType.valueOf("application/*+json"),
MediaType.ALL
));
return body == null ? new org.springframework.http.HttpEntity<>(headers) : new org.springframework.http.HttpEntity<>(body, headers);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package au.org.aodn.esindexer.service;

import au.org.aodn.esindexer.model.Dataset;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
Expand All @@ -20,6 +21,7 @@ interface Callback {
void onComplete(Object result);
}
CompletableFuture<ResponseEntity<String>> indexMetadata(String metadataValues) throws IOException, FactoryException, TransformException, JAXBException;
CompletableFuture<ResponseEntity<String>> indexDataset(Dataset dataset);
ResponseEntity<String> deleteDocumentByUUID(String uuid) throws IOException;
List<BulkResponse> indexAllMetadataRecordsFromGeoNetwork(String beginWithUuid, boolean confirm, Callback callback) throws IOException;
Hit<ObjectNode> getDocumentByUUID(String uuid) throws IOException;
Expand Down
Loading

0 comments on commit 9ca28d4

Please sign in to comment.