From 607b0ab0fd64730c464297ef745360cab8b71486 Mon Sep 17 00:00:00 2001 From: Chinmoy Chakraborty Date: Fri, 1 Dec 2023 16:42:20 +0530 Subject: [PATCH 1/3] Added endpoint to process XMessage. --- .../controllers/XMessageController.java | 25 ++++++ .../transformer/odk/ODKConsumerReactive.java | 83 ++++++++++--------- 2 files changed, 67 insertions(+), 41 deletions(-) create mode 100644 src/main/java/com/uci/transformer/controllers/XMessageController.java diff --git a/src/main/java/com/uci/transformer/controllers/XMessageController.java b/src/main/java/com/uci/transformer/controllers/XMessageController.java new file mode 100644 index 0000000..11674a2 --- /dev/null +++ b/src/main/java/com/uci/transformer/controllers/XMessageController.java @@ -0,0 +1,25 @@ +package com.uci.transformer.controllers; + +import com.uci.transformer.odk.ODKConsumerReactive; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import javax.ws.rs.BadRequestException; + +@RestController() +public class XMessageController { + + @Autowired + private ODKConsumerReactive odkConsumerReactive; + + @PostMapping("/xmsg/processXMessage") + public void processXMessage(@RequestBody String xMessage) { + if (xMessage == null || xMessage.isEmpty() || xMessage.isBlank()) { + throw new BadRequestException(); + } + odkConsumerReactive.processMessage(xMessage); + } +} diff --git a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java index 8dcb333..7a27338 100644 --- a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java +++ b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java @@ -163,47 +163,7 @@ public void onMessage() { .doOnNext(new Consumer>() { @Override public void accept(ReceiverRecord stringMessage) { - final long startTime = System.currentTimeMillis(); - final Date startDateTime = new Date(); - try { - XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes())); - logTimeTaken(startTime, 1); - Mono.just(msg) - .flatMap(message -> transform(message)) - .subscribeOn(Schedulers.parallel()) - .subscribe(transformedMessage -> { - long endTime = System.currentTimeMillis(); - long duration = (endTime - startTime); - log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date()); - logTimeTaken(startTime, 2); - if (transformedMessage != null) { - try { - if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null - && transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null - && transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) { - log.info("CP-04" + transformedMessage.toXML()); - kafkaProducer.send(genericTransformer, transformedMessage.toXML()); - - } else { - log.info("CP-05"); - kafkaProducer.send(processOutboundTopic, transformedMessage.toXML()); - } - } catch (JAXBException e) { - log.error("An error occured : " + e.getMessage()); - e.printStackTrace(); - } - } - }); - } catch (JAXBException e) { - log.error("An error occured : " + e.getMessage()); - e.printStackTrace(); - } catch (NullPointerException e) { - log.error("An error occured : " + e.getMessage() + " at line no : " + e.getStackTrace()[0].getLineNumber() - + " in class : " + e.getStackTrace()[0].getClassName()); - } catch (Exception e) { - log.error("An error occured : " + e.getMessage()); - e.printStackTrace(); - } + processMessage(stringMessage.value()); } }) .doOnError(new Consumer() { @@ -216,6 +176,47 @@ public void accept(Throwable e) { } + public void processMessage(String stringMessage) { + final long startTime = System.currentTimeMillis(); + final Date startDateTime = new Date(); + try { + XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); + logTimeTaken(startTime, 1); + Mono.just(msg) + .flatMap(this::transform) + .subscribeOn(Schedulers.parallel()) + .subscribe(transformedMessage -> { + long endTime = System.currentTimeMillis(); + long duration = (endTime - startTime); + log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date()); + logTimeTaken(startTime, 2); + if (transformedMessage != null) { + try { + if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null + && transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null + && transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) { + log.info("CP-04" + transformedMessage.toXML()); + kafkaProducer.send(genericTransformer, transformedMessage.toXML()); + + } else { + log.info("CP-05"); + kafkaProducer.send(processOutboundTopic, transformedMessage.toXML()); + } + } catch (JAXBException e) { + log.error("An error occured : " + e.getMessage()); + e.printStackTrace(); + } + } + }); + } catch (NullPointerException e) { + log.error("An error occured : " + e.getMessage() + " at line no : " + e.getStackTrace()[0].getLineNumber() + + " in class : " + e.getStackTrace()[0].getClassName()); + } catch (Exception e) { + log.error("An error occured : " + e.getMessage()); + e.printStackTrace(); + } + } + @Override public Mono transform(XMessage xMessage) { ArrayList transformers = xMessage.getTransformers(); From 6099d059e3f9bd8c7dc209aa48a853cac25a3289 Mon Sep 17 00:00:00 2001 From: Chinmoy Chakraborty Date: Fri, 1 Dec 2023 18:56:24 +0530 Subject: [PATCH 2/3] Returns xmessage result from `processXMessage` controller. --- .../controllers/XMessageController.java | 6 +- .../transformer/odk/ODKConsumerReactive.java | 76 +++++++++---------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/uci/transformer/controllers/XMessageController.java b/src/main/java/com/uci/transformer/controllers/XMessageController.java index 11674a2..b6dbc77 100644 --- a/src/main/java/com/uci/transformer/controllers/XMessageController.java +++ b/src/main/java/com/uci/transformer/controllers/XMessageController.java @@ -4,8 +4,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; import javax.ws.rs.BadRequestException; @@ -16,10 +16,10 @@ public class XMessageController { private ODKConsumerReactive odkConsumerReactive; @PostMapping("/xmsg/processXMessage") - public void processXMessage(@RequestBody String xMessage) { + public Mono processXMessage(@RequestBody String xMessage) { if (xMessage == null || xMessage.isEmpty() || xMessage.isBlank()) { throw new BadRequestException(); } - odkConsumerReactive.processMessage(xMessage); + return odkConsumerReactive.processMessage(xMessage); } } diff --git a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java index 7a27338..81b63f5 100644 --- a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java +++ b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java @@ -163,7 +163,7 @@ public void onMessage() { .doOnNext(new Consumer>() { @Override public void accept(ReceiverRecord stringMessage) { - processMessage(stringMessage.value()); + processMessage(stringMessage.value()).subscribe(); } }) .doOnError(new Consumer() { @@ -176,45 +176,43 @@ public void accept(Throwable e) { } - public void processMessage(String stringMessage) { - final long startTime = System.currentTimeMillis(); - final Date startDateTime = new Date(); - try { - XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); - logTimeTaken(startTime, 1); - Mono.just(msg) - .flatMap(this::transform) - .subscribeOn(Schedulers.parallel()) - .subscribe(transformedMessage -> { - long endTime = System.currentTimeMillis(); - long duration = (endTime - startTime); - log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date()); - logTimeTaken(startTime, 2); - if (transformedMessage != null) { - try { - if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null - && transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null - && transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) { - log.info("CP-04" + transformedMessage.toXML()); - kafkaProducer.send(genericTransformer, transformedMessage.toXML()); - - } else { - log.info("CP-05"); - kafkaProducer.send(processOutboundTopic, transformedMessage.toXML()); - } - } catch (JAXBException e) { - log.error("An error occured : " + e.getMessage()); - e.printStackTrace(); - } + public Mono processMessage(String stringMessage) { + return Mono.defer(() -> { + final long startTime = System.currentTimeMillis(); + final Date startDateTime = new Date(); + return Mono.fromCallable(() -> { + XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes())); + logTimeTaken(startTime, 1); + return msg; + }) + .flatMap(this::transform) + .map(transformedMessage -> { + long endTime = System.currentTimeMillis(); + long duration = (endTime - startTime); + log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date()); + logTimeTaken(startTime, 2); + if (transformedMessage != null) { + try { + if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null + && transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null + && transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) { + log.info("CP-04" + transformedMessage.toXML()); + kafkaProducer.send(genericTransformer, transformedMessage.toXML()); + } else { + log.info("CP-05"); + kafkaProducer.send(processOutboundTopic, transformedMessage.toXML()); } - }); - } catch (NullPointerException e) { - log.error("An error occured : " + e.getMessage() + " at line no : " + e.getStackTrace()[0].getLineNumber() - + " in class : " + e.getStackTrace()[0].getClassName()); - } catch (Exception e) { - log.error("An error occured : " + e.getMessage()); - e.printStackTrace(); - } + return transformedMessage.toXML(); + } catch (JAXBException e) { + log.error("An error occurred : " + e.getMessage()); + e.printStackTrace(); + return ""; + } + } + return ""; + }) + .subscribeOn(Schedulers.parallel()); + }); } @Override From 8cd4b9d7db30895506a365a901afaae04b227861 Mon Sep 17 00:00:00 2001 From: Chinmoy Chakraborty Date: Mon, 4 Dec 2023 14:45:35 +0530 Subject: [PATCH 3/3] Added endpoint to update forms. --- pom.xml | 2 +- .../application/TransformerApplication.java | 57 +------------------ .../controllers/ODKController.java | 18 ++++++ .../uci/transformer/odk/FormDownloader.java | 57 +++++++++++++++++-- .../transformer/odk/ODKConsumerReactive.java | 1 + 5 files changed, 72 insertions(+), 63 deletions(-) create mode 100644 src/main/java/com/uci/transformer/controllers/ODKController.java diff --git a/pom.xml b/pom.xml index fa601f4..ad3c884 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.uci transformer - 2.4.0 + 2.4.1 transformer Demo project for Spring Boot diff --git a/src/main/java/com/uci/transformer/application/TransformerApplication.java b/src/main/java/com/uci/transformer/application/TransformerApplication.java index 61b9a44..d1a5a0f 100644 --- a/src/main/java/com/uci/transformer/application/TransformerApplication.java +++ b/src/main/java/com/uci/transformer/application/TransformerApplication.java @@ -57,64 +57,9 @@ private void postConstruct() { if(!(downloadFormsFlag != null && downloadFormsFlag.equalsIgnoreCase("False"))) { downloadForms(); } - // testFormManager(); - } - - private void testFormManager() { - String formPath = ODKConsumerReactive.getFormPath("samagra_workflows_form"); - ServiceResponse response1 = new FormManager(null, null, null, formPath).start(); - log.debug("First response"); - log.debug(response1.getCurrentIndex(), response1.getNextMessage().getText()); } private void downloadForms() { - //Empty the database and folder - FormsDao dao; - try{ - File directoryToDelete = new File("/tmp/forms2"); - FileSystemUtils.deleteRecursively(directoryToDelete); - dao = new FormsDao(JsonDB.getInstance().getDB()); - dao.deleteFormsDatabase(); - }catch (Exception e){} - - //Create a folder /tmp/forms - new File("/tmp/forms2").mkdirs(); - - //Download fresh - OpenRosaHttpInterface openRosaHttpInterface = new OkHttpConnection( - new OkHttpOpenRosaServerClientProvider(new OkHttpClient()), - null, - "userAgent" - ); - WebCredentialsUtils webCredentialsUtils = new WebCredentialsUtils(); - OpenRosaAPIClient openRosaAPIClient = new OpenRosaAPIClient(openRosaHttpInterface, webCredentialsUtils); - FormListDownloader formListDownloader = new FormListDownloader( - openRosaAPIClient, - webCredentialsUtils); - HashMap formList = formListDownloader.downloadFormList(false); - int count = 0; - if (formList.size() > 0) { - ArrayList forms = new ArrayList<>(); - for (Map.Entry form : formList.entrySet()) { - forms.add(form.getValue()); - count += 1; - } - FormDownloader formDownloader = null; - dao = new FormsDao(JsonDB.getInstance().getDB()); - formDownloader = new FormDownloader(dao, openRosaAPIClient); - formDownloader.downloadForms(forms); - List
downloadedForms = dao.getForms(); - log.info("Total downloaded forms: " + downloadedForms.size()); - } + new FormDownloader().downloadFormsDelta(); } -// @Bean -// CommandLineRunner executeOnStartup(Scheduler scheduler, Task sampleOneTimeTask) { -// log.info("Scheduling one time task to now!"); -// -// return ignored -> scheduler.schedule( -// sampleOneTimeTask.instance("command-line-runner"), -// Instant.now() -// ); -// } } - diff --git a/src/main/java/com/uci/transformer/controllers/ODKController.java b/src/main/java/com/uci/transformer/controllers/ODKController.java new file mode 100644 index 0000000..051b189 --- /dev/null +++ b/src/main/java/com/uci/transformer/controllers/ODKController.java @@ -0,0 +1,18 @@ +package com.uci.transformer.controllers; + +import com.uci.transformer.odk.FormDownloader; +import lombok.extern.java.Log; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Slf4j +public class ODKController { + + @GetMapping(path = "/odk/updateAll") + public void updateAllODKForms() { + log.info("Updating forms...."); + new FormDownloader().downloadFormsDelta(); + } +} diff --git a/src/main/java/com/uci/transformer/odk/FormDownloader.java b/src/main/java/com/uci/transformer/odk/FormDownloader.java index 035c220..87872ce 100644 --- a/src/main/java/com/uci/transformer/odk/FormDownloader.java +++ b/src/main/java/com/uci/transformer/odk/FormDownloader.java @@ -5,15 +5,19 @@ import com.uci.transformer.odk.model.Form; import com.uci.transformer.odk.model.FormDetails; import com.uci.transformer.odk.openrosa.OpenRosaAPIClient; +import com.uci.transformer.odk.openrosa.OpenRosaHttpInterface; +import com.uci.transformer.odk.openrosa.okhttp.OkHttpConnection; +import com.uci.transformer.odk.openrosa.okhttp.OkHttpOpenRosaServerClientProvider; import com.uci.transformer.odk.persistance.FormsDao; -import com.uci.transformer.odk.utilities.DocumentFetchResult; -import com.uci.transformer.odk.utilities.FileUtils; -import com.uci.transformer.odk.utilities.MediaFile; +import com.uci.transformer.odk.persistance.JsonDB; +import com.uci.transformer.odk.utilities.*; import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; import org.javarosa.core.reference.ReferenceManager; import org.javarosa.core.reference.RootTranslator; import org.javarosa.xform.parse.XFormParser; import org.kxml2.kdom.Element; +import org.springframework.util.FileSystemUtils; import java.io.File; import java.io.FileOutputStream; @@ -23,6 +27,7 @@ import java.net.URISyntaxException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,9 +49,14 @@ public FormDownloader(OpenRosaAPIClient openRosaAPIClient) { private static final String NAMESPACE_OPENROSA_ORG_XFORMS_XFORMS_MANIFEST = "http:openrosa.org/xforms/xformsManifest"; - public FormDownloader(FormsDao formsDao, OpenRosaAPIClient openRosaAPIClient) { - this.formsDao = formsDao; - this.openRosaAPIClient = openRosaAPIClient; + public FormDownloader() { + this.formsDao = new FormsDao(JsonDB.getInstance().getDB()); + OpenRosaHttpInterface openRosaHttpInterface = new OkHttpConnection( + new OkHttpOpenRosaServerClientProvider(new OkHttpClient()), + null, + "userAgent" + ); + this.openRosaAPIClient = new OpenRosaAPIClient(openRosaHttpInterface, new WebCredentialsUtils()); } public static boolean isXformsManifestNamespacedElement(Element e) { @@ -67,6 +77,41 @@ private static class TaskCancelledException extends Exception { } } + public void resetForms() { + File directoryToDelete = new File("/tmp/forms2"); + FileSystemUtils.deleteRecursively(directoryToDelete); + } + + public void downloadFormsDelta() { + File formsDir = new File("/tmp/forms2"); + if (!formsDir.exists()) { + //Create a folder /tmp/forms2 + new File("/tmp/forms2").mkdirs(); + } + + //Download fresh + WebCredentialsUtils webCredentialsUtils = new WebCredentialsUtils(); + FormListDownloader formListDownloader = new FormListDownloader( + openRosaAPIClient, + webCredentialsUtils); + HashMap formList = formListDownloader.downloadFormList(false); + int count = 0; + if (formList.size() > 0) { + ArrayList forms = new ArrayList<>(); + for (Map.Entry form : formList.entrySet()) { + File existingForm = new File(formsDir.getAbsolutePath(), FormNameUtils.formatFilenameFromFormName(form.getValue().getFormName()) + ".xml"); + if (!existingForm.exists()) { + forms.add(form.getValue()); + count += 1; + } + } + downloadForms(forms); + List downloadedForms = formsDao.getForms(); + log.info("Total forms on disk: " + downloadedForms.size()); + log.info("Total forms downloaded: " + count); + } + } + public void downloadForms(List toDownload) { int total = toDownload.size(); int count = 1; diff --git a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java index 81b63f5..46e88d6 100644 --- a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java +++ b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java @@ -231,6 +231,7 @@ public Mono transform(XMessage xMessage) { String formPath = getFormPath(formID); log.info("current form path:" + formPath); if (formPath == null) { + new FormDownloader().downloadFormsDelta(); log.error("formPath null found return null value : " + formID); return Mono.empty(); }