Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace fisherman client with kafka msg & verification #405

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ version = "14.0.1"
[dependencies]
anyhow.workspace = true
alloy-primitives.workspace = true
alloy-sol-types = "0.3.2"
axum = { version = "0.6.15", default-features = false, features = [
"json",
"tokio",
Expand Down
101 changes: 34 additions & 67 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
};

use alloy_primitives::Address;
use alloy_sol_types::Eip712Domain;
use anyhow::{anyhow, bail, Context as _};
use axum::extract::OriginalUri;
use axum::http::{HeaderValue, Request, Uri};
Expand All @@ -31,7 +32,7 @@ use serde::Deserialize;
use serde_json::json;
use serde_json::value::RawValue;
use tokio::sync::mpsc;
use toolshed::thegraph::{BlockPointer, DeploymentId, SubgraphId};
use toolshed::thegraph::{attestation, BlockPointer, DeploymentId, SubgraphId};
use toolshed::url::Url;
use tracing::Instrument;
use uuid::Uuid;
Expand All @@ -43,22 +44,17 @@ use indexer_selection::{
};
use prelude::{buffer_queue::QueueWriter, double_buffer::DoubleBufferReader, unix_timestamp, GRT};

use crate::auth::{AuthHandler, AuthToken};
use crate::block_constraints::{block_constraints, make_query_deterministic, BlockConstraint};
use crate::budgets::{self, Budgeter};
use crate::indexer_client::{
check_block_error, Attestation, IndexerClient, IndexerError, ResponsePayload,
};
use crate::{
auth::{AuthHandler, AuthToken},
block_constraints::{block_constraints, make_query_deterministic, BlockConstraint},
chains::BlockCache,
fisherman_client::{ChallengeOutcome, FishermanClient},
indexing::IndexingStatus,
metrics::{with_metric, METRICS},
receipts::{ReceiptSigner, ReceiptStatus},
reports,
topology::{Deployment, GraphNetwork, Subgraph},
unattestable_errors::{miscategorized_attestable, miscategorized_unattestable},
};
use crate::chains::BlockCache;
use crate::indexer_client::{check_block_error, IndexerClient, IndexerError, ResponsePayload};
use crate::indexing::IndexingStatus;
use crate::metrics::{with_metric, METRICS};
use crate::receipts::{ReceiptSigner, ReceiptStatus};
use crate::reports::{self, serialize_attestation, KafkaClient};
use crate::topology::{Deployment, GraphNetwork, Subgraph};
use crate::unattestable_errors::{miscategorized_attestable, miscategorized_unattestable};

fn query_id() -> String {
lazy_static! {
Expand Down Expand Up @@ -100,7 +96,7 @@ pub enum Error {
#[derive(Clone)]
pub struct Context {
pub indexer_client: IndexerClient,
pub fisherman_client: Option<&'static FishermanClient>,
pub kafka_client: &'static KafkaClient,
pub graph_env_id: String,
pub auth_handler: &'static AuthHandler,
pub budgeter: &'static Budgeter,
Expand All @@ -110,6 +106,7 @@ pub struct Context {
pub network: GraphNetwork,
pub indexing_statuses: Eventual<Ptr<HashMap<Indexing, IndexingStatus>>>,
pub receipt_signer: &'static ReceiptSigner,
pub attestation_domain: &'static Eip712Domain,
pub indexings_blocklist: Eventual<Ptr<HashSet<Indexing>>>,
pub isa_state: DoubleBufferReader<indexer_selection::State>,
pub observations: QueueWriter<Update>,
Expand Down Expand Up @@ -637,8 +634,9 @@ async fn handle_client_query_inner(
.clone();
let indexer_query_context = IndexerQueryContext {
indexer_client: ctx.indexer_client.clone(),
fisherman_client: ctx.fisherman_client,
kafka_client: ctx.kafka_client,
receipt_signer: ctx.receipt_signer,
attestation_domain: ctx.attestation_domain,
observations: ctx.observations.clone(),
deployment,
latest_block: latest_block.number,
Expand Down Expand Up @@ -718,8 +716,9 @@ async fn handle_client_query_inner(
#[derive(Clone)]
struct IndexerQueryContext {
pub indexer_client: IndexerClient,
pub fisherman_client: Option<&'static FishermanClient>,
pub kafka_client: &'static KafkaClient,
pub receipt_signer: &'static ReceiptSigner,
pub attestation_domain: &'static Eip712Domain,
pub observations: QueueWriter<Update>,
pub deployment: Arc<Deployment>,
pub latest_block: u64,
Expand Down Expand Up @@ -874,60 +873,28 @@ async fn handle_indexer_query_inner(
}

if let Some(attestation) = &response.payload.attestation {
challenge_indexer_response(
ctx.fisherman_client,
ctx.observations.clone(),
selection.indexing,
allocation,
deterministic_query,
response.payload.body.clone(),
attestation.clone(),
let indexer = selection.indexing.indexer;
let verified = attestation::verify(
ctx.attestation_domain,
attestation,
&indexer,
&deterministic_query,
&response.payload.body,
);
// We send the Kafka message directly to avoid passing the request & response payloads
// through the normal reporting path. This is to reduce log bloat.
let response = response.payload.body.to_string();
let payload = serialize_attestation(attestation, indexer, deterministic_query, response);
ctx.kafka_client.send("gateway_attestations", &payload);
if let Err(attestation_verification_err) = verified {
tracing::debug!(%attestation_verification_err);
return Err(IndexerError::BadAttestation);
}
}

Ok(response.payload)
}

fn challenge_indexer_response(
fisherman_client: Option<&'static FishermanClient>,
observations: QueueWriter<Update>,
indexing: Indexing,
allocation: Address,
indexer_query: String,
indexer_response: Arc<String>,
attestation: Attestation,
) {
let fisherman = match fisherman_client {
Some(fisherman) => fisherman,
None => return,
};
tokio::spawn(async move {
let outcome = fisherman
.challenge(
&indexing,
&allocation,
&indexer_query,
&indexer_response,
&attestation,
)
.await;
tracing::trace!(?outcome);
let penalty = match outcome {
ChallengeOutcome::Unknown | ChallengeOutcome::AgreeWithTrustedIndexer => 0,
ChallengeOutcome::DisagreeWithUntrustedIndexer => 10,
ChallengeOutcome::DisagreeWithTrustedIndexer => 35,
ChallengeOutcome::FailedToProvideAttestation => 40,
};
if penalty > 0 {
tracing::info!(?outcome, "penalizing for challenge outcome");
let _ = observations.write(Update::Penalty {
indexing,
weight: penalty,
});
}
});
}

fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result<usize> {
let selection_sets = ctx
.operations
Expand Down
10 changes: 7 additions & 3 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ use crate::poi::ProofOfIndexingInfo;
pub struct Config {
/// Respect the payment state of API keys (disable for testnets)
pub api_key_payment_required: bool,
pub attestations: AttestationConfig,
pub chains: Vec<Chain>,
/// Ethereum RPC provider, or fixed exchange rate for testing
pub exchange_rate_provider: ExchangeRateProvider,
/// Fisherman RPC for challenges
#[serde_as(as = "Option<DisplayFromStr>")]
pub fisherman: Option<Url>,
/// GeoIP database path
pub geoip_database: Option<PathBuf>,
/// GeoIP blocked countries (ISO 3166-1 alpha-2 codes)
Expand Down Expand Up @@ -84,6 +82,12 @@ pub struct Config {
pub subscriptions: Option<Subscriptions>,
}

#[derive(Debug, Deserialize)]
pub struct AttestationConfig {
pub chain_id: String,
pub dispute_manager: Address,
}

#[serde_as]
#[derive(Debug, Deserialize)]
pub struct Chain {
Expand Down
95 changes: 0 additions & 95 deletions graph-gateway/src/fisherman_client.rs

This file was deleted.

20 changes: 4 additions & 16 deletions graph-gateway/src/indexer_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;

use alloy_primitives::{BlockNumber, B256};
use alloy_primitives::BlockNumber;
use axum::http::StatusCode;
use serde::{Deserialize, Serialize};

use indexer_selection::Selection;
use serde::Deserialize;
use toolshed::thegraph::attestation::Attestation;

#[derive(Debug)]
pub struct IndexerResponse {
Expand All @@ -23,6 +23,7 @@ pub enum IndexerError {
NoAllocation,
NoAttestation,
UnattestableError(StatusCode),
BadAttestation,
Timeout,
UnexpectedPayload,
BlockError(BlockError),
Expand All @@ -43,19 +44,6 @@ pub struct IndexerResponsePayload {
pub error: Option<String>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Attestation {
#[serde(rename = "requestCID")]
pub request_cid: B256,
#[serde(rename = "responseCID")]
pub response_cid: B256,
#[serde(rename = "subgraphDeploymentID")]
pub deployment: B256,
pub v: u8,
pub r: B256,
pub s: B256,
}

#[derive(Clone)]
pub struct IndexerClient {
pub client: reqwest::Client,
Expand Down
1 change: 0 additions & 1 deletion graph-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub mod budgets;
pub mod chains;
pub mod client_query;
pub mod config;
pub mod fisherman_client;
pub mod geoip;
pub mod indexer_client;
pub mod indexers_status;
Expand Down
Loading
Loading