From 6ca8016380ec8bf53c146974314b0a94fc609e7b Mon Sep 17 00:00:00 2001 From: anthony sottile <103459774+asottile-sentry@users.noreply.github.com> Date: Thu, 22 Aug 2024 10:00:07 -0400 Subject: [PATCH] [mock-data] collect test data... slowly (#551) --- .github/workflows/mock-data.yml | 15 +-- mini-relay/classified/.gitignore | 2 + mini-relay/classify_data.py | 161 ------------------------------- mini-relay/run_tests.py | 145 ++++++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 172 deletions(-) create mode 100644 mini-relay/classified/.gitignore delete mode 100644 mini-relay/classify_data.py create mode 100644 mini-relay/run_tests.py diff --git a/.github/workflows/mock-data.yml b/.github/workflows/mock-data.yml index 00becc17..85df7814 100644 --- a/.github/workflows/mock-data.yml +++ b/.github/workflows/mock-data.yml @@ -2,24 +2,17 @@ name: mock-data.yml on: push: branches: [master, test-me-*] - #pull_request: # currently fails - distracting when using empower PRs for CodeCov demo + pull_request: jobs: mock-data: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - run: docker compose up --wait - - run: while ! curl --fail --silent http://localhost:3000; do sleep .5; done - timeout-minutes: 1 + - run: docker compose build - uses: actions/setup-python@v4 with: python-version: 3.12 - run: pip install -r tda/requirements.txt - - run: | - SLEEP_LENGTH=0 BACKENDS=flask RUN_ID=${{ github.run_id }}_flask TDA_CONFIG=tda/config.local.yaml pytest tda/desktop_web - - run: | - echo 'waiting for eventual consistency...' - sleep 10 - python3 mini-relay/classify_data.py mini-relay/data out - tree out + - run: python3 -uS mini-relay/run_tests.py + - run: tree mini-relay/classified diff --git a/mini-relay/classified/.gitignore b/mini-relay/classified/.gitignore new file mode 100644 index 00000000..d6b7ef32 --- /dev/null +++ b/mini-relay/classified/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/mini-relay/classify_data.py b/mini-relay/classify_data.py deleted file mode 100644 index 36c8cc0b..00000000 --- a/mini-relay/classify_data.py +++ /dev/null @@ -1,161 +0,0 @@ -from __future__ import annotations - -import argparse -import collections -import json -import os.path -from typing import IO -from typing import NamedTuple - - -class Item(NamedTuple): - header: bytes - body: bytes - - @property - def header_type(self) -> str: - return json.loads(self.header)['type'] - - def get_se(self) -> str | None: - try: - return json.loads(self.body)['tags']['se'] - except (ValueError, KeyError): - return None - - -class Envelope(NamedTuple): - project_id: int - ts: float - header: bytes - items: tuple[Item, ...] - - def get_se(self) -> str | None: - se = None - for item in self.items: - cand_se = item.get_se() - if se is None and cand_se is not None: - se = cand_se - elif se is not None and cand_se is not None and cand_se != se: - raise AssertionError(f'mixed se? {se=} {cand_se=}') - return se - - def get_trace_id(self) -> str | None: - try: - return json.loads(self.header)['trace']['trace_id'] - except (ValueError, KeyError): - return None - - def is_unclassifiable(self) -> bool: - return all( - item.header_type in { - 'session', 'sessions', - # TODO: include tags somehow - 'statsd', 'metric_meta', - 'client_report', - } - for item in self.items - ) - - def debug(self) -> str: - return f'{self.project_id}/{self.ts} {", ".join(item.header_type for item in self.items)}' - - -def _parse_items(bio: IO[bytes]) -> tuple[Item, ...]: - ret = [] - while True: - header = bio.readline() - if not header: - break - contents = json.loads(header) - if 'length' in contents: - body = bio.read(contents['length']) + bio.read(1) - else: - body = bio.readline() - ret.append(Item(header=header, body=body)) - return tuple(ret) - - -def main() -> int: - parser = argparse.ArgumentParser() - parser.add_argument('src') - parser.add_argument('dest') - args = parser.parse_args() - - envelopes = [] - - for subdir in sorted(os.listdir(args.src)): - subdir = os.path.join(args.src, subdir) - if not os.path.isdir(subdir): - continue - project_id = int(os.path.basename(subdir)) - - for event_file in sorted(os.listdir(subdir), key=float): - event_file = os.path.join(subdir, event_file) - ts = float(os.path.basename(event_file)) - - with open(event_file, 'rb') as f: - header = f.readline() - - items = _parse_items(f) - envelopes.append( - Envelope( - project_id=project_id, - ts=ts, - header=header, - items=items, - ), - ) - - trace_id_to_se = {} - by_se = collections.defaultdict(list) - - # can't classify these :( - unclassifiable = [] - - while envelopes: - new = [] - - for envelope in envelopes: - se = envelope.get_se() - - trace_id = envelope.get_trace_id() - - if se is None and trace_id is not None: - se = trace_id_to_se.get(trace_id) - - if se is not None: - by_se[se].append(envelope) - - if trace_id is not None: - trace_id_to_se[trace_id] = se - - elif envelope.is_unclassifiable(): - unclassifiable.append(envelope) - else: - new.append(envelope) - - if len(new) == len(envelopes): - for envelope in new: - print(envelope.debug()) - raise AssertionError(f'unable to classify {len(new)} envelopes') - else: - envelopes = new - - print(f'ignoring {len(unclassifiable)} unclassifiable envelopes') - os.makedirs(args.dest, exist_ok=True) - for k, v in by_se.items(): - k = k.replace('/', '__') - for envelope in v: - fname = f'{args.dest}/{k}/{envelope.project_id}/{envelope.ts}' - os.makedirs(os.path.dirname(fname), exist_ok=True) - with open(fname, 'wb') as f: - f.write(envelope.header) - for item in envelope.items: - f.write(item.header) - f.write(item.body) - - return 0 - - -if __name__ == '__main__': - raise SystemExit(main()) diff --git a/mini-relay/run_tests.py b/mini-relay/run_tests.py new file mode 100644 index 00000000..f2aa0a05 --- /dev/null +++ b/mini-relay/run_tests.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import argparse +import contextlib +import datetime +import os +import shlex +import shutil +import subprocess +import time +import urllib.request +from typing import Generator + + +_PYTEST_ENV = { + 'SLEEP_LENGTH': '0', + 'BACKENDS': 'flask', + 'TDA_CONFIG': 'tda/config.local.yaml', +} + + +def _print_cmd(*cmd: str, **env: str) -> None: + env_s = ' '.join(f'{k}={shlex.quote(v)}' for k, v in env.items()) + sp = ' ' if env_s else '' + print(f'+ {env_s}{sp}{shlex.join(cmd)}') + + +def _run_q(*cmd: str) -> None: + _print_cmd(*cmd) + ret = subprocess.call( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + if ret: + raise SystemExit(f'`{shlex.join(cmd)}` raised {ret}') + + +def _discover_tests(args: list[str]) -> list[str]: + cmd = ('pytest', 'tda/desktop_web', '--collect-only', '--quiet', *args) + _print_cmd(*cmd, **_PYTEST_ENV) + + out = subprocess.run( + cmd, + env={**os.environ, **_PYTEST_ENV}, + capture_output=True, + text=True, + ) + if out.returncode: + raise SystemExit( + f'pytest discovery failed (exit {out.returncode}):\n' + f'{out.stdout}{out.stderr}'.rstrip(), + ) + + tests = [s for s in out.stdout.splitlines() if s.startswith('tda/')] + if not tests: + raise SystemExit('did not discover any tests!') + + return tests + + +def _wait_for_started(url: str) -> None: + for _ in range(10): + try: + urllib.request.urlopen(url).read() + except OSError: + print('... not started yet') + time.sleep(.5) + else: + break + else: + raise SystemExit(f'server at {url} never started!') + + +@contextlib.contextmanager +def _testctx(testname: str) -> Generator[None]: + print(f'running {testname}...') + + # clear out existing data before we collect new data + _run_q( + 'docker', 'compose', 'run', 'mini-relay', + 'bash', '-c', + 'find /data -mindepth 1 -maxdepth 1 -type d | ' + 'xargs --no-run-if-empty rm -r', + ) + + _run_q('docker', 'compose', 'up', '--wait') + try: + print('waiting for docker-compose to be up...') + _wait_for_started('http://localhost:3000') + except BaseException: + _run_q('docker', 'compose', 'down') + raise + + try: + yield + finally: + print('... waiting for eventual consistency') + time.sleep(10) + print('... trying SIGINT first') + _run_q( + 'docker', 'compose', 'kill', + # processes are much more likely to bind SIGINT than SIGTERM + '--signal=SIGINT', + # specifically leaving postgres / mini-relay running + 'react', 'ruby', 'flask', + ) + time.sleep(2) + _run_q('docker', 'compose', 'down') + + testpart = testname.rpartition('::')[-1] + testpart = testpart.partition('[')[0] + + print('... saving mock data') + dt = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + target = os.path.join('mini-relay', 'classified', f'{dt}_{testpart}') + os.makedirs(os.path.dirname(target), exist_ok=True) + shutil.copytree('mini-relay/data', target) + + +def main() -> int: + parser = argparse.ArgumentParser( + usage='%(prog)s [options] [PYTEST_OPTIONS]', + ) + _, rest = parser.parse_known_args() + + if not os.path.exists('tda'): + raise SystemExit('expected to run from root of `empower`') + + print('discovering tests...') + testlist = _discover_tests(rest) + print(f'=> discovered {len(testlist)} tests') + + ret = 0 + for testname in testlist: + with _testctx(testname): + cmd = ('pytest', '-qq', testname) + _print_cmd(*cmd, **_PYTEST_ENV) + ret = ret or subprocess.call(cmd, env={**os.environ, **_PYTEST_ENV}) + + return ret + + +if __name__ == '__main__': + raise SystemExit(main())