Skip to content

Commit

Permalink
feat: replace fisherman client with kafka msg & verification (#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus authored Oct 13, 2023
1 parent a2491ac commit a4ade6c
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 204 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ version = "14.0.1"
[dependencies]
anyhow.workspace = true
alloy-primitives.workspace = true
alloy-sol-types = "0.3.2"
axum = { version = "0.6.15", default-features = false, features = [
"json",
"tokio",
Expand Down
101 changes: 34 additions & 67 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
};

use alloy_primitives::Address;
use alloy_sol_types::Eip712Domain;
use anyhow::{anyhow, bail, Context as _};
use axum::extract::OriginalUri;
use axum::http::{HeaderValue, Request, Uri};
Expand All @@ -31,7 +32,7 @@ use serde::Deserialize;
use serde_json::json;
use serde_json::value::RawValue;
use tokio::sync::mpsc;
use toolshed::thegraph::{BlockPointer, DeploymentId, SubgraphId};
use toolshed::thegraph::{attestation, BlockPointer, DeploymentId, SubgraphId};
use toolshed::url::Url;
use tracing::Instrument;
use uuid::Uuid;
Expand All @@ -43,22 +44,17 @@ use indexer_selection::{
};
use prelude::{buffer_queue::QueueWriter, double_buffer::DoubleBufferReader, unix_timestamp, GRT};

use crate::auth::{AuthHandler, AuthToken};
use crate::block_constraints::{block_constraints, make_query_deterministic, BlockConstraint};
use crate::budgets::{self, Budgeter};
use crate::indexer_client::{
check_block_error, Attestation, IndexerClient, IndexerError, ResponsePayload,
};
use crate::{
auth::{AuthHandler, AuthToken},
block_constraints::{block_constraints, make_query_deterministic, BlockConstraint},
chains::BlockCache,
fisherman_client::{ChallengeOutcome, FishermanClient},
indexing::IndexingStatus,
metrics::{with_metric, METRICS},
receipts::{ReceiptSigner, ReceiptStatus},
reports,
topology::{Deployment, GraphNetwork, Subgraph},
unattestable_errors::{miscategorized_attestable, miscategorized_unattestable},
};
use crate::chains::BlockCache;
use crate::indexer_client::{check_block_error, IndexerClient, IndexerError, ResponsePayload};
use crate::indexing::IndexingStatus;
use crate::metrics::{with_metric, METRICS};
use crate::receipts::{ReceiptSigner, ReceiptStatus};
use crate::reports::{self, serialize_attestation, KafkaClient};
use crate::topology::{Deployment, GraphNetwork, Subgraph};
use crate::unattestable_errors::{miscategorized_attestable, miscategorized_unattestable};

fn query_id() -> String {
lazy_static! {
Expand Down Expand Up @@ -100,7 +96,7 @@ pub enum Error {
#[derive(Clone)]
pub struct Context {
pub indexer_client: IndexerClient,
pub fisherman_client: Option<&'static FishermanClient>,
pub kafka_client: &'static KafkaClient,
pub graph_env_id: String,
pub auth_handler: &'static AuthHandler,
pub budgeter: &'static Budgeter,
Expand All @@ -110,6 +106,7 @@ pub struct Context {
pub network: GraphNetwork,
pub indexing_statuses: Eventual<Ptr<HashMap<Indexing, IndexingStatus>>>,
pub receipt_signer: &'static ReceiptSigner,
pub attestation_domain: &'static Eip712Domain,
pub indexings_blocklist: Eventual<Ptr<HashSet<Indexing>>>,
pub isa_state: DoubleBufferReader<indexer_selection::State>,
pub observations: QueueWriter<Update>,
Expand Down Expand Up @@ -637,8 +634,9 @@ async fn handle_client_query_inner(
.clone();
let indexer_query_context = IndexerQueryContext {
indexer_client: ctx.indexer_client.clone(),
fisherman_client: ctx.fisherman_client,
kafka_client: ctx.kafka_client,
receipt_signer: ctx.receipt_signer,
attestation_domain: ctx.attestation_domain,
observations: ctx.observations.clone(),
deployment,
latest_block: latest_block.number,
Expand Down Expand Up @@ -718,8 +716,9 @@ async fn handle_client_query_inner(
#[derive(Clone)]
struct IndexerQueryContext {
pub indexer_client: IndexerClient,
pub fisherman_client: Option<&'static FishermanClient>,
pub kafka_client: &'static KafkaClient,
pub receipt_signer: &'static ReceiptSigner,
pub attestation_domain: &'static Eip712Domain,
pub observations: QueueWriter<Update>,
pub deployment: Arc<Deployment>,
pub latest_block: u64,
Expand Down Expand Up @@ -874,60 +873,28 @@ async fn handle_indexer_query_inner(
}

if let Some(attestation) = &response.payload.attestation {
challenge_indexer_response(
ctx.fisherman_client,
ctx.observations.clone(),
selection.indexing,
allocation,
deterministic_query,
response.payload.body.clone(),
attestation.clone(),
let indexer = selection.indexing.indexer;
let verified = attestation::verify(
ctx.attestation_domain,
attestation,
&indexer,
&deterministic_query,
&response.payload.body,
);
// We send the Kafka message directly to avoid passing the request & response payloads
// through the normal reporting path. This is to reduce log bloat.
let response = response.payload.body.to_string();
let payload = serialize_attestation(attestation, indexer, deterministic_query, response);
ctx.kafka_client.send("gateway_attestations", &payload);
if let Err(attestation_verification_err) = verified {
tracing::debug!(%attestation_verification_err);
return Err(IndexerError::BadAttestation);
}
}

Ok(response.payload)
}

fn challenge_indexer_response(
fisherman_client: Option<&'static FishermanClient>,
observations: QueueWriter<Update>,
indexing: Indexing,
allocation: Address,
indexer_query: String,
indexer_response: Arc<String>,
attestation: Attestation,
) {
let fisherman = match fisherman_client {
Some(fisherman) => fisherman,
None => return,
};
tokio::spawn(async move {
let outcome = fisherman
.challenge(
&indexing,
&allocation,
&indexer_query,
&indexer_response,
&attestation,
)
.await;
tracing::trace!(?outcome);
let penalty = match outcome {
ChallengeOutcome::Unknown | ChallengeOutcome::AgreeWithTrustedIndexer => 0,
ChallengeOutcome::DisagreeWithUntrustedIndexer => 10,
ChallengeOutcome::DisagreeWithTrustedIndexer => 35,
ChallengeOutcome::FailedToProvideAttestation => 40,
};
if penalty > 0 {
tracing::info!(?outcome, "penalizing for challenge outcome");
let _ = observations.write(Update::Penalty {
indexing,
weight: penalty,
});
}
});
}

fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result<usize> {
let selection_sets = ctx
.operations
Expand Down
10 changes: 7 additions & 3 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ use crate::poi::ProofOfIndexingInfo;
pub struct Config {
/// Respect the payment state of API keys (disable for testnets)
pub api_key_payment_required: bool,
pub attestations: AttestationConfig,
pub chains: Vec<Chain>,
/// Ethereum RPC provider, or fixed exchange rate for testing
pub exchange_rate_provider: ExchangeRateProvider,
/// Fisherman RPC for challenges
#[serde_as(as = "Option<DisplayFromStr>")]
pub fisherman: Option<Url>,
/// GeoIP database path
pub geoip_database: Option<PathBuf>,
/// GeoIP blocked countries (ISO 3166-1 alpha-2 codes)
Expand Down Expand Up @@ -84,6 +82,12 @@ pub struct Config {
pub subscriptions: Option<Subscriptions>,
}

#[derive(Debug, Deserialize)]
pub struct AttestationConfig {
pub chain_id: String,
pub dispute_manager: Address,
}

#[serde_as]
#[derive(Debug, Deserialize)]
pub struct Chain {
Expand Down
95 changes: 0 additions & 95 deletions graph-gateway/src/fisherman_client.rs

This file was deleted.

20 changes: 4 additions & 16 deletions graph-gateway/src/indexer_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;

use alloy_primitives::{BlockNumber, B256};
use alloy_primitives::BlockNumber;
use axum::http::StatusCode;
use serde::{Deserialize, Serialize};

use indexer_selection::Selection;
use serde::Deserialize;
use toolshed::thegraph::attestation::Attestation;

#[derive(Debug)]
pub struct IndexerResponse {
Expand All @@ -23,6 +23,7 @@ pub enum IndexerError {
NoAllocation,
NoAttestation,
UnattestableError(StatusCode),
BadAttestation,
Timeout,
UnexpectedPayload,
BlockError(BlockError),
Expand All @@ -43,19 +44,6 @@ pub struct IndexerResponsePayload {
pub error: Option<String>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Attestation {
#[serde(rename = "requestCID")]
pub request_cid: B256,
#[serde(rename = "responseCID")]
pub response_cid: B256,
#[serde(rename = "subgraphDeploymentID")]
pub deployment: B256,
pub v: u8,
pub r: B256,
pub s: B256,
}

#[derive(Clone)]
pub struct IndexerClient {
pub client: reqwest::Client,
Expand Down
1 change: 0 additions & 1 deletion graph-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub mod budgets;
pub mod chains;
pub mod client_query;
pub mod config;
pub mod fisherman_client;
pub mod geoip;
pub mod indexer_client;
pub mod indexers_status;
Expand Down
Loading

0 comments on commit a4ade6c

Please sign in to comment.