Skip to content

Commit

Permalink
Returns xmessage result from processXMessage controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmoy12c committed Dec 1, 2023
1 parent 607b0ab commit 6099d05
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import javax.ws.rs.BadRequestException;

Expand All @@ -16,10 +16,10 @@ public class XMessageController {
private ODKConsumerReactive odkConsumerReactive;

@PostMapping("/xmsg/processXMessage")
public void processXMessage(@RequestBody String xMessage) {
public Mono<String> processXMessage(@RequestBody String xMessage) {
if (xMessage == null || xMessage.isEmpty() || xMessage.isBlank()) {
throw new BadRequestException();
}
odkConsumerReactive.processMessage(xMessage);
return odkConsumerReactive.processMessage(xMessage);
}
}
76 changes: 37 additions & 39 deletions src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void onMessage() {
.doOnNext(new Consumer<ReceiverRecord<String, String>>() {
@Override
public void accept(ReceiverRecord<String, String> stringMessage) {
processMessage(stringMessage.value());
processMessage(stringMessage.value()).subscribe();
}
})
.doOnError(new Consumer<Throwable>() {
Expand All @@ -176,45 +176,43 @@ public void accept(Throwable e) {

}

public void processMessage(String stringMessage) {
final long startTime = System.currentTimeMillis();
final Date startDateTime = new Date();
try {
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes()));
logTimeTaken(startTime, 1);
Mono.just(msg)
.flatMap(this::transform)
.subscribeOn(Schedulers.parallel())
.subscribe(transformedMessage -> {
long endTime = System.currentTimeMillis();
long duration = (endTime - startTime);
log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date());
logTimeTaken(startTime, 2);
if (transformedMessage != null) {
try {
if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null
&& transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null
&& transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) {
log.info("CP-04" + transformedMessage.toXML());
kafkaProducer.send(genericTransformer, transformedMessage.toXML());

} else {
log.info("CP-05");
kafkaProducer.send(processOutboundTopic, transformedMessage.toXML());
}
} catch (JAXBException e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
}
public Mono<String> processMessage(String stringMessage) {
return Mono.defer(() -> {
final long startTime = System.currentTimeMillis();
final Date startDateTime = new Date();
return Mono.fromCallable(() -> {
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes()));
logTimeTaken(startTime, 1);
return msg;
})
.flatMap(this::transform)
.map(transformedMessage -> {
long endTime = System.currentTimeMillis();
long duration = (endTime - startTime);
log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date());
logTimeTaken(startTime, 2);
if (transformedMessage != null) {
try {
if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null
&& transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null
&& transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) {
log.info("CP-04" + transformedMessage.toXML());
kafkaProducer.send(genericTransformer, transformedMessage.toXML());
} else {
log.info("CP-05");
kafkaProducer.send(processOutboundTopic, transformedMessage.toXML());
}
});
} catch (NullPointerException e) {
log.error("An error occured : " + e.getMessage() + " at line no : " + e.getStackTrace()[0].getLineNumber()
+ " in class : " + e.getStackTrace()[0].getClassName());
} catch (Exception e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
}
return transformedMessage.toXML();
} catch (JAXBException e) {
log.error("An error occurred : " + e.getMessage());
e.printStackTrace();
return "";
}
}
return "";
})
.subscribeOn(Schedulers.parallel());
});
}

@Override
Expand Down

0 comments on commit 6099d05

Please sign in to comment.