diff --git a/src/main/java/to/wetransform/hale/transformer/CustomTarget.java b/src/main/java/to/wetransform/hale/transformer/CustomTarget.java new file mode 100644 index 0000000..63c1f8d --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/CustomTarget.java @@ -0,0 +1,13 @@ +package to.wetransform.hale.transformer; + +import java.util.HashMap; +import java.util.Map; + +import eu.esdihumboldt.hale.common.core.io.Value; + +public record CustomTarget(String providerId, Map settings) { + + public CustomTarget(String providerId) { + this(providerId, new HashMap<>()); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/SourceConfig.java b/src/main/java/to/wetransform/hale/transformer/SourceConfig.java new file mode 100644 index 0000000..927bae2 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/SourceConfig.java @@ -0,0 +1,17 @@ +package to.wetransform.hale.transformer; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import eu.esdihumboldt.hale.common.core.io.Value; + +public record SourceConfig( + URI location, String providerId, Map settings, boolean transform, List attachments) { + + public SourceConfig(URI location, String providerId) { + this(location, providerId, new HashMap<>(), true, new ArrayList<>()); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/TargetConfig.java b/src/main/java/to/wetransform/hale/transformer/TargetConfig.java new file mode 100644 index 0000000..38a6548 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/TargetConfig.java @@ -0,0 +1,3 @@ +package to.wetransform.hale.transformer; + +public record TargetConfig(String filename, String preset, CustomTarget customTarget) {} diff --git a/src/main/java/to/wetransform/hale/transformer/Transformer.java b/src/main/java/to/wetransform/hale/transformer/Transformer.java index 9362a08..4f1465b 100644 --- a/src/main/java/to/wetransform/hale/transformer/Transformer.java +++ b/src/main/java/to/wetransform/hale/transformer/Transformer.java @@ -1,13 +1,80 @@ package to.wetransform.hale.transformer; +import java.io.InputStream; +import java.net.URI; +import java.text.MessageFormat; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import eu.esdihumboldt.hale.app.transform.ExecContext; +import eu.esdihumboldt.hale.common.core.HalePlatform; +import eu.esdihumboldt.hale.common.core.io.project.model.Project; +import eu.esdihumboldt.hale.common.core.io.supplier.DefaultInputSupplier; +import eu.esdihumboldt.util.io.IOUtils; +import org.osgi.framework.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import to.wetransform.halecli.internal.Init; public class Transformer { private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); - void transform() { - LOG.info("Transforming..."); + private CountDownLatch latch = new CountDownLatch(1); + + public void transform(/* TODO add parameters for data and project sources */ ) { + // TODO setup log files for reports and transformation log + + long heapMaxSize = Runtime.getRuntime().maxMemory(); + LOG.info("Maximum heap size configured as " + IOUtils.humanReadableByteCount(heapMaxSize, false)); + + Init.init(); + + Version version = HalePlatform.getCoreVersion(); + LOG.info(MessageFormat.format("Launching hale-transformer {0}...", version.toString())); + + ExecContext context = new ExecContext(); + + // URI projectUri = .... + // context.setProject(projectUri); + // Project project = loadProject(projectUri); + + // context.setSources(...) + // context.setSourceProviderIds(...) + // context.setSourcesSettings(...) + + // Value sourceCrs = null; + // TODO determine source CRS + + // TargetConfig targetConfig = configureTarget(project, sourceCrs); + + try { + // run the transformation + + LOG.info("Transforming..."); + TimeUnit.SECONDS.sleep(30); + // new ExecTransformation().run(context); + + LOG.info("Transformation complete."); + } catch (Throwable t) { + LOG.error("Failed to execute transformation: " + t.getMessage(), t); + } finally { + latch.countDown(); + } + } + + private Project loadProject(URI projectUri) { + DefaultInputSupplier supplier = new DefaultInputSupplier(projectUri); + Project result = null; + try (InputStream in = supplier.getInput()) { + result = Project.load(in); + } catch (Exception e) { + LOG.warn("Could not load project file to determine presets: " + e.getStackTrace()); + } + return result; + } + + public CountDownLatch getLatch() { + return latch; } } diff --git a/src/main/java/to/wetransform/hale/transformer/api/TransformerApiApplication.java b/src/main/java/to/wetransform/hale/transformer/api/TransformerApiApplication.java index b67b12e..d8c6d80 100644 --- a/src/main/java/to/wetransform/hale/transformer/api/TransformerApiApplication.java +++ b/src/main/java/to/wetransform/hale/transformer/api/TransformerApiApplication.java @@ -28,11 +28,15 @@ public class TransformerApiApplication { @Bean Queue queue() { + // TODO Queue should be declared passively, i.e. it should be created + // outside of this application return new Queue(QUEUE_NAME, false); } @Bean TopicExchange exchange() { + // TODO Exchange should be declared passively, i.e. it should be created + // outside of this application return new TopicExchange(TOPIC_EXCHANGE_NAME); } diff --git a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java index 15eb742..1f5d524 100644 --- a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java +++ b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java @@ -1,7 +1,7 @@ package to.wetransform.hale.transformer.api.messaging; import java.io.Serializable; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.annotation.JsonProperty; import org.slf4j.Logger; @@ -22,15 +22,25 @@ public record TransformationMessage( private static final Logger LOG = LoggerFactory.getLogger(TransformationMessageConsumer.class); - private CountDownLatch latch = new CountDownLatch(1); - @RabbitListener(queues = TransformerApiApplication.QUEUE_NAME) public void receiveMessage(final TransformationMessage message) { LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl); - latch.countDown(); - } - public CountDownLatch getLatch() { - return latch; + // TODO Implement mechanism to only accept a message from the queue if no + // transformation is currently running + + if (message.projectUrl != null && message.sourceDataUrl() != null) { + Transformer tx = new Transformer(); + + try { + tx.transform(); + tx.getLatch().await(10, TimeUnit.MINUTES); // TODO make configurable + } catch (InterruptedException e) { + // TODO What should be done when the transformation fails or times out? + // - Simply requeuing the message is probably not helpful + // - Send a message back so that the producer can react? + LOG.error("Transformation process timed out: " + e.getMessage(), e); + } + } } }