diff --git a/requirements-minimal.txt b/requirements-minimal.txt index 86a6425ae..d1dd6f739 100644 --- a/requirements-minimal.txt +++ b/requirements-minimal.txt @@ -24,3 +24,5 @@ task_processing[mesos_executor,k8s] Twisted>=19.7.0 urllib3>=1.24.2 Werkzeug>=0.15.3 +yelp-clog==7.0.1 +yelp-meteorite==2.1.1 diff --git a/requirements.txt b/requirements.txt index 503d519d8..55bfb9999 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,6 @@ certifi==2022.12.7 cffi==1.12.3 cfn-lint==0.24.4 chardet==3.0.4 -clusterman-metrics==2.2.1 # used by tron for pre-scaling for Spark runs constantly==15.1.0 cryptography==39.0.1 dataclasses==0.6 @@ -47,11 +46,13 @@ lockfile==0.12.2 MarkupSafe==2.1.1 matplotlib-inline==0.1.3 mock==3.0.5 +monk==3.0.4 moto==1.3.13 oauthlib==3.1.0 parso==0.7.0 pexpect==4.7.0 pickleshare==0.7.5 +ply==3.11 prompt-toolkit==3.0.38 psutil==5.6.6 ptyprocess==0.6.0 @@ -76,12 +77,14 @@ requests-oauthlib==1.2.0 responses==0.10.6 rsa==4.9 s3transfer==0.6.0 -scribereader==0.14.1 # used by tron to get tronjob logs setuptools==65.5.1 six==1.15.0 +srv-configs==1.3.3 sshpubkeys==3.1.0 stack-data==0.6.2 task-processing==0.12.2 +tenacity==8.2.3 +thriftpy2==0.4.17 traitlets==5.0.0 Twisted==22.10.0 typing-extensions==4.5.0 @@ -92,6 +95,5 @@ Werkzeug==2.2.3 wrapt==1.11.2 xmltodict==0.12.0 yelp-clog==7.0.1 # scribereader dependency -yelp-logging==4.17.0 # scribereader dependency yelp-meteorite==2.1.1 # used by task-processing to emit metrics, clusterman-metrics dependency zope.interface==5.1.0 diff --git a/tests/utils/scribereader_test.py b/tests/utils/scribereader_test.py index 65e0f4863..58a0848fa 100644 --- a/tests/utils/scribereader_test.py +++ b/tests/utils/scribereader_test.py @@ -21,7 +21,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_today(): "tron.utils.scribereader.get_scribereader_host_and_port", autospec=True, return_value=("host", 1234), - ), mock.patch( + ), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch( "tron.utils.scribereader.scribereader.get_stream_reader", autospec=True, ) as mock_stream_reader, mock.patch( @@ -97,7 +97,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_different_days(): "tron.utils.scribereader.get_scribereader_host_and_port", autospec=True, return_value=("host", 1234), - ), mock.patch( + ), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch( "tron.utils.scribereader.scribereader.get_stream_reader", autospec=True, ) as mock_stream_reader, mock.patch( @@ -195,7 +195,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_in_past(): "tron.utils.scribereader.get_scribereader_host_and_port", autospec=True, return_value=("host", 1234), - ), mock.patch( + ), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch( "tron.utils.scribereader.scribereader.get_stream_reader", autospec=True, ) as mock_stream_reader, mock.patch( @@ -261,7 +261,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_for_long_output(): "tron.utils.scribereader.get_scribereader_host_and_port", autospec=True, return_value=("host", 1234), - ), mock.patch( + ), mock.patch("tron.utils.scribereader.scribereader", autospec=True,), mock.patch( "tron.utils.scribereader.scribereader.get_stream_reader", autospec=True, ) as mock_stream_reader, mock.patch( diff --git a/tox.ini b/tox.ini index 936b4d527..a5329de79 100644 --- a/tox.ini +++ b/tox.ini @@ -25,7 +25,7 @@ commands = check-requirements # optionally install yelpy requirements - this is after check-requirements since # check-requirements doesn't understand these extra requirements - -pip-custom-platform install --index-url https://pypi.yelpcorp.com/simple -r yelp_package/extra_requirements_yelp.txt + -pip install --index-url https://pypi.yelpcorp.com/simple -r yelp_package/extra_requirements_yelp.txt # we then run tests at the very end so that we can run tests with yelpy requirements py.test -s {posargs:tests} diff --git a/tron/commands/client.py b/tron/commands/client.py index be036abe0..307480fda 100644 --- a/tron/commands/client.py +++ b/tron/commands/client.py @@ -18,7 +18,7 @@ assert simplejson # Pyflakes except ImportError: - import json as simplejson + import json as simplejson # type: ignore log = logging.getLogger(__name__) @@ -100,11 +100,11 @@ def build_get_url(url, data=None): def ensure_user_attribution(headers: Mapping[str, str]) -> Dict[str, str]: - headers = headers.copy() + headers = headers.copy() # type: ignore if "User-Agent" not in headers: - headers["User-Agent"] = USER_AGENT - headers["User-Agent"] += f' ({os.environ.get("USER", "anonymous")})' - return headers + headers["User-Agent"] = USER_AGENT # type: ignore + headers["User-Agent"] += f' ({os.environ.get("USER", "anonymous")})' # type: ignore + return headers # type: ignore class Client: diff --git a/tron/commands/display.py b/tron/commands/display.py index c64813260..14471fdbb 100644 --- a/tron/commands/display.py +++ b/tron/commands/display.py @@ -269,7 +269,7 @@ class DisplayJobRuns(TableDisplay): colors = { "id": partial(Color.set, "yellow"), "state": add_color_for_state, - "manual": lambda value: Color.set("cyan" if value else None, value), + "manual": lambda value: Color.set("cyan" if value else None, value), # type: ignore } def format_value(self, field_idx, value): diff --git a/tron/config/config_utils.py b/tron/config/config_utils.py index d2ca4eabd..67250bd1d 100644 --- a/tron/config/config_utils.py +++ b/tron/config/config_utils.py @@ -275,8 +275,8 @@ def build_child_context(self, path): class NullConfigContext: path = "" - nodes = set() - command_context = {} + nodes = set() # type: ignore + command_context = {} # type: ignore namespace = MASTER_NAMESPACE partial = False @@ -292,8 +292,8 @@ class Validator: """ config_class = None - defaults = {} - validators = {} + defaults = {} # type: ignore + validators = {} # type: ignore optional = False def validate(self, in_dict, config_context): diff --git a/tron/config/schema.py b/tron/config/schema.py index eaca7cd19..6cdd78098 100644 --- a/tron/config/schema.py +++ b/tron/config/schema.py @@ -260,7 +260,7 @@ def config_object_factory(name, required=None, optional=None): ) -class ConfigSecretVolume(_ConfigSecretVolume): +class ConfigSecretVolume(_ConfigSecretVolume): # type: ignore def _asdict(self) -> dict: d = super()._asdict().copy() items = d.get("items", []) @@ -274,7 +274,7 @@ def _asdict(self) -> dict: for i, item in enumerate(items): if isinstance(item, ConfigSecretVolumeItem): d["items"][i] = item._asdict() - return d + return d # type: ignore ConfigSecretSource = config_object_factory( @@ -304,15 +304,15 @@ def _asdict(self) -> dict: optional=[], ) -StatePersistenceTypes = Enum( +StatePersistenceTypes = Enum( # type: ignore "StatePersistenceTypes", dict(shelve="shelve", yaml="yaml", dynamodb="dynamodb"), ) -ExecutorTypes = Enum("ExecutorTypes", dict(ssh="ssh", mesos="mesos", kubernetes="kubernetes", spark="spark")) +ExecutorTypes = Enum("ExecutorTypes", dict(ssh="ssh", mesos="mesos", kubernetes="kubernetes", spark="spark")) # type: ignore -ActionRunnerTypes = Enum("ActionRunnerTypes", dict(none="none", subprocess="subprocess")) +ActionRunnerTypes = Enum("ActionRunnerTypes", dict(none="none", subprocess="subprocess")) # type: ignore -VolumeModes = Enum("VolumeModes", dict(RO="RO", RW="RW")) +VolumeModes = Enum("VolumeModes", dict(RO="RO", RW="RW")) # type: ignore -ActionOnRerun = Enum("ActionOnRerun", dict(rerun="rerun")) +ActionOnRerun = Enum("ActionOnRerun", dict(rerun="rerun")) # type: ignore diff --git a/tron/config/static_config.py b/tron/config/static_config.py index 84804db15..7bcfe667f 100644 --- a/tron/config/static_config.py +++ b/tron/config/static_config.py @@ -1,6 +1,6 @@ from functools import partial -import staticconf +import staticconf # type: ignore from staticconf import config FILENAME = "/nail/srv/configs/tron.yaml" diff --git a/tron/core/action.py b/tron/core/action.py index 5e4b488ec..b1b21976c 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -20,13 +20,13 @@ class ActionCommandConfig: """A configurable data object for one try of an Action.""" command: str - cpus: float = None - mem: float = None - disk: float = None + cpus: Optional[float] = None + mem: Optional[float] = None + disk: Optional[float] = None cap_add: List[str] = field(default_factory=list) cap_drop: List[str] = field(default_factory=list) constraints: set = field(default_factory=set) - docker_image: str = None + docker_image: Optional[str] = None # XXX: we can get rid of docker_parameters once we're off of Mesos docker_parameters: set = field(default_factory=set) env: dict = field(default_factory=dict) @@ -56,14 +56,14 @@ class Action: name: str command_config: ActionCommandConfig node_pool: str - retries: int = None - retries_delay: datetime.timedelta = None - expected_runtime: datetime.timedelta = None - executor: str = None - trigger_downstreams: (bool, dict) = None - triggered_by: set = None - on_upstream_rerun: str = None - trigger_timeout: datetime.timedelta = None + retries: Optional[int] = None + retries_delay: Optional[datetime.timedelta] = None + expected_runtime: Optional[datetime.timedelta] = None + executor: Optional[str] = None + trigger_downstreams: (bool, dict) = None # type: ignore + triggered_by: Optional[set] = None + on_upstream_rerun: Optional[str] = None + trigger_timeout: Optional[datetime.timedelta] = None @property def is_cleanup(self): diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 682e5db47..a391c7617 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -17,7 +17,7 @@ from tron.actioncommand import ActionCommand from tron.actioncommand import NoActionRunnerFactory from tron.actioncommand import SubprocessActionRunnerFactory -from tron.bin.action_runner import build_environment +from tron.bin.action_runner import build_environment # type: ignore from tron.config.config_utils import StringFormatter from tron.config.schema import ExecutorTypes from tron.core import action @@ -38,7 +38,7 @@ log = logging.getLogger(__name__) MAX_RECOVER_TRIES = 5 INITIAL_RECOVER_DELAY = 3 -KUBERNETES_ACTIONRUN_EXECUTORS = {ExecutorTypes.kubernetes.value, ExecutorTypes.spark.value} +KUBERNETES_ACTIONRUN_EXECUTORS = {ExecutorTypes.kubernetes.value, ExecutorTypes.spark.value} # type: ignore class ActionRunFactory: @@ -136,12 +136,12 @@ class ActionRunAttempt: """Stores state about one try of an action run.""" command_config: action.ActionCommandConfig - start_time: datetime.datetime = None - end_time: datetime.datetime = None - rendered_command: str = None - exit_status: int = None - mesos_task_id: str = None - kubernetes_task_id: str = None + start_time: Optional[datetime.datetime] = None + end_time: Optional[datetime.datetime] = None + rendered_command: Optional[str] = None + exit_status: Optional[int] = None + mesos_task_id: Optional[str] = None + kubernetes_task_id: Optional[str] = None def exit(self, exit_status, end_time=None): if self.end_time is None: @@ -492,7 +492,7 @@ def start(self, original_command=True): if not self.is_valid_command(new_attempt.rendered_command): log.error(f"{self} invalid command: {new_attempt.command_config.command}") self.fail(exitcode.EXIT_INVALID_COMMAND) - return + return None return self.submit_command(new_attempt) @@ -522,7 +522,7 @@ def kill(self, final=True): def recover(self): raise NotImplementedError() - def _done(self, target, exit_status=0): + def _done(self, target, exit_status=0) -> Optional[bool]: if self.machine.check(target): if self.triggered_by: EventBus.clear_subscriptions(self.__hash__()) @@ -539,6 +539,7 @@ def _done(self, target, exit_status=0): log.debug( f"{self} cannot transition from {self.state} via {target}", ) + return None def retry(self, original_command=True): """Invoked externally (via API) when action needs to be re-tried @@ -591,7 +592,7 @@ def _exit_unsuccessful(self, exit_status=None, retry_original_command=True): log.info( f"{self} got exit code {exit_status} but already in terminal " f'state "{self.state}", not retrying', ) - return + return None if self.last_attempt is not None: self.last_attempt.exit(exit_status) if self.retries_remaining is not None: @@ -640,7 +641,7 @@ def rendered_triggers(self) -> List[str]: def remaining_triggers(self): return [trig for trig in self.rendered_triggers if not EventBus.has_event(trig)] - def success(self): + def success(self) -> Optional[bool]: transition_valid = self._done("success") if transition_valid: if self.trigger_downstreams: @@ -800,10 +801,11 @@ def __getattr__(self, name: str): def __str__(self): return f"ActionRun: {self.id}" - def transition_and_notify(self, target): + def transition_and_notify(self, target) -> Optional[bool]: if self.machine.transition(target): self.notify(self.state) return True + return None class SSHActionRun(ActionRun, Observer): @@ -1204,12 +1206,12 @@ def recover(self) -> Optional[KubernetesTask]: # We cannot recover if we can't transition to running if not self.machine.check("running"): log.error(f"{self} unable to transition from {self.machine.state} to running for recovery") - return + return None if not self.attempts or self.attempts[-1].kubernetes_task_id is None: log.error(f"{self} no task ID, cannot recover") self.fail_unknown() - return + return None last_attempt = self.attempts[-1] @@ -1244,7 +1246,7 @@ def recover(self) -> Optional[KubernetesTask]: f"invalid task ID {last_attempt.kubernetes_task_id!r}", ) self.fail_unknown() - return + return None self.watch(task) k8s_cluster.recover(task) @@ -1300,9 +1302,7 @@ def kill(self, final: bool = True) -> Optional[str]: return "\n".join(msgs) - def handle_action_command_state_change( - self, action_command: ActionCommand, event: str, event_data=None - ) -> Optional[bool]: + def handle_action_command_state_change(self, action_command: ActionCommand, event: str, event_data=None): """ Observe ActionCommand state changes and transition the ActionCommand state machine to a new state. """ @@ -1323,6 +1323,7 @@ def handle_action_command_state_change( return self.success() return self._exit_unsuccessful(action_command.exit_status) + return None handler = handle_action_command_state_change @@ -1392,7 +1393,7 @@ def update_action_config(self, action_graph): @property def cleanup_action_run(self) -> ActionRun: - return self.run_map.get(action.CLEANUP_ACTION_NAME) + return self.run_map.get(action.CLEANUP_ACTION_NAME) # type: ignore @property def state_data(self): diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 7056b31e1..8da45e9b0 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -1,4 +1,3 @@ -import json import logging from logging import Logger from typing import cast @@ -41,17 +40,6 @@ KUBERNETES_LOST_NODE_EXIT_CODES = {exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION, exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN} log = logging.getLogger(__name__) -try: - import clog # type: ignore - - clog.config.configure( - scribe_host="169.254.255.254", - scribe_port=1463, - monk_disable=False, - scribe_disable=False, - ) -except ImportError: - clog = None def combine_volumes( @@ -274,21 +262,9 @@ def handle_event(self, event: Event) -> None: self.done() except Exception as e: - self.log.exception(f"unable to handle an event for id={event_id} for event={str(event)}") - # clog here and make sure the message is a string - if clog is None: - log.debug("Clog logger unavailable. Unable to log event") - else: - clog.log_line( - "tmp_missed_tronevents", - json.dumps( - { - "event": str(event), - "exception": type(e), - "exception_message": str(e), - } - ), - ) + self.log.exception( + f"unable to handle an event for id={event_id} for event={str(event)}, exception type: {type(e)} and exception message: {str(e)}" + ) class KubernetesCluster: @@ -477,11 +453,11 @@ def create_task( self, action_run_id: str, serializer: OutputStreamSerializer, - command: str, + command: Optional[str], cpus: Optional[float], mem: Optional[float], disk: Optional[float], - docker_image: str, + docker_image: Optional[str], env: Dict[str, str], secret_env: Dict[str, ConfigSecretSource], secret_volumes: Collection[ConfigSecretVolume], diff --git a/tron/mesos.py b/tron/mesos.py index bc2ee67b6..140317b2f 100644 --- a/tron/mesos.py +++ b/tron/mesos.py @@ -6,7 +6,7 @@ from urllib.parse import urlparse import requests -import staticconf +import staticconf # type: ignore from task_processing.runners.subscription import Subscription from task_processing.task_processor import TaskProcessor from twisted.internet.defer import logError @@ -84,8 +84,8 @@ class MesosClusterRepository: secret = None name = "frameworks" - clusters = {} - state_data = {} + clusters = {} # type: ignore + state_data = {} # type: ignore state_watcher = None @classmethod diff --git a/tron/metrics.py b/tron/metrics.py index 1687b8e87..c069637df 100644 --- a/tron/metrics.py +++ b/tron/metrics.py @@ -1,10 +1,10 @@ -from pyformance.meters import Counter +from pyformance.meters import Counter # type: ignore from pyformance.meters import Histogram from pyformance.meters import Meter from pyformance.meters import SimpleGauge from pyformance.meters import Timer -all_metrics = {} +all_metrics = {} # type: ignore def get_metric(metric_type, name, dimensions, default): diff --git a/tron/serialize/filehandler.py b/tron/serialize/filehandler.py index 2bd8eccfa..c508e5545 100644 --- a/tron/serialize/filehandler.py +++ b/tron/serialize/filehandler.py @@ -186,7 +186,7 @@ def tail(self, filename, num_lines=None) -> List[str]: try: cmd = ("tail", "-n", str(num_lines), path) tail_sub = Popen(cmd, stdout=PIPE) - return list(line.rstrip().decode() for line in tail_sub.stdout) + return list(line.rstrip().decode() for line in tail_sub.stdout) # type: ignore except OSError as e: log.error(f"Could not tail {path}: {e}") return [] diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index a94219b72..a33293188 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -7,7 +7,7 @@ from collections import defaultdict from collections import OrderedDict -import boto3 +import boto3 # type: ignore from tron.metrics import timer @@ -24,7 +24,7 @@ def __init__(self, name, dynamodb_region, stopping=False) -> None: self.dynamodb_region = dynamodb_region self.table = self.dynamodb.Table(name) self.stopping = stopping - self.save_queue = OrderedDict() + self.save_queue = OrderedDict() # type: OrderedDict self.save_lock = threading.Lock() self.save_errors = 0 self.save_thread = threading.Thread(target=self._save_loop, args=(), daemon=True) @@ -89,7 +89,7 @@ def _get_remaining_partitions(self, items: list): def _merge_items(self, first_items, remaining_items) -> dict: items = defaultdict(list) - raw_items = defaultdict(bytearray) + raw_items = defaultdict(bytearray) # type: ignore # Merge all items based their keys and deserialize their values if remaining_items: first_items.extend(remaining_items) diff --git a/tron/serialize/runstate/shelvestore.py b/tron/serialize/runstate/shelvestore.py index 13e5b16ce..22d1bf8ac 100644 --- a/tron/serialize/runstate/shelvestore.py +++ b/tron/serialize/runstate/shelvestore.py @@ -5,7 +5,7 @@ import sys from io import BytesIO -import bsddb3 +import bsddb3 # type: ignore from tron.utils import maybe_decode diff --git a/tron/ssh.py b/tron/ssh.py index 453543b17..7d28cd7fe 100644 --- a/tron/ssh.py +++ b/tron/ssh.py @@ -65,7 +65,7 @@ def __str__(self): class NoPasswordAuthClient(default.SSHUserAuthClient): """Only support passwordless auth.""" - preferredOrder = ["publickey", "keyboard-interactive"] + preferredOrder = ["publickey", "keyboard-interactive"] # type: ignore auth_password = None def getGenericAnswers(self, name, instruction, prompts):