Skip to content

Commit

Permalink
EHSTSWRQ-54-HLSP: option for query parameters instead of telescope
Browse files Browse the repository at this point in the history
  • Loading branch information
jespinosaar committed Jan 16, 2025
1 parent 3814fe9 commit 50cdc17
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ public void setReadTimeout(int readTimeout) {
}


public List<DeletedObservation> getDeleted(String collection, String telescope, Date start, Date end, Integer maxrec) {
return readDeletedEntityList(new DeletionListReader(), collection, telescope, start, end, maxrec);
public List<DeletedObservation> getDeleted(String collection, String queryParams, Date start, Date end, Integer maxrec) {
return readDeletedEntityList(new DeletionListReader(), collection, queryParams, start, end, maxrec);
// TODO: make call(s) to the deletion endpoint until requested number of
// entries (like getObservationList)

Expand All @@ -311,19 +311,19 @@ public List<DeletedObservation> getDeleted(String collection, String telescope,
*/
}

public List<ObservationState> getObservationList(String collection, String telescope, Date start, Date end, Integer maxrec) throws AccessControlException {
return readObservationStateList(new ObservationStateListReader(), collection, telescope, start, end, maxrec);
public List<ObservationState> getObservationList(String collection, String queryParams, Date start, Date end, Integer maxrec) throws AccessControlException {
return readObservationStateList(new ObservationStateListReader(), collection, queryParams, start, end, maxrec);
}

public List<ObservationResponse> getList(String collection, String telescope, Date startDate, Date end,
public List<ObservationResponse> getList(String collection, String queryParams, Date startDate, Date end,
Integer numberOfObservations) throws InterruptedException,
ExecutionException {

// startDate = null;
// end = df.parse("2017-06-20T09:03:15.360");
List<ObservationResponse> list = new ArrayList<>();

List<ObservationState> stateList = getObservationList(collection, telescope, startDate, end, numberOfObservations);
List<ObservationState> stateList = getObservationList(collection, queryParams, startDate, end, numberOfObservations);

// Create tasks for each file
List<Callable<ObservationResponse>> tasks = new ArrayList<>();
Expand Down Expand Up @@ -430,14 +430,14 @@ public List<ObservationResponse> get(List<ObservationURI> listURI) throws Interr
return list;
}

public ObservationResponse get(String collection, String telescope, URI uri, Date start) {
public ObservationResponse get(String collection, String queryParams, URI uri, Date start) {
if (uri == null) {
throw new IllegalArgumentException("uri cannot be null");
}

log.debug("******************* getObservationList(collection, start, null, null) " + collection);

List<ObservationState> list = getObservationList(collection, telescope, start, null, null);
List<ObservationState> list = getObservationList(collection, queryParams, start, null, null);
ObservationState obsState = null;
for (ObservationState os : list) {
if (!os.getURI().getURI().equals(uri)) {
Expand All @@ -459,7 +459,7 @@ public ObservationResponse get(String collection, String telescope, URI uri, Dat
}
}

private List<ObservationState> readObservationStateList(ObservationStateListReader transformer, String collection, String telescope, Date start, Date end, Integer maxrec) {
private List<ObservationState> readObservationStateList(ObservationStateListReader transformer, String collection, String queryParams, Date start, Date end, Integer maxrec) {

List<ObservationState> accList = new ArrayList<>();
boolean tooBigRequest = maxrec == null || maxrec > DEFAULT_BATCH_SIZE;
Expand Down Expand Up @@ -489,8 +489,8 @@ private List<ObservationState> readObservationStateList(ObservationStateListRead
surl = surl + "&end=" + df.format(end);
}

if (telescope != null) {
surl = surl + "&telescope=" + telescope;
if (queryParams != null) {
surl = surl + "&" + queryParams;
}

log.debug("URL: " + surl);
Expand Down Expand Up @@ -587,7 +587,7 @@ public void read(InputStream in) throws IOException {
}
}

private List<DeletedObservation> readDeletedEntityList(DeletionListReader transformer, String collection, String telescope,
private List<DeletedObservation> readDeletedEntityList(DeletionListReader transformer, String collection, String queryParams,
Date start, Date end, Integer maxrec) {

List<DeletedObservation> accList = new ArrayList<>();
Expand Down Expand Up @@ -618,8 +618,8 @@ private List<DeletedObservation> readDeletedEntityList(DeletionListReader transf
if (end != null) {
surl = surl + "&end=" + df.format(end);
}
if (telescope != null) {
surl = surl + "&telescope=" + telescope;
if (queryParams != null) {
surl = surl + "&" + queryParams;
}
log.debug("URL: " + surl);

Expand Down
48 changes: 20 additions & 28 deletions icewind/src/main/java/org/opencadc/icewind/CaomHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public class CaomHarvester implements Runnable {
private final HarvestSource src;
private final HarvestDestination dest;
private final List<String> collections;
private final List<String> telescopes;
private final URI basePublisherID;

// optional
Expand All @@ -119,10 +118,9 @@ public class CaomHarvester implements Runnable {
* @param collections list of collections to process
* @param basePublisherID base to use in generating Plane publisherID values in destination database
*/
public CaomHarvester(HarvestSource src, List<String> collections, List<String> telescopes, HarvestDestination dest, URI basePublisherID) {
public CaomHarvester(HarvestSource src, List<String> collections, HarvestDestination dest, URI basePublisherID) {
this.src = src;
this.collections = collections;
this.telescopes = telescopes;
this.dest = dest;
this.basePublisherID = basePublisherID;

Expand Down Expand Up @@ -162,16 +160,19 @@ public void run() {

long sleep = 0;
boolean done = false;
String telescope;
String collection;
String queryParams;
while (!done) {
int ingested = 0;
for (String collection : collections) {
telescope = getTelescope(collection);
for (String inputCollection : collections) {
collection = getCollection(inputCollection);
queryParams = getQueryParams(collection);

log.info(src.getIdentifier(collection) + " -> " + dest);

if (validateMode) {
ObservationValidator validator = new ObservationValidator(src, collection, telescope, dest, batchSize, numThreads, false);
ObservationHarvester obsHarvester = new ObservationHarvester(src, collection, telescope, dest, basePublisherID,
ObservationValidator validator = new ObservationValidator(src, collection, queryParams, dest, batchSize, numThreads, false);
ObservationHarvester obsHarvester = new ObservationHarvester(src, collection, queryParams, dest, basePublisherID,
batchSize, numThreads, nochecksum);
obsHarvester.setSkipped(skipMode, null);
try {
Expand All @@ -183,11 +184,11 @@ public void run() {
log.warn("validate " + src.getIdentifier(collection) + " FAIL", ex);
}
} else {
ObservationHarvester obsHarvester = new ObservationHarvester(src, collection, telescope, dest, basePublisherID,
ObservationHarvester obsHarvester = new ObservationHarvester(src, collection, queryParams, dest, basePublisherID,
batchSize, numThreads, nochecksum);
obsHarvester.setSkipped(skipMode, retryErrorMessagePattern);

DeletionHarvester obsDeleter = new DeletionHarvester(DeletedObservation.class, src, collection, telescope, dest);
DeletionHarvester obsDeleter = new DeletionHarvester(DeletedObservation.class, src, collection, queryParams, dest);
boolean initDel = init;
if (!init) {
// check if we have ever harvested before
Expand Down Expand Up @@ -230,29 +231,20 @@ public void run() {
}

/**
* Retrieve the telescope parameter associated to the collection
* @param collection
* @return the telescope associated for HLSP collections, null if collection is different
* Retrieve the query parameters associated to the collection
* @param inputCollection
* @return the query parameters associated to the collection
*/
private String getTelescope(String collection) {
if (collection.equals(HLSP_CONSTANT)) {
// Retrieve the first element and remove it
return this.telescopes.remove(0);
}
return null;
private String getQueryParams(String inputCollection) {
String[] splitCollection = inputCollection.split("&", 2)
return splitCollection.length > 1 ? splitCollection[1] : null
}

/**
* Validate that, for each HLSP collection, a telescope is assigned
* Retrieve the clean collection from a collection possibly defined with query parameters
*/
private void validateCollectionAndTelescopes() {
int numberOfHLSPCollections = (int)this.collections.stream().filter(col -> col.equals(HLSP_CONSTANT)).count();
int numberOfTelescopes = this.telescopes.size();

if (numberOfHLSPCollections != numberOfTelescopes) {
throw new IllegalArgumentException(
String.format("Telescopes must be configured with the same number of HLSP collections"));
}
private void getCollection(String inputCollection) {
return inputCollection.split("&")[0];
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ public class DeletionHarvester extends Harvester implements Runnable {
* @param collection the collection to process
* @param batchSize ignored, always full list
*/
public DeletionHarvester(Class<?> entityClass, HarvestSource src, String collection, String telescope,
public DeletionHarvester(Class<?> entityClass, HarvestSource src, String collection, String queryParams,
HarvestDestination dest) {
super(entityClass, src, collection, telescope, dest);
super(entityClass, src, collection, queryParams, dest);
setBatchSize(DEFAULT_BATCH_SIZE);
init();
}
Expand Down Expand Up @@ -282,7 +282,7 @@ private Progress doit() {
entityList = deletedDAO.getList(collection, startDate, endDate, batchSize);
} else {
source = "repoClient";
entityList = repoClient.getDeleted(collection, telescope, startDate, endDate, batchSize);
entityList = repoClient.getDeleted(collection, queryParams, startDate, endDate, batchSize);
}

if (entityList == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public List<String> getCollections() {
}

public URI getIdentifier(String collection) {
return URI.create(resourceID + "?" + collection);
return URI.create(resourceID + "?" + collection.replaceAll('?','&'));
}

@Override
Expand Down
4 changes: 1 addition & 3 deletions icewind/src/main/java/org/opencadc/icewind/Harvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,16 @@ public abstract class Harvester implements Runnable {
protected int batchSize;
protected HarvestSource src;
protected String collection;
protected String telescope;
protected HarvestDestination dest;
protected HarvestStateDAO harvestStateDAO;

protected Harvester() {
}

protected Harvester(Class entityClass, HarvestSource src, String collection, String telescope, HarvestDestination dest) {
protected Harvester(Class entityClass, HarvestSource src, String collection, HarvestDestination dest) {
this.entityClass = entityClass;
this.src = src;
this.collection = collection;
this.telescope = telescope;
this.dest = dest;
}

Expand Down
6 changes: 1 addition & 5 deletions icewind/src/main/java/org/opencadc/icewind/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public class Main {
private static final String RETRY_ERROR_PATTERN = CONFIG_PREFIX + ".retryErrorPattern";
private static final String REPO_SERVICE_CONFIG_KEY = CONFIG_PREFIX + ".repoService";
private static final String COLLECTION_CONFIG_KEY = CONFIG_PREFIX + ".collection";
private static final String TELESCOPE_CONFIG_KEY = CONFIG_PREFIX + ".telescope";
private static final String MAX_IDLE_CONFIG_KEY = CONFIG_PREFIX + ".maxIdle";
private static final String BATCH_SIZE_CONFIG_KEY = CONFIG_PREFIX + ".batchSize";
private static final String NUM_THREADS_CONFIG_KEY = CONFIG_PREFIX + ".numThreads";
Expand Down Expand Up @@ -162,8 +161,6 @@ public static void main(final String[] args) {
String.format("%s must be configured with a minimum of one collection", COLLECTION_CONFIG_KEY));
}

final List<String> configuredTelescopes = props.getProperty(TELESCOPE_CONFIG_KEY);

final URI configuredSourceRepoService;
String s = props.getFirstPropertyValue(REPO_SERVICE_CONFIG_KEY);
try {
Expand Down Expand Up @@ -255,8 +252,7 @@ public static void main(final String[] args) {
}
}

CaomHarvester harvester = new CaomHarvester(sourceHarvestResource, configuredCollections,
configuredTelescopes, destinationHarvestResource, basePublisherID);
CaomHarvester harvester = new CaomHarvester(sourceHarvestResource, configuredCollections, destinationHarvestResource, basePublisherID);
harvester.batchSize = batchSize;
harvester.numThreads = numThreads;
harvester.maxSleep = maxSleep;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ public class ObservationHarvester extends Harvester {
private int ingested = 0;
private String errorMessagePattern;

public ObservationHarvester(HarvestSource src, String collection, String telescope,
public ObservationHarvester(HarvestSource src, String collection, String queryParams,
HarvestDestination dest, URI basePublisherID,
Integer batchSize, int nthreads, boolean nochecksum) {
super(Observation.class, src, collection, telescope, dest);
super(Observation.class, src, collection, queryParams, dest);
setBatchSize(batchSize);
this.basePublisherID = basePublisherID;
this.nochecksum = nochecksum;
Expand Down Expand Up @@ -294,7 +294,7 @@ private Progress doit() {
if (srcObservationDAO != null) {
obsList = srcObservationDAO.getList(collection, startDate, endDate, batchSize + 1);
} else {
obsList = srcRepoClient.getList(collection, telescope, startDate, endDate, batchSize + 1);
obsList = srcRepoClient.getList(collection, queryParams, startDate, endDate, batchSize + 1);
}
entityList = wrap(obsList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ public class ObservationValidator extends Harvester {
HarvestSkipURIDAO harvestSkip = null;
private int numMismatches = 0;

public ObservationValidator(HarvestSource src, String collection, String telescope, HarvestDestination dest,
public ObservationValidator(HarvestSource src, String collection, String queryParams, HarvestDestination dest,
Integer batchSize, int nthreads, boolean dryrun) {
super(Observation.class, src, collection, telescope, dest);
super(Observation.class, src, collection, queryParams, dest);
setBatchSize(batchSize);
this.dryrun = dryrun;
init(nthreads);
Expand Down Expand Up @@ -199,7 +199,7 @@ private Progress doit() {
t = System.currentTimeMillis();

log.info("getObservationList: " + src.getIdentifier(collection));
List<ObservationState> tmpSrcState = srcObservationService.getObservationList(collection, telescope, null, null, null);
List<ObservationState> tmpSrcState = srcObservationService.getObservationList(collection, queryParams, null, null, null);
log.debug("found: " + tmpSrcState.size());

Set<ObservationState> srcState = new TreeSet<>(compStateUri);
Expand Down

0 comments on commit 50cdc17

Please sign in to comment.