diff --git a/debian/changelog b/debian/changelog index 62b6b589d..bc7a5639a 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,13 @@ +tron (1.30.0) jammy; urgency=medium + + * 1.30.0 tagged with 'make release' + Commit: Adding yelp_clog S3LogsReader (#949) * Upgrade yelp + scribereader deps * Enable S3LogsReader for action run logs * + Upgrade boto requirements * Upgrade mypy and update type ignores * + Pin more internal requirements + + -- Yaroslav Liakhovskyi Mon, 29 Apr 2024 04:57:37 -0700 + tron (1.29.5) jammy; urgency=medium * 1.29.5 tagged with 'make release' diff --git a/mypy.ini b/mypy.ini index a27a103f9..258b29266 100644 --- a/mypy.ini +++ b/mypy.ini @@ -12,3 +12,9 @@ warn_unused_ignores = True [mypy-clusterman_metrics.*] ignore_missing_imports = True + +[mypy-clog.*] +ignore_missing_imports = True + +[mypy-scribereader.*] +ignore_missing_imports = True diff --git a/requirements-dev-minimal.txt b/requirements-dev-minimal.txt index 500dc9bd2..eb9d38808 100644 --- a/requirements-dev-minimal.txt +++ b/requirements-dev-minimal.txt @@ -7,3 +7,6 @@ pylint pytest pytest-asyncio requirements-tools +types-PyYAML +types-requests<2.31.0.7 # newer types-requests requires urllib3>=2 +types-simplejson diff --git a/requirements-dev.txt b/requirements-dev.txt index 36def9173..41f7131fc 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,8 +10,8 @@ iniconfig==1.1.1 isort==4.3.18 lazy-object-proxy==1.9.0 mccabe==0.7.0 -mypy==0.812 -mypy-extensions==0.4.3 +mypy==1.9.0 +mypy-extensions==1.0.0 nodeenv==1.3.3 packaging==19.2 platformdirs==2.5.2 @@ -28,5 +28,8 @@ requirements-tools==1.2.1 toml==0.10.2 tomli==2.0.1 tomlkit==0.11.6 -typed-ast==1.4.0 +types-PyYAML==6.0.12 +types-requests==2.31.0.5 +types-simplejson==3.19.0.20240310 +types-urllib3==1.26.25.14 virtualenv==20.17.1 diff --git a/requirements.txt b/requirements.txt index 71910567a..d7cd92707 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,8 +7,8 @@ aws-sam-translator==1.15.1 aws-xray-sdk==2.4.2 backcall==0.1.0 boto==2.49.0 -boto3==1.26.85 -botocore==1.29.86 +boto3==1.34.80 +botocore==1.34.80 bsddb3==6.2.7 cachetools==4.2.1 certifi==2022.12.7 @@ -75,7 +75,7 @@ requests==2.25.0 requests-oauthlib==1.2.0 responses==0.10.6 rsa==4.9 -s3transfer==0.6.0 +s3transfer==0.10.1 setuptools==65.5.1 six==1.15.0 sshpubkeys==3.1.0 diff --git a/tests/utils/scribereader_test.py b/tests/utils/scribereader_test.py index 65e0f4863..66e3ce8a8 100644 --- a/tests/utils/scribereader_test.py +++ b/tests/utils/scribereader_test.py @@ -3,12 +3,81 @@ import pytest +import tron.utils.scribereader from tron.utils.scribereader import read_log_stream_for_action_run try: import scribereader # noqa: F401 + from clog.readers import S3LogsReader # noqa: F401 except ImportError: - pytest.skip("scribereader not available, skipping tests", allow_module_level=True) + pytest.skip("yelp logs readers not available, skipping tests", allow_module_level=True) + + +# used for an explicit patch of staticconf.read return value for an arbitrary namespace +def static_conf_patch(args): + return lambda arg, namespace, default=None: args.get(arg) + + +def test_read_log_stream_for_action_run_not_available(): + with mock.patch("tron.utils.scribereader.scribereader_available", False), mock.patch( + "tron.utils.scribereader.s3reader_available", False + ): + output = tron.utils.scribereader.read_log_stream_for_action_run( + "namespace.job.1234.action", + component="stdout", + min_date=datetime.datetime.now(), + max_date=datetime.datetime.now(), + paasta_cluster="fake", + ) + assert "unable to display logs" in output[0] + + +def test_read_log_stream_for_action_run_yelp_clog(): + with mock.patch( + "staticconf.read", + autospec=True, + side_effect=static_conf_patch({"logging.use_s3_reader": True, "logging.max_lines_to_display": 1000}), + ), mock.patch("tron.config.static_config.build_configuration_watcher", autospec=True,), mock.patch( + "tron.config.static_config.load_yaml_file", + autospec=True, + ), mock.patch( + "tron.utils.scribereader.S3LogsReader", autospec=True + ) as mock_s3_reader: + + mock_s3_reader.return_value.get_log_reader.return_value = iter( + [ + """{ + "tron_run_number": 1234, + "component": "stdout", + "message": "line 1", + "timestamp": "2021-01-02T18:10:09.169421619Z", + "cluster": "fake" + }""", + """{ + "tron_run_number": 1234, + "component": "stdout", + "message": "line 2", + "timestamp": "2021-01-02T18:11:09.169421619Z", + "cluster": "fake" + }""", + """{ + "tron_run_number": 1234, + "component": "stderr", + "message": "line 3", + "timestamp": "2021-01-02T18:12:09.169421619Z", + "cluster": "fake" + }""", + ] + ) + + output = read_log_stream_for_action_run( + "namespace.job.1234.action", + component="stdout", + min_date=datetime.datetime.now(), + max_date=datetime.datetime.now(), + paasta_cluster="fake", + ) + assert output == ["line 1", "line 2"] def test_read_log_stream_for_action_run_min_date_and_max_date_today(): @@ -35,7 +104,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_today(): "tron.config.static_config.build_configuration_watcher", autospec=True, ), mock.patch( - "staticconf.read", autospec=True, return_value=1000 + "staticconf.read", autospec=True, side_effect=static_conf_patch({"logging.max_lines_to_display": 1000}) ), mock.patch( "tron.config.static_config.load_yaml_file", autospec=True, @@ -111,7 +180,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_different_days(): "tron.config.static_config.build_configuration_watcher", autospec=True, ), mock.patch( - "staticconf.read", autospec=True, return_value=1000 + "staticconf.read", autospec=True, side_effect=static_conf_patch({"logging.max_lines_to_display": 1000}) ), mock.patch( "tron.config.static_config.load_yaml_file", autospec=True, @@ -209,7 +278,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_in_past(): "tron.config.static_config.build_configuration_watcher", autospec=True, ), mock.patch( - "staticconf.read", autospec=True, return_value=1000 + "staticconf.read", autospec=True, side_effect=static_conf_patch({"logging.max_lines_to_display": 1000}) ), mock.patch( "tron.config.static_config.load_yaml_file", autospec=True, @@ -275,7 +344,7 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_for_long_output(): "tron.config.static_config.build_configuration_watcher", autospec=True, ), mock.patch( - "staticconf.read", autospec=True, return_value=1000 + "staticconf.read", autospec=True, side_effect=static_conf_patch({"logging.max_lines_to_display": 1000}) ), mock.patch( "tron.config.static_config.load_yaml_file", autospec=True, diff --git a/tron/__init__.py b/tron/__init__.py index 119dd160b..e63b54537 100644 --- a/tron/__init__.py +++ b/tron/__init__.py @@ -1,4 +1,4 @@ -__version_info__ = (1, 29, 5) +__version_info__ = (1, 30, 0) __version__ = ".".join("%s" % v for v in __version_info__) __author__ = "Yelp " __credits__ = [ diff --git a/tron/commands/backfill.py b/tron/commands/backfill.py index dce607a60..137f5fec9 100644 --- a/tron/commands/backfill.py +++ b/tron/commands/backfill.py @@ -112,7 +112,7 @@ async def create(self) -> Optional[str]: match = re.match(r"^Created JobRun:([-.\w]+)$", result) if match: - self.run_name = match.groups(0)[0] + self.run_name = match.groups(0)[0] # type: ignore[assignment] # mypy wrongly identifies self.run_name type as "Union[str, int]" self.run_state = ActionRun.STARTING print(f"Job run '{self.run_name}' for {self.run_time_str} created") else: diff --git a/tron/commands/display.py b/tron/commands/display.py index 91e254518..c64813260 100644 --- a/tron/commands/display.py +++ b/tron/commands/display.py @@ -269,7 +269,7 @@ class DisplayJobRuns(TableDisplay): colors = { "id": partial(Color.set, "yellow"), "state": add_color_for_state, - "manual": lambda value: Color.set("cyan" if value else None, value), # type: ignore # can't type a lambda + "manual": lambda value: Color.set("cyan" if value else None, value), } def format_value(self, field_idx, value): diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 27d2d90f7..db66d3b0d 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -19,7 +19,7 @@ from tron.actioncommand import ActionCommand from tron.actioncommand import NoActionRunnerFactory from tron.actioncommand import SubprocessActionRunnerFactory -from tron.bin.action_runner import build_environment # type: ignore # mypy can't find library stub +from tron.bin.action_runner import build_environment from tron.bin.action_runner import build_labels from tron.config.config_utils import StringFormatter from tron.config.schema import ExecutorTypes diff --git a/tron/utils/scribereader.py b/tron/utils/scribereader.py index ea068e570..21521867d 100644 --- a/tron/utils/scribereader.py +++ b/tron/utils/scribereader.py @@ -4,6 +4,7 @@ import operator import socket from functools import lru_cache +from typing import Iterator from typing import List from typing import Optional from typing import Tuple @@ -15,10 +16,19 @@ try: - from scribereader import scribereader # type: ignore - from clog.readers import StreamTailerSetupError # type: ignore + from scribereader import scribereader + from scribereader.clog.readers import StreamTailerSetupError + + scribereader_available = True except ImportError: - scribereader = None # sorry folks, you'll need to add your own way to retrieve logs + scribereader_available = False # sorry folks, you'll need to add your own way to retrieve logs + +try: + from clog.readers import S3LogsReader + + s3reader_available = True +except ImportError: + s3reader_available = False log = logging.getLogger(__name__) @@ -59,15 +69,7 @@ def get_ecosystem() -> str: @lru_cache(maxsize=1) -def get_scribereader_host_and_port() -> Optional[Tuple[str, int]]: - try: - ecosystem = get_ecosystem() - superregion = get_superregion() - region = get_region() - except OSError: - log.warning("Unable to read location mapping files from disk, not returning scribereader host/port") - return None - +def get_scribereader_host_and_port(ecosystem: str, superregion: str, region: str) -> Tuple[str, int]: # NOTE: Passing in an ecosystem of prod is not supported by scribereader # as there's no mapping of ecosystem->scribe-kafka-services discovery hosts # for this ecosystem @@ -79,6 +81,56 @@ def get_scribereader_host_and_port() -> Optional[Tuple[str, int]]: return host, port +class PaaSTALogs: + def __init__(self, component: str, paasta_cluster: str, action_run_id: str) -> None: + self.component = component + self.paasta_cluster = paasta_cluster + self.action_run_id = action_run_id + namespace, job_name, run_num, action = action_run_id.split(".") + # in our logging infra, things are logged to per-instance streams - but + # since Tron PaaSTA instances are of the form `job_name.action`, we need + # to escape the period since some parts of our infra will reject streams + # containing them - thus, the "weird" __ separator + self.stream_name = f"stream_paasta_app_output_{namespace}_{job_name}__{action}" + self.run_num = int(run_num) + self.num_lines = 0 + self.malformed_lines = 0 + self.output: List[Tuple[str, str]] = [] + self.truncated_output = False + + def fetch(self, stream: Iterator[str], max_lines: Optional[int]) -> None: + for line in stream: + if max_lines is not None and self.num_lines == max_lines: + self.truncated_output = True + break + # it's possible for jobs to run multiple times a day and have obscenely large amounts of output + # so we can't just truncate after seeing X number of lines for the run number in question - we + # need to count how many total lines we've seen and bail out early to preserve tron's uptime + self.num_lines += 1 + + try: + payload = json.loads(line) + except json.decoder.JSONDecodeError: + log.error( + f"Unable to decode log line from stream ({self.stream_name}) for {self.action_run_id}: {line}" + ) + self.malformed_lines += 1 + continue + + if ( + int(payload.get("tron_run_number", -1)) == self.run_num + and payload.get("component") == self.component + and payload.get("message") is not None + and payload.get("timestamp") is not None + and payload.get("cluster") == self.paasta_cluster + ): + self.output.append((payload["timestamp"], payload["message"])) + + def sorted_lines(self) -> List[str]: + self.output.sort(key=operator.itemgetter(0)) + return [line for _, line in self.output] + + def read_log_stream_for_action_run( action_run_id: str, component: str, @@ -90,146 +142,109 @@ def read_log_stream_for_action_run( if min_date is None: return [f"{action_run_id} has not started yet."] + use_s3_reader = False + + if s3reader_available: + config_watcher = get_config_watcher() + config_watcher.reload_if_changed() + use_s3_reader = staticconf.read("logging.use_s3_reader", namespace=NAMESPACE, default=False) + elif scribereader_available is False: + return ["Neither scribereader nor yelp_clog (internal Yelp packages) are available - unable to display logs."] + if max_lines == USE_SRV_CONFIGS: config_watcher = get_config_watcher() config_watcher.reload_if_changed() max_lines = staticconf.read("logging.max_lines_to_display", namespace=NAMESPACE) - if scribereader is None: - return ["Scribereader (an internal Yelp package) is not available - unable to display logs."] - if get_scribereader_host_and_port() is None: + try: + ecosystem = get_ecosystem() + superregion = get_superregion() + region = get_region() + except OSError: + log.warning("Unable to read location mapping files from disk, not returning scribereader host/port") return [ "Unable to determine where Tron is located. If you're seeing this inside Yelp, report this to #compute-infra" ] - host, port = get_scribereader_host_and_port() # type: ignore # the None case is covered by the check above - # this should never fail since get_scribereader_host_and_port() will have also called get_superregion() and we've ensured that - # that file exists by getting to this point if paasta_cluster is None: - paasta_cluster = get_superregion() + paasta_cluster = superregion + + paasta_logs = PaaSTALogs(component, paasta_cluster, action_run_id) + stream_name = paasta_logs.stream_name today = datetime.date.today() start_date = min_date.date() - end_date = max_date.date() if max_date else None - - use_tailer = today in {start_date, end_date} - use_reader = start_date != today and end_date is not None - - if end_date is not None and end_date == today: - end_date -= datetime.timedelta(days=1) - - namespace, job_name, run_num, action = action_run_id.split(".") - # in our logging infra, things are logged to per-instance streams - but - # since Tron PaaSTA instances are of the form `job_name.action`, we need - # to escape the period since some parts of our infra will reject streams - # containing them - thus, the "weird" __ separator - stream_name = f"stream_paasta_app_output_{namespace}_{job_name}__{action}" - output: List[Tuple[str, str]] = [] - - malformed_lines = 0 - num_lines = 0 - truncated_output = False - - # We'll only use a stream reader for logs from not-today. - # that said, it's possible that an action spans more than a single day - in this case, we'll first read "historical" data from - # the reader and then follow-up with today's logs from a stream tailer. - # NOTE: this is more-or-less what our internal `scribereader` binary does - if use_reader: - with scribereader.get_stream_reader( - stream_name=stream_name, - min_date=min_date, - max_date=max_date, - reader_host=host, - reader_port=port, - ) as stream: - for line in stream: - if max_lines is not None and num_lines == max_lines: - truncated_output = True - break - # it's possible for jobs to run multiple times a day and have obscenely large amounts of output - # so we can't just truncate after seeing X number of lines for the run number in question - we - # need to count how many total lines we've seen and bail out early to preserve tron's uptime - num_lines += 1 - - try: - payload = json.loads(line) - except json.decoder.JSONDecodeError: - log.error(f"Unable to decode log line from stream ({stream_name}) for {action_run_id}: {line}") - malformed_lines += 1 - continue - - if ( - payload.get("tron_run_number") == int(run_num) - and payload.get("component") == component - and payload.get("message") is not None - and payload.get("timestamp") is not None - and payload.get("cluster") == paasta_cluster - ): - output.append((payload["timestamp"], payload["message"])) - - if use_tailer: - stream = scribereader.get_stream_tailer( - stream_name=stream_name, - tailing_host=host, - tailing_port=port, - lines=-1, - ) - try: - for line in stream: - if num_lines == max_lines: - truncated_output = True - break - # it's possible for jobs to run multiple times a day and have obscenely large amounts of output - # so we can't just truncate after seeing X number of lines for the run number in question - we - # need to count how many total lines we've seen and bail out early to preserve tron's uptime - num_lines += 1 - - try: - payload = json.loads(line) - except json.decoder.JSONDecodeError: - log.error(f"Unable to decode log line from stream ({stream_name}) for {action_run_id}: {line}") - malformed_lines += 1 - continue - - if ( - payload.get("tron_run_number") == int(run_num) - and payload.get("component") == component - and payload.get("message") is not None - and payload.get("timestamp") is not None - and payload.get("cluster") == paasta_cluster - ): - output.append((payload["timestamp"], payload["message"])) - except StreamTailerSetupError: - return [ - f"No data in stream {stream_name} - if this is the first time this action has run and you expected " - "output, please wait a couple minutes and refresh." - ] - except socket.timeout: - return [ - f"Unable to connect to stream {stream_name} - if this is the first time this action has run and you " - "expected output, please wait a couple minutes and refresh." - ] - finally: - stream.close() - - # XXX: for some reason, we're occasionally getting data out of order from scribereader - so we'll sort based on - # timestamp until we can figure out what's causing this. - output.sort(key=operator.itemgetter(0)) - lines = [line for _, line in output] - malformed = [f"{malformed_lines} encountered while retrieving logs"] if malformed_lines else [] + end_date: Optional[datetime.date] + + # yelp_clog S3LogsReader is a newer reader that is supposed to replace scribe readers eventually. + if use_s3_reader: + end_date = max_date.date() if max_date else today + + log.debug("Using S3LogsReader to retrieve logs") + s3_reader = S3LogsReader(ecosystem).get_log_reader(log_name=stream_name, min_date=start_date, max_date=end_date) + paasta_logs.fetch(s3_reader, max_lines) + else: + end_date = max_date.date() if max_date else None + use_tailer = today in {start_date, end_date} + use_reader = start_date != today and end_date is not None + + host, port = get_scribereader_host_and_port(ecosystem, superregion, region) + + # We'll only use a stream reader for logs from not-today. + # that said, it's possible that an action spans more than a single day - in this case, we'll first read "historical" data from + # the reader and then follow-up with today's logs from a stream tailer. + # NOTE: this is more-or-less what our internal `scribereader` binary does + if use_reader: + with scribereader.get_stream_reader( + stream_name=stream_name, + min_date=min_date, + max_date=max_date, + reader_host=host, + reader_port=port, + ) as stream: + paasta_logs.fetch(stream, max_lines) + + if use_tailer: + stream = scribereader.get_stream_tailer( + stream_name=stream_name, + tailing_host=host, + tailing_port=port, + lines=-1, + ) + try: + paasta_logs.fetch(stream, max_lines) + except StreamTailerSetupError: + return [ + f"No data in stream {stream_name} - if this is the first time this action has run and you expected " + "output, please wait a couple minutes and refresh." + ] + except socket.timeout: + return [ + f"Unable to connect to stream {stream_name} - if this is the first time this action has run and you " + "expected output, please wait a couple minutes and refresh." + ] + finally: + stream.close() + + # for logs that use Kafka topics with multiple partitions underneath or retrieved by S3LogsReader, + # data ordering is not guaranteed - so we'll sort based on log timestamp set by producer. + lines = paasta_logs.sorted_lines() + malformed = ( + [f"{paasta_logs.malformed_lines} encountered while retrieving logs"] if paasta_logs.malformed_lines else [] + ) try: location_selector = f"-s {paasta_cluster}" if "prod" in paasta_cluster else f'-e {paasta_cluster.split("-")[1]}' except IndexError: location_selector = f"-s {paasta_cluster}" truncation_message = ( [ - f"This output is truncated. Use this command to view all lines: scribereader {location_selector} {stream_name} --min-date {min_date.date()} --max-date {max_date.date()} | jq --raw-output 'select(.tron_run_number=={int(run_num)} and .component == \"{component}\") | .message'" + f"This output is truncated. Use this command to view all lines: scribereader {location_selector} {stream_name} --min-date {min_date.date()} --max-date {max_date.date()} | jq --raw-output 'select(.tron_run_number=={int(paasta_logs.run_num)} and .component == \"{component}\") | .message'" ] if max_date else [ - f"This output is truncated. Use this command to view all lines: scribereader {location_selector} {stream_name} --min-date {min_date.date()} | jq --raw-output 'select(.tron_run_number=={int(run_num)} and .component == \"{component}\") | .message'" + f"This output is truncated. Use this command to view all lines: scribereader {location_selector} {stream_name} --min-date {min_date.date()} | jq --raw-output 'select(.tron_run_number=={int(paasta_logs.run_num)} and .component == \"{component}\") | .message'" ] ) - truncated = truncation_message if truncated_output else [] + truncated = truncation_message if paasta_logs.truncated_output else [] return lines + malformed + truncated diff --git a/yelp_package/extra_requirements_yelp.txt b/yelp_package/extra_requirements_yelp.txt index 4d3333148..20224d247 100644 --- a/yelp_package/extra_requirements_yelp.txt +++ b/yelp_package/extra_requirements_yelp.txt @@ -1,5 +1,14 @@ clusterman-metrics==2.2.1 # used by tron for pre-scaling for Spark runs -scribereader==0.14.1 # used by tron to get tronjob logs -yelp-clog==5.2.3 # scribereader dependency +dateglob==1.1.1 # required by yelp-logging +geogrid==2.1.0 # required by yelp-logging +monk==3.0.4 # required by yelp-clog +ply==3.11 # required by thriftpy2 +scribereader==1.1.1 # used by tron to get tronjob logs +simplejson==3.19.2 # required by yelp-logging +srv-configs==1.3.4 # required by monk +tenacity==8.2.3 # required by yelp-logging +thriftpy2==0.4.20 # required by monk +yelp-cgeom==1.3.1 # required by geogrid +yelp-clog==7.1.1 # scribereader dependency yelp-logging==4.17.0 # scribereader dependency yelp-meteorite==2.1.1 # used by task-processing to emit metrics, clusterman-metrics dependency