Skip to content

Commit

Permalink
Merge pull request #29 from neutrons/py2to3
Browse files Browse the repository at this point in the history
Run python 2->3 and fix tests
  • Loading branch information
peterfpeterson authored Dec 5, 2023
2 parents 9f422b4 + f38413a commit eaff951
Show file tree
Hide file tree
Showing 18 changed files with 91 additions and 103 deletions.
9 changes: 9 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
coverage:
status:
project:
default:
target: 60%
threshold: 5%

github_checks:
annotations: false
15 changes: 7 additions & 8 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
name: post_processing_agent_py2
name: post_processing_agent_py3
channels:
- conda-forge
- defaults
dependencies:
- python=2.7
- pip<21.0
# if pip doesn't work https://stackoverflow.com/questions/49940813/pip-no-module-named-internal
- gcc_linux-64
- python=3.6
- pip
- plotly
- pre-commit
- pytest
- pytest-cov
- pytest-mock
- requests
- twisted
- pip: # will need to be installed separately if pip is broken
- twisted==12.2.0
- stompest==2.1.6
- stompest-async==2.1.6
- stompest
- stompest-async
# needed for wheel - add when switching to python3
# - build
15 changes: 5 additions & 10 deletions postprocessing/Configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import json
import logging
import importlib


class Configuration(object):
Expand Down Expand Up @@ -137,22 +138,16 @@ def __init__(self, config_file):
toks = p.split(".")
if len(toks) == 2:
# for instance, emulate `from oncat_processor import ONCatProcessor`
# TODO replace with importlib.import_module()
processor_module = __import__(
"postprocessing.processors.%s" % toks[0],
globals(),
locals(),
[
toks[1],
],
-1,
processor_module = importlib.import_module( # noqa: F841
"postprocessing.processors.%s" % toks[0]
)
try:
processor_class = eval("processor_module.%s" % toks[1])
self.queues.append(processor_class.get_input_queue_name())
except: # noqa: E722
logging.error(
"Configuration: Error loading processor: %s", sys.exc_value
"Configuration: Error loading processor: %s",
sys.exc_info()[1],
)
else:
logging.error(
Expand Down
23 changes: 14 additions & 9 deletions postprocessing/Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def run(self):
try:
client = yield client.disconnected
except:
logging.error("Connection error: %s" % sys.exc_value)
logging.error("Connection error: %s" % sys.exc_info()[1])
reactor.callLater(5, self.run)

def consume(self, client, frame):
Expand Down Expand Up @@ -120,10 +120,12 @@ def consume(self, client, frame):
self.instrument_jobs[instrument] = []
client.ack(frame)
except:
logging.error(sys.exc_value)
logging.error(sys.exc_info()[1])
# Raising an exception here may result in an ActiveMQ result being sent.
# We therefore pick a message that will mean someone to the users.
raise RuntimeError, "Error processing incoming message: contact post-processing expert"
raise RuntimeError(
"Error processing incoming message: contact post-processing expert"
)

try:
# Put together the command to execute, including any optional arguments
Expand All @@ -140,10 +142,11 @@ def consume(self, client, frame):
# Format the data argument
if self.config.task_script_data_arg is not None:
command_args.append(self.config.task_script_data_arg)
command_args.append(str(data).replace(" ", ""))
command_args.append(str(data.decode()).replace(" ", ""))

logging.debug("Command: %s" % str(command_args))
logging.warning("Command: %s" % str(command_args))
proc = subprocess.Popen(command_args)
logging.warning("end")
self.procList.append(proc)
if instrument is not None:
self.instrument_jobs[instrument].append(proc)
Expand All @@ -167,10 +170,12 @@ def consume(self, client, frame):
)
self.update_processes()
except:
logging.error(sys.exc_value)
logging.error(sys.exc_info()[1])
# Raising an exception here may result in an ActiveMQ result being sent.
# We therefore pick a message that will mean someone to the users.
raise RuntimeError, "Error processing message: contact post-processing expert"
raise RuntimeError(
"Error processing message: contact post-processing expert"
)

def update_processes(self):
"""
Expand Down Expand Up @@ -206,10 +211,10 @@ def heartbeat(self, destination=None, data_dict={}):
"pid": str(os.getpid()),
}
)
stomp.send(destination, json.dumps(data_dict))
stomp.send(destination, json.dumps(data_dict).encode())
stomp.disconnect()
except:
logging.error("Could not send heartbeat: %s" % sys.exc_value)
logging.error("Could not send heartbeat: %s" % sys.exc_info()[1])

def ack_ping(self, data):
"""
Expand Down
44 changes: 20 additions & 24 deletions postprocessing/PostProcessAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
@copyright: 2014 Oak Ridge National Laboratory
"""
import logging, json, socket, os, sys, subprocess
import processors.job_handling as job_handling
import importlib
from postprocessing.processors import job_handling
from stompest.config import StompConfig
from stompest.sync import Stomp

Expand Down Expand Up @@ -49,7 +50,7 @@ def _process_data(self, data):
provided with an incoming message.
@param data: data dictionary
"""
if data.has_key("data_file"):
if "data_file" in data:
self.data_file = str(data["data_file"])
if os.access(self.data_file, os.R_OK) is False:
raise ValueError(
Expand All @@ -58,22 +59,22 @@ def _process_data(self, data):
else:
raise ValueError("data_file is missing: %s" % self.data_file)

if data.has_key("facility"):
if "facility" in data:
self.facility = str(data["facility"]).upper()
else:
raise ValueError("Facility is missing")

if data.has_key("instrument"):
if "instrument" in data:
self.instrument = str(data["instrument"]).upper()
else:
raise ValueError("Instrument is missing")

if data.has_key("ipts"):
if "ipts" in data:
self.proposal = str(data["ipts"]).upper()
else:
raise ValueError("IPTS is missing")

if data.has_key("run_number"):
if "run_number" in data:
self.run_number = str(data["run_number"])
else:
raise ValueError("Run number is missing")
Expand Down Expand Up @@ -170,23 +171,23 @@ def reduce(self):
else:
self.send("/queue/" + self.conf.reduction_error, json.dumps(self.data))
except: # noqa: E722
logging.error("reduce: %s" % sys.exc_value)
self.data["error"] = "Reduction: %s " % sys.exc_value
logging.error("reduce: %s" % sys.exc_info()[1])
self.data["error"] = "Reduction: %s " % sys.exc_info()[1]
self.send("/queue/" + self.conf.reduction_error, json.dumps(self.data))

def create_reduction_script(self):
"""
Create a new reduction script from a template
"""
try:
import reduction_script_writer
from postprocessing import reduction_script_writer

writer = reduction_script_writer.ScriptWriter(self.data["instrument"])
writer.process_request(
self.data, configuration=self.conf, send_function=self.send
)
except: # noqa: E722
logging.error("create_reduction_script: %s" % sys.exc_value)
logging.error("create_reduction_script: %s" % sys.exc_info()[1])

def send(self, destination, data):
"""
Expand All @@ -196,13 +197,13 @@ def send(self, destination, data):
"""
logging.info("%s: %s" % (destination, data))
self.client.connect()
self.client.send(destination, data)
self.client.send(destination, data.encode())
self.client.disconnect()


if __name__ == "__main__":
import argparse
from Configuration import read_configuration
from postprocessing.Configuration import read_configuration

parser = argparse.ArgumentParser(description="Post-processing agent")
parser.add_argument(
Expand Down Expand Up @@ -265,14 +266,8 @@ def send(self, destination, data):
for p in configuration.processors:
toks = p.split(".")
if len(toks) == 2:
processor_module = __import__(
"postprocessing.processors.%s" % toks[0],
globals(),
locals(),
[
toks[1],
],
-1,
processor_module = importlib.import_module(
"postprocessing.processors.%s" % toks[0]
)
try:
processor_class = eval("processor_module.%s" % toks[1])
Expand All @@ -287,7 +282,8 @@ def send(self, destination, data):
proc()
except: # noqa: E722
logging.error(
"PostProcessAdmin: Processor error: %s" % sys.exc_value
"PostProcessAdmin: Processor error: %s"
% sys.exc_info()[1]
)
else:
logging.error(
Expand All @@ -297,7 +293,7 @@ def send(self, destination, data):
except: # noqa: E722
# If we have a proper data dictionary, send it back with an error message
if isinstance(data, dict):
data["error"] = str(sys.exc_value)
data["error"] = str(sys.exc_info()[1])
stomp = Stomp(
StompConfig(
configuration.failover_uri,
Expand All @@ -306,8 +302,8 @@ def send(self, destination, data):
)
)
stomp.connect()
stomp.send(configuration.postprocess_error, json.dumps(data))
stomp.send(configuration.postprocess_error, json.dumps(data).encode())
stomp.disconnect()
raise
except: # noqa: E722
logging.error("PostProcessAdmin: %s" % sys.exc_value)
logging.error("PostProcessAdmin: %s" % sys.exc_info()[1])
16 changes: 8 additions & 8 deletions postprocessing/processors/base_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
@copyright: 2014-2015 Oak Ridge National Laboratory
"""
from __future__ import print_function

import os
import logging
import json
import job_handling
from . import job_handling


class BaseProcessor(object):
Expand Down Expand Up @@ -94,7 +94,7 @@ def _process_data(self, data):
provided with an incoming message.
@param data: data dictionary
"""
if data.has_key("data_file"):
if "data_file" in data:
self.data_file = str(data["data_file"])
if os.access(self.data_file, os.R_OK) is False:
raise ValueError(
Expand All @@ -103,22 +103,22 @@ def _process_data(self, data):
else:
raise ValueError("data_file is missing: %s" % self.data_file)

if data.has_key("facility"):
if "facility" in data:
self.facility = str(data["facility"]).upper()
else:
raise ValueError("Facility is missing")

if data.has_key("instrument"):
if "instrument" in data:
self.instrument = str(data["instrument"]).upper()
else:
raise ValueError("Instrument is missing")

if data.has_key("ipts"):
if "ipts" in data:
self.proposal = str(data["ipts"]).upper()
else:
raise ValueError("IPTS is missing")

if data.has_key("run_number"):
if "run_number" in data:
self.run_number = str(data["run_number"])
else:
raise ValueError("Run number is missing")
Expand All @@ -139,7 +139,7 @@ def process_error(self, destination, message):
error_message = "%s: %s" % (type(self).__name__, message)
logging.error(error_message)
self.data["error"] = error_message
self.send("/queue/%s" % destination, json.dumps(self.data))
self.send("/queue/%s" % destination, json.dumps(self.data).encode())
# Reset the error and information
if "information" in self.data:
del self.data["information"]
Expand Down
2 changes: 1 addition & 1 deletion postprocessing/processors/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
@copyright: 2014 Oak Ridge National Laboratory
"""
from base_processor import BaseProcessor
from .base_processor import BaseProcessor
import json


Expand Down
6 changes: 3 additions & 3 deletions postprocessing/publish_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Utility functions to post plot data
"""
from __future__ import print_function

import sys
import logging
from postprocessing.Configuration import Configuration, CONFIG_FILE
Expand Down Expand Up @@ -154,7 +154,7 @@ def plot1d(
try:
return publish_plot(instrument, run_number, files={"file": plot_div})
except: # noqa: E722
logging.error("Publish plot failed: %s", sys.exc_value)
logging.error("Publish plot failed: %s", sys.exc_info()[1])
return None
else:
return plot_div
Expand Down Expand Up @@ -248,7 +248,7 @@ def plot_heatmap(
try:
return publish_plot(instrument, run_number, files={"file": plot_div})
except: # noqa: E722
logging.error("Publish plot failed: %s", sys.exc_value)
logging.error("Publish plot failed: %s", sys.exc_info()[1])
return None
else:
return plot_div
Loading

0 comments on commit eaff951

Please sign in to comment.