-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
d8a5b99
commit 1b6041c
Showing
4 changed files
with
180 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
FROM condaforge/mambaforge | ||
|
||
RUN apt-get update -y | ||
RUN apt-get install gcc -y | ||
|
||
COPY environment.yml . | ||
|
||
RUN conda env create | ||
|
||
RUN mkdir -p /opt/postprocessing/log | ||
|
||
COPY postprocessing /opt/postprocessing/postprocessing | ||
COPY configuration/post_process_consumer.conf.local /etc/autoreduce/post_processing.conf | ||
RUN sed -i 's/localhost/activemq/' /etc/autoreduce/post_processing.conf | ||
|
||
ENV PYTHONPATH /opt/postprocessing | ||
|
||
RUN mkdir -p /SNS/EQSANS/IPTS-10674/0/30892/NeXus | ||
RUN touch /SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30892_event.nxs | ||
RUN mkdir -p /SNS/EQSANS/shared/autoreduce | ||
RUN echo "import sys;print(sys.argv[1:])" > /SNS/EQSANS/shared/autoreduce/reduce_EQSANS.py | ||
|
||
RUN echo "#!/bin/bash" > /usr/bin/run_postprocessing && \ | ||
echo ". activate post_processing_agent_py2" >> /usr/bin/run_postprocessing && \ | ||
echo "/opt/postprocessing/postprocessing/queueProcessor.py" >> /usr/bin/run_postprocessing && \ | ||
chmod +x /usr/bin/run_postprocessing | ||
|
||
CMD /usr/bin/run_postprocessing |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
version: '3.8' | ||
|
||
services: | ||
|
||
post_processing_agent: | ||
build: | ||
context: ../.. | ||
dockerfile: ./tests/integration/Dockerfile | ||
|
||
activemq: | ||
image: rmohr/activemq | ||
hostname: activemq | ||
ports: | ||
- 8161:8161 | ||
- 61613:61613 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import json | ||
import subprocess | ||
import pytest | ||
|
||
from stompest.config import StompConfig | ||
from stompest.sync import Stomp | ||
from stompest.error import StompConnectTimeout | ||
|
||
|
||
def test_missing_data(): | ||
message = { | ||
"information": "mac83808.sns.gov", | ||
"run_number": "30892", | ||
"instrument": "EQSANS", | ||
"ipts": "IPTS-10674", | ||
"facility": "SNS", | ||
"data_file": "/SNS/DOES_NOT_EXIST.nxs", | ||
} | ||
|
||
client = Stomp(StompConfig("tcp://localhost:61613")) | ||
try: | ||
client.connect() | ||
except StompConnectTimeout: | ||
pytest.skip("Requires activemq running") | ||
|
||
# send data ready | ||
client.send("/queue/REDUCTION.DATA_READY", json.dumps(message)) | ||
|
||
# expect a error for missing file | ||
client.subscribe("/queue/POSTPROCESS.ERROR") | ||
|
||
assert client.canRead(5) | ||
frame = client.receiveFrame() | ||
|
||
client.disconnect() | ||
|
||
msg = json.loads(frame.body) | ||
assert ( | ||
msg["error"] | ||
== "Data file does not exist or is not readable: /SNS/DOES_NOT_EXIST.nxs" | ||
) | ||
|
||
|
||
def test_disabled_reduction(): | ||
message = { | ||
"information": "mac83808.sns.gov", | ||
"run_number": "30892", | ||
"instrument": "INSTRUMENT", | ||
"ipts": "IPTS-10674", | ||
"facility": "SNS", | ||
"data_file": "/SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30892_event.nxs", | ||
} | ||
|
||
client = Stomp(StompConfig("tcp://localhost:61613")) | ||
try: | ||
client.connect() | ||
except StompConnectTimeout: | ||
pytest.skip("Requires activemq running") | ||
|
||
# send data ready | ||
client.send("/queue/REDUCTION.DATA_READY", json.dumps(message)) | ||
|
||
# expect a reduction disabled | ||
client.subscribe("/queue/REDUCTION.DISABLED") | ||
|
||
assert client.canRead(5) | ||
frame = client.receiveFrame() | ||
|
||
client.disconnect() | ||
|
||
msg = json.loads(frame.body) | ||
assert msg["run_number"] == message["run_number"] | ||
assert msg["instrument"] == message["instrument"] | ||
assert msg["ipts"] == message["ipts"] | ||
assert msg["facility"] == message["facility"] | ||
assert msg["data_file"] == message["data_file"] | ||
|
||
|
||
def test_reduction(): | ||
message = { | ||
"information": "mac83808.sns.gov", | ||
"run_number": "30892", | ||
"instrument": "EQSANS", | ||
"ipts": "IPTS-10674", | ||
"facility": "SNS", | ||
"data_file": "/SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30892_event.nxs", | ||
} | ||
|
||
client = Stomp(StompConfig("tcp://localhost:61613")) | ||
try: | ||
client.connect() | ||
except StompConnectTimeout: | ||
pytest.skip("Requires activemq running") | ||
|
||
# send data ready | ||
client.send("/queue/REDUCTION.DATA_READY", json.dumps(message)) | ||
|
||
# expect a reduction complete | ||
client.subscribe("/queue/REDUCTION.COMPLETE") | ||
|
||
assert client.canRead(5) | ||
frame = client.receiveFrame() | ||
|
||
client.disconnect() | ||
|
||
msg = json.loads(frame.body) | ||
assert msg["run_number"] == message["run_number"] | ||
assert msg["instrument"] == message["instrument"] | ||
assert msg["ipts"] == message["ipts"] | ||
assert msg["facility"] == message["facility"] | ||
assert msg["data_file"] == message["data_file"] | ||
|
||
# we can also check that the reduction did run by checking the reduction_log | ||
reduction_log = subprocess.check_output( | ||
"docker exec integration_post_processing_agent_1 cat /SNS/EQSANS/IPTS-10674/shared/autoreduce/reduction_log/EQSANS_30892_event.nxs.log", | ||
stderr=subprocess.STDOUT, | ||
shell=True, | ||
) | ||
|
||
assert ( | ||
reduction_log | ||
== "['/SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30892_event.nxs', '/SNS/EQSANS/IPTS-10674/shared/autoreduce/']\n" | ||
) |