Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
Support forking from transaction hash
Support providing IPC path as argument
Use `watchdog` to wait for IPC file creation instead of repeatedly pinging provider
Convert some attributes to properties
  • Loading branch information
BowTiedDevil committed Oct 22, 2024
1 parent 9fb4f6d commit 83238a8
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 67 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ requires-python = ">=3.10"
dependencies = [
"scipy >=1.14.1, <1.15",
"ujson >= 5.10.0, <6",
"watchdog >= 5.0.3",
"web3 >=7.2.0, <8",
]
license = {text = "MIT"}
Expand Down
209 changes: 142 additions & 67 deletions src/degenbot/anvil_fork.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
import contextlib
import os
import pathlib
import shutil
import socket
import subprocess
from collections.abc import Iterable
from queue import Queue
from typing import Any, Literal, cast

import watchdog.events
import watchdog.observers
from eth_typing import HexAddress
from web3 import IPCProvider, Web3
from web3.middleware import Middleware
from web3.types import RPCEndpoint

from .constants import MAX_UINT256
from .exceptions import DegenbotValueError
from .exceptions import DegenbotValueError, InvalidUint256
from .logging import logger


class AnvilNotFound(Exception):
def __init__(self) -> None: # pragma: no cover
super().__init__("Anvil path could not be located.")


