From 359482c14dee7242433cad2364584edc06fab74a Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Tue, 29 Oct 2024 08:54:41 -0400 Subject: [PATCH] feat: replace legacy kafka reports --- README.md | 4 +- src/main.rs | 6 +- src/reports.rs | 279 +++++++++++++++++++++---------------------------- 3 files changed, 120 insertions(+), 169 deletions(-) diff --git a/README.md b/README.md index 434d2ba4..212c8aa6 100644 --- a/README.md +++ b/README.md @@ -81,10 +81,8 @@ respond to the request, then this process is repeated until all available indexe The gateway exports data into the following kafka topics: -- client requests (`gateway_client_query_results`) -- indexer requests (`gateway_indexer_attempts`) +- queries (`gateway_queries`) - attestations (`gateway_attestations`) -- indexer fees (TAP only) (`gateway_indexer_fees`) Optionally, the [titorelli](https://github.com/edgeandnode/titorelli/) system can do aggregations over these topics. For now, this is limited to creating `gateway_indexer_fees_hourly` to improve diff --git a/src/main.rs b/src/main.rs index 6a418a81..4c97944b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -143,10 +143,8 @@ async fn main() { conf.graph_env_id, conf.query_fees_target, reports::Topics { - client_request: "gateway_client_query_results", - indexer_request: "gateway_indexer_attempts", - attestation: "gateway_attestations", - indexer_fees: "gateway_indexer_fees", + queries: "gateway_queries", + attestations: "gateway_attestations", }, conf.kafka, ) diff --git a/src/reports.rs b/src/reports.rs index 6bd630f8..a51e406f 100644 --- a/src/reports.rs +++ b/src/reports.rs @@ -1,12 +1,11 @@ use anyhow::{anyhow, Context}; use ordered_float::NotNan; use prost::Message; -use serde_json::json; use thegraph_core::{Address, AllocationId, DeploymentId, IndexerId}; use tokio::sync::mpsc; use toolshed::concat_bytes; -use crate::{errors, indexer_client::IndexerResponse, receipts::Receipt, time::unix_timestamp}; +use crate::{errors, indexer_client::IndexerResponse, receipts::Receipt}; pub struct ClientRequest { pub id: String, @@ -47,10 +46,8 @@ pub struct Reporter { } pub struct Topics { - pub client_request: &'static str, - pub indexer_request: &'static str, - pub attestation: &'static str, - pub indexer_fees: &'static str, + pub queries: &'static str, + pub attestations: &'static str, } impl Reporter { @@ -87,7 +84,38 @@ impl Reporter { } fn report(&mut self, client_request: ClientRequest) -> anyhow::Result<()> { - let timestamp = unix_timestamp(); + let indexer_queries = client_request + .indexer_requests + .iter() + .map(|indexer_request| IndexerQueryProtobuf { + indexer: indexer_request.indexer.to_vec(), + deployment: indexer_request.deployment.as_ref().to_vec(), + allocation: indexer_request.receipt.allocation().to_vec(), + indexed_chain: indexer_request.subgraph_chain.clone(), + url: indexer_request.url.clone(), + fee_grt: indexer_request.receipt.grt_value() as f64 * 1e-18, + response_time_ms: indexer_request.response_time_ms as u32, + seconds_behind: indexer_request.seconds_behind, + result: indexer_request + .result + .as_ref() + .map(|_| "success".to_string()) + .unwrap_or_else(|err| err.to_string()), + indexer_errors: indexer_request + .result + .as_ref() + .map(|r| { + r.errors + .iter() + .map(|err| err.as_str()) + .collect::>() + .join("; ") + }) + .unwrap_or_default(), + blocks_behind: indexer_request.blocks_behind, + legacy_scalar: matches!(&indexer_request.receipt, Receipt::Legacy(_, _)), + }) + .collect(); let total_fees_grt: f64 = client_request .indexer_requests @@ -96,137 +124,32 @@ impl Reporter { .sum(); let total_fees_usd: f64 = total_fees_grt / *client_request.grt_per_usd; - let (legacy_status_message, legacy_status_code): (String, u32) = - match &client_request.result { - Ok(_) => ("200 OK".to_string(), 0), - Err(err) => match err { - errors::Error::BlockNotFound(_) => ("Unresolved block".to_string(), 604610595), - errors::Error::Internal(_) => ("Internal error".to_string(), 816601499), - errors::Error::Auth(_) => ("Invalid API key".to_string(), 888904173), - errors::Error::BadQuery(_) => ("Invalid query".to_string(), 595700117), - errors::Error::NoIndexers => ( - "No indexers found for subgraph deployment".to_string(), - 1621366907, - ), - errors::Error::BadIndexers(_) => ( - "No suitable indexer found for subgraph deployment".to_string(), - 510359393, - ), - errors::Error::SubgraphNotFound(_) => (err.to_string(), 2599148187), - }, - }; - - let indexed_chain = client_request - .indexer_requests - .first() - .map(|i| i.subgraph_chain.as_str()) - .unwrap_or(""); - let gateway_id = format!("{:?}", self.tap_signer); - let client_request_payload = json!({ - "gateway_id": &gateway_id, - "query_id": &client_request.id, - "ray_id": &client_request.id, - "network_chain": &self.graph_env, - "graph_env": &self.graph_env, - "timestamp": timestamp, - "api_key": &client_request.api_key, - "user": &client_request.user_address, - "deployment": client_request.indexer_requests.first().map(|i| i.deployment.to_string()).unwrap_or_default(), - "indexed_chain": indexed_chain, - "network": indexed_chain, - "response_time_ms": client_request.response_time_ms, - "request_bytes": client_request.request_bytes, - "response_bytes": client_request.response_bytes, - "budget": self.budget.to_string(), - "query_count": 1, - "fee": total_fees_grt as f32, - "fee_usd": total_fees_usd as f32, - "status": legacy_status_message, - "status_code": legacy_status_code, - }); - - for indexer_request in client_request.indexer_requests { - let indexer_errors = indexer_request + let client_query_msg = ClientQueryProtobuf { + gateway_id: self.graph_env.clone(), + receipt_signer: self.tap_signer.to_vec(), + query_id: client_request.id, + api_key: client_request.api_key, + result: client_request .result - .as_ref() - .map(|r| { - r.errors - .iter() - .map(|err| err.as_str()) - .collect::>() - .join("; ") - }) - .unwrap_or_default(); - let legacy_status_code: u32 = { - let (prefix, data) = match &indexer_request.result { - Ok(_) => (0x0, 200_u32.to_be()), - Err(errors::IndexerError::Internal(_)) => (0x1, 0x0), - Err(errors::IndexerError::Unavailable(_)) => (0x2, 0x0), - Err(errors::IndexerError::Timeout) => (0x3, 0x0), - Err(errors::IndexerError::BadResponse(_)) => (0x4, 0x0), - }; - (prefix << 28) | (data & (u32::MAX >> 4)) - }; - - let indexer_request_payload = json!({ - "gateway_id": &gateway_id, - "query_id": &client_request.id, - "ray_id": &client_request.id, - "network_chain": &self.graph_env, - "graph_env": &self.graph_env, - "timestamp": timestamp, - "api_key": &client_request.api_key, - "user_address": &client_request.user_address, - "deployment": &indexer_request.deployment, - "network": &indexer_request.subgraph_chain, - "indexed_chain": &indexer_request.subgraph_chain, - "indexer": &indexer_request.indexer, - "url": &indexer_request.url, - "fee": (indexer_request.receipt.grt_value() as f64 * 1e-18) as f32, - "legacy_scalar": matches!(&indexer_request.receipt, Receipt::Legacy(_, _)), - "utility": 1.0, - "seconds_behind": indexer_request.seconds_behind, - "blocks_behind": indexer_request.blocks_behind, - "response_time_ms": indexer_request.response_time_ms, - "allocation": &indexer_request.receipt.allocation(), - "indexer_errors": indexer_errors, - "status": indexer_request.result.as_ref().map(|_| "200 OK".into()).unwrap_or_else(|err| err.to_string()), - "status_code": legacy_status_code, - }); - serde_json::to_writer(&mut self.write_buf, &indexer_request_payload).unwrap(); - let record: rdkafka::producer::BaseRecord<(), [u8], ()> = - rdkafka::producer::BaseRecord::to(self.topics.indexer_request) - .payload(&self.write_buf); - self.kafka_producer - .send(record) - .map_err(|(err, _)| err) - .context(anyhow!( - "failed to send to topic {}", - self.topics.indexer_request - ))?; - self.write_buf.clear(); + .map(|()| "success".to_string()) + .unwrap_or_else(|err| err.to_string()), + response_time_ms: client_request.response_time_ms as u32, + request_bytes: client_request.request_bytes, + response_bytes: client_request.response_bytes, + total_fees_usd, + indexer_queries, + }; - if matches!(&indexer_request.receipt, Receipt::TAP(_)) { - IndexerFeesProtobuf { - signer: self.tap_signer.to_vec(), - receiver: indexer_request.indexer.to_vec(), - fee_grt: indexer_request.receipt.grt_value() as f64 * 1e-18, - } - .encode(&mut self.write_buf) - .unwrap(); - let record: rdkafka::producer::BaseRecord<(), [u8], ()> = - rdkafka::producer::BaseRecord::to(self.topics.indexer_fees) - .payload(&self.write_buf); - self.kafka_producer - .send(record) - .map_err(|(err, _)| err) - .context(anyhow!( - "failed to send to topic {}", - self.topics.indexer_fees - ))?; - self.write_buf.clear(); - } + client_query_msg.encode(&mut self.write_buf).unwrap(); + let record: rdkafka::producer::BaseRecord<(), [u8], ()> = + rdkafka::producer::BaseRecord::to(self.topics.queries).payload(&self.write_buf); + self.kafka_producer + .send(record) + .map_err(|(err, _)| err) + .context(anyhow!("failed to send to topic {}", self.topics.queries))?; + self.write_buf.clear(); + for indexer_request in client_request.indexer_requests { if let Some((original_response, attestation)) = indexer_request .result .ok() @@ -249,36 +172,80 @@ impl Reporter { .encode(&mut self.write_buf) .unwrap(); let record: rdkafka::producer::BaseRecord<(), [u8], ()> = - rdkafka::producer::BaseRecord::to(self.topics.attestation) + rdkafka::producer::BaseRecord::to(self.topics.attestations) .payload(&self.write_buf); self.kafka_producer .send(record) .map_err(|(err, _)| err) .context(anyhow!( "failed to send to topic {}", - self.topics.attestation + self.topics.attestations ))?; self.write_buf.clear(); } } - serde_json::to_writer(&mut self.write_buf, &client_request_payload).unwrap(); - let record: rdkafka::producer::BaseRecord<(), [u8], ()> = - rdkafka::producer::BaseRecord::to(self.topics.client_request).payload(&self.write_buf); - self.kafka_producer - .send(record) - .map_err(|(err, _)| err) - .context(anyhow!( - "failed to send to topic {}", - self.topics.client_request - ))?; - self.write_buf.clear(); - Ok(()) } } -#[derive(Clone, PartialEq, prost::Message)] +#[derive(prost::Message)] +pub struct ClientQueryProtobuf { + #[prost(string, tag = "1")] + gateway_id: String, + // 20 bytes + #[prost(bytes, tag = "2")] + receipt_signer: Vec, + #[prost(string, tag = "3")] + query_id: String, + #[prost(string, tag = "4")] + api_key: String, + #[prost(string, tag = "5")] + result: String, + #[prost(uint32, tag = "6")] + response_time_ms: u32, + #[prost(uint32, tag = "7")] + request_bytes: u32, + #[prost(uint32, optional, tag = "8")] + response_bytes: Option, + #[prost(double, tag = "9")] + total_fees_usd: f64, + #[prost(message, repeated, tag = "10")] + indexer_queries: Vec, +} + +#[derive(prost::Message)] +pub struct IndexerQueryProtobuf { + /// 20 bytes + #[prost(bytes, tag = "1")] + indexer: Vec, + /// 32 bytes + #[prost(bytes, tag = "2")] + deployment: Vec, + /// 20 bytes + #[prost(bytes, tag = "3")] + allocation: Vec, + #[prost(string, tag = "4")] + indexed_chain: String, + #[prost(string, tag = "5")] + url: String, + #[prost(double, tag = "6")] + fee_grt: f64, + #[prost(uint32, tag = "7")] + response_time_ms: u32, + #[prost(uint32, tag = "8")] + seconds_behind: u32, + #[prost(string, tag = "9")] + result: String, + #[prost(string, tag = "10")] + indexer_errors: String, + #[prost(uint64, tag = "11")] + blocks_behind: u64, + #[prost(bool, tag = "12")] + legacy_scalar: bool, +} + +#[derive(prost::Message)] pub struct AttestationProtobuf { #[prost(string, optional, tag = "1")] request: Option, @@ -300,15 +267,3 @@ pub struct AttestationProtobuf { #[prost(bytes, tag = "7")] signature: Vec, } - -#[derive(Clone, PartialEq, prost::Message)] -pub struct IndexerFeesProtobuf { - /// 20 bytes (address) - #[prost(bytes, tag = "1")] - signer: Vec, - /// 20 bytes (address) - #[prost(bytes, tag = "2")] - receiver: Vec, - #[prost(double, tag = "3")] - fee_grt: f64, -}