diff --git a/Cargo.lock b/Cargo.lock index 69c14d5d..f49b7b22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1426,6 +1426,7 @@ name = "graph-gateway" version = "14.0.1" dependencies = [ "alloy-primitives", + "alloy-sol-types", "anyhow", "assert_matches", "axum", diff --git a/graph-gateway/Cargo.toml b/graph-gateway/Cargo.toml index 10d4e610..a72f05d9 100644 --- a/graph-gateway/Cargo.toml +++ b/graph-gateway/Cargo.toml @@ -6,6 +6,7 @@ version = "14.0.1" [dependencies] anyhow.workspace = true alloy-primitives.workspace = true +alloy-sol-types = "0.3.2" axum = { version = "0.6.15", default-features = false, features = [ "json", "tokio", diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 16fe65fe..09889b90 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -10,6 +10,7 @@ use std::{ }; use alloy_primitives::Address; +use alloy_sol_types::Eip712Domain; use anyhow::{anyhow, bail, Context as _}; use axum::extract::OriginalUri; use axum::http::{HeaderValue, Request, Uri}; @@ -31,7 +32,7 @@ use serde::Deserialize; use serde_json::json; use serde_json::value::RawValue; use tokio::sync::mpsc; -use toolshed::thegraph::{BlockPointer, DeploymentId, SubgraphId}; +use toolshed::thegraph::{attestation, BlockPointer, DeploymentId, SubgraphId}; use toolshed::url::Url; use tracing::Instrument; use uuid::Uuid; @@ -43,22 +44,17 @@ use indexer_selection::{ }; use prelude::{buffer_queue::QueueWriter, double_buffer::DoubleBufferReader, unix_timestamp, GRT}; +use crate::auth::{AuthHandler, AuthToken}; +use crate::block_constraints::{block_constraints, make_query_deterministic, BlockConstraint}; use crate::budgets::{self, Budgeter}; -use crate::indexer_client::{ - check_block_error, Attestation, IndexerClient, IndexerError, ResponsePayload, -}; -use crate::{ - auth::{AuthHandler, AuthToken}, - block_constraints::{block_constraints, make_query_deterministic, BlockConstraint}, - chains::BlockCache, - fisherman_client::{ChallengeOutcome, FishermanClient}, - indexing::IndexingStatus, - metrics::{with_metric, METRICS}, - receipts::{ReceiptSigner, ReceiptStatus}, - reports, - topology::{Deployment, GraphNetwork, Subgraph}, - unattestable_errors::{miscategorized_attestable, miscategorized_unattestable}, -}; +use crate::chains::BlockCache; +use crate::indexer_client::{check_block_error, IndexerClient, IndexerError, ResponsePayload}; +use crate::indexing::IndexingStatus; +use crate::metrics::{with_metric, METRICS}; +use crate::receipts::{ReceiptSigner, ReceiptStatus}; +use crate::reports::{self, serialize_attestation, KafkaClient}; +use crate::topology::{Deployment, GraphNetwork, Subgraph}; +use crate::unattestable_errors::{miscategorized_attestable, miscategorized_unattestable}; fn query_id() -> String { lazy_static! { @@ -100,7 +96,7 @@ pub enum Error { #[derive(Clone)] pub struct Context { pub indexer_client: IndexerClient, - pub fisherman_client: Option<&'static FishermanClient>, + pub kafka_client: &'static KafkaClient, pub graph_env_id: String, pub auth_handler: &'static AuthHandler, pub budgeter: &'static Budgeter, @@ -110,6 +106,7 @@ pub struct Context { pub network: GraphNetwork, pub indexing_statuses: Eventual>>, pub receipt_signer: &'static ReceiptSigner, + pub attestation_domain: &'static Eip712Domain, pub indexings_blocklist: Eventual>>, pub isa_state: DoubleBufferReader, pub observations: QueueWriter, @@ -637,8 +634,9 @@ async fn handle_client_query_inner( .clone(); let indexer_query_context = IndexerQueryContext { indexer_client: ctx.indexer_client.clone(), - fisherman_client: ctx.fisherman_client, + kafka_client: ctx.kafka_client, receipt_signer: ctx.receipt_signer, + attestation_domain: ctx.attestation_domain, observations: ctx.observations.clone(), deployment, latest_block: latest_block.number, @@ -718,8 +716,9 @@ async fn handle_client_query_inner( #[derive(Clone)] struct IndexerQueryContext { pub indexer_client: IndexerClient, - pub fisherman_client: Option<&'static FishermanClient>, + pub kafka_client: &'static KafkaClient, pub receipt_signer: &'static ReceiptSigner, + pub attestation_domain: &'static Eip712Domain, pub observations: QueueWriter, pub deployment: Arc, pub latest_block: u64, @@ -874,60 +873,28 @@ async fn handle_indexer_query_inner( } if let Some(attestation) = &response.payload.attestation { - challenge_indexer_response( - ctx.fisherman_client, - ctx.observations.clone(), - selection.indexing, - allocation, - deterministic_query, - response.payload.body.clone(), - attestation.clone(), + let indexer = selection.indexing.indexer; + let verified = attestation::verify( + ctx.attestation_domain, + attestation, + &indexer, + &deterministic_query, + &response.payload.body, ); + // We send the Kafka message directly to avoid passing the request & response payloads + // through the normal reporting path. This is to reduce log bloat. + let response = response.payload.body.to_string(); + let payload = serialize_attestation(attestation, indexer, deterministic_query, response); + ctx.kafka_client.send("gateway_attestations", &payload); + if let Err(attestation_verification_err) = verified { + tracing::debug!(%attestation_verification_err); + return Err(IndexerError::BadAttestation); + } } Ok(response.payload) } -fn challenge_indexer_response( - fisherman_client: Option<&'static FishermanClient>, - observations: QueueWriter, - indexing: Indexing, - allocation: Address, - indexer_query: String, - indexer_response: Arc, - attestation: Attestation, -) { - let fisherman = match fisherman_client { - Some(fisherman) => fisherman, - None => return, - }; - tokio::spawn(async move { - let outcome = fisherman - .challenge( - &indexing, - &allocation, - &indexer_query, - &indexer_response, - &attestation, - ) - .await; - tracing::trace!(?outcome); - let penalty = match outcome { - ChallengeOutcome::Unknown | ChallengeOutcome::AgreeWithTrustedIndexer => 0, - ChallengeOutcome::DisagreeWithUntrustedIndexer => 10, - ChallengeOutcome::DisagreeWithTrustedIndexer => 35, - ChallengeOutcome::FailedToProvideAttestation => 40, - }; - if penalty > 0 { - tracing::info!(?outcome, "penalizing for challenge outcome"); - let _ = observations.write(Update::Penalty { - indexing, - weight: penalty, - }); - } - }); -} - fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result { let selection_sets = ctx .operations diff --git a/graph-gateway/src/config.rs b/graph-gateway/src/config.rs index 0a602857..fd00aa82 100644 --- a/graph-gateway/src/config.rs +++ b/graph-gateway/src/config.rs @@ -22,12 +22,10 @@ use crate::poi::ProofOfIndexingInfo; pub struct Config { /// Respect the payment state of API keys (disable for testnets) pub api_key_payment_required: bool, + pub attestations: AttestationConfig, pub chains: Vec, /// Ethereum RPC provider, or fixed exchange rate for testing pub exchange_rate_provider: ExchangeRateProvider, - /// Fisherman RPC for challenges - #[serde_as(as = "Option")] - pub fisherman: Option, /// GeoIP database path pub geoip_database: Option, /// GeoIP blocked countries (ISO 3166-1 alpha-2 codes) @@ -84,6 +82,12 @@ pub struct Config { pub subscriptions: Option, } +#[derive(Debug, Deserialize)] +pub struct AttestationConfig { + pub chain_id: String, + pub dispute_manager: Address, +} + #[serde_as] #[derive(Debug, Deserialize)] pub struct Chain { diff --git a/graph-gateway/src/fisherman_client.rs b/graph-gateway/src/fisherman_client.rs deleted file mode 100644 index fabc6c78..00000000 --- a/graph-gateway/src/fisherman_client.rs +++ /dev/null @@ -1,95 +0,0 @@ -use alloy_primitives::Address; -use serde::Deserialize; -use serde_json::json; -use toolshed::url::Url; - -use indexer_selection::Indexing; - -use crate::indexer_client::Attestation; - -#[derive(Clone, Copy, Debug, Deserialize)] -pub enum ChallengeOutcome { - AgreeWithTrustedIndexer, - DisagreeWithTrustedIndexer, - DisagreeWithUntrustedIndexer, - FailedToProvideAttestation, - Unknown, -} - -#[derive(Clone)] -pub struct FishermanClient { - client: reqwest::Client, - url: Url, -} - -impl FishermanClient { - pub async fn challenge( - &self, - indexing: &Indexing, - allocation: &Address, - indexer_query: &str, - indexer_response: &str, - attestation: &Attestation, - ) -> ChallengeOutcome { - match self - .send_challenge( - indexing, - allocation, - indexer_query, - indexer_response, - attestation, - ) - .await - { - Ok(outcome) => outcome, - Err(fisherman_challenge_err) => { - tracing::error!(%fisherman_challenge_err); - ChallengeOutcome::Unknown - } - } - } -} - -impl FishermanClient { - pub fn new(client: reqwest::Client, url: Url) -> Self { - Self { client, url } - } - - async fn send_challenge( - &self, - indexing: &Indexing, - allocation: &Address, - indexer_query: &str, - indexer_response: &str, - attestation: &Attestation, - ) -> anyhow::Result { - let challenge = serde_json::to_string(&json!({ - "jsonrpc": "2.0", - "id": 0, - "method": "challenge", - "params": { - "attestation": serde_json::to_value(attestation)?, - "subgraphDeploymentID": format!("0x{}", hex::encode(indexing.deployment.0)), - "allocationID": allocation.to_string(), - "query": indexer_query, - "response": indexer_response, - }, - }))?; - tracing::trace!(?indexing, %challenge); - self.client - .post(self.url.0.clone()) - .header("Content-Type", "application/json") - .body(challenge) - .send() - .await? - .json::() - .await - .map(|response| response.result) - .map_err(Into::into) - } -} - -#[derive(Deserialize)] -struct RPCResponse { - result: ChallengeOutcome, -} diff --git a/graph-gateway/src/indexer_client.rs b/graph-gateway/src/indexer_client.rs index b9df3bb9..a460263c 100644 --- a/graph-gateway/src/indexer_client.rs +++ b/graph-gateway/src/indexer_client.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use alloy_primitives::{BlockNumber, B256}; +use alloy_primitives::BlockNumber; use axum::http::StatusCode; -use serde::{Deserialize, Serialize}; - use indexer_selection::Selection; +use serde::Deserialize; +use toolshed::thegraph::attestation::Attestation; #[derive(Debug)] pub struct IndexerResponse { @@ -23,6 +23,7 @@ pub enum IndexerError { NoAllocation, NoAttestation, UnattestableError(StatusCode), + BadAttestation, Timeout, UnexpectedPayload, BlockError(BlockError), @@ -43,19 +44,6 @@ pub struct IndexerResponsePayload { pub error: Option, } -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Attestation { - #[serde(rename = "requestCID")] - pub request_cid: B256, - #[serde(rename = "responseCID")] - pub response_cid: B256, - #[serde(rename = "subgraphDeploymentID")] - pub deployment: B256, - pub v: u8, - pub r: B256, - pub s: B256, -} - #[derive(Clone)] pub struct IndexerClient { pub client: reqwest::Client, diff --git a/graph-gateway/src/lib.rs b/graph-gateway/src/lib.rs index 34de40d2..b77deefa 100644 --- a/graph-gateway/src/lib.rs +++ b/graph-gateway/src/lib.rs @@ -9,7 +9,6 @@ pub mod budgets; pub mod chains; pub mod client_query; pub mod config; -pub mod fisherman_client; pub mod geoip; pub mod indexer_client; pub mod indexers_status; diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 20149135..eebdeaeb 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -1,16 +1,15 @@ +use std::collections::hash_map::{Entry, HashMap}; +use std::collections::hash_set::HashSet; +use std::env; +use std::fs::read_to_string; +use std::io::Write as _; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; -use std::{ - collections::hash_map::{Entry, HashMap}, - collections::hash_set::HashSet, - env, - fs::read_to_string, - io::Write as _, - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::PathBuf, - sync::Arc, -}; -use alloy_primitives::Address; +use alloy_primitives::{Address, U256}; +use alloy_sol_types::Eip712Domain; use anyhow::{self, Context}; use axum::{ extract::{ConnectInfo, DefaultBodyLimit, State}, @@ -26,7 +25,7 @@ use prometheus::{self, Encoder as _}; use serde_json::json; use simple_rate_limiter::RateLimiter; use tokio::spawn; -use toolshed::thegraph::DeploymentId; +use toolshed::thegraph::{attestation, DeploymentId}; use tower_http::cors::{self, CorsLayer}; use graph_gateway::indexings_blocklist::indexings_blocklist; @@ -35,7 +34,6 @@ use graph_gateway::{ chains::{ethereum, BlockCache}, client_query, config::{Config, ExchangeRateProvider}, - fisherman_client::FishermanClient, geoip::GeoIP, indexer_client::IndexerClient, indexing::{indexing_statuses, IndexingStatus}, @@ -70,7 +68,7 @@ async fn main() { let config_repr = format!("{config:#?}"); // Instantiate the Kafka client - let kafka_client = match KafkaClient::new(&config.kafka.into()) { + let kafka_client: &'static KafkaClient = match KafkaClient::new(&config.kafka.into()) { Ok(kafka_client) => Box::leak(Box::new(kafka_client)), Err(kafka_client_err) => { tracing::error!(%kafka_client_err); @@ -145,6 +143,12 @@ async fn main() { let receipt_signer: &'static ReceiptSigner = Box::leak(Box::new(ReceiptSigner::new(signer_key))); + let attestation_domain: &'static Eip712Domain = + Box::leak(Box::new(attestation::eip712_domain( + U256::from_str_radix(&config.attestations.chain_id, 10) + .expect("failed to parse attestation domain chain_id"), + config.attestations.dispute_manager, + ))); let ipfs = ipfs::Client::new(http_client.clone(), config.ipfs, 50); let network = GraphNetwork::new(network_subgraph_data.subgraphs, ipfs).await; @@ -233,11 +237,6 @@ async fn main() { .expect("invalid query_fees_target"); let budgeter: &'static Budgeter = Box::leak(Box::new(Budgeter::new(query_fees_target))); - let fisherman_client = config.fisherman.map(|url| { - Box::leak(Box::new(FishermanClient::new(http_client.clone(), url))) - as &'static FishermanClient - }); - tracing::info!("Waiting for exchange rate..."); usd_to_grt.value().await.unwrap(); tracing::info!("Waiting for ISA setup..."); @@ -249,13 +248,14 @@ async fn main() { indexer_client: IndexerClient { client: http_client.clone(), }, + kafka_client, graph_env_id: config.graph_env_id.clone(), auth_handler, budgeter, network, indexing_statuses, + attestation_domain, indexings_blocklist, - fisherman_client, block_caches, observations: update_writer, receipt_signer, diff --git a/graph-gateway/src/reports.rs b/graph-gateway/src/reports.rs index 9015e4ed..8e8166b7 100644 --- a/graph-gateway/src/reports.rs +++ b/graph-gateway/src/reports.rs @@ -1,10 +1,12 @@ use std::error::Error; use std::fmt; +use alloy_primitives::Address; use prost::Message as _; use rdkafka::error::KafkaResult; use serde::Deserialize; use serde_json::{json, Map}; +use toolshed::{concat_bytes, thegraph::attestation::Attestation}; use tracing::span; use tracing_subscriber::{filter::FilterFn, layer, prelude::*, registry, EnvFilter, Layer}; @@ -499,7 +501,7 @@ pub fn indexer_attempt_status_code(result: &Result (0x0, 200_u32.to_be()), - Err(IndexerError::NoAttestation) => (0x1, 0x0), + Err(IndexerError::NoAttestation) | Err(IndexerError::BadAttestation) => (0x1, 0x0), Err(IndexerError::UnattestableError(_)) => (0x2, 0x0), Err(IndexerError::Timeout) => (0x3, 0x0), Err(IndexerError::UnexpectedPayload) => (0x4, 0x0), @@ -510,3 +512,44 @@ pub fn indexer_attempt_status_code(result: &Result> 4)) } + +pub fn serialize_attestation( + attestation: &Attestation, + indexer: Address, + request: String, + response: String, +) -> Vec { + AttestationProtobuf { + request, + response, + indexer: indexer.0 .0.into(), + subgraph_deployment: attestation.deployment.0.into(), + request_cid: attestation.request_cid.0.into(), + response_cid: attestation.response_cid.0.into(), + signature: concat_bytes!(65, [&[attestation.v], &attestation.r.0, &attestation.s.0]).into(), + } + .encode_to_vec() +} + +#[derive(Clone, PartialEq, prost::Message)] +pub struct AttestationProtobuf { + #[prost(string, tag = "1")] + request: String, + #[prost(string, tag = "2")] + response: String, + /// 20 bytes + #[prost(bytes, tag = "3")] + indexer: Vec, + /// 32 bytes + #[prost(bytes, tag = "4")] + subgraph_deployment: Vec, + /// 32 bytes + #[prost(bytes, tag = "5")] + request_cid: Vec, + /// 32 bytes + #[prost(bytes, tag = "6")] + response_cid: Vec, + /// 65 bytes, ECDSA signature (v, r, s) + #[prost(bytes, tag = "7")] + signature: Vec, +} diff --git a/indexer-selection/src/indexing.rs b/indexer-selection/src/indexing.rs index f112107d..73e00c51 100644 --- a/indexer-selection/src/indexing.rs +++ b/indexer-selection/src/indexing.rs @@ -96,7 +96,9 @@ impl IndexingState { self.perf_failure.current_mut().observe(duration); match err { IndexerErrorObservation::Other => (), - IndexerErrorObservation::Timeout => self.reliability.current_mut().penalize(50), + IndexerErrorObservation::Timeout | IndexerErrorObservation::BadAttestation => { + self.reliability.current_mut().penalize(30) + } IndexerErrorObservation::IndexingBehind { latest_query_block, latest_block, diff --git a/indexer-selection/src/lib.rs b/indexer-selection/src/lib.rs index e3e5d166..82d3ba8f 100644 --- a/indexer-selection/src/lib.rs +++ b/indexer-selection/src/lib.rs @@ -107,6 +107,7 @@ pub enum IndexerErrorObservation { /// Latest block indexed at the time of query execution, reported by indexer reported_block: Option, }, + BadAttestation, Other, }