From 3c62dda4a3d397f313e73c67532db7df12581f63 Mon Sep 17 00:00:00 2001 From: philogicae Date: Wed, 11 Sep 2024 12:27:40 +0300 Subject: [PATCH] feat: create/import keystore wallet (password-encrypted) + add docstrings --- pyproject.toml | 1 + src/aleph/sdk/account.py | 24 +++++- src/aleph/sdk/chains/common.py | 140 +++++++++++++++++++++++++++++++-- src/aleph/sdk/conf.py | 17 ++-- 4 files changed, 167 insertions(+), 15 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0f62338e..21e739f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "aleph-superfluid>=0.2.1", "eth_typing==4.3.1", "web3==6.3.0", + "rich==13.7.1", ] [project.optional-dependencies] diff --git a/src/aleph/sdk/account.py b/src/aleph/sdk/account.py index 59eef815..1996a00d 100644 --- a/src/aleph/sdk/account.py +++ b/src/aleph/sdk/account.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Optional, Type, TypeVar -from aleph.sdk.chains.common import get_fallback_private_key +from aleph.sdk.chains.common import get_fallback_private_key, load_key from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.chains.remote import RemoteAccount from aleph.sdk.conf import settings @@ -15,13 +15,33 @@ def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T: + """ + Loads an account from a hexadecimal string representation of a private key. + + Args: + private_key_str (str): The private key as a hexadecimal string. + account_type (Type[T]): The type of account to load. + + Returns: + T: An instance of the specified account type. + """ if private_key_str.startswith("0x"): private_key_str = private_key_str[2:] return account_type(bytes.fromhex(private_key_str)) def account_from_file(private_key_path: Path, account_type: Type[T]) -> T: - private_key = private_key_path.read_bytes() + """ + Loads an account from a private key stored in a file (plain text or keystore). + + Args: + private_key_path (Path): The path to the file containing the private key. + account_type (Type[T]): The type of account to load. + + Returns: + T: An instance of the specified account type. + """ + private_key = load_key(private_key_path) return account_type(private_key) diff --git a/src/aleph/sdk/chains/common.py b/src/aleph/sdk/chains/common.py index 0a90183c..486c6fd8 100644 --- a/src/aleph/sdk/chains/common.py +++ b/src/aleph/sdk/chains/common.py @@ -1,10 +1,15 @@ +import json import logging +import sys from abc import ABC, abstractmethod +from functools import lru_cache from pathlib import Path from typing import Dict, Optional from coincurve.keys import PrivateKey +from rich.prompt import Console, Prompt, Text from typing_extensions import deprecated +from web3 import Web3 from aleph.sdk.conf import settings from aleph.sdk.utils import enum_as_str @@ -143,22 +148,145 @@ async def decrypt(self, content: bytes) -> bytes: raise NotImplementedError -# Start of the ugly stuff def generate_key() -> bytes: + """ + Generate a new private key. + + Returns: + bytes: The generated private key as bytes. + """ + privkey = PrivateKey() return privkey.secret +def create_or_import_key() -> bytes: + """ + Create or import a private key. + + This function allows the user to either import an existing private key + or generate a new one. If the user chooses to import a key, they can + enter a private key in hexadecimal format or a mnemonic seed phrase. + + Returns: + bytes: The private key as bytes. + """ + if Prompt.ask("Import an existing wallet", choices=["y", "n"], default="n") == "y": + data = Prompt.ask("Enter your private key or mnemonic seed phrase") + # private key + if data.startswith("0x"): + data = data[2:] + if len(data) == 64: + return bytes.fromhex(data) + # mnemonic seed phrase + elif len(data.split()) in [12, 24]: + w3 = Web3() + w3.eth.account.enable_unaudited_hdwallet_features() + return w3.eth.account.from_mnemonic(data.strip()).key + else: + raise ValueError("Invalid private key or mnemonic seed phrase") + else: + return generate_key() + + +def save_key(private_key: bytes, path: Path): + """ + Save a private key to a file. + + Parameters: + private_key (bytes): The private key as bytes. + path (Path): The path to the private key file. + + Returns: + None + """ + w3 = Web3() + address = None + path.parent.mkdir(exist_ok=True, parents=True) + if path.name.endswith(".key") or "pytest" in sys.modules: + address = w3.to_checksum_address(w3.eth.account.from_key(private_key).address) + path.write_bytes(private_key) + elif path.name.endswith(".json"): + address = w3.to_checksum_address(w3.eth.account.from_key(private_key).address) + password = Prompt.ask( + "Enter a password to encrypt your keystore", password=True + ) + keystore = w3.eth.account.encrypt(private_key, password) + path.write_text(json.dumps(keystore)) + else: + raise ValueError("Unsupported private key file format") + confirmation = Text.assemble( + "\nYour address: ", + Text(address, style="cyan"), + "\nSaved file: ", + Text(str(path), style="orange1"), + "\n", + ) + Console().print(confirmation) + + +@lru_cache(maxsize=1) +def load_key(private_key_path: Path) -> bytes: + """ + Load a private key from a file. + + This function supports two types of private key files: + 1. Unencrypted .key files. + 2. Encrypted .json keystore files. + + Parameters: + private_key_path (Path): The path to the private key file. + + Returns: + bytes: The private key as bytes. + + Raises: + FileNotFoundError: If the private key file does not exist. + ValueError: If the private key file is not a .key or .json file. + """ + if not private_key_path.exists(): + raise FileNotFoundError("Private key file not found") + elif private_key_path.name.endswith(".key"): + return private_key_path.read_bytes() + elif private_key_path.name.endswith(".json"): + keystore = private_key_path.read_text() + password = Prompt.ask("Keystore password", password=True) + try: + return Web3().eth.account.decrypt(keystore, password) + except ValueError: + raise ValueError("Invalid password") + else: + raise ValueError("Unsupported private key file format") + + def get_fallback_private_key(path: Optional[Path] = None) -> bytes: + """ + Retrieve or create a fallback private key. + + This function attempts to load a private key from the specified path. + If the path is not provided, it defaults to the path specified in the + settings. If the file does not exist or is empty, a new private key + is generated and saved to the specified path. A symlink is also created + to use this key by default. + + Parameters: + path (Optional[Path]): The path to the private key file. If not provided, + the default path from settings is used. + + Returns: + bytes: The private key as bytes. + """ path = path or settings.PRIVATE_KEY_FILE private_key: bytes if path.exists() and path.stat().st_size > 0: - private_key = path.read_bytes() + private_key = load_key(path) else: - private_key = generate_key() - path.parent.mkdir(exist_ok=True, parents=True) - path.write_bytes(private_key) - + private_key = ( + generate_key() + if path.name.endswith(".key") or "pytest" in sys.modules + else create_or_import_key() + ) + save_key(private_key, path) default_key_path = path.parent / "default.key" # If the symlink exists but does not point to a file, delete it. diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 4236370a..5dbab331 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -13,8 +13,12 @@ class Settings(BaseSettings): CONFIG_HOME: Optional[str] = None - # In case the user does not want to bother with handling private keys himself, - # do an ugly and insecure write and read from disk to this file. + # Two methods for storing your private key: + # 1. *.key: The private key is written to and read from an unencrypted file. + # This method is less secure as the key is stored in plain text. + # 2. *.json: The private key is stored in a keystore file, encrypted with a password. + # This method is more secure as the key is protected by encryption. + # If the file is missing, a new private key will be created. PRIVATE_KEY_FILE: Path = Field( default=Path("ethereum.key"), description="Path to the private key used to sign messages and transactions", @@ -152,12 +156,11 @@ class Config: settings = Settings() +# Corrected private key file path (encrypted or not) assert settings.CONFIG_HOME -if str(settings.PRIVATE_KEY_FILE) == "ethereum.key": - settings.PRIVATE_KEY_FILE = Path( - settings.CONFIG_HOME, "private-keys", "ethereum.key" - ) - +pk_file = str(settings.PRIVATE_KEY_FILE.name) +if pk_file.endswith(".key") or pk_file.endswith(".json"): + settings.PRIVATE_KEY_FILE = Path(settings.CONFIG_HOME, "private-keys", pk_file) if str(settings.PRIVATE_MNEMONIC_FILE) == "substrate.mnemonic": settings.PRIVATE_MNEMONIC_FILE = Path( settings.CONFIG_HOME, "private-keys", "substrate.mnemonic"