Skip to content

Commit

Permalink
Fixing mypy issues and tests failing
Browse files Browse the repository at this point in the history
  • Loading branch information
EmanElsaban committed Jan 19, 2024
1 parent ad7cdb6 commit 33dfd68
Show file tree
Hide file tree
Showing 18 changed files with 79 additions and 98 deletions.
2 changes: 2 additions & 0 deletions requirements-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ task_processing[mesos_executor,k8s]
Twisted>=19.7.0
urllib3>=1.24.2
Werkzeug>=0.15.3
yelp-clog==7.0.1
yelp-meteorite==2.1.1
8 changes: 5 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ certifi==2022.12.7
cffi==1.12.3
cfn-lint==0.24.4
chardet==3.0.4
clusterman-metrics==2.2.1 # used by tron for pre-scaling for Spark runs
constantly==15.1.0
cryptography==39.0.1
dataclasses==0.6
Expand Down Expand Up @@ -47,11 +46,13 @@ lockfile==0.12.2
MarkupSafe==2.1.1
matplotlib-inline==0.1.3
mock==3.0.5
monk==3.0.4
moto==1.3.13
oauthlib==3.1.0
parso==0.7.0
pexpect==4.7.0
pickleshare==0.7.5
ply==3.11
prompt-toolkit==3.0.38
psutil==5.6.6
ptyprocess==0.6.0
Expand All @@ -76,12 +77,14 @@ requests-oauthlib==1.2.0
responses==0.10.6
rsa==4.9
s3transfer==0.6.0
scribereader==0.14.1 # used by tron to get tronjob logs
setuptools==65.5.1
six==1.15.0
srv-configs==1.3.3
sshpubkeys==3.1.0
stack-data==0.6.2
task-processing==0.12.2
tenacity==8.2.3
thriftpy2==0.4.17
traitlets==5.0.0
Twisted==22.10.0
typing-extensions==4.5.0
Expand All @@ -92,6 +95,5 @@ Werkzeug==2.2.3
wrapt==1.11.2
xmltodict==0.12.0
yelp-clog==7.0.1 # scribereader dependency
yelp-logging==4.17.0 # scribereader dependency
yelp-meteorite==2.1.1 # used by task-processing to emit metrics, clusterman-metrics dependency
zope.interface==5.1.0
8 changes: 4 additions & 4 deletions tests/utils/scribereader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_today():
"tron.utils.scribereader.get_scribereader_host_and_port",
autospec=True,
return_value=("host", 1234),
), mock.patch(
), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch(
"tron.utils.scribereader.scribereader.get_stream_reader",
autospec=True,
) as mock_stream_reader, mock.patch(
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_different_days():
"tron.utils.scribereader.get_scribereader_host_and_port",
autospec=True,
return_value=("host", 1234),
), mock.patch(
), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch(
"tron.utils.scribereader.scribereader.get_stream_reader",
autospec=True,
) as mock_stream_reader, mock.patch(
Expand Down Expand Up @@ -195,7 +195,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_in_past():
"tron.utils.scribereader.get_scribereader_host_and_port",
autospec=True,
return_value=("host", 1234),
), mock.patch(
), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch(
"tron.utils.scribereader.scribereader.get_stream_reader",
autospec=True,
) as mock_stream_reader, mock.patch(
Expand Down Expand Up @@ -261,7 +261,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_for_long_output():
"tron.utils.scribereader.get_scribereader_host_and_port",
autospec=True,
return_value=("host", 1234),
), mock.patch(
), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch(
"tron.utils.scribereader.scribereader.get_stream_reader",
autospec=True,
) as mock_stream_reader, mock.patch(
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ commands =
check-requirements
# optionally install yelpy requirements - this is after check-requirements since
# check-requirements doesn't understand these extra requirements
-pip-custom-platform install --index-url https://pypi.yelpcorp.com/simple -r yelp_package/extra_requirements_yelp.txt
-pip install --index-url https://pypi.yelpcorp.com/simple -r yelp_package/extra_requirements_yelp.txt
# we then run tests at the very end so that we can run tests with yelpy requirements
py.test -s {posargs:tests}

Expand Down
10 changes: 5 additions & 5 deletions tron/commands/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

assert simplejson # Pyflakes
except ImportError:
import json as simplejson
import json as simplejson # type: ignore

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,11 +100,11 @@ def build_get_url(url, data=None):


def ensure_user_attribution(headers: Mapping[str, str]) -> Dict[str, str]:
headers = headers.copy()
headers = headers.copy() # type: ignore
if "User-Agent" not in headers:
headers["User-Agent"] = USER_AGENT
headers["User-Agent"] += f' ({os.environ.get("USER", "anonymous")})'
return headers
headers["User-Agent"] = USER_AGENT # type: ignore
headers["User-Agent"] += f' ({os.environ.get("USER", "anonymous")})' # type: ignore
return headers # type: ignore


class Client:
Expand Down
2 changes: 1 addition & 1 deletion tron/commands/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class DisplayJobRuns(TableDisplay):
colors = {
"id": partial(Color.set, "yellow"),
"state": add_color_for_state,
"manual": lambda value: Color.set("cyan" if value else None, value),
"manual": lambda value: Color.set("cyan" if value else None, value), # type: ignore
}

def format_value(self, field_idx, value):
Expand Down
8 changes: 4 additions & 4 deletions tron/config/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ def build_child_context(self, path):

class NullConfigContext:
path = ""
nodes = set()
command_context = {}
nodes = set() # type: ignore
command_context = {} # type: ignore
namespace = MASTER_NAMESPACE
partial = False

Expand All @@ -292,8 +292,8 @@ class Validator:
"""

config_class = None
defaults = {}
validators = {}
defaults = {} # type: ignore
validators = {} # type: ignore
optional = False

def validate(self, in_dict, config_context):
Expand Down
14 changes: 7 additions & 7 deletions tron/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def config_object_factory(name, required=None, optional=None):
)


class ConfigSecretVolume(_ConfigSecretVolume):
class ConfigSecretVolume(_ConfigSecretVolume): # type: ignore
def _asdict(self) -> dict:
d = super()._asdict().copy()
items = d.get("items", [])
Expand All @@ -274,7 +274,7 @@ def _asdict(self) -> dict:
for i, item in enumerate(items):
if isinstance(item, ConfigSecretVolumeItem):
d["items"][i] = item._asdict()
return d
return d # type: ignore


ConfigSecretSource = config_object_factory(
Expand Down Expand Up @@ -304,15 +304,15 @@ def _asdict(self) -> dict:
optional=[],
)

StatePersistenceTypes = Enum(
StatePersistenceTypes = Enum( # type: ignore
"StatePersistenceTypes",
dict(shelve="shelve", yaml="yaml", dynamodb="dynamodb"),
)

ExecutorTypes = Enum("ExecutorTypes", dict(ssh="ssh", mesos="mesos", kubernetes="kubernetes", spark="spark"))
ExecutorTypes = Enum("ExecutorTypes", dict(ssh="ssh", mesos="mesos", kubernetes="kubernetes", spark="spark")) # type: ignore

ActionRunnerTypes = Enum("ActionRunnerTypes", dict(none="none", subprocess="subprocess"))
ActionRunnerTypes = Enum("ActionRunnerTypes", dict(none="none", subprocess="subprocess")) # type: ignore

VolumeModes = Enum("VolumeModes", dict(RO="RO", RW="RW"))
VolumeModes = Enum("VolumeModes", dict(RO="RO", RW="RW")) # type: ignore

ActionOnRerun = Enum("ActionOnRerun", dict(rerun="rerun"))
ActionOnRerun = Enum("ActionOnRerun", dict(rerun="rerun")) # type: ignore
2 changes: 1 addition & 1 deletion tron/config/static_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import partial

import staticconf
import staticconf # type: ignore
from staticconf import config

FILENAME = "/nail/srv/configs/tron.yaml"
Expand Down
24 changes: 12 additions & 12 deletions tron/core/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ class ActionCommandConfig:
"""A configurable data object for one try of an Action."""

command: str
cpus: float = None
mem: float = None
disk: float = None
cpus: Optional[float] = None
mem: Optional[float] = None
disk: Optional[float] = None
cap_add: List[str] = field(default_factory=list)
cap_drop: List[str] = field(default_factory=list)
constraints: set = field(default_factory=set)
docker_image: str = None
docker_image: Optional[str] = None
# XXX: we can get rid of docker_parameters once we're off of Mesos
docker_parameters: set = field(default_factory=set)
env: dict = field(default_factory=dict)
Expand Down Expand Up @@ -56,14 +56,14 @@ class Action:
name: str
command_config: ActionCommandConfig
node_pool: str
retries: int = None
retries_delay: datetime.timedelta = None
expected_runtime: datetime.timedelta = None
executor: str = None
trigger_downstreams: (bool, dict) = None
triggered_by: set = None
on_upstream_rerun: str = None
trigger_timeout: datetime.timedelta = None
retries: Optional[int] = None
retries_delay: Optional[datetime.timedelta] = None
expected_runtime: Optional[datetime.timedelta] = None
executor: Optional[str] = None
trigger_downstreams: (bool, dict) = None # type: ignore
triggered_by: Optional[set] = None
on_upstream_rerun: Optional[str] = None
trigger_timeout: Optional[datetime.timedelta] = None

@property
def is_cleanup(self):
Expand Down
41 changes: 21 additions & 20 deletions tron/core/actionrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from tron.actioncommand import ActionCommand
from tron.actioncommand import NoActionRunnerFactory
from tron.actioncommand import SubprocessActionRunnerFactory
from tron.bin.action_runner import build_environment
from tron.bin.action_runner import build_environment # type: ignore
from tron.config.config_utils import StringFormatter
from tron.config.schema import ExecutorTypes
from tron.core import action
Expand All @@ -38,7 +38,7 @@
log = logging.getLogger(__name__)
MAX_RECOVER_TRIES = 5
INITIAL_RECOVER_DELAY = 3
KUBERNETES_ACTIONRUN_EXECUTORS = {ExecutorTypes.kubernetes.value, ExecutorTypes.spark.value}
KUBERNETES_ACTIONRUN_EXECUTORS = {ExecutorTypes.kubernetes.value, ExecutorTypes.spark.value} # type: ignore


class ActionRunFactory:
Expand Down Expand Up @@ -136,12 +136,12 @@ class ActionRunAttempt:
"""Stores state about one try of an action run."""

command_config: action.ActionCommandConfig
start_time: datetime.datetime = None
end_time: datetime.datetime = None
rendered_command: str = None
exit_status: int = None
mesos_task_id: str = None
kubernetes_task_id: str = None
start_time: Optional[datetime.datetime] = None
end_time: Optional[datetime.datetime] = None
rendered_command: Optional[str] = None
exit_status: Optional[int] = None
mesos_task_id: Optional[str] = None
kubernetes_task_id: Optional[str] = None

def exit(self, exit_status, end_time=None):
if self.end_time is None:
Expand Down Expand Up @@ -492,7 +492,7 @@ def start(self, original_command=True):
if not self.is_valid_command(new_attempt.rendered_command):
log.error(f"{self} invalid command: {new_attempt.command_config.command}")
self.fail(exitcode.EXIT_INVALID_COMMAND)
return
return None

return self.submit_command(new_attempt)

Expand Down Expand Up @@ -522,7 +522,7 @@ def kill(self, final=True):
def recover(self):
raise NotImplementedError()

def _done(self, target, exit_status=0):
def _done(self, target, exit_status=0) -> Optional[bool]:
if self.machine.check(target):
if self.triggered_by:
EventBus.clear_subscriptions(self.__hash__())
Expand All @@ -539,6 +539,7 @@ def _done(self, target, exit_status=0):
log.debug(
f"{self} cannot transition from {self.state} via {target}",
)
return None

def retry(self, original_command=True):
"""Invoked externally (via API) when action needs to be re-tried
Expand Down Expand Up @@ -591,7 +592,7 @@ def _exit_unsuccessful(self, exit_status=None, retry_original_command=True):
log.info(
f"{self} got exit code {exit_status} but already in terminal " f'state "{self.state}", not retrying',
)
return
return None
if self.last_attempt is not None:
self.last_attempt.exit(exit_status)
if self.retries_remaining is not None:
Expand Down Expand Up @@ -640,7 +641,7 @@ def rendered_triggers(self) -> List[str]:
def remaining_triggers(self):
return [trig for trig in self.rendered_triggers if not EventBus.has_event(trig)]

def success(self):
def success(self) -> Optional[bool]:
transition_valid = self._done("success")
if transition_valid:
if self.trigger_downstreams:
Expand Down Expand Up @@ -800,10 +801,11 @@ def __getattr__(self, name: str):
def __str__(self):
return f"ActionRun: {self.id}"

def transition_and_notify(self, target):
def transition_and_notify(self, target) -> Optional[bool]:
if self.machine.transition(target):
self.notify(self.state)
return True
return None


class SSHActionRun(ActionRun, Observer):
Expand Down Expand Up @@ -1204,12 +1206,12 @@ def recover(self) -> Optional[KubernetesTask]:
# We cannot recover if we can't transition to running
if not self.machine.check("running"):
log.error(f"{self} unable to transition from {self.machine.state} to running for recovery")
return
return None

if not self.attempts or self.attempts[-1].kubernetes_task_id is None:
log.error(f"{self} no task ID, cannot recover")
self.fail_unknown()
return
return None

last_attempt = self.attempts[-1]

Expand Down Expand Up @@ -1244,7 +1246,7 @@ def recover(self) -> Optional[KubernetesTask]:
f"invalid task ID {last_attempt.kubernetes_task_id!r}",
)
self.fail_unknown()
return
return None

self.watch(task)
k8s_cluster.recover(task)
Expand Down Expand Up @@ -1300,9 +1302,7 @@ def kill(self, final: bool = True) -> Optional[str]:

return "\n".join(msgs)

def handle_action_command_state_change(
self, action_command: ActionCommand, event: str, event_data=None
) -> Optional[bool]:
def handle_action_command_state_change(self, action_command: ActionCommand, event: str, event_data=None):
"""
Observe ActionCommand state changes and transition the ActionCommand state machine to a new state.
"""
Expand All @@ -1323,6 +1323,7 @@ def handle_action_command_state_change(
return self.success()

return self._exit_unsuccessful(action_command.exit_status)
return None

handler = handle_action_command_state_change

Expand Down Expand Up @@ -1392,7 +1393,7 @@ def update_action_config(self, action_graph):

@property
def cleanup_action_run(self) -> ActionRun:
return self.run_map.get(action.CLEANUP_ACTION_NAME)
return self.run_map.get(action.CLEANUP_ACTION_NAME) # type: ignore

@property
def state_data(self):
Expand Down
Loading

0 comments on commit 33dfd68

Please sign in to comment.