From e9283b1515cd3eb30cfa9eff671de2b3fca779b9 Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Fri, 19 Jan 2024 12:32:50 -0800 Subject: [PATCH] Fixing mypy issues and tests failing --- Makefile | 6 +- debian/rules | 13 +- dev/config/MASTER.yaml | 142 +++++++----------- requirements.txt | 5 - tests/commands/retry_test.py | 23 +++ tests/utils/scribereader_test.py | 8 +- tox.ini | 2 +- tron/commands/client.py | 5 +- tron/commands/display.py | 2 +- tron/config/config_utils.py | 8 +- tron/config/schema.py | 14 +- tron/config/static_config.py | 2 +- tron/core/action.py | 25 +-- tron/core/actionrun.py | 61 +++++--- tron/kubernetes.py | 28 +--- tron/mesos.py | 8 +- tron/metrics.py | 4 +- tron/serialize/filehandler.py | 2 +- .../runstate/dynamodb_state_store.py | 7 +- tron/serialize/runstate/shelvestore.py | 2 +- tron/ssh.py | 2 +- yelp_package/jammy/Dockerfile | 2 +- 22 files changed, 170 insertions(+), 201 deletions(-) diff --git a/Makefile b/Makefile index a79ce4d4f..357c7ccfd 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ endif NOOP = true ifeq ($(PAASTA_ENV),YELP) - export PIP_INDEX_URL ?= https://pypi.yelpcorp.com/simple + export PIP_INDEX_URL ?= http://169.254.255.254:20641/jammy/simple/ export NPM_CONFIG_REGISTRY ?= https://npm.yelpcorp.com/ ADD_MISSING_DEPS_MAYBE:=-diff --unchanged-line-format= --old-line-format= --new-line-format='%L' ./requirements.txt ./yelp_package/extra_requirements_yelp.txt >> ./requirements.txt else @@ -81,13 +81,13 @@ itest_%: debitest_% @echo "itest $* OK" dev: - SSH_AUTH_SOCK=$(SSH_AUTH_SOCK) .tox/py38/bin/trond --debug --working-dir=dev -l logging.conf --host=0.0.0.0 --web-path=/nail/home/emanelsabban/pg/Tron/tronweb + SSH_AUTH_SOCK=$(SSH_AUTH_SOCK) .tox/py38/bin/trond --debug --working-dir=dev -l logging.conf --host=0.0.0.0 example_cluster: tox -e example-cluster yelpy: - .tox/py38/bin/pip install -i https://pypi.yelpcorp.com/simple -r yelp_package/extra_requirements_yelp.txt + .tox/py38/bin/pip install -r yelp_package/extra_requirements_yelp.txt LAST_COMMIT_MSG = $(shell git log -1 --pretty=%B | sed -e 's/[\x27\x22]/\\\x27/g') release: diff --git a/debian/rules b/debian/rules index c4a1aa481..96c3162f4 100755 --- a/debian/rules +++ b/debian/rules @@ -19,15 +19,12 @@ override_dh_auto_test: override_dh_virtualenv: echo $(PIP_INDEX_URL) - dh_virtualenv --index-url $(PIP_INDEX_URL) \ - --python=/usr/bin/python3.8 \ - --preinstall no-manylinux1 \ - --preinstall cython \ - --preinstall pip-custom-platform \ + dh_virtualenv --index-url $(PIP_INDEX_URL) \ + --extra-pip-arg --trusted-host=169.254.255.254 \ + --python=/usr/bin/python3.8 \ + --preinstall cython==0.29.36 \ --preinstall pip==18.1 \ - --preinstall setuptools==46.1.3 \ - --pip-tool pip-custom-platform \ - --extra-pip-arg "-vvv" + --preinstall setuptools==46.1.3 @echo patching k8s client lib for configuration class patch debian/tron/opt/venvs/tron/lib/python3.8/site-packages/kubernetes/client/configuration.py contrib/patch-config-loggers.diff override_dh_installinit: diff --git a/dev/config/MASTER.yaml b/dev/config/MASTER.yaml index adac421dd..46b2dbbe2 100755 --- a/dev/config/MASTER.yaml +++ b/dev/config/MASTER.yaml @@ -1,7 +1,8 @@ +# Please visit for a guide on how to setup Tron for local development state_persistence: name: "tron_state" table_name: "tmp-tron-state" - store_type: "shelve" + store_type: "dynamodb" buffer_size: 1 dynamodb_region: us-west-1 @@ -10,16 +11,8 @@ ssh_options: agent: True nodes: -- hostname: localhost - name: localhost - username: batch -- hostname: localhost - name: paasta - username: batch + - hostname: localhost -k8s_options: - enabled: true - kubeconfig_path: /nail/home/emanelsabban/pg/Tron/tron.conf # Replace this with the path relative to your home dir to use # action_runner: # runner_type: "subprocess" @@ -27,84 +20,55 @@ k8s_options: # remote_exec_path: "pg/tron/.tox/py38/bin" jobs: - k8s: + testjob0: + enabled: true + node: localhost + schedule: "cron * * * * *" + run_limit: 5 + actions: + zeroth: + command: env + trigger_downstreams: + minutely: "{ymdhm}" + cpus: 1 + mem: 100 + + testjob1: + enabled: false + node: localhost + schedule: "cron * * * * *" + actions: + first: + command: "sleep 5" + cpus: 1 + mem: 100 + second: + command: "echo 'hello world'" + requires: [first] + triggered_by: + - "MASTER.testjob0.zeroth.minutely.{ymdhm}" + trigger_downstreams: + minutely: "{ymdhm}" + cpus: 1 + mem: 100 + + testjob2: + enabled: false + node: localhost + schedule: "cron * * * * *" + actions: + first: + command: "echo 'goodbye, world'" + cpus: 1 + mem: 100 + triggered_by: + - "MASTER.testjob1.second.minutely.{ymdhm}" + + retrier: + node: localhost + schedule: "cron 0 0 1 1 *" actions: - sleep: - annotations: - paasta.yelp.com/routable_ip: 'false' - cap_add: [] - cap_drop: - - SETPCAP - - MKNOD - - AUDIT_WRITE - - CHOWN - - NET_RAW - - DAC_OVERRIDE - - FOWNER - - FSETID - - KILL - - SETGID - - SETUID - - NET_BIND_SERVICE - - SYS_CHROOT - - SETFCAP - command: sleep 60; echo "I'm up and running $PAASTA_DOCKER_IMAGE" - cpus: 0.1 - disk: 1024 - docker_image: docker-paasta.yelpcorp.com:443/services-paasta-contract-monitor:paasta-a0c12fd6fbce6707947ce1fafcfdf7a7c6aaff9d - env: - ENABLE_PER_INSTANCE_LOGSPOUT: '1' - PAASTA_CLUSTER: infrastage - PAASTA_DEPLOY_GROUP: everything - PAASTA_DOCKER_IMAGE: services-paasta-contract-monitor:paasta-a0c12fd6fbce6707947ce1fafcfdf7a7c6aaff9d - PAASTA_GIT_SHA: a0c12fd6 - PAASTA_INSTANCE: k8s.sleep - PAASTA_INSTANCE_TYPE: tron - PAASTA_MONITORING_TEAM: compute_infra - PAASTA_RESOURCE_CPUS: '0.1' - PAASTA_RESOURCE_DISK: '1024' - PAASTA_RESOURCE_MEM: '50' - PAASTA_SERVICE: paasta-contract-monitor - executor: kubernetes - extra_volumes: [] - field_selector_env: - PAASTA_POD_IP: - field_path: status.podIP - labels: - paasta.yelp.com/cluster: infrastage - paasta.yelp.com/instance: k8s.sleep - paasta.yelp.com/pool: default - paasta.yelp.com/service: paasta-contract-monitor - yelp.com/owner: compute_infra_platform_experience - mem: 50 - node_selectors: - yelp.com/pool: default + failing: + command: exit 1 retries: 1 - secret_env: - PAASTA_SECRET_TEST: - key: goss - secret_name: tron-secret-paasta-contract-monitor-goss - secret_volumes: - - container_path: /super-secret-data - items: - - key: goss - path: secret.py - secret_name: goss - secret_volume_name: tron-secret-paasta-contract-monitor-goss - service_account_name: paasta--arn-aws-iam-528741615426-role-paasta-contract-monitor - node: paasta - queueing: false - monitoring: - alert_after: 25m - description: k8s actions failing on Tron locally - page: false - runbook: http://y/rb-tron - slack_channels: - - emans-super-cool-channel - team: noop - ticket: false - tip: Check "paasta logs" and read y/rb-paasta-contract-monitor - schedule: - type: cron - value: '*/1 * * * *' - use_k8s: true + retries_delay: 5m diff --git a/requirements.txt b/requirements.txt index 503d519d8..2d56568b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,6 @@ certifi==2022.12.7 cffi==1.12.3 cfn-lint==0.24.4 chardet==3.0.4 -clusterman-metrics==2.2.1 # used by tron for pre-scaling for Spark runs constantly==15.1.0 cryptography==39.0.1 dataclasses==0.6 @@ -76,7 +75,6 @@ requests-oauthlib==1.2.0 responses==0.10.6 rsa==4.9 s3transfer==0.6.0 -scribereader==0.14.1 # used by tron to get tronjob logs setuptools==65.5.1 six==1.15.0 sshpubkeys==3.1.0 @@ -91,7 +89,4 @@ websocket-client==0.56.0 Werkzeug==2.2.3 wrapt==1.11.2 xmltodict==0.12.0 -yelp-clog==7.0.1 # scribereader dependency -yelp-logging==4.17.0 # scribereader dependency -yelp-meteorite==2.1.1 # used by task-processing to emit metrics, clusterman-metrics dependency zope.interface==5.1.0 diff --git a/tests/commands/retry_test.py b/tests/commands/retry_test.py index 203f553ac..e601d4dcf 100644 --- a/tests/commands/retry_test.py +++ b/tests/commands/retry_test.py @@ -187,3 +187,26 @@ def test_wait_for_retry_deps_done(fake_retry_action, mock_client_request, event_ data=dict(command="retry", use_latest_command=1), user_attribution=True, ) + + +@mock.patch.object(retry, "RetryAction", autospec=True) +def test_retry_actions(mock_retry_action, mock_client, event_loop): + mock_wait_and_retry = mock_retry_action.return_value.wait_and_retry + mock_wait_and_retry.return_value = _empty_coro() + + r_actions = retry.retry_actions( + "http://localhost", + ["a_job.0.an_action_0", "another_job.1.an_action_1"], + use_latest_command=True, + deps_timeout_s=4, + ) + + assert r_actions == [mock_retry_action.return_value] * 2 + assert mock_retry_action.call_args_list == [ + mock.call(mock_client.return_value, "a_job.0.an_action_0", use_latest_command=True), + mock.call(mock_client.return_value, "another_job.1.an_action_1", use_latest_command=True), + ] + assert mock_wait_and_retry.call_args_list == [ + mock.call(deps_timeout_s=4, jitter=False), + mock.call(deps_timeout_s=4), + ] diff --git a/tests/utils/scribereader_test.py b/tests/utils/scribereader_test.py index 65e0f4863..58a0848fa 100644 --- a/tests/utils/scribereader_test.py +++ b/tests/utils/scribereader_test.py @@ -21,7 +21,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_today(): "tron.utils.scribereader.get_scribereader_host_and_port", autospec=True, return_value=("host", 1234), - ), mock.patch( + ), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch( "tron.utils.scribereader.scribereader.get_stream_reader", autospec=True, ) as mock_stream_reader, mock.patch( @@ -97,7 +97,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_different_days(): "tron.utils.scribereader.get_scribereader_host_and_port", autospec=True, return_value=("host", 1234), - ), mock.patch( + ), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch( "tron.utils.scribereader.scribereader.get_stream_reader", autospec=True, ) as mock_stream_reader, mock.patch( @@ -195,7 +195,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_in_past(): "tron.utils.scribereader.get_scribereader_host_and_port", autospec=True, return_value=("host", 1234), - ), mock.patch( + ), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch( "tron.utils.scribereader.scribereader.get_stream_reader", autospec=True, ) as mock_stream_reader, mock.patch( @@ -261,7 +261,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_for_long_output(): "tron.utils.scribereader.get_scribereader_host_and_port", autospec=True, return_value=("host", 1234), - ), mock.patch( + ), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch( "tron.utils.scribereader.scribereader.get_stream_reader", autospec=True, ) as mock_stream_reader, mock.patch( diff --git a/tox.ini b/tox.ini index 936b4d527..96479c51e 100644 --- a/tox.ini +++ b/tox.ini @@ -25,7 +25,7 @@ commands = check-requirements # optionally install yelpy requirements - this is after check-requirements since # check-requirements doesn't understand these extra requirements - -pip-custom-platform install --index-url https://pypi.yelpcorp.com/simple -r yelp_package/extra_requirements_yelp.txt + -pip install -r yelp_package/extra_requirements_yelp.txt # we then run tests at the very end so that we can run tests with yelpy requirements py.test -s {posargs:tests} diff --git a/tron/commands/client.py b/tron/commands/client.py index be036abe0..f80c8dad3 100644 --- a/tron/commands/client.py +++ b/tron/commands/client.py @@ -8,7 +8,6 @@ import urllib.request from collections import namedtuple from typing import Dict -from typing import Mapping import tron from tron.config.schema import MASTER_NAMESPACE @@ -18,7 +17,7 @@ assert simplejson # Pyflakes except ImportError: - import json as simplejson + import json as simplejson # type: ignore log = logging.getLogger(__name__) @@ -99,7 +98,7 @@ def build_get_url(url, data=None): return url -def ensure_user_attribution(headers: Mapping[str, str]) -> Dict[str, str]: +def ensure_user_attribution(headers: Dict[str, str]) -> Dict[str, str]: headers = headers.copy() if "User-Agent" not in headers: headers["User-Agent"] = USER_AGENT diff --git a/tron/commands/display.py b/tron/commands/display.py index c64813260..91e254518 100644 --- a/tron/commands/display.py +++ b/tron/commands/display.py @@ -269,7 +269,7 @@ class DisplayJobRuns(TableDisplay): colors = { "id": partial(Color.set, "yellow"), "state": add_color_for_state, - "manual": lambda value: Color.set("cyan" if value else None, value), + "manual": lambda value: Color.set("cyan" if value else None, value), # type: ignore # can't type a lambda } def format_value(self, field_idx, value): diff --git a/tron/config/config_utils.py b/tron/config/config_utils.py index d2ca4eabd..67250bd1d 100644 --- a/tron/config/config_utils.py +++ b/tron/config/config_utils.py @@ -275,8 +275,8 @@ def build_child_context(self, path): class NullConfigContext: path = "" - nodes = set() - command_context = {} + nodes = set() # type: ignore + command_context = {} # type: ignore namespace = MASTER_NAMESPACE partial = False @@ -292,8 +292,8 @@ class Validator: """ config_class = None - defaults = {} - validators = {} + defaults = {} # type: ignore + validators = {} # type: ignore optional = False def validate(self, in_dict, config_context): diff --git a/tron/config/schema.py b/tron/config/schema.py index eaca7cd19..6cdd78098 100644 --- a/tron/config/schema.py +++ b/tron/config/schema.py @@ -260,7 +260,7 @@ def config_object_factory(name, required=None, optional=None): ) -class ConfigSecretVolume(_ConfigSecretVolume): +class ConfigSecretVolume(_ConfigSecretVolume): # type: ignore def _asdict(self) -> dict: d = super()._asdict().copy() items = d.get("items", []) @@ -274,7 +274,7 @@ def _asdict(self) -> dict: for i, item in enumerate(items): if isinstance(item, ConfigSecretVolumeItem): d["items"][i] = item._asdict() - return d + return d # type: ignore ConfigSecretSource = config_object_factory( @@ -304,15 +304,15 @@ def _asdict(self) -> dict: optional=[], ) -StatePersistenceTypes = Enum( +StatePersistenceTypes = Enum( # type: ignore "StatePersistenceTypes", dict(shelve="shelve", yaml="yaml", dynamodb="dynamodb"), ) -ExecutorTypes = Enum("ExecutorTypes", dict(ssh="ssh", mesos="mesos", kubernetes="kubernetes", spark="spark")) +ExecutorTypes = Enum("ExecutorTypes", dict(ssh="ssh", mesos="mesos", kubernetes="kubernetes", spark="spark")) # type: ignore -ActionRunnerTypes = Enum("ActionRunnerTypes", dict(none="none", subprocess="subprocess")) +ActionRunnerTypes = Enum("ActionRunnerTypes", dict(none="none", subprocess="subprocess")) # type: ignore -VolumeModes = Enum("VolumeModes", dict(RO="RO", RW="RW")) +VolumeModes = Enum("VolumeModes", dict(RO="RO", RW="RW")) # type: ignore -ActionOnRerun = Enum("ActionOnRerun", dict(rerun="rerun")) +ActionOnRerun = Enum("ActionOnRerun", dict(rerun="rerun")) # type: ignore diff --git a/tron/config/static_config.py b/tron/config/static_config.py index 84804db15..7bcfe667f 100644 --- a/tron/config/static_config.py +++ b/tron/config/static_config.py @@ -1,6 +1,6 @@ from functools import partial -import staticconf +import staticconf # type: ignore from staticconf import config FILENAME = "/nail/srv/configs/tron.yaml" diff --git a/tron/core/action.py b/tron/core/action.py index 5e4b488ec..ad8a72b8f 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -5,6 +5,7 @@ from dataclasses import fields from typing import List from typing import Optional +from typing import Union from tron import node from tron.config.schema import CLEANUP_ACTION_NAME @@ -20,13 +21,13 @@ class ActionCommandConfig: """A configurable data object for one try of an Action.""" command: str - cpus: float = None - mem: float = None - disk: float = None + cpus: Optional[float] = None + mem: Optional[float] = None + disk: Optional[float] = None cap_add: List[str] = field(default_factory=list) cap_drop: List[str] = field(default_factory=list) constraints: set = field(default_factory=set) - docker_image: str = None + docker_image: Optional[str] = None # XXX: we can get rid of docker_parameters once we're off of Mesos docker_parameters: set = field(default_factory=set) env: dict = field(default_factory=dict) @@ -56,14 +57,14 @@ class Action: name: str command_config: ActionCommandConfig node_pool: str - retries: int = None - retries_delay: datetime.timedelta = None - expected_runtime: datetime.timedelta = None - executor: str = None - trigger_downstreams: (bool, dict) = None - triggered_by: set = None - on_upstream_rerun: str = None - trigger_timeout: datetime.timedelta = None + retries: Optional[int] = None + retries_delay: Optional[datetime.timedelta] = None + expected_runtime: Optional[datetime.timedelta] = None + executor: Optional[str] = None + trigger_downstreams: Optional[Union[bool, dict]] = None + triggered_by: Optional[set] = None + on_upstream_rerun: Optional[str] = None + trigger_timeout: Optional[datetime.timedelta] = None @property def is_cleanup(self): diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 682e5db47..08e69c8a0 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -6,8 +6,11 @@ import os from dataclasses import dataclass from dataclasses import fields +from typing import Dict from typing import List from typing import Optional +from typing import Set +from typing import Union from pyrsistent import InvariantException from twisted.internet import reactor @@ -17,7 +20,7 @@ from tron.actioncommand import ActionCommand from tron.actioncommand import NoActionRunnerFactory from tron.actioncommand import SubprocessActionRunnerFactory -from tron.bin.action_runner import build_environment +from tron.bin.action_runner import build_environment # type: ignore # mypy can't find library stub from tron.config.config_utils import StringFormatter from tron.config.schema import ExecutorTypes from tron.core import action @@ -38,7 +41,7 @@ log = logging.getLogger(__name__) MAX_RECOVER_TRIES = 5 INITIAL_RECOVER_DELAY = 3 -KUBERNETES_ACTIONRUN_EXECUTORS = {ExecutorTypes.kubernetes.value, ExecutorTypes.spark.value} +KUBERNETES_ACTIONRUN_EXECUTORS: Set[str] = {ExecutorTypes.kubernetes.value, ExecutorTypes.spark.value} # type: ignore # mypy can't seem to inspect this enum class ActionRunFactory: @@ -136,12 +139,12 @@ class ActionRunAttempt: """Stores state about one try of an action run.""" command_config: action.ActionCommandConfig - start_time: datetime.datetime = None - end_time: datetime.datetime = None - rendered_command: str = None - exit_status: int = None - mesos_task_id: str = None - kubernetes_task_id: str = None + start_time: Optional[datetime.datetime] = None + end_time: Optional[datetime.datetime] = None + rendered_command: Optional[str] = None + exit_status: Optional[int] = None + mesos_task_id: Optional[str] = None + kubernetes_task_id: Optional[str] = None def exit(self, exit_status, end_time=None): if self.end_time is None: @@ -466,7 +469,7 @@ def from_state( run.transition_and_notify("fail_unknown") return run - def start(self, original_command=True): + def start(self, original_command=True) -> Optional[Union[bool, ActionCommand]]: """Start this ActionRun.""" if self.in_delay is not None: log.warning(f"{self} cancelling suspend timer") @@ -492,7 +495,7 @@ def start(self, original_command=True): if not self.is_valid_command(new_attempt.rendered_command): log.error(f"{self} invalid command: {new_attempt.command_config.command}") self.fail(exitcode.EXIT_INVALID_COMMAND) - return + return None return self.submit_command(new_attempt) @@ -510,7 +513,7 @@ def create_attempt(self, original_command=True): self.attempts.append(new_attempt) return new_attempt - def submit_command(self, attempt): + def submit_command(self, attempt) -> Optional[Union[bool, ActionCommand]]: raise NotImplementedError() def stop(self): @@ -522,7 +525,7 @@ def kill(self, final=True): def recover(self): raise NotImplementedError() - def _done(self, target, exit_status=0): + def _done(self, target, exit_status=0) -> Optional[bool]: if self.machine.check(target): if self.triggered_by: EventBus.clear_subscriptions(self.__hash__()) @@ -539,6 +542,7 @@ def _done(self, target, exit_status=0): log.debug( f"{self} cannot transition from {self.state} via {target}", ) + return None def retry(self, original_command=True): """Invoked externally (via API) when action needs to be re-tried @@ -567,10 +571,10 @@ def start_after_delay(self): self.in_delay = None self.start() - def restart(self, original_command=True): + def restart(self, original_command=True) -> Optional[Union[bool, ActionCommand]]: """Used by `fail` when action run has to be re-tried""" if self.retries_delay is not None: - self.in_delay = reactor.callLater( + self.in_delay = reactor.callLater( # type: ignore # no twisted stubs self.retries_delay.total_seconds(), self.start_after_delay, ) @@ -586,12 +590,12 @@ def fail(self, exit_status=None): return self._done("fail", exit_status) - def _exit_unsuccessful(self, exit_status=None, retry_original_command=True): + def _exit_unsuccessful(self, exit_status=None, retry_original_command=True) -> Optional[Union[bool, ActionCommand]]: if self.is_done: log.info( f"{self} got exit code {exit_status} but already in terminal " f'state "{self.state}", not retrying', ) - return + return None if self.last_attempt is not None: self.last_attempt.exit(exit_status) if self.retries_remaining is not None: @@ -640,7 +644,7 @@ def rendered_triggers(self) -> List[str]: def remaining_triggers(self): return [trig for trig in self.rendered_triggers if not EventBus.has_event(trig)] - def success(self): + def success(self) -> Optional[bool]: transition_valid = self._done("success") if transition_valid: if self.trigger_downstreams: @@ -800,10 +804,11 @@ def __getattr__(self, name: str): def __str__(self): return f"ActionRun: {self.id}" - def transition_and_notify(self, target): + def transition_and_notify(self, target) -> Optional[bool]: if self.machine.transition(target): self.notify(self.state) return True + return None class SSHActionRun(ActionRun, Observer): @@ -1141,6 +1146,13 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: self.fail(exitcode.EXIT_KUBERNETES_NOT_CONFIGURED) return None + if attempt.rendered_command is None: + self.fail(exitcode.EXIT_INVALID_COMMAND) + return None + + if attempt.command_config.docker_image is None: + self.fail(exitcode.EXIT_KUBERNETES_TASK_INVALID) + return None try: task = k8s_cluster.create_task( action_run_id=self.id, @@ -1204,12 +1216,12 @@ def recover(self) -> Optional[KubernetesTask]: # We cannot recover if we can't transition to running if not self.machine.check("running"): log.error(f"{self} unable to transition from {self.machine.state} to running for recovery") - return + return None if not self.attempts or self.attempts[-1].kubernetes_task_id is None: log.error(f"{self} no task ID, cannot recover") self.fail_unknown() - return + return None last_attempt = self.attempts[-1] @@ -1244,7 +1256,7 @@ def recover(self) -> Optional[KubernetesTask]: f"invalid task ID {last_attempt.kubernetes_task_id!r}", ) self.fail_unknown() - return + return None self.watch(task) k8s_cluster.recover(task) @@ -1302,7 +1314,7 @@ def kill(self, final: bool = True) -> Optional[str]: def handle_action_command_state_change( self, action_command: ActionCommand, event: str, event_data=None - ) -> Optional[bool]: + ) -> Optional[Union[bool, ActionCommand]]: """ Observe ActionCommand state changes and transition the ActionCommand state machine to a new state. """ @@ -1323,6 +1335,7 @@ def handle_action_command_state_change( return self.success() return self._exit_unsuccessful(action_command.exit_status) + return None handler = handle_action_command_state_change @@ -1341,7 +1354,7 @@ class ActionRunCollection: def __init__(self, action_graph, run_map): self.action_graph = action_graph - self.run_map = run_map + self.run_map: Dict[str, ActionRun] = run_map # Setup proxies self.proxy_action_runs_with_cleanup = proxy.CollectionProxy( self.get_action_runs_with_cleanup, @@ -1391,7 +1404,7 @@ def update_action_config(self, action_graph): return updated @property - def cleanup_action_run(self) -> ActionRun: + def cleanup_action_run(self) -> Optional[ActionRun]: return self.run_map.get(action.CLEANUP_ACTION_NAME) @property diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 7056b31e1..8b3d0d1e1 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -1,4 +1,3 @@ -import json import logging from logging import Logger from typing import cast @@ -41,17 +40,6 @@ KUBERNETES_LOST_NODE_EXIT_CODES = {exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION, exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN} log = logging.getLogger(__name__) -try: - import clog # type: ignore - - clog.config.configure( - scribe_host="169.254.255.254", - scribe_port=1463, - monk_disable=False, - scribe_disable=False, - ) -except ImportError: - clog = None def combine_volumes( @@ -273,22 +261,8 @@ def handle_event(self, event: Event) -> None: self.log.error("Unexpected failure, exiting") self.done() - except Exception as e: + except Exception: self.log.exception(f"unable to handle an event for id={event_id} for event={str(event)}") - # clog here and make sure the message is a string - if clog is None: - log.debug("Clog logger unavailable. Unable to log event") - else: - clog.log_line( - "tmp_missed_tronevents", - json.dumps( - { - "event": str(event), - "exception": type(e), - "exception_message": str(e), - } - ), - ) class KubernetesCluster: diff --git a/tron/mesos.py b/tron/mesos.py index bc2ee67b6..2070d637e 100644 --- a/tron/mesos.py +++ b/tron/mesos.py @@ -3,10 +3,12 @@ import re import socket import time +from typing import Any +from typing import Dict from urllib.parse import urlparse import requests -import staticconf +import staticconf # type: ignore # need to upgrade to get type stubs from task_processing.runners.subscription import Subscription from task_processing.task_processor import TaskProcessor from twisted.internet.defer import logError @@ -84,8 +86,8 @@ class MesosClusterRepository: secret = None name = "frameworks" - clusters = {} - state_data = {} + clusters: Dict[str, "MesosCluster"] = {} + state_data: Dict[str, Any] = {} state_watcher = None @classmethod diff --git a/tron/metrics.py b/tron/metrics.py index 1687b8e87..c069637df 100644 --- a/tron/metrics.py +++ b/tron/metrics.py @@ -1,10 +1,10 @@ -from pyformance.meters import Counter +from pyformance.meters import Counter # type: ignore from pyformance.meters import Histogram from pyformance.meters import Meter from pyformance.meters import SimpleGauge from pyformance.meters import Timer -all_metrics = {} +all_metrics = {} # type: ignore def get_metric(metric_type, name, dimensions, default): diff --git a/tron/serialize/filehandler.py b/tron/serialize/filehandler.py index 2bd8eccfa..6c3dd8190 100644 --- a/tron/serialize/filehandler.py +++ b/tron/serialize/filehandler.py @@ -186,7 +186,7 @@ def tail(self, filename, num_lines=None) -> List[str]: try: cmd = ("tail", "-n", str(num_lines), path) tail_sub = Popen(cmd, stdout=PIPE) - return list(line.rstrip().decode() for line in tail_sub.stdout) + return list(line.rstrip().decode() for line in (tail_sub.stdout if tail_sub.stdout else [])) except OSError as e: log.error(f"Could not tail {path}: {e}") return [] diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index a94219b72..57c5363ea 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -6,8 +6,9 @@ import time from collections import defaultdict from collections import OrderedDict +from typing import DefaultDict -import boto3 +import boto3 # type: ignore from tron.metrics import timer @@ -24,7 +25,7 @@ def __init__(self, name, dynamodb_region, stopping=False) -> None: self.dynamodb_region = dynamodb_region self.table = self.dynamodb.Table(name) self.stopping = stopping - self.save_queue = OrderedDict() + self.save_queue: OrderedDict = OrderedDict() self.save_lock = threading.Lock() self.save_errors = 0 self.save_thread = threading.Thread(target=self._save_loop, args=(), daemon=True) @@ -89,7 +90,7 @@ def _get_remaining_partitions(self, items: list): def _merge_items(self, first_items, remaining_items) -> dict: items = defaultdict(list) - raw_items = defaultdict(bytearray) + raw_items: DefaultDict[str, bytearray] = defaultdict(bytearray) # Merge all items based their keys and deserialize their values if remaining_items: first_items.extend(remaining_items) diff --git a/tron/serialize/runstate/shelvestore.py b/tron/serialize/runstate/shelvestore.py index 13e5b16ce..22d1bf8ac 100644 --- a/tron/serialize/runstate/shelvestore.py +++ b/tron/serialize/runstate/shelvestore.py @@ -5,7 +5,7 @@ import sys from io import BytesIO -import bsddb3 +import bsddb3 # type: ignore from tron.utils import maybe_decode diff --git a/tron/ssh.py b/tron/ssh.py index 453543b17..7d28cd7fe 100644 --- a/tron/ssh.py +++ b/tron/ssh.py @@ -65,7 +65,7 @@ def __str__(self): class NoPasswordAuthClient(default.SSHUserAuthClient): """Only support passwordless auth.""" - preferredOrder = ["publickey", "keyboard-interactive"] + preferredOrder = ["publickey", "keyboard-interactive"] # type: ignore auth_password = None def getGenericAnswers(self, name, instruction, prompts): diff --git a/yelp_package/jammy/Dockerfile b/yelp_package/jammy/Dockerfile index da742381c..5f4b4b6b7 100644 --- a/yelp_package/jammy/Dockerfile +++ b/yelp_package/jammy/Dockerfile @@ -46,5 +46,5 @@ RUN wget --quiet -O - https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - RUN echo "deb http://dl.yarnpkg.com/debian/ stable main" > /etc/apt/sources.list.d/yarn.list RUN apt-get -q update && apt-get -q install -y --no-install-recommends yarn -RUN pip3 install --index-url ${PIP_INDEX_URL} virtualenv==16.7.5 +RUN pip3 install --trusted-host 169.254.255.254 --index-url ${PIP_INDEX_URL} virtualenv==16.7.5 WORKDIR /work