Skip to content

Commit

Permalink
towards streaming reviews as outlined in Big-Bee-Network/bif#1 (comment)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Poelen committed Aug 1, 2024
1 parent a8a871d commit 73a0243
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 29 deletions.
62 changes: 40 additions & 22 deletions src/main/java/org/globalbioticinteractions/elton/cmd/CmdReview.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.globalbioticinteractions.elton.Elton;
import org.globalbioticinteractions.elton.util.DatasetRegistryUtil;
import org.globalbioticinteractions.elton.util.NodeFactoryNull;
import org.globalbioticinteractions.elton.util.ProgressCursorFactory;
import org.globalbioticinteractions.elton.util.ProgressUtil;
import org.globalbioticinteractions.elton.util.SpecimenNull;
import picocli.CommandLine;
Expand Down Expand Up @@ -70,6 +71,7 @@
public class CmdReview extends CmdTabularWriterParams {
public static final String LOG_FORMAT_STRING = "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s";
public static final long LOG_NUMBER_OF_FIELDS = Arrays.stream(LOG_FORMAT_STRING.split("\t")).filter(x -> x.equals("%s")).count();
public static final String REVIEWER_DEFAULT = "GloBI automated reviewer (elton-" + Elton.getVersionString() + ")";

@CommandLine.Option(names = {"-n", "--lines"}, description = "print first n number of lines")
private Long maxLines = null;
Expand All @@ -80,7 +82,7 @@ public class CmdReview extends CmdTabularWriterParams {

private DateFactory dateFactory = Date::new;

private String reviewerName = "GloBI automated reviewer (elton-" + Elton.getVersionString() + ")";
private String reviewerName = REVIEWER_DEFAULT;

private String reviewId = UUID.randomUUID().toString();

Expand Down Expand Up @@ -112,7 +114,7 @@ public void run() {
new ResourceServiceLocalAndRemote(factory)
);

review(DatasetRegistryUtil.NAMESPACE_LOCAL, registryLocal, factory);
review(DatasetRegistryUtil.NAMESPACE_LOCAL, registryLocal, factory, shouldSkipHeader());
}

reviewCachedOrRemote(remoteNamespaces, factory);
Expand All @@ -124,35 +126,35 @@ public void run() {

private void reviewCachedOrRemote(List<String> namespaces, InputStreamFactory inputStreamFactory) throws StudyImporterException {
for (String namespace : namespaces) {
review(namespace, DatasetRegistryUtil.forCacheDir(getCacheDir(), new ResourceServiceLocal(inputStreamFactory)), inputStreamFactory);
review(namespace, DatasetRegistryUtil.forCacheDir(getCacheDir(), new ResourceServiceLocal(inputStreamFactory)), inputStreamFactory, shouldSkipHeader());
}
}

private void review(String namespace, DatasetRegistry registry, InputStreamFactory inputStreamFactory) throws StudyImporterException {
private void review(String namespace, DatasetRegistry registry, InputStreamFactory inputStreamFactory, boolean shouldSkipHeader) throws StudyImporterException {
ReviewReport report = createReport(namespace, CmdReview.this.reviewId, CmdReview.this.getReviewerName(), CmdReview.this.dateFactory);
ReviewReportLogger reviewReportLogger = new ReviewReportLogger(report, getStdout(), getMaxLines(), getProgressCursorFactory());
ReviewReportLogger logger = new ReviewReportLogger(report, getStdout(), getMaxLines(), getProgressCursorFactory());

try {
getStderr().print("creating review [" + namespace + "]... ");

Dataset dataset = new DatasetFactory(
registry,
inputStreamFactory)
.datasetFor(namespace);

String citationString = CitationUtil.citationFor(dataset);
if (StringUtils.startsWith(citationString, "<")
&& StringUtils.endsWith(citationString, ">")) {
reviewReportLogger.warn(null, "no citation found for dataset at [" + dataset.getArchiveURI() + "]");
}
NodeFactoryReview nodeFactory = new NodeFactoryReview(report.getInteractionCounter(), reviewReportLogger);
nodeFactory.getOrCreateDataset(dataset);
getStderr().print("creating review [" + namespace + "]... ");
if (!shouldSkipHeader()) {
logHeader(getStdout());
if (!shouldSkipHeader) {
logReviewHeader(getStdout());
}

NodeFactoryReview nodeFactory = new NodeFactoryReview(
report.getInteractionCounter(),
logger,
getProgressCursorFactory()
);

ParserFactoryLocal parserFactory = new ParserFactoryLocal(getClass());
DatasetImporterForRegistry studyImporter = new DatasetImporterForRegistry(parserFactory, nodeFactory, registry);
studyImporter.setLogger(reviewReportLogger);
studyImporter.setLogger(logger);
DatasetImportUtil.importDataset(
null,
dataset,
Expand All @@ -162,19 +164,19 @@ private void review(String namespace, DatasetRegistry registry, InputStreamFacto
);

if (report.getInteractionCounter().get() == 0) {
reviewReportLogger.warn(null, "no interactions found");
logger.warn(null, "no interactions found");
}
getStderr().println("done.");
log(null, dataset.getArchiveURI().toString(), ReviewCommentType.summary, report, getStdout());
} catch (DatasetRegistryException e) {
reviewReportLogger.warn(null, "no local repository at [" + getWorkDir().toString() + "]");
logger.warn(null, "no local repository at [" + getWorkDir().toString() + "]");
getStderr().println("failed.");
throw new StudyImporterException(e);
} catch (Throwable e) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
e.printStackTrace(new PrintWriter(out));
e.printStackTrace();
reviewReportLogger.severe(null, new String(out.toByteArray()));
logger.severe(null, new String(out.toByteArray()));
throw new StudyImporterException(e);
} finally {
log(null, report.getInteractionCounter().get() + " interaction(s)", ReviewCommentType.summary, report, getStdout());
Expand All @@ -196,7 +198,7 @@ private ReviewReport createReport(String namespace, String reviewId1, String rev
reviewId1, dateFactory1, reviewerName1, interactionCounter);
}

private void logHeader(PrintStream out) {
public static void logReviewHeader(PrintStream out) {
logReviewComment(out, "reviewId", "reviewDate", "reviewer", "namespace", "reviewCommentType", "reviewComment", "archiveURI", "referenceUrl", "institutionCode", "collectionCode", "collectionId", "catalogNumber", "occurrenceId", "sourceCitation", "dataContext");
}

Expand Down Expand Up @@ -292,24 +294,40 @@ public void setDesiredReviewCommentTypes(List<ReviewCommentType> commentTypes) {
desiredReviewCommentTypes = commentTypes;
}

private class NodeFactoryReview extends NodeFactoryNull {
public static class NodeFactoryReview extends NodeFactoryNull {
final AtomicLong counter;
final ImportLogger importLogger;

public NodeFactoryReview(AtomicLong counter, ImportLogger importLogger) {
this(counter, importLogger, new ProgressCursorFactoryNoop());
}

public NodeFactoryReview(AtomicLong counter, ImportLogger importLogger, ProgressCursorFactory progressCursorFactory) {
this.counter = counter;
this.importLogger = importLogger;
this.progressCursorFactory = progressCursorFactory;
}

private ProgressCursorFactory progressCursorFactory;
final Specimen specimen = new SpecimenNull() {
@Override
public void interactsWith(Specimen target, InteractType type, Location centroid) {
long count = counter.get();
ProgressUtil.logProgress(ProgressUtil.SPECIMEN_CREATED_PROGRESS_BATCH_SIZE, count, getProgressCursorFactory().createProgressCursor());
ProgressUtil.logProgress(ProgressUtil.SPECIMEN_CREATED_PROGRESS_BATCH_SIZE, count, progressCursorFactory.createProgressCursor());
counter.getAndIncrement();
}
};

@Override
public Dataset getOrCreateDataset(Dataset dataset) {
String citationString = CitationUtil.citationFor(dataset);
if (StringUtils.startsWith(citationString, "<")
&& StringUtils.endsWith(citationString, ">")) {
importLogger.warn(null, "no citation found for dataset at [" + dataset.getArchiveURI() + "]");
}
return super.getOrCreateDataset(dataset);
}


@Override
public Specimen createSpecimen(Study study, Taxon taxon) throws NodeFactoryException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
import org.apache.commons.lang3.NotImplementedException;
import org.eol.globi.data.ImportLogger;
import org.eol.globi.data.NodeFactory;
import org.eol.globi.domain.LogContext;
import org.eol.globi.tool.NullImportLogger;
import org.globalbioticinteractions.elton.util.DatasetRegistryUtil;
import org.globalbioticinteractions.elton.util.ProgressCursor;
import org.globalbioticinteractions.elton.util.ProgressCursorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
Expand All @@ -19,8 +22,10 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

@CommandLine.Command(
Expand All @@ -34,6 +39,10 @@ public class CmdStream extends CmdDefaultParams {

private final static Logger LOG = LoggerFactory.getLogger(CmdStream.class);

public void setRecordType(String recordType) {
this.recordType = recordType;
}

@CommandLine.Option(names = {"--record-type"},
description = "record types (e.g., interaction, name, review)"
)
Expand All @@ -51,20 +60,28 @@ public void run() {
JsonNode jsonNode = new ObjectMapper().readTree(line);
String namespace = jsonNode.at("/namespace").asText(DatasetRegistryUtil.NAMESPACE_LOCAL);
if (StringUtils.isNotBlank(namespace)) {
ImportLoggerFactory loggerFactory = new ImportLoggerFactoryImpl(recordType, namespace, Arrays.asList(ReviewCommentType.values()), getStdout());
try {
boolean shouldWriteHeader = isFirst.get();
StreamingDatasetsHandler namespaceHandler = new StreamingDatasetsHandler(
jsonNode,
this.getCacheDir(),
this.getStderr(),
this.createInputStreamFactory(),
new NodeFactoryFactoryImpl(shouldWriteHeader, recordType),
new ImportLoggerFactoryImpl(recordType)
new NodeFactoryFactoryImpl(shouldWriteHeader, recordType, loggerFactory.createImportLogger()),
loggerFactory
);
namespaceHandler.onNamespace(namespace);
isFirst.set(false);
} catch (Exception e) {
LOG.error("failed to add dataset associated with namespace [" + namespace + "]", e);
String msg = "failed to add dataset associated with namespace [" + namespace + "]";
loggerFactory.createImportLogger().warn(new LogContext() {
@Override
public String toString() {
return "{ \"namespace\": \"" + namespace + "\" }";
}
}, msg);
LOG.error(msg, e);
} finally {
FileUtils.forceDelete(new File(this.getCacheDir()));
}
Expand All @@ -81,16 +98,41 @@ public void run() {

public static class ImportLoggerFactoryImpl implements ImportLoggerFactory {
private final String recordType;

public ImportLoggerFactoryImpl(String recordType) {
private final String namespace;
private final List<ReviewCommentType> desiredReviewCommentTypes;
private final PrintStream stdout;

public ImportLoggerFactoryImpl(String recordType,
String namespace,
List<ReviewCommentType> desiredReviewCommentTypes,
PrintStream stdout) {
this.recordType = recordType;
this.namespace = namespace;
this.desiredReviewCommentTypes = desiredReviewCommentTypes;
this.stdout = stdout;
}

@Override
public ImportLogger createImportLogger() {
ImportLogger logger;
if (Arrays.asList("name", "interaction").contains(recordType)) {
logger = new NullImportLogger();
} else if (StringUtils.equals("review", recordType)) {
logger = new ReviewReportLogger(
new ReviewReport(namespace, desiredReviewCommentTypes),
stdout,
null,
new ProgressCursorFactory() {
@Override
public ProgressCursor createProgressCursor() {
return new ProgressCursor() {
@Override
public void increment() {

}
};
}
});
} else {
throw new NotImplementedException("no import logger for [" + recordType + "] available yet.");
}
Expand All @@ -103,10 +145,12 @@ public class NodeFactoryFactoryImpl implements NodeFactorFactory {

private final boolean shouldWriteHeader;
private final String recordType;
private ImportLogger logger;

public NodeFactoryFactoryImpl(boolean shouldWriteHeader, String recordType) {
public NodeFactoryFactoryImpl(boolean shouldWriteHeader, String recordType, ImportLogger logger) {
this.shouldWriteHeader = shouldWriteHeader;
this.recordType = recordType;
this.logger = logger;
}

@Override
Expand All @@ -116,10 +160,16 @@ public NodeFactory createNodeFactory() {
factory = WriterUtil.nodeFactoryForInteractionWriting(shouldWriteHeader, getStdout());
} else if (StringUtils.equals("name", recordType)) {
factory = WriterUtil.nodeFactoryForTaxonWriting(shouldWriteHeader, getStdout());
} else if (StringUtils.equals("review", recordType)) {
factory = WriterUtil.nodeFactoryForReviewWriting(shouldWriteHeader, getStdout(), logger);
} else {
throw new NotImplementedException("no node factory for [" + recordType + "] available yet.");
}
return factory;
}
}

public static class WriterFactory {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.globalbioticinteractions.elton.cmd;

import org.globalbioticinteractions.elton.util.ProgressCursor;
import org.globalbioticinteractions.elton.util.ProgressCursorFactory;

public class ProgressCursorFactoryNoop implements ProgressCursorFactory {
@Override
public ProgressCursor createProgressCursor() {
return new ProgressCursor() {
@Override
public void increment() {
//
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.globalbioticinteractions.elton.cmd;

import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

public class ReviewReport {
Expand All @@ -15,6 +17,18 @@ public class ReviewReport {
private final DateFactory dateFactory;
private final String reviewerName;

ReviewReport(String namespace, List<ReviewCommentType> desiredReviewCommentTypes) {
this.infoCounter = new AtomicLong(0);
this.noteCounter = new AtomicLong(0);
this.interactionCounter = new AtomicLong(0);
this.namespace = namespace;
this.desiredReviewCommentTypes = desiredReviewCommentTypes;
this.lineCount = new AtomicLong(0);
this.dateFactory = () -> new Date();
this.reviewId = UUID.randomUUID().toString();
this.reviewerName = CmdReview.REVIEWER_DEFAULT;
}

ReviewReport(AtomicLong infoCounter, AtomicLong noteCounter, String namespace, List<ReviewCommentType> desiredReviewCommentTypes, AtomicLong lineCount, String reviewId, DateFactory dateFactory, String reviewerName, AtomicLong interactionCounter) {
this.infoCounter = infoCounter;
this.noteCounter = noteCounter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.globalbioticinteractions.elton.cmd;

import org.eol.globi.data.ImportLogger;
import org.eol.globi.data.NodeFactory;
import org.eol.globi.data.NodeFactoryException;
import org.eol.globi.domain.Interaction;
Expand All @@ -13,6 +14,7 @@
import org.globalbioticinteractions.elton.util.TaxonWriter;

import java.io.PrintStream;
import java.util.concurrent.atomic.AtomicLong;

public class WriterUtil {
static NodeFactory nodeFactoryForInteractionWriting(boolean shouldWriteHeader, PrintStream stdout) {
Expand Down Expand Up @@ -55,4 +57,14 @@ public Specimen createSpecimen(Study study, Taxon taxon) throws NodeFactoryExcep
}
};
}

public static NodeFactory nodeFactoryForReviewWriting(boolean shouldWriteHeader,
PrintStream out,
ImportLogger importLogger) {
if (shouldWriteHeader) {
CmdReview.logReviewHeader(out);
}
return new CmdReview.NodeFactoryReview(new AtomicLong(0), importLogger);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void runCheckLocalNoRepo() {
try {
cmdReview.run();
} finally {
assertThat(errOs.toString(), is("failed.\n"));
assertThat(errOs.toString(), is("creating review [local]... failed.\n"));
}

}
Expand Down
Loading

0 comments on commit 73a0243

Please sign in to comment.