Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence framework #66

Merged
merged 12 commits into from
Dec 1, 2018
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ target
bin
.idea
.iml
*.iml
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<jackson.version>2.9.4</jackson.version>
<github.global.server>github</github.global.server>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<developers>
<developer>
Expand Down
92 changes: 74 additions & 18 deletions rre-core/src/main/java/io/sease/rre/core/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@
import io.sease.rre.Func;
import io.sease.rre.core.domain.*;
import io.sease.rre.core.domain.metrics.Metric;
import io.sease.rre.persistence.PersistenceConfiguration;
import io.sease.rre.persistence.PersistenceHandler;
import io.sease.rre.persistence.PersistenceManager;
import io.sease.rre.search.api.QueryOrSearchResponse;
import io.sease.rre.search.api.SearchPlatform;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
Expand Down Expand Up @@ -60,6 +59,10 @@ public class Engine {
private ObjectMapper mapper = new ObjectMapper();

private List<String> versions;
private String versionTimestamp = null;

private final PersistenceManager persistenceManager;
private final PersistenceConfiguration persistenceConfiguration;

/**
* Builds a new {@link Engine} instance with the given data.
Expand All @@ -69,6 +72,10 @@ public class Engine {
* @param corporaFolderPath the corpora folder path.
* @param ratingsFolderPath the ratings folder path.
* @param templatesFolderPath the query templates folder path.
* @param metrics the list of metric classes to include in the output.
* @param fields the fields to retrieve with each result.
* @param exclude a list of folders to exclude when scanning the configuration folders.
* @param include a list of folders to include from the configuration folders.
*/
public Engine(
final SearchPlatform platform,
Expand All @@ -79,7 +86,8 @@ public Engine(
final List<String> metrics,
final String[] fields,
final List<String> exclude,
final List<String> include) {
final List<String> include,
final PersistenceConfiguration persistenceConfiguration) {
this.configurationsFolder = new File(configurationsFolderPath);
this.corporaFolder = new File(corporaFolderPath);
this.ratingsFolder = new File(ratingsFolderPath);
Expand All @@ -95,6 +103,10 @@ public Engine(
.map(Func::newMetricDefinition)
.filter(Objects::nonNull)
.collect(toList());

this.persistenceConfiguration = persistenceConfiguration;
this.persistenceManager = new PersistenceManager();
initialisePersistenceManager();
}

public String name(final JsonNode node) {
Expand All @@ -104,6 +116,19 @@ public String name(final JsonNode node) {
.orElse(UNNAMED);
}

private void initialisePersistenceManager() {
persistenceConfiguration.getHandlers().forEach((n, h) -> {
try {
// Instantiate the handler
PersistenceHandler handler = (PersistenceHandler) Class.forName(h).newInstance();
handler.configure(n, persistenceConfiguration.getHandlerConfigurationByName(n));
persistenceManager.registerHandler(handler);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
LOGGER.error("[" + n + "] Caught exception instantiating persistence handler :: " + e.getMessage());
}
});
}

/**
* Executes the evaluation process.
*
Expand All @@ -116,11 +141,13 @@ public Evaluation evaluate(final Map<String, Object> configuration) {
LOGGER.info("RRE: New evaluation session is starting...");

platform.beforeStart(configuration);
persistenceManager.beforeStart();

LOGGER.info("RRE: Search Platform in use: " + platform.getName());
LOGGER.info("RRE: Starting " + platform.getName() + "...");

platform.start();
persistenceManager.start();

LOGGER.info("RRE: " + platform.getName() + " Search Platform successfully started.");

Expand All @@ -133,14 +160,14 @@ public Evaluation evaluate(final Map<String, Object> configuration) {
LOGGER.info("RRE: Ratings Set processing starts");

final String indexName =
requireNonNull(
ratingsNode.get(INDEX_NAME),
"WARNING!!! \"" + INDEX_NAME + "\" attribute not found!").asText();
requireNonNull(
ratingsNode.get(INDEX_NAME),
"WARNING!!! \"" + INDEX_NAME + "\" attribute not found!").asText();
final String idFieldName =
requireNonNull(
ratingsNode.get(ID_FIELD_NAME),
"WARNING!!! \"" + ID_FIELD_NAME + "\" attribute not found!")
.asText(DEFAULT_ID_FIELD_NAME);
requireNonNull(
ratingsNode.get(ID_FIELD_NAME),
"WARNING!!! \"" + ID_FIELD_NAME + "\" attribute not found!")
.asText(DEFAULT_ID_FIELD_NAME);

final File data = data(ratingsNode);
final String queryPlaceholder = ofNullable(ratingsNode.get("query_placeholder")).map(JsonNode::asText).orElse("$query");
Expand Down Expand Up @@ -194,9 +221,12 @@ public Evaluation evaluate(final Map<String, Object> configuration) {
query(queryNode, sharedTemplate, version),
fields,
Math.max(10, relevantDocuments.size()));
queryEvaluation.setTotalHits(response.totalHits(), version);
response.hits().forEach(hit -> queryEvaluation.collect(hit, rank.getAndIncrement(), version));
queryEvaluation.setTotalHits(response.totalHits(), persistVersion(version));
response.hits().forEach(hit -> queryEvaluation.collect(hit, rank.getAndIncrement(), persistVersion(version)));
});

// Persist the query result
persistenceManager.recordQuery(queryEvaluation);
});
});
});
Expand All @@ -207,7 +237,10 @@ public Evaluation evaluate(final Map<String, Object> configuration) {
return evaluation;
} finally {
platform.beforeStop();
persistenceManager.beforeStop();
LOGGER.info("RRE: " + platform.getName() + " Search Platform shutdown procedure executed.");
LOGGER.info("RRE: Stopping persistence manager");
persistenceManager.stop();
}
}

Expand Down Expand Up @@ -307,7 +340,8 @@ private List<Metric> availableMetrics(
return metric;
} catch (final Exception exception) {
throw new IllegalArgumentException(exception);
}})
}
})
.collect(toList());
}

Expand Down Expand Up @@ -358,7 +392,7 @@ private String templateContent(final File file) {
* @return the ratings / judgements for this evaluation suite.
*/
private Stream<JsonNode> ratings() {
final File [] ratingsFiles =
final File[] ratingsFiles =
requireNonNull(
ratingsFolder.listFiles(ONLY_JSON_FILES),
"Unable to find the ratings folder.");
Expand Down Expand Up @@ -390,8 +424,8 @@ private void prepareData(final String indexName, final File data) {
final File[] versionFolders =
safe(configurationsFolder.listFiles(
file -> ONLY_DIRECTORIES.accept(file)
&& (include.isEmpty() || include.contains(file.getName()) || include.stream().anyMatch(rule -> file.getName().matches(rule)))
&& (exclude.isEmpty() || (!exclude.contains(file.getName()) && exclude.stream().noneMatch(rule -> file.getName().matches(rule))))));
&& (include.isEmpty() || include.contains(file.getName()) || include.stream().anyMatch(rule -> file.getName().matches(rule)))
&& (exclude.isEmpty() || (!exclude.contains(file.getName()) && exclude.stream().noneMatch(rule -> file.getName().matches(rule))))));

if (versionFolders == null || versionFolders.length == 0) {
throw new IllegalArgumentException("RRE: no target versions available. Check the configuration set folder and include/exclude clauses.");
Expand All @@ -411,6 +445,15 @@ private void prepareData(final String indexName, final File data) {
.map(File::getName)
.collect(toList());

if (persistenceConfiguration.isUseTimestampAsVersion()) {
if (versions.size() == 1) {
versionTimestamp = String.valueOf(System.currentTimeMillis());
LOGGER.info("Using local system timestamp as version tag : " + versionTimestamp);
} else {
LOGGER.warn("Persistence.useTimestampAsVersion == true, but multiple configurations exist - ignoring");
}
}

LOGGER.info("RRE: target versions are " + String.join(",", versions));
}

Expand Down Expand Up @@ -444,4 +487,17 @@ private String query(final JsonNode queryNode, final Optional<String> defaultTem
}
return query;
}

/**
* Get the version to store when persisting query results.
*
* @param configVersion the configuration set version being evaluated.
* @return the given configVersion, or the version timestamp if and only
* if it is set (eg. there is a single version, and the persistence
* configuration indicates a timestamp should be used to version this
* evaluation data).
*/
private String persistVersion(final String configVersion) {
return ofNullable(versionTimestamp).orElse(configVersion);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.sease.rre.core.domain;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.sease.rre.core.domain.metrics.Metric;
import io.sease.rre.core.domain.metrics.impl.AveragedMetric;
Expand Down Expand Up @@ -124,4 +125,9 @@ public void notifyCollectedMetrics() {
public Map<String, Metric> getMetrics() {
return metrics;
}

@JsonIgnore
public Optional<DomainMember> getParent() {
return ofNullable(parent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.sease.rre.persistence;

import java.util.HashMap;
import java.util.Map;

/**
* Configuration details for the persistence manager.
* <p>
* Configuration consists of a set of handler names, mapped to their
* implementation classes. The handler names are then used to refer to
* specific sets of configuration details, allowing the same implementation
* to be used multiple times with separate destinations (for example).
*
* @author Matt Pearce ([email protected])
*/
public class PersistenceConfiguration {

/**
* Default configuration object, with configuration for persisting to
* a single JSON output file.
*/
public static final PersistenceConfiguration DEFAULT_CONFIG = defaultConfiguration();

private boolean useTimestampAsVersion = false;
private Map<String, String> handlers;
// Supplying type params for nested map breaks Maven initialisation
private Map<String, Map> handlerConfiguration;

@SuppressWarnings("unused")
public PersistenceConfiguration() {
// Do nothing - required for Maven initialisation
}

private PersistenceConfiguration(boolean useTimestampAsVersion,
Map<String, String> handlers,
Map<String, Map> handlerConfiguration) {
this.useTimestampAsVersion = useTimestampAsVersion;
this.handlers = handlers;
this.handlerConfiguration = handlerConfiguration;
}

/**
* Should the persistence framework use a timestamp in place of the
* configuration version? This timestamp should be consistent for
* the duration of the evaluation.
* <p>
* This will take effect when there is only one configuration set
* available - eg. for users who modify the same configuration set
* rather than creating a separate one each iteration.
*
* @return {@code true} if a timestamp should be used in place of the
* version string when persisting query output.
*/
public boolean isUseTimestampAsVersion() {
return useTimestampAsVersion;
}

/**
* @return a map of handler name to implementation classes.
*/
public Map<String, String> getHandlers() {
return handlers;
}

/**
* @return a map of handler name to a map containing configuration
* details for the handler.
*/
public Map<String, Map> getHandlerConfiguration() {
return handlerConfiguration;
}

/**
* Get the configuration for an individual handler, returning a map of
* configuration items keyed by the String value of their name. If there
* is no configuration, an empty map will be returned.
*
* @param name the name of the handler whose configuration is required.
* @return a String:Object map containing the configuration. Never
* {@code null}.
*/
@SuppressWarnings("unchecked")
public Map<String, Object> getHandlerConfigurationByName(String name) {
Map<String, Object> configMap = new HashMap<>();

if (handlerConfiguration.get(name) != null) {
handlerConfiguration.get(name).forEach((k, v) -> configMap.put(String.valueOf(k), v));
}

return configMap;
}

/**
* Build a default PersistenceConfiguration, with handlers set to write
* to the standard JSON output file.
*
* @return a PersistenceConfiguration object.
*/
private static PersistenceConfiguration defaultConfiguration() {
final String jsonKey = "json";
Map<String, String> handlers = new HashMap<>();
handlers.put(jsonKey, "io.sease.rre.persistence.impl.JsonPersistenceHandler");
Map<String, Object> jsonConfig = new HashMap<>();
jsonConfig.put("outputFile", "target/rre/evaluation.json");
Map<String, Map> handlerConfig = new HashMap<>();
handlerConfig.put(jsonKey, jsonConfig);

return new PersistenceConfiguration(false, handlers, handlerConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.sease.rre.persistence;

/**
* Exception thrown by the persistence framework.
*
* @author Matt Pearce ([email protected])
*/
public class PersistenceException extends Exception {

public PersistenceException() {
super();
}

public PersistenceException(String message) {
super(message);
}

public PersistenceException(String message, Throwable cause) {
super(message, cause);
}

public PersistenceException(Throwable cause) {
super(cause);
}
}
Loading