class AnvilFork:
"""
Launch an Anvil fork as a separate process and expose methods for commonly-used RPC calls.
Expand All @@ -27,14 +36,14 @@ def __init__(
self,
fork_url: str,
fork_block: int | None = None,
fork_transaction_hash: str | None = None,
hardfork: str = "latest",
port: int | None = None,
chain_id: int | None = None,
mining_mode: Literal["auto", "interval", "none"] = "auto",
mining_interval: int = 12,
storage_caching: bool = True,
base_fee: int | None = None,
ipc_path: str | None = None,
ipc_path: pathlib.Path = pathlib.Path("/tmp/"),
mnemonic: str = (
# Default mnemonic used by Brownie for Ganache forks
"patient rude simple dog close planet oval animal hunt sketch suspect slim"
Expand All @@ -47,20 +56,22 @@ def __init__(
ipc_provider_kwargs: dict[str, Any] | None = None,
prune_history: bool = False,
):
def build_anvil_command() -> list[str]: # pragma: no cover
def build_anvil_command(path_to_anvil: pathlib.Path) -> list[str]: # pragma: no cover
command = [
"anvil",
str(path_to_anvil),
"--silent",
"--auto-impersonate",
"--no-rate-limit",
f"--fork-url={fork_url}",
f"--hardfork={hardfork}",
f"--port={self.port}",
f"--ipc={ipc_path}",
f"--ipc={self.ipc_filename}",
f"--mnemonic={mnemonic}",
]
if fork_block:
command.append(f"--fork-block-number={fork_block}")
if fork_transaction_hash:
command.append(f"--fork-transaction-hash={fork_transaction_hash}")
if chain_id:
command.append(f"--chain-id={chain_id}")
if base_fee:
Expand All @@ -79,44 +90,36 @@ def build_anvil_command() -> list[str]: # pragma: no cover
command.append("--no-mining")
command.append("--order=fifo")
case _:
raise DegenbotValueError(f"Unknown mining mode '{mining_mode}'.")
raise DegenbotValueError(message=f"Unknown mining mode '{mining_mode}'.")

return command

def get_free_port_number() -> int:
with socket.socket() as sock:
sock.bind(("", 0))
_, _port = sock.getsockname()
return cast(int, _port)

if shutil.which("anvil") is None: # pragma: no cover
raise Exception("Anvil is not installed or not accessible in the current path.")

self.port = port if port is not None else get_free_port_number()

ipc_path = f"/tmp/anvil-{self.port}.ipc" if ipc_path is None else ipc_path
_path_to_anvil = shutil.which("anvil")
if _path_to_anvil is None: # pragma: no cover
raise AnvilNotFound
path_to_anvil = pathlib.Path(_path_to_anvil)

self._process = subprocess.Popen(build_anvil_command())
self.fork_url = fork_url
self.http_url = f"http://localhost:{self.port}"
self.ws_url = f"ws://localhost:{self.port}"
self.port = self._get_free_port_number()
self.ipc_path = ipc_path
self.ipc_provider_kwargs = (
ipc_provider_kwargs if ipc_provider_kwargs is not None else dict()
)

if ipc_provider_kwargs is None:
ipc_provider_kwargs = dict()
self.w3 = Web3(IPCProvider(ipc_path=ipc_path, **ipc_provider_kwargs))

if middlewares is not None:
for middleware, layer in middlewares:
self.w3.middleware_onion.inject(middleware, layer=layer)

while self.w3.is_connected() is False:
continue
self._anvil_command = build_anvil_command(path_to_anvil=path_to_anvil)
self._process = self._setup_subprocess(
anvil_command=self._anvil_command, ipc_path=self.ipc_path
)
self.w3 = Web3(IPCProvider(ipc_path=self.ipc_filename, **self.ipc_provider_kwargs))

self._initial_block_number = (
fork_block if fork_block is not None else self.w3.eth.get_block_number()
)

if middlewares is not None:
for middleware, layer in middlewares:
self.w3.middleware_onion.inject(middleware, layer=layer)

if balance_overrides is not None:
for account, balance in balance_overrides:
self.set_balance(account, balance)
Expand All @@ -132,59 +135,133 @@ def get_free_port_number() -> int:
if coinbase is not None:
self.set_coinbase(coinbase)

@property
def http_url(self) -> str:
return f"http://localhost:{self.port}"

@property
def ipc_filename(self) -> pathlib.Path:
return self.ipc_path / f"anvil-{self.port}.ipc"

@property
def ws_url(self) -> str:
return f"ws://localhost:{self.port}"

@staticmethod
def _get_free_port_number() -> int:
with socket.socket() as sock:
sock.bind(("", 0))
_, _port = sock.getsockname()
return cast(int, _port)

def _setup_subprocess(
self, anvil_command: list[str], ipc_path: pathlib.Path
) -> subprocess.Popen[Any]:
"""
Launch an Anvil subprocess, waiting for the IPC file to be created.
"""

class WaitForIPCReady(watchdog.events.FileSystemEventHandler):
def __init__(self, queue: Queue[Any], ipc_filename: str):
self.queue = queue
self.ipc_filename = ipc_filename

def on_created(self, event: watchdog.events.FileSystemEvent) -> None:
if event.src_path == self.ipc_filename: # pragma: no branch
self.queue.put(object())

queue: Queue[Any] = Queue()
observer = watchdog.observers.Observer()
observer.schedule(
event_handler=WaitForIPCReady(
queue,
str(self.ipc_filename),
),
path=str(ipc_path),
)
observer.start()
process = subprocess.Popen(anvil_command)
queue.get(timeout=10)
observer.stop()
observer.join()

return process

def __del__(self) -> None:
self._process.terminate()
self._process.wait()
with contextlib.suppress(FileNotFoundError):
os.remove(self.ipc_path)

def create_access_list(self, transaction: dict[Any, Any]) -> Any:
# Exclude transaction values that are irrelevant for the JSON-RPC method
# ref: https://docs.infura.io/networks/ethereum/json-rpc-methods/eth_createaccesslist
keys_to_drop = ("gasPrice", "maxFeePerGas", "maxPriorityFeePerGas", "gas", "chainId")
sanitized_tx = {k: v for k, v in transaction.items() if k not in keys_to_drop}

# Apply int->hex conversion to some transaction values
# ref: https://docs.infura.io/networks/ethereum/json-rpc-methods/eth_createaccesslist
keys_to_hexify = ("value", "nonce")
for key in keys_to_hexify:
if key in sanitized_tx and isinstance(sanitized_tx[key], int):
sanitized_tx[key] = hex(sanitized_tx[key])

return self.w3.provider.make_request(
method=RPCEndpoint("eth_createAccessList"),
params=[sanitized_tx],
)["result"]["accessList"]
os.remove(self.ipc_filename)

def mine(self) -> None:
self.w3.provider.make_request(
method=RPCEndpoint("evm_mine"),
params=[],
)

def reset(
def _reset(
self,
fork_url: str | None = None,
block_number: int | None = None,
base_fee: int | None = None,
fork_url: str,
block_number: int | None,
) -> None:
forking_params: dict[str, Any] = {
"jsonRpcUrl": fork_url if fork_url is not None else self.fork_url,
"blockNumber": block_number if block_number is not None else self._initial_block_number,
}
forking_params: dict[str, Any] = {"jsonRpcUrl": fork_url}
if block_number:
forking_params["blockNumber"] = block_number

self.w3.provider.make_request(
method=RPCEndpoint("anvil_reset"),
params=[{"forking": forking_params}],
)

if fork_url is not None:
def reset(
self,
fork_url: str | None = None,
block_number: int | None = None,
) -> None:
self._reset(
fork_url=fork_url if fork_url is not None else self.fork_url,
block_number=block_number,
)
if fork_url:
self.fork_url = fork_url
if base_fee is not None:
self.set_next_base_fee(base_fee)

def reset_to_transaction_hash(self, transaction_hash: str) -> None:
"""
Reset to the state after a given transaction hash.
This method will launch a new Anvil process since the anvil_reset API endpoint only supports
resetting to a block number.
"""

self._process.terminate()
self._process.wait()
with contextlib.suppress(FileNotFoundError):
os.remove(self.ipc_filename)

self.port = self._get_free_port_number()

anvil_command = []
for option in self._anvil_command:
if any(
(
"--fork-block-number" in option,
"--fork-transaction-hash" in option,
"--ipc" in option,
"--port" in option,
)
):
continue
anvil_command.append(option)
anvil_command.append(f"--fork-transaction-hash={transaction_hash}")
anvil_command.append(f"--port={self.port}")
anvil_command.append(f"--ipc={self.ipc_filename}")

self._process = self._setup_subprocess(anvil_command=anvil_command, ipc_path=self.ipc_path)
self.w3 = Web3(IPCProvider(ipc_path=self.ipc_filename, **self.ipc_provider_kwargs))

def return_to_snapshot(self, id: int) -> bool:
if id < 0:
raise DegenbotValueError("ID cannot be negative")
raise DegenbotValueError(message="ID cannot be negative")
return bool(
self.w3.provider.make_request(
method=RPCEndpoint("evm_revert"),
Expand All @@ -194,9 +271,7 @@ def return_to_snapshot(self, id: int) -> bool:

def set_balance(self, address: str, balance: int) -> None:
if not (0 <= balance <= MAX_UINT256):
raise DegenbotValueError(
"Invalid balance, must be within range: 0 <= balance <= 2**256 - 1"
)
raise InvalidUint256

self.w3.provider.make_request(
method=RPCEndpoint("anvil_setBalance"),
Expand All @@ -217,7 +292,7 @@ def set_coinbase(self, address: str) -> None:

def set_next_base_fee(self, fee: int) -> None:
if not (0 <= fee <= MAX_UINT256):
raise DegenbotValueError("Fee outside valid range 0 <= fee <= 2**256-1")
raise InvalidUint256
self.w3.provider.make_request(
method=RPCEndpoint("anvil_setNextBlockBaseFeePerGas"),
params=[fee],
Expand Down

0 comments on commit 83238a8

Please sign in to comment.