diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 76555e5..e02d2a9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,6 +49,7 @@ jobs: strategy: matrix: python-version: ["3.9", "3.10", "3.11", "3.12"] + toxenv: ["test", "pytest7"] fail-fast: false steps: - uses: actions/checkout@v2 @@ -59,7 +60,10 @@ jobs: - name: Install dependencies run: python -m pip install --upgrade pip setuptools tox - name: Run tests - run: python -m tox --recreate -e test + env: + # we hit https://github.com/dateutil/dateutil/issues/1314 via pandas/plotly + PYTHONWARNINGS: ignore::DeprecationWarning:dateutil + run: python -m tox --recreate -e ${{ matrix.toxenv }} release: runs-on: ubuntu-latest diff --git a/deps/check.txt b/deps/check.txt index fc82781..34b90eb 100644 --- a/deps/check.txt +++ b/deps/check.txt @@ -5,14 +5,14 @@ # pip-compile --annotation-style=line --output-file=deps/check.txt deps/check.in # attrs==23.2.0 # via flake8-bugbear -autoflake==2.2.1 # via shed +autoflake==2.3.0 # via shed bandit==1.7.7 # via flake8-bandit -black==24.1.1 # via shed +black==24.2.0 # via shed click==8.1.7 # via black com2ann==0.3.0 # via shed flake8==7.0.0 # via -r deps/check.in, flake8-bandit, flake8-bugbear, flake8-builtins, flake8-comprehensions, flake8-docstrings, flake8-print, pep8-naming flake8-bandit==4.1.1 # via -r deps/check.in -flake8-bugbear==24.1.17 # via -r deps/check.in +flake8-bugbear==24.2.6 # via -r deps/check.in flake8-builtins==2.2.0 # via -r deps/check.in flake8-comprehensions==3.14.0 # via -r deps/check.in flake8-docstrings==1.7.0 # via -r deps/check.in @@ -33,7 +33,7 @@ pycodestyle==2.11.1 # via flake8, flake8-print pydocstyle==6.3.0 # via flake8-docstrings pyflakes==3.2.0 # via autoflake, flake8 pygments==2.17.2 # via rich -pyupgrade==3.15.0 # via shed +pyupgrade==3.15.1 # via shed pyyaml==6.0.1 # via bandit, libcst rich==13.7.0 # via bandit shed==2024.1.1 # via -r deps/check.in @@ -41,7 +41,7 @@ snowballstemmer==2.2.0 # via pydocstyle stevedore==5.1.0 # via bandit tokenize-rt==5.2.0 # via pyupgrade tomli==2.0.1 # via autoflake, black, mypy -types-requests==2.31.0.20240125 # via -r deps/check.in +types-requests==2.31.0.20240218 # via -r deps/check.in typing-extensions==4.9.0 # via black, libcst, mypy, typing-inspect typing-inspect==0.9.0 # via libcst -urllib3==2.2.0 # via types-requests +urllib3==2.2.1 # via types-requests diff --git a/deps/docs.txt b/deps/docs.txt index 2d9d62c..49770b2 100644 --- a/deps/docs.txt +++ b/deps/docs.txt @@ -7,7 +7,7 @@ alabaster==0.7.16 # via sphinx attrs==23.2.0 # via hypothesis babel==2.14.0 # via sphinx -black==24.1.1 # via hypofuzz (setup.py), hypothesis +black==24.2.0 # via hypofuzz (setup.py), hypothesis blinker==1.7.0 # via flask certifi==2024.2.2 # via requests charset-normalizer==3.3.2 # via requests @@ -19,8 +19,8 @@ dash-html-components==2.0.0 # via dash dash-table==5.0.0 # via dash docutils==0.20.1 # via myst-parser, pybtex-docutils, sphinx, sphinx-rtd-theme, sphinxcontrib-bibtex exceptiongroup==1.2.0 # via hypothesis, pytest -flask==3.0.1 # via dash -hypothesis[cli]==6.97.4 # via hypofuzz (setup.py) +flask==3.0.2 # via dash +hypothesis[cli]==6.98.8 # via hypofuzz (setup.py) idna==3.6 # via requests imagesize==1.4.1 # via sphinx importlib-metadata==7.0.1 # via dash @@ -30,24 +30,24 @@ jinja2==3.1.3 # via flask, myst-parser, sphinx latexcodec==2.0.1 # via pybtex libcst==1.1.0 # via hypofuzz (setup.py) markdown-it-py==3.0.0 # via mdit-py-plugins, myst-parser, rich -markupsafe==2.1.4 # via jinja2, werkzeug +markupsafe==2.1.5 # via jinja2, werkzeug mdit-py-plugins==0.4.0 # via myst-parser mdurl==0.1.2 # via markdown-it-py mypy-extensions==1.0.0 # via black, typing-inspect myst-parser==2.0.0 # via -r deps/docs.in nest-asyncio==1.6.0 # via dash -numpy==1.26.3 # via pandas +numpy==1.26.4 # via pandas packaging==23.2 # via black, plotly, pytest, sphinx pandas==2.2.0 # via hypofuzz (setup.py) pathspec==0.12.1 # via black platformdirs==4.2.0 # via black -plotly==5.18.0 # via dash +plotly==5.19.0 # via dash pluggy==1.4.0 # via pytest psutil==5.9.8 # via hypofuzz (setup.py) pybtex==0.24.0 # via pybtex-docutils, sphinxcontrib-bibtex pybtex-docutils==1.0.3 # via sphinxcontrib-bibtex pygments==2.17.2 # via rich, sphinx -pytest==8.0.0 # via hypofuzz (setup.py) +pytest==8.0.1 # via hypofuzz (setup.py) python-dateutil==2.8.2 # via pandas pytz==2024.1 # via pandas pyyaml==6.0.1 # via libcst, myst-parser, pybtex @@ -72,8 +72,8 @@ tenacity==8.2.3 # via plotly tomli==2.0.1 # via black, pytest typing-extensions==4.9.0 # via black, dash, libcst, typing-inspect typing-inspect==0.9.0 # via libcst -tzdata==2023.4 # via pandas -urllib3==2.2.0 # via requests +tzdata==2024.1 # via pandas +urllib3==2.2.1 # via requests werkzeug==3.0.1 # via dash, flask zipp==3.17.0 # via importlib-metadata diff --git a/deps/test-old.in b/deps/test-old.in new file mode 100644 index 0000000..f47b399 --- /dev/null +++ b/deps/test-old.in @@ -0,0 +1,2 @@ +-r ./test.in +pytest < 8.0 diff --git a/deps/test-old.txt b/deps/test-old.txt new file mode 100644 index 0000000..7b42438 --- /dev/null +++ b/deps/test-old.txt @@ -0,0 +1,64 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --annotation-style=line --output-file=deps/test-old.txt deps/test-old.in setup.py +# +attrs==23.2.0 # via hypothesis +black==24.2.0 # via hypofuzz (setup.py), hypothesis +blinker==1.7.0 # via flask +certifi==2024.2.2 # via requests +charset-normalizer==3.3.2 # via requests +click==8.1.7 # via black, flask, hypothesis +coverage[toml]==7.4.1 # via hypofuzz (setup.py), pytest-cov +dash==2.15.0 # via hypofuzz (setup.py) +dash-core-components==2.0.0 # via dash +dash-html-components==2.0.0 # via dash +dash-table==5.0.0 # via dash +exceptiongroup==1.2.0 # via hypothesis, pytest +execnet==2.0.2 # via pytest-xdist +flask==3.0.2 # via dash +hypothesis[cli]==6.98.8 # via hypofuzz (setup.py) +idna==3.6 # via requests +importlib-metadata==7.0.1 # via dash +iniconfig==2.0.0 # via pytest +itsdangerous==2.1.2 # via flask +jinja2==3.1.3 # via flask +libcst==1.1.0 # via hypofuzz (setup.py) +markdown-it-py==3.0.0 # via rich +markupsafe==2.1.5 # via jinja2, werkzeug +mdurl==0.1.2 # via markdown-it-py +mypy-extensions==1.0.0 # via black, typing-inspect +nest-asyncio==1.6.0 # via dash +numpy==1.26.4 # via pandas, pyarrow +packaging==23.2 # via black, plotly, pytest +pandas==2.2.0 # via hypofuzz (setup.py) +pathspec==0.12.1 # via black +platformdirs==4.2.0 # via black +plotly==5.19.0 # via dash +pluggy==1.4.0 # via pytest +psutil==5.9.8 # via hypofuzz (setup.py) +pyarrow==15.0.0 # via -r deps/./test.in +pygments==2.17.2 # via rich +pytest==7.4.4 # via -r deps/./test.in, -r deps/test-old.in, hypofuzz (setup.py), pytest-cov, pytest-xdist +pytest-cov==4.1.0 # via -r deps/./test.in +pytest-xdist==3.5.0 # via -r deps/./test.in +python-dateutil @ git+https://github.com/dateutil/dateutil.git@296d419fe6bf3b22897f8f210735ac9c4e1cb796 # via -r deps/./test.in, pandas +pytz==2024.1 # via pandas +pyyaml==6.0.1 # via libcst +requests==2.31.0 # via dash, hypofuzz (setup.py) +retrying==1.3.4 # via dash +rich==13.7.0 # via hypothesis +six==1.16.0 # via python-dateutil, retrying +sortedcontainers==2.4.0 # via hypothesis +tenacity==8.2.3 # via plotly +tomli==2.0.1 # via black, coverage, pytest +typing-extensions==4.9.0 # via black, dash, libcst, typing-inspect +typing-inspect==0.9.0 # via libcst +tzdata==2024.1 # via pandas +urllib3==2.2.1 # via requests +werkzeug==3.0.1 # via dash, flask +zipp==3.17.0 # via importlib-metadata + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/deps/test.in b/deps/test.in index 2206063..aeba90d 100644 --- a/deps/test.in +++ b/deps/test.in @@ -2,3 +2,8 @@ pytest pytest-cov pytest-xdist + +pyarrow # avoid Pandas deprecation warning via plotly/dash + +# avoid https://github.com/dateutil/dateutil/issues/1314 +git+https://github.com/dateutil/dateutil.git@296d419fe6bf3b22897f8f210735ac9c4e1cb796 diff --git a/deps/test.txt b/deps/test.txt index 5636d1b..395dd2d 100644 --- a/deps/test.txt +++ b/deps/test.txt @@ -5,7 +5,7 @@ # pip-compile --annotation-style=line --output-file=deps/test.txt deps/test.in setup.py # attrs==23.2.0 # via hypothesis -black==24.1.1 # via hypofuzz (setup.py), hypothesis +black==24.2.0 # via hypofuzz (setup.py), hypothesis blinker==1.7.0 # via flask certifi==2024.2.2 # via requests charset-normalizer==3.3.2 # via requests @@ -17,8 +17,8 @@ dash-html-components==2.0.0 # via dash dash-table==5.0.0 # via dash exceptiongroup==1.2.0 # via hypothesis, pytest execnet==2.0.2 # via pytest-xdist -flask==3.0.1 # via dash -hypothesis[cli]==6.97.4 # via hypofuzz (setup.py) +flask==3.0.2 # via dash +hypothesis[cli]==6.98.8 # via hypofuzz (setup.py) idna==3.6 # via requests importlib-metadata==7.0.1 # via dash iniconfig==2.0.0 # via pytest @@ -26,23 +26,24 @@ itsdangerous==2.1.2 # via flask jinja2==3.1.3 # via flask libcst==1.1.0 # via hypofuzz (setup.py) markdown-it-py==3.0.0 # via rich -markupsafe==2.1.4 # via jinja2, werkzeug +markupsafe==2.1.5 # via jinja2, werkzeug mdurl==0.1.2 # via markdown-it-py mypy-extensions==1.0.0 # via black, typing-inspect nest-asyncio==1.6.0 # via dash -numpy==1.26.3 # via pandas +numpy==1.26.4 # via pandas, pyarrow packaging==23.2 # via black, plotly, pytest pandas==2.2.0 # via hypofuzz (setup.py) pathspec==0.12.1 # via black platformdirs==4.2.0 # via black -plotly==5.18.0 # via dash +plotly==5.19.0 # via dash pluggy==1.4.0 # via pytest psutil==5.9.8 # via hypofuzz (setup.py) +pyarrow==15.0.0 # via -r deps/test.in pygments==2.17.2 # via rich -pytest==8.0.0 # via -r deps/test.in, hypofuzz (setup.py), pytest-cov, pytest-xdist +pytest==8.0.1 # via -r deps/test.in, hypofuzz (setup.py), pytest-cov, pytest-xdist pytest-cov==4.1.0 # via -r deps/test.in pytest-xdist==3.5.0 # via -r deps/test.in -python-dateutil==2.8.2 # via pandas +python-dateutil @ git+https://github.com/dateutil/dateutil.git@296d419fe6bf3b22897f8f210735ac9c4e1cb796 # via -r deps/test.in, pandas pytz==2024.1 # via pandas pyyaml==6.0.1 # via libcst requests==2.31.0 # via dash, hypofuzz (setup.py) @@ -54,8 +55,8 @@ tenacity==8.2.3 # via plotly tomli==2.0.1 # via black, coverage, pytest typing-extensions==4.9.0 # via black, dash, libcst, typing-inspect typing-inspect==0.9.0 # via libcst -tzdata==2023.4 # via pandas -urllib3==2.2.0 # via requests +tzdata==2024.1 # via pandas +urllib3==2.2.1 # via requests werkzeug==3.0.1 # via dash, flask zipp==3.17.0 # via importlib-metadata diff --git a/docs-src/changelog.md b/docs-src/changelog.md index e5410b5..b838d1a 100644 --- a/docs-src/changelog.md +++ b/docs-src/changelog.md @@ -2,6 +2,9 @@ HypoFuzz uses [calendar-based versioning](https://calver.org/), with a `YY-MM-patch` format. +## 24.02.3 +Fixed compatibility with Pytest 8.x ([#32](https://github.com/Zac-HD/hypofuzz/issues/32)). + ## 24.02.2 Fixed a dashboard bug ([#31](https://github.com/Zac-HD/hypofuzz/issues/31)). diff --git a/src/hypofuzz/__init__.py b/src/hypofuzz/__init__.py index 877e021..063d9f6 100644 --- a/src/hypofuzz/__init__.py +++ b/src/hypofuzz/__init__.py @@ -1,4 +1,4 @@ """Adaptive fuzzing for property-based tests using Hypothesis.""" -__version__ = "24.2.2" +__version__ = "24.2.3" __all__: list = [] diff --git a/src/hypofuzz/dashboard.py b/src/hypofuzz/dashboard.py index 37dfccd..cde870e 100644 --- a/src/hypofuzz/dashboard.py +++ b/src/hypofuzz/dashboard.py @@ -1,363 +1,363 @@ -"""Live web dashboard for a fuzzing run.""" - -import atexit -import datetime -import os -import signal -from typing import List, Tuple - -import black -import dash -import flask -import plotly.express as px -import plotly.graph_objects as go -from dash import dcc, html -from dash.dependencies import Input, Output -from hypothesis.configuration import storage_directory - -from .patching import make_and_save_patches - -DATA_TO_PLOT = [{"nodeid": "", "elapsed_time": 0, "ninputs": 0, "branches": 0}] -LAST_UPDATE: dict = {} - -PYTEST_ARGS = None - -headings = ["nodeid", "elapsed time", "ninputs", "since new cov", "branches", "note"] -app = flask.Flask(__name__, static_folder=os.path.abspath("pycrunch-recordings")) - -try: - import flask_cors - from pycrunch_trace.oop.safe_filename import SafeFilename -except ImportError: - SafeFilename = None -else: - flask_cors.CORS(app) - - -@app.route("/", methods=["POST"]) # type: ignore -def recv_data() -> Tuple[str, int]: - data = flask.request.json - if not isinstance(data, list): - data = [data] - for d in data: - add_data(d) - return "", 200 - - -def add_data(d: dict) -> None: - if not LAST_UPDATE: - del DATA_TO_PLOT[0] - DATA_TO_PLOT.append( - {k: d[k] for k in ["nodeid", "elapsed_time", "ninputs", "branches"]} - ) - LAST_UPDATE[d["nodeid"]] = d - - -external_stylesheets = ["https://codepen.io/chriddyp/pen/bWLwgP.css"] -board = dash.Dash(__name__, server=app, external_stylesheets=external_stylesheets) -board.layout = html.Div( - children=[ - # represents the URL bar, doesn't render anything - dcc.Location(id="url", refresh=False), - html.H1( - children=[ - html.A("HypoFuzz", href="https://hypofuzz.com"), - " Live Dashboard", - ] - ), - html.Div(id="page-content"), - dcc.Interval(id="interval-component", interval=5000), # time in millis - ] -) - - -def row_for(data: dict, include_link: bool = True, *extra: object) -> html.Tr: - parts = [] - if include_link: - parts.append( - dcc.Link(data["nodeid"], href="/" + data["nodeid"].replace("/", "_")) - ) - if "elapsed_time" in data: - parts.append(str(datetime.timedelta(seconds=int(data["elapsed_time"])))) - else: - parts.append("") - for key in headings[2:]: - parts.append(data.get(key, "")) - return html.Tr([html.Td(p) for p in parts + [str(e) for e in extra]]) - - -def try_format(code: str) -> str: - try: - return black.format_str(code, mode=black.FileMode()) - except Exception: - return code - - -@board.callback( # type: ignore - Output("page-content", "children"), - [Input("url", "pathname")], -) -def display_page(pathname: str) -> html.Div: - # Main page - if pathname == "/" or pathname is None: - return html.Div( - children=[ - html.Div("Total branch coverage for each test."), - dcc.Graph(id="live-update-graph"), - html.Button("Toggle log-xaxis", id="xaxis-state", n_clicks=0), - html.Div( - dcc.Link( - "See patches with covering and/or failing examples", - href="/patches/", - refresh=True, - ) - ), - html.Div(html.Table(id="summary-table-rows")), - html.Div("Estimated number of inputs to discover new coverage or bugs"), - html.Div(html.Table(id="estimators-table-rows")), - ] - ) - - # Target-specific subpages - nodeid = pathname[1:] - trace = [ - d - for d in DATA_TO_PLOT - if d["nodeid"].replace("/", "_") == nodeid # type: ignore - ] - if not trace: - return html.Div( - children=[ - dcc.Link("Back to main dashboard", href="/"), - html.P(["No results for this test function yet."]), - ] - ) - fig1 = px.line( - trace, x="ninputs", y="branches", line_shape="hv", hover_data=["elapsed_time"] - ) - fig2 = px.line( - trace, x="elapsed_time", y="branches", line_shape="hv", hover_data=["ninputs"] - ) - last_update = LAST_UPDATE[trace[-1]["nodeid"]] - add: List[str] = [] - if "failures" in last_update: - for failures in last_update["failures"]: - failures[0] = try_format(failures[0]) - add.extend(html.Pre(children=[html.Code(children=[x])]) for x in failures) - if SafeFilename: - url = f"http://{flask.request.host}/pycrunch-recordings/{SafeFilename(nodeid)}/session.chunked" - link = dcc.Link( - "Debug failing example online with pytrace", - href=f"https://app.pytrace.com/?open={url}", - ) - add.append(html.Pre(children=[link])) - - _seen_cov_examples = set() - covering_examples = [] - for row in last_update.get("seed_pool", []): - example = try_format(row[1]), row[2] - if example not in _seen_cov_examples: - _seen_cov_examples.add(example) - covering_examples.append(example) - - return html.Div( - children=[ - dcc.Link("Back to main dashboard", href="/"), - html.P( - children=[ - "Example count by status: ", - str(last_update.get("status_counts", "???")), - ] - ), - html.Table( - children=[ - html.Tr( - [html.Th(h) for h in headings[1:]] + [html.Th(["seed count"])] - ), - row_for(last_update, False, len(last_update.get("seed_pool", []))), - ] - ), - *add, - dcc.Graph(id=f"graph-of-{pathname}-1", figure=fig1), - dcc.Graph(id=f"graph-of-{pathname}-2", figure=fig2), - html.H3(["Minimal covering examples"]), - html.P( - [ - "Each additional example shown below covers at least one branch " - "not covered by any previous, more-minimal, example." - ] - ), - *(html.Pre([html.Code([*ex, "\n"])]) for ex in covering_examples), - ] - ) - - -FIRST_FAILED_AT = {} - - -@board.callback( # type: ignore - Output("live-update-graph", "figure"), - [Input("interval-component", "n_intervals"), Input("xaxis-state", "n_clicks")], -) -def update_graph_live(n: int, clicks: int) -> object: - fig = px.line( - DATA_TO_PLOT, - x="ninputs", - y="branches", - color="nodeid", - line_shape="hv", - hover_data=["elapsed_time"], - log_x=bool(clicks % 2), - ) - failing = { - d["nodeid"]: (d["ninputs"], d["branches"]) - for d in LAST_UPDATE.values() - if d.get("status_counts", {}).get("INTERESTING", 0) - } - for k, v in failing.items(): - if k not in FIRST_FAILED_AT: - FIRST_FAILED_AT[k] = v - for series, symbol in [(failing, "🔍"), (FIRST_FAILED_AT, "💥")]: - if series: - xs, ys = zip(*series.values()) - fig.add_trace( - go.Scatter( - x=xs, - y=ys, - mode="text", - text=symbol, - showlegend=False, - ) - ) - fig.update_layout( - height=800, - legend_yanchor="top", - legend_xanchor="left", - legend_y=-0.08, - legend_x=0, - ) - # Setting this to a constant prevents data updates clobbering zoom / selections - fig.layout.uirevision = "this key never changes" - return fig - - -@board.callback( # type: ignore - Output("summary-table-rows", "children"), - [Input("interval-component", "n_intervals")], -) -def update_table_live(n: int) -> object: - return [html.Tr([html.Th(h) for h in headings])] + [ - row_for(data) for name, data in sorted(LAST_UPDATE.items()) if name - ] - - -def estimators(data: dict) -> dict: - since_new_cov = data["since new cov"] - ninputs = data["ninputs"] - loaded_from_db = data["loaded_from_db"] - return { - "branch_lower": 1 / (since_new_cov + 1), - "branch_upper": 1 / (max(ninputs - loaded_from_db, 0) + 1), - "bug": 1 if data.get("failures") else 1 / max(ninputs + 1, loaded_from_db), - } - - -@board.callback( # type: ignore - Output("estimators-table-rows", "children"), - [Input("interval-component", "n_intervals")], -) -def update_estimators_table(n: int) -> object: - contents = [html.Col(), html.Colgroup(span=1)] + [ - html.Colgroup(span=2) for _ in range(2) - ] - colnames = ["nodeid", "branch-lower", "branch-upper", "bug"] - contents = [html.Tr([html.Th(h) for h in colnames])] - - for _, d in sorted(LAST_UPDATE.items()): - try: - est = estimators(d) - except KeyError: - continue - row = [ - d["nodeid"], - int(1 / est["branch_lower"]), - int(1 / est["branch_upper"]), - int(1 / est["bug"]), - ] - contents.append(html.Tr([html.Td(x) for x in row])) - return contents - - -@app.route("/pycrunch-recordings/") # type: ignore -def download_file(name: str) -> flask.Response: - return flask.send_from_directory( - directory="pycrunch-recordings", - path=name, - mimetype="application/octet-stream", - ) - - -@app.route("/patches/") # type: ignore -def download_patch(name: str) -> flask.Response: - return flask.send_from_directory( - directory=os.path.relpath(storage_directory("patches"), app.root_path), - path=name, - mimetype="application/octet-stream", - ) - - -@app.route("/patches/") # type: ignore -def patch_summary() -> flask.Response: - patches = make_and_save_patches(PYTEST_ARGS, LAST_UPDATE) - if not patches: - return """ - HypoFuzz patches - - Waiting for examples, please refresh the page in minute or so. - """ - show = "fail" - if show not in patches: - show = "cov" - patch_path = patches[show] - describe = {"fail": "failing", "cov": "covering", "all": "covering and failing"} - links = "\n".join( - f'
  • patch with {v} examples
  • ' - for k, v in describe.items() - if k in patches - ) - return f""" - - HypoFuzz patches - - - - - -

    Download links

    - -

    Latest {describe[show]} patch

    -
    {patch_path.read_text()}
    - - """ - - -def start_dashboard_process( - port: int, *, pytest_args: list, host: str = "localhost", debug: bool = False -) -> None: - global PYTEST_ARGS - PYTEST_ARGS = pytest_args - - # Ensure that we dump whatever patches are ready before shutting down - def signal_handler(signum, frame): # type: ignore - make_and_save_patches(pytest_args, LAST_UPDATE) - if old_handler in (signal.SIG_DFL, None): - return old_handler - elif old_handler is not signal.SIG_IGN: - return old_handler(signum, frame) - raise NotImplementedError("Unreachable") - - old_handler = signal.signal(signal.SIGTERM, signal_handler) - atexit.register(make_and_save_patches, pytest_args, LAST_UPDATE) - - print(f"\n\tNow serving dashboard at http://{host}:{port}/\n") # noqa - app.run(host=host, port=port, debug=debug) +"""Live web dashboard for a fuzzing run.""" + +import atexit +import datetime +import os +import signal +from typing import List, Tuple + +import black +import dash +import flask +import plotly.express as px +import plotly.graph_objects as go +from dash import dcc, html +from dash.dependencies import Input, Output +from hypothesis.configuration import storage_directory + +from .patching import make_and_save_patches + +DATA_TO_PLOT = [{"nodeid": "", "elapsed_time": 0, "ninputs": 0, "branches": 0}] +LAST_UPDATE: dict = {} + +PYTEST_ARGS = None + +headings = ["nodeid", "elapsed time", "ninputs", "since new cov", "branches", "note"] +app = flask.Flask(__name__, static_folder=os.path.abspath("pycrunch-recordings")) + +try: + import flask_cors + from pycrunch_trace.oop.safe_filename import SafeFilename +except ImportError: + SafeFilename = None +else: + flask_cors.CORS(app) + + +@app.route("/", methods=["POST"]) # type: ignore +def recv_data() -> Tuple[str, int]: + data = flask.request.json + if not isinstance(data, list): + data = [data] + for d in data: + add_data(d) + return "", 200 + + +def add_data(d: dict) -> None: + if not LAST_UPDATE: + del DATA_TO_PLOT[0] + DATA_TO_PLOT.append( + {k: d[k] for k in ["nodeid", "elapsed_time", "ninputs", "branches"]} + ) + LAST_UPDATE[d["nodeid"]] = d + + +external_stylesheets = ["https://codepen.io/chriddyp/pen/bWLwgP.css"] +board = dash.Dash(__name__, server=app, external_stylesheets=external_stylesheets) +board.layout = html.Div( + children=[ + # represents the URL bar, doesn't render anything + dcc.Location(id="url", refresh=False), + html.H1( + children=[ + html.A("HypoFuzz", href="https://hypofuzz.com"), + " Live Dashboard", + ] + ), + html.Div(id="page-content"), + dcc.Interval(id="interval-component", interval=5000), # time in millis + ] +) + + +def row_for(data: dict, include_link: bool = True, *extra: object) -> html.Tr: + parts = [] + if include_link: + parts.append( + dcc.Link(data["nodeid"], href="/" + data["nodeid"].replace("/", "_")) + ) + if "elapsed_time" in data: + parts.append(str(datetime.timedelta(seconds=int(data["elapsed_time"])))) + else: + parts.append("") + for key in headings[2:]: + parts.append(data.get(key, "")) + return html.Tr([html.Td(p) for p in parts + [str(e) for e in extra]]) + + +def try_format(code: str) -> str: + try: + return black.format_str(code, mode=black.FileMode()) + except Exception: + return code + + +@board.callback( # type: ignore + Output("page-content", "children"), + [Input("url", "pathname")], +) +def display_page(pathname: str) -> html.Div: + # Main page + if pathname == "/" or pathname is None: + return html.Div( + children=[ + html.Div("Total branch coverage for each test."), + dcc.Graph(id="live-update-graph"), + html.Button("Toggle log-xaxis", id="xaxis-state", n_clicks=0), + html.Div( + dcc.Link( + "See patches with covering and/or failing examples", + href="/patches/", + refresh=True, + ) + ), + html.Div(html.Table(id="summary-table-rows")), + html.Div("Estimated number of inputs to discover new coverage or bugs"), + html.Div(html.Table(id="estimators-table-rows")), + ] + ) + + # Target-specific subpages + nodeid = pathname[1:] + trace = [ + d + for d in DATA_TO_PLOT + if d["nodeid"].replace("/", "_") == nodeid # type: ignore + ] + if not trace: + return html.Div( + children=[ + dcc.Link("Back to main dashboard", href="/"), + html.P(["No results for this test function yet."]), + ] + ) + fig1 = px.line( + trace, x="ninputs", y="branches", line_shape="hv", hover_data=["elapsed_time"] + ) + fig2 = px.line( + trace, x="elapsed_time", y="branches", line_shape="hv", hover_data=["ninputs"] + ) + last_update = LAST_UPDATE[trace[-1]["nodeid"]] + add: List[str] = [] + if "failures" in last_update: + for failures in last_update["failures"]: + failures[0] = try_format(failures[0]) + add.extend(html.Pre(children=[html.Code(children=[x])]) for x in failures) + if SafeFilename: + url = f"http://{flask.request.host}/pycrunch-recordings/{SafeFilename(nodeid)}/session.chunked" + link = dcc.Link( + "Debug failing example online with pytrace", + href=f"https://app.pytrace.com/?open={url}", + ) + add.append(html.Pre(children=[link])) + + _seen_cov_examples = set() + covering_examples = [] + for row in last_update.get("seed_pool", []): + example = try_format(row[1]), row[2] + if example not in _seen_cov_examples: + _seen_cov_examples.add(example) + covering_examples.append(example) + + return html.Div( + children=[ + dcc.Link("Back to main dashboard", href="/"), + html.P( + children=[ + "Example count by status: ", + str(last_update.get("status_counts", "???")), + ] + ), + html.Table( + children=[ + html.Tr( + [html.Th(h) for h in headings[1:]] + [html.Th(["seed count"])] + ), + row_for(last_update, False, len(last_update.get("seed_pool", []))), + ] + ), + *add, + dcc.Graph(id=f"graph-of-{pathname}-1", figure=fig1), + dcc.Graph(id=f"graph-of-{pathname}-2", figure=fig2), + html.H3(["Minimal covering examples"]), + html.P( + [ + "Each additional example shown below covers at least one branch " + "not covered by any previous, more-minimal, example." + ] + ), + *(html.Pre([html.Code([*ex, "\n"])]) for ex in covering_examples), + ] + ) + + +FIRST_FAILED_AT = {} + + +@board.callback( # type: ignore + Output("live-update-graph", "figure"), + [Input("interval-component", "n_intervals"), Input("xaxis-state", "n_clicks")], +) +def update_graph_live(n: int, clicks: int) -> object: + fig = px.line( + DATA_TO_PLOT, + x="ninputs", + y="branches", + color="nodeid", + line_shape="hv", + hover_data=["elapsed_time"], + log_x=bool(clicks % 2), + ) + failing = { + d["nodeid"]: (d["ninputs"], d["branches"]) + for d in LAST_UPDATE.values() + if d.get("status_counts", {}).get("INTERESTING", 0) + } + for k, v in failing.items(): + if k not in FIRST_FAILED_AT: + FIRST_FAILED_AT[k] = v + for series, symbol in [(failing, "🔍"), (FIRST_FAILED_AT, "💥")]: + if series: + xs, ys = zip(*series.values()) + fig.add_trace( + go.Scatter( + x=xs, + y=ys, + mode="text", + text=symbol, + showlegend=False, + ) + ) + fig.update_layout( + height=800, + legend_yanchor="top", + legend_xanchor="left", + legend_y=-0.08, + legend_x=0, + ) + # Setting this to a constant prevents data updates clobbering zoom / selections + fig.layout.uirevision = "this key never changes" + return fig + + +@board.callback( # type: ignore + Output("summary-table-rows", "children"), + [Input("interval-component", "n_intervals")], +) +def update_table_live(n: int) -> object: + return [html.Tr([html.Th(h) for h in headings])] + [ + row_for(data) for name, data in sorted(LAST_UPDATE.items()) if name + ] + + +def estimators(data: dict) -> dict: + since_new_cov = data["since new cov"] + ninputs = data["ninputs"] + loaded_from_db = data["loaded_from_db"] + return { + "branch_lower": 1 / (since_new_cov + 1), + "branch_upper": 1 / (max(ninputs - loaded_from_db, 0) + 1), + "bug": 1 if data.get("failures") else 1 / max(ninputs + 1, loaded_from_db), + } + + +@board.callback( # type: ignore + Output("estimators-table-rows", "children"), + [Input("interval-component", "n_intervals")], +) +def update_estimators_table(n: int) -> object: + contents = [html.Col(), html.Colgroup(span=1)] + [ + html.Colgroup(span=2) for _ in range(2) + ] + colnames = ["nodeid", "branch-lower", "branch-upper", "bug"] + contents = [html.Tr([html.Th(h) for h in colnames])] + + for _, d in sorted(LAST_UPDATE.items()): + try: + est = estimators(d) + except KeyError: + continue + row = [ + d["nodeid"], + int(1 / est["branch_lower"]), + int(1 / est["branch_upper"]), + int(1 / est["bug"]), + ] + contents.append(html.Tr([html.Td(x) for x in row])) + return contents + + +@app.route("/pycrunch-recordings/") # type: ignore +def download_file(name: str) -> flask.Response: + return flask.send_from_directory( + directory="pycrunch-recordings", + path=name, + mimetype="application/octet-stream", + ) + + +@app.route("/patches/") # type: ignore +def download_patch(name: str) -> flask.Response: + return flask.send_from_directory( + directory=os.path.relpath(storage_directory("patches"), app.root_path), + path=name, + mimetype="application/octet-stream", + ) + + +@app.route("/patches/") # type: ignore +def patch_summary() -> flask.Response: + patches = make_and_save_patches(PYTEST_ARGS, LAST_UPDATE) + if not patches: + return """ + HypoFuzz patches + + Waiting for examples, please refresh the page in minute or so. + """ + show = "fail" + if show not in patches: + show = "cov" + patch_path = patches[show] + describe = {"fail": "failing", "cov": "covering", "all": "covering and failing"} + links = "\n".join( + f'
  • patch with {v} examples
  • ' + for k, v in describe.items() + if k in patches + ) + return f""" + + HypoFuzz patches + + + + + +

    Download links

    +
      {links}
    +

    Latest {describe[show]} patch

    +
    {patch_path.read_text()}
    + + """ + + +def start_dashboard_process( + port: int, *, pytest_args: list, host: str = "localhost", debug: bool = False +) -> None: + global PYTEST_ARGS + PYTEST_ARGS = pytest_args + + # Ensure that we dump whatever patches are ready before shutting down + def signal_handler(signum, frame): # type: ignore + make_and_save_patches(pytest_args, LAST_UPDATE) + if old_handler in (signal.SIG_DFL, None): + return old_handler + elif old_handler is not signal.SIG_IGN: + return old_handler(signum, frame) + raise NotImplementedError("Unreachable") + + old_handler = signal.signal(signal.SIGTERM, signal_handler) + atexit.register(make_and_save_patches, pytest_args, LAST_UPDATE) + + print(f"\n\tNow serving dashboard at http://{host}:{port}/\n") # noqa + app.run(host=host, port=port, debug=debug) diff --git a/src/hypofuzz/entrypoint.py b/src/hypofuzz/entrypoint.py index c13a2f8..1f97ffa 100644 --- a/src/hypofuzz/entrypoint.py +++ b/src/hypofuzz/entrypoint.py @@ -56,6 +56,27 @@ def fuzz( This process will run forever unless stopped with e.g. ctrl-C. """ + dash_proc = _fuzz_impl( + numprocesses=numprocesses, + dashboard=dashboard, + port=port, + unsafe=unsafe, + pytest_args=pytest_args, + ) + if dash_proc: + dash_proc.kill() + dash_proc.join() + sys.exit(1) + raise NotImplementedError("unreachable") + + +def _fuzz_impl( + numprocesses: int, + dashboard: bool, + port: Optional[int], + unsafe: bool, + pytest_args: Tuple[str, ...], +) -> Optional[Process]: # Before doing anything with our arguments, we'll check that none # of HypoFuzz's arguments will be passed on to pytest instead. misplaced: set = set(pytest_args) & set().union(*(p.opts for p in fuzz.params)) @@ -81,12 +102,14 @@ def fuzz( if dashboard: from .dashboard import start_dashboard_process - Process( + dash_proc = Process( target=start_dashboard_process, kwargs={"port": port, "pytest_args": pytest_args}, - ).start() + ) + dash_proc.start() else: port = None + dash_proc = None if numprocesses <= 1: _fuzz_several( @@ -105,5 +128,4 @@ def fuzz( for p in processes: p.join() print("Found a failing input for every test!", file=sys.stderr) # noqa: T201 - sys.exit(1) - raise NotImplementedError("unreachable") + return dash_proc diff --git a/src/hypofuzz/hy.py b/src/hypofuzz/hy.py index bac6a36..a795410 100644 --- a/src/hypofuzz/hy.py +++ b/src/hypofuzz/hy.py @@ -1,425 +1,425 @@ -"""Adaptive fuzzing for property-based tests using Hypothesis.""" - -import contextlib -import itertools -import sys -import time -import traceback -from random import Random -from typing import Any, Callable, Dict, Generator, List, NoReturn, Optional, Union - -from hypothesis import settings -from hypothesis.core import ( - BuildContext, - Stuff, - deterministic_PRNG, - failure_exceptions_to_catch, - get_trimmed_traceback, - process_arguments_to_given, -) -from hypothesis.database import ExampleDatabase -from hypothesis.errors import StopTest, UnsatisfiedAssumption -from hypothesis.internal.conjecture.data import ConjectureData, Status -from hypothesis.internal.conjecture.engine import BUFFER_SIZE -from hypothesis.internal.conjecture.junkdrawer import stack_depth_of_caller -from hypothesis.internal.conjecture.shrinker import Shrinker -from hypothesis.internal.reflection import function_digest, get_signature -from hypothesis.reporting import with_reporter -from hypothesis.vendor.pretty import RepresentationPrinter -from sortedcontainers import SortedKeyList - -from .corpus import BlackBoxMutator, CrossOverMutator, EngineStub, HowGenerated, Pool -from .cov import CustomCollectionContext - -record_pytrace: Optional[Callable[..., Any]] -try: - from .debugger import record_pytrace -except ImportError: - record_pytrace = None - -Report = Dict[str, Union[int, float, str, list, Dict[str, int]]] - -UNDELIVERED_REPORTS: List[Report] = [] - - -@contextlib.contextmanager -def constant_stack_depth() -> Generator[None, None, None]: - # TODO: consider extracting this upstream so we can just import it. - recursion_limit = sys.getrecursionlimit() - depth = stack_depth_of_caller() - # Because we add to the recursion limit, to be good citizens we also add - # a check for unbounded recursion. The default limit is 1000, so this can - # only ever trigger if something really strange is happening and it's hard - # to imagine an intentionally-deeply-recursive use of this code. - assert depth <= 1000, ( - f"Hypothesis would usually add {recursion_limit} to the stack depth of " - f"{depth} here, but we are already much deeper than expected. Aborting " - "now, to avoid extending the stack limit in an infinite loop..." - ) - try: - sys.setrecursionlimit(depth + recursion_limit) - yield - finally: - sys.setrecursionlimit(recursion_limit) - - -class HitShrinkTimeoutError(Exception): - pass - - -class FuzzProcess: - """Maintain all the state associated with fuzzing a single target. - - This includes: - - - the coverage map and associated inputs - - references to Hypothesis' database for this test (for failure replay) - - a "run one" method, and an estimate of the value of running one input - - checkpointing tools so we can crash and restart without losing progess - - etc. The fuzz controller will then operate on a collection of these objects. - """ - - @classmethod - def from_hypothesis_test( - cls, - wrapped_test: Any, - *, - nodeid: Optional[str] = None, - extra_kw: Optional[Dict[str, object]] = None, - ) -> "FuzzProcess": - """Return a FuzzProcess for an @given-decorated test function.""" - _, _, stuff = process_arguments_to_given( - wrapped_test, - arguments=(), - kwargs=extra_kw or {}, - given_kwargs=wrapped_test.hypothesis._given_kwargs, - params=get_signature(wrapped_test).parameters, - ) - return cls( - test_fn=wrapped_test.hypothesis.inner_test, - stuff=stuff, - nodeid=nodeid, - database_key=function_digest(wrapped_test.hypothesis.inner_test), - hypothesis_database=getattr( - wrapped_test, "_hypothesis_internal_use_settings", settings.default - ).database - or settings.default.database, - ) - - def __init__( - self, - test_fn: Callable, - stuff: Stuff, - *, - random_seed: int = 0, - nodeid: Optional[str] = None, - database_key: bytes, - hypothesis_database: ExampleDatabase, - ) -> None: - """Construct a FuzzProcess from specific arguments.""" - # The actual fuzzer implementation - self.random = Random(random_seed) - self._test_fn = test_fn - self.__stuff = stuff - self.nodeid = nodeid or test_fn.__qualname__ - - # The seed pool is responsible for managing all seed state, including saving - # novel seeds to the database. This includes tracking how often each branch - # has been hit, minimal covering examples, and so on. - self.pool = Pool(hypothesis_database, database_key) - self._mutator_blackbox = BlackBoxMutator(self.pool, self.random) - self._mutator_crossover = CrossOverMutator(self.pool, self.random) - - # Set up the basic data that we'll track while fuzzing - self.ninputs = 0 - self.elapsed_time = 0.0 - self.stop_shrinking_at = float("inf") - self.since_new_cov = 0 - self.status_counts = {s.name: 0 for s in Status} - self.shrinking = False - # Any new examples from the database will be added to this replay buffer - self._replay_buffer: List[bytes] = [] - # After replay, we stay in blackbox mode for a while, until we've generated - # 1000 consecutive examples without new coverage, and then switch to mutation. - self._early_blackbox_mode = True - - # We batch updates, since frequent HTTP posts are slow - self._last_post_time = self.elapsed_time - - def startup(self) -> None: - """Set up initial state and prepare to replay the saved behaviour.""" - assert self.ninputs == 0, "already started this FuzzProcess" - # Report that we've started this fuzz target, and run zero examples so far - self._report_change(self._json_description) - # Next, restore progress made in previous runs by loading our saved examples. - # This is meant to be the minimal set of inputs that exhibits all distinct - # behaviours we've observed to date. Replaying takes longer than restoring - # our data structures directly, but copes much better with changed behaviour. - self._replay_buffer.extend(self.pool.fetch()) - self._replay_buffer.append(b"\x00" * BUFFER_SIZE) - - def generate_prefix(self) -> bytes: - """Generate a test prefix by mutating previous examples. - - This is going to be the method to override when experimenting with - alternative fuzzing techniques. - - - for unguided fuzzing, return an empty b'' and the random postfix - generation in ConjectureData will do the rest. - - for coverage-guided fuzzing, mutate or splice together known inputs. - - This version is terrible, but any coverage guidance at all is enough to help... - """ - # Start by replaying any previous failures which we've retrieved from the - # database. This is useful to recover state at startup, or to share - # progress made in other processes. - if self._replay_buffer: - return self._replay_buffer.pop() - - # TODO: currently hard-coding a particular mutator; we want to do MOpt-style - # adaptive weighting of all the different mutators we could use. - # For now though, we'll just use a hardcoded swapover point - if self._early_blackbox_mode or self.random.random() < 0.05: - return self._mutator_blackbox.generate_buffer() - return self._mutator_crossover.generate_buffer() - - def run_one(self) -> None: - """Run a single input through the fuzz target, or maybe more. - - The "more" part is in cases where we discover new coverage, and shrink - to the minimal covering example. - """ - # If we've been stable for a little while, try loading new examples from the - # database. We do this unconditionally because even if this fuzzer doesn't - # know of other concurrent runs, there may be e.g. a test process sharing the - # database. We do make it infrequent to manage the overhead though. - if self.ninputs % 1000 == 0 and self.since_new_cov > 1000: - self._replay_buffer.extend(self.pool.fetch()) - - # seen_count = len(self.pool.arc_counts) - - # Run the input - result = self._run_test_on(self.generate_prefix(), extend=BUFFER_SIZE) - - if result.status is Status.INTERESTING: - # Shrink to our minimal failing example, since we'll stop after this. - self.shrinking = True - passing_buffers = frozenset( - b for b, r in self.pool.results.items() if r.status == Status.VALID - ) - shrinker = Shrinker( - EngineStub(self._run_test_on, self.random, passing_buffers), - result, - predicate=lambda d: d.status is Status.INTERESTING, - allow_transition=None, - explain=False, # TODO: enable explain mode - ) - self.stop_shrinking_at = self.elapsed_time + 300 - with contextlib.suppress(HitShrinkTimeoutError): - shrinker.shrink() - self.shrinking = False - if record_pytrace: - # Replay minimal example under our time-travelling debug tracer - self._run_test_on( - shrinker.shrink_target.buffer, - collector=record_pytrace(self.nodeid), - ) - UNDELIVERED_REPORTS.append(self._json_description) - self._report_change(UNDELIVERED_REPORTS) - del UNDELIVERED_REPORTS[:] - - # Consider switching out of blackbox mode. - if self.since_new_cov >= 1000 and not self._replay_buffer: - self._early_blackbox_mode = False - - # NOTE: this distillation logic works fine, it's just discovering new coverage - # much more slowly than jumping directly to mutational mode. - # if len(self.pool.arc_counts) > seen_count and not self._early_blackbox_mode: - # self.pool.distill(self._run_test_on, self.random) - - def _run_test_on( - self, - buffer: bytes, - *, - error_on_discard: bool = False, - extend: int = 0, - source: HowGenerated = HowGenerated.shrinking, - collector: Optional[contextlib.AbstractContextManager] = None, - ) -> ConjectureData: - """Run the test_fn on a given buffer of bytes, in a way a Shrinker can handle. - - In normal operation, it's called via run_one (above), but we might also - delegate to the shrinker to find minimal covering examples. - """ - start = time.perf_counter() - self.ninputs += 1 - collector = collector or CustomCollectionContext() # type: ignore - assert collector is not None - reports: List[str] = [] - data = ConjectureData( - max_length=min(BUFFER_SIZE, len(buffer) + extend), - prefix=buffer, - random=self.random, - ) - try: - with deterministic_PRNG(), BuildContext( - data, is_final=True - ) as context, constant_stack_depth(), with_reporter(reports.append): - # Note that the data generation and test execution happen in the same - # coverage context. We may later split this, or tag each separately. - with collector: - if self.__stuff.selfy is not None: - data.hypothesis_runner = self.__stuff.selfy - # Generate all arguments to the test function. - args = self.__stuff.args - kwargs = dict(self.__stuff.kwargs) - kw, argslices = context.prep_args_kwargs_from_strategies( - self.__stuff.given_kwargs - ) - kwargs.update(kw) - - printer = RepresentationPrinter(context=context) - printer.repr_call( - self._test_fn.__name__, - args, - kwargs, - force_split=True, - arg_slices=argslices, - leading_comment=( - "# " + context.data.slice_comments[(0, 0)] - if (0, 0) in context.data.slice_comments - else None - ), - ) - data.extra_information.call_repr = printer.getvalue() - - self._test_fn(*args, **kwargs) - except StopTest: - data.status = Status.OVERRUN - except UnsatisfiedAssumption: - data.status = Status.INVALID - except failure_exceptions_to_catch() as e: - data.status = Status.INTERESTING - tb = get_trimmed_traceback() - filename, lineno, *_ = traceback.extract_tb(tb)[-1] - data.interesting_origin = (type(e), filename, lineno) - data.extra_information.traceback = "".join( - traceback.format_exception(type(e), value=e, tb=tb) - ) - except KeyboardInterrupt: - # If you have a test function which raises KI, this is pretty useful. - print(f"Got a KeyboardInterrupt in {self.nodeid}, exiting...") # noqa - raise - finally: - data.extra_information.reports = "\n".join(map(str, reports)) - - # In addition to coverage branches, use psudeo-coverage information provided via - # the `hypothesis.event()` function - exploiting user-defined partitions - # designed for diagnostic output to guide generation. See - # https://hypothesis.readthedocs.io/en/latest/details.html#hypothesis.event - data.extra_information.branches = frozenset( - getattr(collector, "branches", ()) # might be a debug tracer instead - ).union( - f"event:{k}:{v}" - for k, v in data.events.items() - if not k.startswith(("invalid because", "Retried draw from ")) - ) - - data.freeze() - # Update the pool and report any changes immediately for new coverage. If no - # new coverage, occasionally send an update anyway so we don't look stalled. - self.status_counts[data.status.name] += 1 - if self.pool.add(data.as_result(), source): - self.since_new_cov = 0 - else: - self.since_new_cov += 1 - if 0 in (self.since_new_cov, self.ninputs % 100): - UNDELIVERED_REPORTS.append(self._json_description) - - self.elapsed_time += time.perf_counter() - start - if UNDELIVERED_REPORTS and (self._last_post_time + 10 < self.elapsed_time): - self._report_change(UNDELIVERED_REPORTS) - del UNDELIVERED_REPORTS[:] - - if self.elapsed_time > self.stop_shrinking_at: - raise HitShrinkTimeoutError - - # The shrinker relies on returning the data object to be inspected. - return data.as_result() - - def _report_change(self, data: Union[Report, List[Report]]) -> None: - """Replace this method to send JSON data to the dashboard.""" - - @property - def _json_description(self) -> Report: - """Summarise current state to send to dashboard.""" - if self.ninputs == 0: - return { - "nodeid": self.nodeid, - "note": "starting up...", - "ninputs": 0, - "branches": 0, - "elapsed_time": 0, - } - report: Report = { - "nodeid": self.nodeid, - "elapsed_time": self.elapsed_time, - "timestamp": time.time(), - "ninputs": self.ninputs, - "branches": len(self.pool.arc_counts), - "since new cov": self.since_new_cov, - "loaded_from_db": len(self.pool._loaded_from_database), - "status_counts": self.status_counts, - "seed_pool": self.pool.json_report, - "note": ( - "replaying saved examples" - if self._replay_buffer - else ("shrinking known examples" if self.pool._in_distill_phase else "") - ), - } - if self.pool.interesting_examples: - report["note"] = ( - f"raised {list(self.pool.interesting_examples)[0][0].__name__} " - f"({'shrinking...' if self.shrinking else 'finished'})" - ) - report["failures"] = [ - ls for _, ls in self.pool.interesting_examples.values() - ] - del report["since new cov"] - return report - - @property - def has_found_failure(self) -> bool: - """If we've already found a failing example we might reprioritize.""" - return bool(self.pool.interesting_examples) - - -def fuzz_several(*targets_: FuzzProcess, random_seed: Optional[int] = None) -> NoReturn: - """Take N fuzz targets and run them all.""" - # TODO: this isn't actually multi-process yet, and that's bad. - rand = Random(random_seed) - targets = SortedKeyList(targets_, lambda t: t.since_new_cov) - - # Loop forever: at each timestep, we choose a target using an epsilon-greedy - # strategy for simplicity (TODO: improve this later) and run it once. - # TODO: make this aware of test runtime, so it adapts for branches-per-second - # rather than branches-per-input. - for t in targets: - t.startup() - for i in itertools.count(): - if i % 20 == 0: - t = targets.pop(rand.randrange(len(targets))) - t.run_one() - targets.add(t) - else: - targets[0].run_one() - if len(targets) > 1 and targets.key(targets[0]) > targets.key(targets[1]): - # pay our log-n cost to keep the list sorted - targets.add(targets.pop(0)) - elif targets[0].has_found_failure: - print(f"found failing example for {targets[0].nodeid}") # noqa - targets.pop(0) - if not targets: - raise Exception("Found failures for all tests!") - raise NotImplementedError("unreachable") +"""Adaptive fuzzing for property-based tests using Hypothesis.""" + +import contextlib +import itertools +import sys +import time +import traceback +from random import Random +from typing import Any, Callable, Dict, Generator, List, Optional, Union + +from hypothesis import settings +from hypothesis.core import ( + BuildContext, + Stuff, + deterministic_PRNG, + failure_exceptions_to_catch, + get_trimmed_traceback, + process_arguments_to_given, +) +from hypothesis.database import ExampleDatabase +from hypothesis.errors import StopTest, UnsatisfiedAssumption +from hypothesis.internal.conjecture.data import ConjectureData, Status +from hypothesis.internal.conjecture.engine import BUFFER_SIZE +from hypothesis.internal.conjecture.junkdrawer import stack_depth_of_caller +from hypothesis.internal.conjecture.shrinker import Shrinker +from hypothesis.internal.reflection import function_digest, get_signature +from hypothesis.reporting import with_reporter +from hypothesis.vendor.pretty import RepresentationPrinter +from sortedcontainers import SortedKeyList + +from .corpus import BlackBoxMutator, CrossOverMutator, EngineStub, HowGenerated, Pool +from .cov import CustomCollectionContext + +record_pytrace: Optional[Callable[..., Any]] +try: + from .debugger import record_pytrace +except ImportError: + record_pytrace = None + +Report = Dict[str, Union[int, float, str, list, Dict[str, int]]] + +UNDELIVERED_REPORTS: List[Report] = [] + + +@contextlib.contextmanager +def constant_stack_depth() -> Generator[None, None, None]: + # TODO: consider extracting this upstream so we can just import it. + recursion_limit = sys.getrecursionlimit() + depth = stack_depth_of_caller() + # Because we add to the recursion limit, to be good citizens we also add + # a check for unbounded recursion. The default limit is 1000, so this can + # only ever trigger if something really strange is happening and it's hard + # to imagine an intentionally-deeply-recursive use of this code. + assert depth <= 1000, ( + f"Hypothesis would usually add {recursion_limit} to the stack depth of " + f"{depth} here, but we are already much deeper than expected. Aborting " + "now, to avoid extending the stack limit in an infinite loop..." + ) + try: + sys.setrecursionlimit(depth + recursion_limit) + yield + finally: + sys.setrecursionlimit(recursion_limit) + + +class HitShrinkTimeoutError(Exception): + pass + + +class FuzzProcess: + """Maintain all the state associated with fuzzing a single target. + + This includes: + + - the coverage map and associated inputs + - references to Hypothesis' database for this test (for failure replay) + - a "run one" method, and an estimate of the value of running one input + - checkpointing tools so we can crash and restart without losing progess + + etc. The fuzz controller will then operate on a collection of these objects. + """ + + @classmethod + def from_hypothesis_test( + cls, + wrapped_test: Any, + *, + nodeid: Optional[str] = None, + extra_kw: Optional[Dict[str, object]] = None, + ) -> "FuzzProcess": + """Return a FuzzProcess for an @given-decorated test function.""" + _, _, stuff = process_arguments_to_given( + wrapped_test, + arguments=(), + kwargs=extra_kw or {}, + given_kwargs=wrapped_test.hypothesis._given_kwargs, + params=get_signature(wrapped_test).parameters, + ) + return cls( + test_fn=wrapped_test.hypothesis.inner_test, + stuff=stuff, + nodeid=nodeid, + database_key=function_digest(wrapped_test.hypothesis.inner_test), + hypothesis_database=getattr( + wrapped_test, "_hypothesis_internal_use_settings", settings.default + ).database + or settings.default.database, + ) + + def __init__( + self, + test_fn: Callable, + stuff: Stuff, + *, + random_seed: int = 0, + nodeid: Optional[str] = None, + database_key: bytes, + hypothesis_database: ExampleDatabase, + ) -> None: + """Construct a FuzzProcess from specific arguments.""" + # The actual fuzzer implementation + self.random = Random(random_seed) + self._test_fn = test_fn + self.__stuff = stuff + self.nodeid = nodeid or test_fn.__qualname__ + + # The seed pool is responsible for managing all seed state, including saving + # novel seeds to the database. This includes tracking how often each branch + # has been hit, minimal covering examples, and so on. + self.pool = Pool(hypothesis_database, database_key) + self._mutator_blackbox = BlackBoxMutator(self.pool, self.random) + self._mutator_crossover = CrossOverMutator(self.pool, self.random) + + # Set up the basic data that we'll track while fuzzing + self.ninputs = 0 + self.elapsed_time = 0.0 + self.stop_shrinking_at = float("inf") + self.since_new_cov = 0 + self.status_counts = {s.name: 0 for s in Status} + self.shrinking = False + # Any new examples from the database will be added to this replay buffer + self._replay_buffer: List[bytes] = [] + # After replay, we stay in blackbox mode for a while, until we've generated + # 1000 consecutive examples without new coverage, and then switch to mutation. + self._early_blackbox_mode = True + + # We batch updates, since frequent HTTP posts are slow + self._last_post_time = self.elapsed_time + + def startup(self) -> None: + """Set up initial state and prepare to replay the saved behaviour.""" + assert self.ninputs == 0, "already started this FuzzProcess" + # Report that we've started this fuzz target, and run zero examples so far + self._report_change(self._json_description) + # Next, restore progress made in previous runs by loading our saved examples. + # This is meant to be the minimal set of inputs that exhibits all distinct + # behaviours we've observed to date. Replaying takes longer than restoring + # our data structures directly, but copes much better with changed behaviour. + self._replay_buffer.extend(self.pool.fetch()) + self._replay_buffer.append(b"\x00" * BUFFER_SIZE) + + def generate_prefix(self) -> bytes: + """Generate a test prefix by mutating previous examples. + + This is going to be the method to override when experimenting with + alternative fuzzing techniques. + + - for unguided fuzzing, return an empty b'' and the random postfix + generation in ConjectureData will do the rest. + - for coverage-guided fuzzing, mutate or splice together known inputs. + + This version is terrible, but any coverage guidance at all is enough to help... + """ + # Start by replaying any previous failures which we've retrieved from the + # database. This is useful to recover state at startup, or to share + # progress made in other processes. + if self._replay_buffer: + return self._replay_buffer.pop() + + # TODO: currently hard-coding a particular mutator; we want to do MOpt-style + # adaptive weighting of all the different mutators we could use. + # For now though, we'll just use a hardcoded swapover point + if self._early_blackbox_mode or self.random.random() < 0.05: + return self._mutator_blackbox.generate_buffer() + return self._mutator_crossover.generate_buffer() + + def run_one(self) -> None: + """Run a single input through the fuzz target, or maybe more. + + The "more" part is in cases where we discover new coverage, and shrink + to the minimal covering example. + """ + # If we've been stable for a little while, try loading new examples from the + # database. We do this unconditionally because even if this fuzzer doesn't + # know of other concurrent runs, there may be e.g. a test process sharing the + # database. We do make it infrequent to manage the overhead though. + if self.ninputs % 1000 == 0 and self.since_new_cov > 1000: + self._replay_buffer.extend(self.pool.fetch()) + + # seen_count = len(self.pool.arc_counts) + + # Run the input + result = self._run_test_on(self.generate_prefix(), extend=BUFFER_SIZE) + + if result.status is Status.INTERESTING: + # Shrink to our minimal failing example, since we'll stop after this. + self.shrinking = True + passing_buffers = frozenset( + b for b, r in self.pool.results.items() if r.status == Status.VALID + ) + shrinker = Shrinker( + EngineStub(self._run_test_on, self.random, passing_buffers), + result, + predicate=lambda d: d.status is Status.INTERESTING, + allow_transition=None, + explain=False, # TODO: enable explain mode + ) + self.stop_shrinking_at = self.elapsed_time + 300 + with contextlib.suppress(HitShrinkTimeoutError): + shrinker.shrink() + self.shrinking = False + if record_pytrace: + # Replay minimal example under our time-travelling debug tracer + self._run_test_on( + shrinker.shrink_target.buffer, + collector=record_pytrace(self.nodeid), + ) + UNDELIVERED_REPORTS.append(self._json_description) + self._report_change(UNDELIVERED_REPORTS) + del UNDELIVERED_REPORTS[:] + + # Consider switching out of blackbox mode. + if self.since_new_cov >= 1000 and not self._replay_buffer: + self._early_blackbox_mode = False + + # NOTE: this distillation logic works fine, it's just discovering new coverage + # much more slowly than jumping directly to mutational mode. + # if len(self.pool.arc_counts) > seen_count and not self._early_blackbox_mode: + # self.pool.distill(self._run_test_on, self.random) + + def _run_test_on( + self, + buffer: bytes, + *, + error_on_discard: bool = False, + extend: int = 0, + source: HowGenerated = HowGenerated.shrinking, + collector: Optional[contextlib.AbstractContextManager] = None, + ) -> ConjectureData: + """Run the test_fn on a given buffer of bytes, in a way a Shrinker can handle. + + In normal operation, it's called via run_one (above), but we might also + delegate to the shrinker to find minimal covering examples. + """ + start = time.perf_counter() + self.ninputs += 1 + collector = collector or CustomCollectionContext() # type: ignore + assert collector is not None + reports: List[str] = [] + data = ConjectureData( + max_length=min(BUFFER_SIZE, len(buffer) + extend), + prefix=buffer, + random=self.random, + ) + try: + with deterministic_PRNG(), BuildContext( + data, is_final=True + ) as context, constant_stack_depth(), with_reporter(reports.append): + # Note that the data generation and test execution happen in the same + # coverage context. We may later split this, or tag each separately. + with collector: + if self.__stuff.selfy is not None: + data.hypothesis_runner = self.__stuff.selfy + # Generate all arguments to the test function. + args = self.__stuff.args + kwargs = dict(self.__stuff.kwargs) + kw, argslices = context.prep_args_kwargs_from_strategies( + self.__stuff.given_kwargs + ) + kwargs.update(kw) + + printer = RepresentationPrinter(context=context) + printer.repr_call( + self._test_fn.__name__, + args, + kwargs, + force_split=True, + arg_slices=argslices, + leading_comment=( + "# " + context.data.slice_comments[(0, 0)] + if (0, 0) in context.data.slice_comments + else None + ), + ) + data.extra_information.call_repr = printer.getvalue() + + self._test_fn(*args, **kwargs) + except StopTest: + data.status = Status.OVERRUN + except UnsatisfiedAssumption: + data.status = Status.INVALID + except failure_exceptions_to_catch() as e: + data.status = Status.INTERESTING + tb = get_trimmed_traceback() + filename, lineno, *_ = traceback.extract_tb(tb)[-1] + data.interesting_origin = (type(e), filename, lineno) + data.extra_information.traceback = "".join( + traceback.format_exception(type(e), value=e, tb=tb) + ) + except KeyboardInterrupt: + # If you have a test function which raises KI, this is pretty useful. + print(f"Got a KeyboardInterrupt in {self.nodeid}, exiting...") # noqa + raise + finally: + data.extra_information.reports = "\n".join(map(str, reports)) + + # In addition to coverage branches, use psudeo-coverage information provided via + # the `hypothesis.event()` function - exploiting user-defined partitions + # designed for diagnostic output to guide generation. See + # https://hypothesis.readthedocs.io/en/latest/details.html#hypothesis.event + data.extra_information.branches = frozenset( + getattr(collector, "branches", ()) # might be a debug tracer instead + ).union( + f"event:{k}:{v}" + for k, v in data.events.items() + if not k.startswith(("invalid because", "Retried draw from ")) + ) + + data.freeze() + # Update the pool and report any changes immediately for new coverage. If no + # new coverage, occasionally send an update anyway so we don't look stalled. + self.status_counts[data.status.name] += 1 + if self.pool.add(data.as_result(), source): + self.since_new_cov = 0 + else: + self.since_new_cov += 1 + if 0 in (self.since_new_cov, self.ninputs % 100): + UNDELIVERED_REPORTS.append(self._json_description) + + self.elapsed_time += time.perf_counter() - start + if UNDELIVERED_REPORTS and (self._last_post_time + 10 < self.elapsed_time): + self._report_change(UNDELIVERED_REPORTS) + del UNDELIVERED_REPORTS[:] + + if self.elapsed_time > self.stop_shrinking_at: + raise HitShrinkTimeoutError + + # The shrinker relies on returning the data object to be inspected. + return data.as_result() + + def _report_change(self, data: Union[Report, List[Report]]) -> None: + """Replace this method to send JSON data to the dashboard.""" + + @property + def _json_description(self) -> Report: + """Summarise current state to send to dashboard.""" + if self.ninputs == 0: + return { + "nodeid": self.nodeid, + "note": "starting up...", + "ninputs": 0, + "branches": 0, + "elapsed_time": 0, + } + report: Report = { + "nodeid": self.nodeid, + "elapsed_time": self.elapsed_time, + "timestamp": time.time(), + "ninputs": self.ninputs, + "branches": len(self.pool.arc_counts), + "since new cov": self.since_new_cov, + "loaded_from_db": len(self.pool._loaded_from_database), + "status_counts": self.status_counts, + "seed_pool": self.pool.json_report, + "note": ( + "replaying saved examples" + if self._replay_buffer + else ("shrinking known examples" if self.pool._in_distill_phase else "") + ), + } + if self.pool.interesting_examples: + report["note"] = ( + f"raised {list(self.pool.interesting_examples)[0][0].__name__} " + f"({'shrinking...' if self.shrinking else 'finished'})" + ) + report["failures"] = [ + ls for _, ls in self.pool.interesting_examples.values() + ] + del report["since new cov"] + return report + + @property + def has_found_failure(self) -> bool: + """If we've already found a failing example we might reprioritize.""" + return bool(self.pool.interesting_examples) + + +def fuzz_several(*targets_: FuzzProcess, random_seed: Optional[int] = None) -> None: + """Take N fuzz targets and run them all.""" + # TODO: this isn't actually multi-process yet, and that's bad. + rand = Random(random_seed) + targets = SortedKeyList(targets_, lambda t: t.since_new_cov) + + # Loop forever: at each timestep, we choose a target using an epsilon-greedy + # strategy for simplicity (TODO: improve this later) and run it once. + # TODO: make this aware of test runtime, so it adapts for branches-per-second + # rather than branches-per-input. + for t in targets: + t.startup() + for i in itertools.count(): + if i % 20 == 0: + t = targets.pop(rand.randrange(len(targets))) + t.run_one() + targets.add(t) + else: + targets[0].run_one() + if len(targets) > 1 and targets.key(targets[0]) > targets.key(targets[1]): + # pay our log-n cost to keep the list sorted + targets.add(targets.pop(0)) + elif targets[0].has_found_failure: + print(f"found failing example for {targets[0].nodeid}") # noqa + targets.pop(0) + if not targets: + return + raise NotImplementedError("unreachable") diff --git a/src/hypofuzz/interface.py b/src/hypofuzz/interface.py index 5392d65..3d1ad42 100644 --- a/src/hypofuzz/interface.py +++ b/src/hypofuzz/interface.py @@ -4,7 +4,8 @@ import sys from contextlib import redirect_stdout, suppress from functools import partial -from typing import TYPE_CHECKING, Iterable, List, NoReturn, Optional, Tuple +from inspect import signature +from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple import pytest import requests @@ -34,9 +35,13 @@ def pytest_collection_finish(self, session: pytest.Session) -> None: ).name2fixturedefs # However, autouse fixtures are ubiquitous enough that we'll skip them; # until we get full pytest compatibility it's an expedient approximation. - _, all_autouse, _ = manager.getfixtureclosure( - tuple(manager._getautousenames(item.nodeid)), item - ) + # The relevant internals changed in Pytest 8.0, so handle both cases... + if "ignore_args" in signature(manager.getfixtureclosure).parameters: + all_autouse = set(manager._getautousenames(item.nodeid)) + else: + _, all_autouse, _ = manager.getfixtureclosure( + tuple(manager._getautousenames(item.nodeid)), item + ) if set(name2fixturedefs).difference(all_autouse): continue # For parametrized tests, we have to pass the parametrized args into @@ -90,7 +95,7 @@ def _post(port: int, data: dict) -> None: def _fuzz_several( pytest_args: Tuple[str, ...], nodeids: List[str], port: Optional[int] = None -) -> NoReturn: +) -> None: """Collect and fuzz tests. Designed to be used inside a multiprocessing.Process started with the spawn() @@ -107,4 +112,3 @@ def _fuzz_several( t._report_change = partial(_post, port) # type: ignore fuzz_several(*tests) - raise NotImplementedError("unreachable") diff --git a/tests/test_collection.py b/tests/test_collection.py new file mode 100644 index 0000000..ee11610 --- /dev/null +++ b/tests/test_collection.py @@ -0,0 +1,37 @@ +"""Tests for the hypofuzz library.""" + +from hypofuzz import interface + +TEST_CODE = """ +import pytest +from hypothesis import given, settings, strategies as st + +@pytest.fixture(autouse=True) +def fixture(): + pass + +@given(st.none()) +def test_autouse(x): + pass + + +@pytest.fixture() +def other_fixture(): + pass + +@given(st.none()) +def test_with_fixture(x, other_fixture): + pass +""" + + +def test_collects_despite_autouse_fixtures(tmp_path): + test_fname = tmp_path / "test_demo.py" + test_fname.write_text(TEST_CODE, encoding="utf-8") + try: + fps = interface._get_hypothesis_tests_with_pytest( + ["-p", "no:dash", str(test_fname)] + ) + except SystemExit as err: + raise AssertionError from err + assert len(fps) == 1 diff --git a/tests/test_e2e.py b/tests/test_e2e.py new file mode 100644 index 0000000..1b51a34 --- /dev/null +++ b/tests/test_e2e.py @@ -0,0 +1,45 @@ +"""Tests for the hypofuzz library.""" + +import pytest +import requests + +from hypofuzz import entrypoint + +TEST_CODE = """ +from hypothesis import given, settings, strategies as st +from hypothesis.database import InMemoryExampleDatabase + +settings.register_profile("ephemeral", settings(database=InMemoryExampleDatabase())) +settings.load_profile("ephemeral") + +n = st.integers(0, 127) + +@given(a=n, b=n, c=n) +def test(a, b, c): + # Expected number of cases to find this is (128**3)/2 using random search, + # but with fuzzing is about (128 + 2*128 * 3*128)/2, many many times faster. + # Our heuristics only complicated that a bit, but it's still only going to + # work as a e2e test if the coverage guidance is working. + if a == 3: + if b == 4: + assert c != 5 +""" + + +@pytest.mark.parametrize("numprocesses", [1]) # consider multiprocess someday? +def test_end_to_end(numprocesses, tmp_path): + """An end-to-end test to start the fuzzer and access the dashboard.""" + test_fname = tmp_path / "test_demo.py" + test_fname.write_text(TEST_CODE, encoding="utf-8") + dash_proc = entrypoint._fuzz_impl( + numprocesses=numprocesses, + dashboard=True, + port=7777, + unsafe=False, + pytest_args=["-p", "no:dash", str(test_fname)], + ) + assert dash_proc + resp = requests.get("http://localhost:7777", allow_redirects=True, timeout=10) + resp.raise_for_status() + dash_proc.kill() + dash_proc.join() diff --git a/tox.ini b/tox.ini index 8ccb6f7..1e2f691 100644 --- a/tox.ini +++ b/tox.ini @@ -21,10 +21,11 @@ commands = pip install --no-deps --editable . sphinx-build -W --keep-going docs-src docs/docs {posargs} -[testenv:test] +[testenv:{test,pytest7}] description = Runs pytest with posargs - `tox -e test -- -v` == `pytest -v` deps = - --requirement deps/test.txt + test: --requirement deps/test.txt + pytest7: --requirement deps/test-old.txt commands = pip install --no-deps --editable . pytest {posargs:-n auto} @@ -34,9 +35,10 @@ description = Updates test corpora and the pinned dependencies in `deps/*.txt` deps = pip-tools commands = - pip-compile --annotation-style=line --quiet --upgrade --rebuild --output-file=deps/check.txt deps/check.in - pip-compile --annotation-style=line --quiet --upgrade --rebuild --output-file=deps/docs.txt deps/docs.in setup.py - pip-compile --annotation-style=line --quiet --upgrade --rebuild --output-file=deps/test.txt deps/test.in setup.py + pip-compile --annotation-style=line --quiet --upgrade --rebuild --no-strip-extras --output-file=deps/check.txt deps/check.in + pip-compile --annotation-style=line --quiet --upgrade --rebuild --no-strip-extras --output-file=deps/docs.txt deps/docs.in setup.py + pip-compile --annotation-style=line --quiet --upgrade --rebuild --no-strip-extras --output-file=deps/test.txt deps/test.in setup.py + pip-compile --annotation-style=line --quiet --upgrade --rebuild --no-strip-extras --output-file=deps/test-old.txt deps/test-old.in setup.py # Settings for other tools