Skip to content

Commit

Permalink
Allow send custom VRF request_id (#29)
Browse files Browse the repository at this point in the history
* Problem: Is not possible to send a custom request_id on the API, because it creates it automatically.

Solution: Allow to pass a custom request_id field. If the request_id is already used, we will return it. If not, we will use it on the VRF. If it's not set, we create a new one using uuid, like before.

* Fix: Added method to check the message integrity checking also the previous send messages of all executors.

* Fix: Solved some PR issues.

* Fix: Solved last PR comment issue.

* Fix: Accept request_id parameter also on the coordinator API.

* Fix: Solved error code to return in case of failure and 2 more issues related with SDK updates.

---------

Co-authored-by: Andres D. Molins <[email protected]>
  • Loading branch information
nesitor and Andres D. Molins authored Jan 24, 2024
1 parent c5e2f84 commit dfd2194
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 44 deletions.
8 changes: 4 additions & 4 deletions src/aleph_vrf/coordinator/executor_selection.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import abc
import json
from pathlib import Path
from typing import List, Dict, Any, AsyncIterator
import random
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List

import aiohttp
from aleph_message.models import ItemHash

from aleph_vrf.exceptions import NotEnoughExecutors, AlephNetworkError
from aleph_vrf.models import Executor, Node, AlephExecutor, ComputeResourceNode
from aleph_vrf.exceptions import AlephNetworkError, NotEnoughExecutors
from aleph_vrf.models import AlephExecutor, ComputeResourceNode, Executor, Node
from aleph_vrf.settings import settings


Expand Down
21 changes: 15 additions & 6 deletions src/aleph_vrf/coordinator/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from typing import Union
from typing import Optional, Union

from pydantic import BaseModel

from aleph_vrf.settings import settings

Expand All @@ -10,11 +12,11 @@
from aleph.sdk.vm.cache import VmCache

logger.debug("import fastapi")
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException

logger.debug("local imports")
from aleph_vrf.coordinator.vrf import generate_vrf
from aleph_vrf.models import APIResponse, PublishedVRFResponse, APIError
from aleph_vrf.models import APIError, APIResponse, PublishedVRFResponse

logger.debug("imports done")

Expand All @@ -23,6 +25,10 @@
cache = VmCache()


class VRFRequest(BaseModel):
request_id: Optional[str]


@app.get("/")
async def index():
return {
Expand All @@ -34,7 +40,9 @@ async def index():


@app.post("/vrf")
async def receive_vrf() -> APIResponse[Union[PublishedVRFResponse, APIError]]:
async def receive_vrf(
request: Optional[VRFRequest] = None,
) -> APIResponse[Union[PublishedVRFResponse, APIError]]:
"""
Goes through the VRF random number generation process and returns a random number
along with details on how the number was generated.
Expand All @@ -44,9 +52,10 @@ async def receive_vrf() -> APIResponse[Union[PublishedVRFResponse, APIError]]:

response: Union[PublishedVRFResponse, APIError]

request_id = request.request_id if request and request.request_id else None
try:
response = await generate_vrf(account)
response = await generate_vrf(account=account, request_id=request_id)
except Exception as err:
response = APIError(error=str(err))
raise HTTPException(status_code=500, detail=str(err))

return APIResponse(data=response)
147 changes: 129 additions & 18 deletions src/aleph_vrf/coordinator/vrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import json
import logging
from hashlib import sha3_256
from typing import Dict, List, Type, TypeVar, Union, Optional
from typing import Dict, List, Optional, Type, TypeVar, Union
from uuid import uuid4

import aiohttp
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AuthenticatedAlephClient
from aleph_message.models import ItemHash
from aleph_message.models import ItemHash, MessageType, PostMessage
from aleph_message.status import MessageStatus
from hexbytes import HexBytes
from pydantic import BaseModel
Expand All @@ -19,37 +19,33 @@
ExecutorSelectionPolicy,
)
from aleph_vrf.exceptions import (
HashValidationFailed,
AlephNetworkError,
ExecutorHttpError,
RandomNumberPublicationFailed,
RandomNumberGenerationFailed,
HashesDoNotMatch,
HashValidationFailed,
PublishedHashesDoNotMatch,
PublishedHashValidationFailed,
RandomNumberGenerationFailed,
RandomNumberPublicationFailed,
)
from aleph_vrf.models import (
Executor,
ExecutorVRFResponse,
VRFRequest,
VRFResponse,
PublishedVRFRandomNumberHash,
PublishedVRFRandomNumber,
Executor,
PublishedVRFRandomNumberHash,
PublishedVRFResponse,
VRFRequest,
VRFResponse,
)
from aleph_vrf.settings import settings
from aleph_vrf.types import RequestId, Nonce
from aleph_vrf.utils import (
generate_nonce,
verify,
xor_all,
)
from aleph_vrf.types import Nonce, RequestId
from aleph_vrf.utils import generate_nonce, verify, xor_all

VRF_FUNCTION_GENERATE_PATH = "generate"
VRF_FUNCTION_PUBLISH_PATH = "publish"


logger = logging.getLogger(__name__)


M = TypeVar("M", bound=BaseModel)


Expand All @@ -66,12 +62,28 @@ async def post_executor_api_request(url: str, model: Type[M]) -> M:
return model.parse_obj(response["data"])


async def prepare_executor_api_request(url: str) -> bool:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=120) as resp:
try:
resp.raise_for_status()
except aiohttp.ClientResponseError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error

response = await resp.json()

return response["name"] == "vrf_generate_api"


async def _generate_vrf(
aleph_client: AuthenticatedAlephClient,
nb_executors: int,
nb_bytes: int,
vrf_function: ItemHash,
executor_selection_policy: ExecutorSelectionPolicy,
request_id: Optional[str] = None,
) -> PublishedVRFResponse:
executors = await executor_selection_policy.select_executors(nb_executors)
selected_nodes_json = json.dumps(
Expand All @@ -80,12 +92,20 @@ async def _generate_vrf(

nonce = generate_nonce()

if request_id:
existing_message = await get_existing_vrf_message(aleph_client, request_id)
if existing_message:
message = PublishedVRFResponse.from_vrf_post_message(existing_message)
await check_message_integrity(aleph_client, message)

return message

vrf_request = VRFRequest(
nb_bytes=nb_bytes,
nb_executors=nb_executors,
nonce=nonce,
vrf_function=vrf_function,
request_id=RequestId(str(uuid4())),
request_id=RequestId(request_id or str(uuid4())),
node_list_hash=sha3_256(selected_nodes_json).hexdigest(),
)

Expand Down Expand Up @@ -136,6 +156,7 @@ async def _generate_vrf(

async def generate_vrf(
account: ETHAccount,
request_id: Optional[str] = None,
nb_executors: Optional[int] = None,
nb_bytes: Optional[int] = None,
vrf_function: Optional[ItemHash] = None,
Expand All @@ -152,6 +173,7 @@ async def generate_vrf(
) as aleph_client:
return await _generate_vrf(
aleph_client=aleph_client,
request_id=request_id,
nb_executors=nb_executors or settings.NB_EXECUTORS,
nb_bytes=nb_bytes or settings.NB_BYTES,
vrf_function=vrf_function or settings.FUNCTION,
Expand Down Expand Up @@ -292,3 +314,92 @@ async def publish_data(
)

return message.item_hash


async def get_existing_vrf_message(
aleph_client: AuthenticatedAlephClient,
request_id: str,
) -> Optional[PostMessage]:
channel = f"vrf_{request_id}"
ref = f"vrf_{request_id}"

logger.debug(
f"Getting VRF messages on {aleph_client.api_server} from request id {request_id}"
)

messages = await aleph_client.get_messages(
message_type=MessageType.post,
channels=[channel],
refs=[ref],
)

if messages.messages:
if len(messages.messages) > 1:
logger.warning(f"Multiple VRF messages found for request id {request_id}")
return messages.messages[0]
else:
logger.debug(f"Existing VRF message for request id {request_id} not found")
return None


async def get_existing_message(
aleph_client: AuthenticatedAlephClient,
item_hash: ItemHash,
) -> Optional[PostMessage]:
logger.debug(
f"Getting VRF message on {aleph_client.api_server} for item_hash {item_hash}"
)

message = await aleph_client.get_message(
item_hash=item_hash,
)

if not message:
raise AlephNetworkError(
f"Message could not be read for item_hash {message.item_hash}"
)

return message


async def check_message_integrity(
aleph_client: AuthenticatedAlephClient, vrf_response: PublishedVRFResponse
):
logger.debug(
f"Checking VRF response message on {aleph_client.api_server} for item_hash {vrf_response.message_hash}"
)

for executor in vrf_response.executors:
generation_message = await get_existing_message(
aleph_client, executor.generation_message_hash
)
loaded_generation_message = PublishedVRFRandomNumberHash.from_published_message(
generation_message
)
publish_message = await get_existing_message(
aleph_client, executor.publication_message_hash
)
loaded_publish_message = PublishedVRFRandomNumber.from_published_message(
publish_message
)

if (
loaded_generation_message.random_number_hash
!= loaded_publish_message.random_number_hash
):
raise PublishedHashesDoNotMatch(
executor=executor,
generation_hash=loaded_generation_message.random_number_hash,
publication_hash=loaded_publish_message.random_number_hash,
)

if not verify(
HexBytes(loaded_publish_message.random_number),
loaded_generation_message.nonce,
loaded_generation_message.random_number_hash,
):
raise PublishedHashValidationFailed(
executor=executor,
random_number=loaded_publish_message,
random_number_hash=loaded_generation_message.random_number_hash,
)
44 changes: 43 additions & 1 deletion src/aleph_vrf/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from aleph_vrf.models import Executor, PublishedVRFRandomNumber
from aleph_vrf.models import Executor, ExecutorVRFResponse, PublishedVRFRandomNumber


class VrfException(Exception):
Expand Down Expand Up @@ -72,6 +72,25 @@ def __str__(self):
)


class PublishedHashesDoNotMatch(VrfException):
"""
The random number hash received from /publish is different from the one received from /generate.
"""

def __init__(
self, executor: ExecutorVRFResponse, generation_hash: str, publication_hash: str
):
self.executor = executor
self.generation_hash = generation_hash
self.publication_hash = publication_hash

def __str__(self):
return (
f"Published random number hash ({self.publication_hash})"
f"does not match the generated one ({self.generation_hash})."
)


class HashValidationFailed(VrfException):
"""
A random number does not match the SHA3 hash sent by the executor.
Expand All @@ -95,6 +114,29 @@ def __str__(self):
)


class PublishedHashValidationFailed(VrfException):
"""
A random number does not match the SHA3 hash sent by the executor.
"""

def __init__(
self,
random_number: PublishedVRFRandomNumber,
random_number_hash: str,
executor: ExecutorVRFResponse,
):
self.random_number = random_number
self.random_number_hash = random_number_hash
self.executor = executor

def __str__(self):
return (
f"The random number published by {self.executor.url} "
f"(execution ID: {self.random_number.execution_id}) "
"does not match the hash."
)


class NotEnoughExecutors(VrfException):
"""
There are not enough executors available to satisfy the user requirements.
Expand Down
Loading

0 comments on commit dfd2194

Please sign in to comment.