-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: create/import keystore wallet (password-encrypted) + docstrings #164
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,15 @@ | ||
import json | ||
import logging | ||
import sys | ||
from abc import ABC, abstractmethod | ||
from functools import lru_cache | ||
from pathlib import Path | ||
from typing import Dict, Optional | ||
|
||
from coincurve.keys import PrivateKey | ||
from rich.prompt import Console, Prompt, Text | ||
from typing_extensions import deprecated | ||
from web3 import Web3 | ||
|
||
from aleph.sdk.conf import settings | ||
from aleph.sdk.utils import enum_as_str | ||
|
@@ -143,22 +148,145 @@ async def decrypt(self, content: bytes) -> bytes: | |
raise NotImplementedError | ||
|
||
|
||
# Start of the ugly stuff | ||
def generate_key() -> bytes: | ||
""" | ||
Generate a new private key. | ||
|
||
Returns: | ||
bytes: The generated private key as bytes. | ||
""" | ||
|
||
privkey = PrivateKey() | ||
return privkey.secret | ||
|
||
|
||
def create_or_import_key() -> bytes: | ||
""" | ||
Create or import a private key. | ||
|
||
This function allows the user to either import an existing private key | ||
or generate a new one. If the user chooses to import a key, they can | ||
enter a private key in hexadecimal format or a mnemonic seed phrase. | ||
|
||
Returns: | ||
bytes: The private key as bytes. | ||
""" | ||
if Prompt.ask("Import an existing wallet", choices=["y", "n"], default="n") == "y": | ||
data = Prompt.ask("Enter your private key or mnemonic seed phrase") | ||
# private key | ||
if data.startswith("0x"): | ||
data = data[2:] | ||
if len(data) == 64: | ||
return bytes.fromhex(data) | ||
# mnemonic seed phrase | ||
elif len(data.split()) in [12, 24]: | ||
w3 = Web3() | ||
w3.eth.account.enable_unaudited_hdwallet_features() | ||
return w3.eth.account.from_mnemonic(data.strip()).key | ||
else: | ||
raise ValueError("Invalid private key or mnemonic seed phrase") | ||
else: | ||
return generate_key() | ||
|
||
|
||
def save_key(private_key: bytes, path: Path): | ||
""" | ||
Save a private key to a file. | ||
|
||
Parameters: | ||
private_key (bytes): The private key as bytes. | ||
path (Path): The path to the private key file. | ||
|
||
Returns: | ||
None | ||
""" | ||
w3 = Web3() | ||
address = None | ||
path.parent.mkdir(exist_ok=True, parents=True) | ||
if path.name.endswith(".key") or "pytest" in sys.modules: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why |
||
address = w3.to_checksum_address(w3.eth.account.from_key(private_key).address) | ||
path.write_bytes(private_key) | ||
elif path.name.endswith(".json"): | ||
address = w3.to_checksum_address(w3.eth.account.from_key(private_key).address) | ||
password = Prompt.ask( | ||
"Enter a password to encrypt your keystore", password=True | ||
) | ||
Comment on lines
+211
to
+213
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Users should be prompted in the CLI, not in the SDK. |
||
keystore = w3.eth.account.encrypt(private_key, password) | ||
path.write_text(json.dumps(keystore)) | ||
else: | ||
raise ValueError("Unsupported private key file format") | ||
confirmation = Text.assemble( | ||
"\nYour address: ", | ||
Text(address, style="cyan"), | ||
"\nSaved file: ", | ||
Text(str(path), style="orange1"), | ||
"\n", | ||
) | ||
Console().print(confirmation) | ||
|
||
|
||
@lru_cache(maxsize=1) | ||
def load_key(private_key_path: Path) -> bytes: | ||
""" | ||
Load a private key from a file. | ||
|
||
This function supports two types of private key files: | ||
1. Unencrypted .key files. | ||
2. Encrypted .json keystore files. | ||
|
||
Parameters: | ||
private_key_path (Path): The path to the private key file. | ||
|
||
Returns: | ||
bytes: The private key as bytes. | ||
|
||
Raises: | ||
FileNotFoundError: If the private key file does not exist. | ||
ValueError: If the private key file is not a .key or .json file. | ||
""" | ||
if not private_key_path.exists(): | ||
raise FileNotFoundError("Private key file not found") | ||
elif private_key_path.name.endswith(".key"): | ||
return private_key_path.read_bytes() | ||
elif private_key_path.name.endswith(".json"): | ||
keystore = private_key_path.read_text() | ||
password = Prompt.ask("Keystore password", password=True) | ||
try: | ||
return Web3().eth.account.decrypt(keystore, password) | ||
except ValueError: | ||
raise ValueError("Invalid password") | ||
else: | ||
raise ValueError("Unsupported private key file format") | ||
|
||
|
||
def get_fallback_private_key(path: Optional[Path] = None) -> bytes: | ||
""" | ||
Retrieve or create a fallback private key. | ||
|
||
This function attempts to load a private key from the specified path. | ||
If the path is not provided, it defaults to the path specified in the | ||
settings. If the file does not exist or is empty, a new private key | ||
is generated and saved to the specified path. A symlink is also created | ||
to use this key by default. | ||
|
||
Parameters: | ||
path (Optional[Path]): The path to the private key file. If not provided, | ||
the default path from settings is used. | ||
|
||
Returns: | ||
bytes: The private key as bytes. | ||
""" | ||
path = path or settings.PRIVATE_KEY_FILE | ||
private_key: bytes | ||
if path.exists() and path.stat().st_size > 0: | ||
private_key = path.read_bytes() | ||
private_key = load_key(path) | ||
else: | ||
private_key = generate_key() | ||
path.parent.mkdir(exist_ok=True, parents=True) | ||
path.write_bytes(private_key) | ||
|
||
private_key = ( | ||
generate_key() | ||
if path.name.endswith(".key") or "pytest" in sys.modules | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this mention of |
||
else create_or_import_key() | ||
) | ||
save_key(private_key, path) | ||
default_key_path = path.parent / "default.key" | ||
|
||
# If the symlink exists but does not point to a file, delete it. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users should be prompted in the CLI, not in the SDK.