Skip to content

Commit

Permalink
feat: replace legacy kafka reports
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 4, 2024
1 parent 136ad69 commit 359482c
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 169 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,8 @@ respond to the request, then this process is repeated until all available indexe

The gateway exports data into the following kafka topics:

- client requests (`gateway_client_query_results`)
- indexer requests (`gateway_indexer_attempts`)
- queries (`gateway_queries`)
- attestations (`gateway_attestations`)
- indexer fees (TAP only) (`gateway_indexer_fees`)

Optionally, the [titorelli](https://github.com/edgeandnode/titorelli/) system can do aggregations
over these topics. For now, this is limited to creating `gateway_indexer_fees_hourly` to improve
Expand Down
6 changes: 2 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,8 @@ async fn main() {
conf.graph_env_id,
conf.query_fees_target,
reports::Topics {
client_request: "gateway_client_query_results",
indexer_request: "gateway_indexer_attempts",
attestation: "gateway_attestations",
indexer_fees: "gateway_indexer_fees",
queries: "gateway_queries",
attestations: "gateway_attestations",
},
conf.kafka,
)
Expand Down
279 changes: 117 additions & 162 deletions src/reports.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use anyhow::{anyhow, Context};
use ordered_float::NotNan;
use prost::Message;
use serde_json::json;
use thegraph_core::{Address, AllocationId, DeploymentId, IndexerId};
use tokio::sync::mpsc;
use toolshed::concat_bytes;

use crate::{errors, indexer_client::IndexerResponse, receipts::Receipt, time::unix_timestamp};
use crate::{errors, indexer_client::IndexerResponse, receipts::Receipt};

pub struct ClientRequest {
pub id: String,
Expand Down Expand Up @@ -47,10 +46,8 @@ pub struct Reporter {
}

pub struct Topics {
pub client_request: &'static str,
pub indexer_request: &'static str,
pub attestation: &'static str,
pub indexer_fees: &'static str,
pub queries: &'static str,
pub attestations: &'static str,
}

impl Reporter {
Expand Down Expand Up @@ -87,7 +84,38 @@ impl Reporter {
}

fn report(&mut self, client_request: ClientRequest) -> anyhow::Result<()> {
let timestamp = unix_timestamp();
let indexer_queries = client_request
.indexer_requests
.iter()
.map(|indexer_request| IndexerQueryProtobuf {
indexer: indexer_request.indexer.to_vec(),
deployment: indexer_request.deployment.as_ref().to_vec(),
allocation: indexer_request.receipt.allocation().to_vec(),
indexed_chain: indexer_request.subgraph_chain.clone(),
url: indexer_request.url.clone(),
fee_grt: indexer_request.receipt.grt_value() as f64 * 1e-18,
response_time_ms: indexer_request.response_time_ms as u32,
seconds_behind: indexer_request.seconds_behind,
result: indexer_request
.result
.as_ref()
.map(|_| "success".to_string())
.unwrap_or_else(|err| err.to_string()),
indexer_errors: indexer_request
.result
.as_ref()
.map(|r| {
r.errors
.iter()
.map(|err| err.as_str())
.collect::<Vec<&str>>()
.join("; ")
})
.unwrap_or_default(),
blocks_behind: indexer_request.blocks_behind,
legacy_scalar: matches!(&indexer_request.receipt, Receipt::Legacy(_, _)),
})
.collect();

let total_fees_grt: f64 = client_request
.indexer_requests
Expand All @@ -96,137 +124,32 @@ impl Reporter {
.sum();
let total_fees_usd: f64 = total_fees_grt / *client_request.grt_per_usd;

let (legacy_status_message, legacy_status_code): (String, u32) =
match &client_request.result {
Ok(_) => ("200 OK".to_string(), 0),
Err(err) => match err {
errors::Error::BlockNotFound(_) => ("Unresolved block".to_string(), 604610595),
errors::Error::Internal(_) => ("Internal error".to_string(), 816601499),
errors::Error::Auth(_) => ("Invalid API key".to_string(), 888904173),
errors::Error::BadQuery(_) => ("Invalid query".to_string(), 595700117),
errors::Error::NoIndexers => (
"No indexers found for subgraph deployment".to_string(),
1621366907,
),
errors::Error::BadIndexers(_) => (
"No suitable indexer found for subgraph deployment".to_string(),
510359393,
),
errors::Error::SubgraphNotFound(_) => (err.to_string(), 2599148187),
},
};

let indexed_chain = client_request
.indexer_requests
.first()
.map(|i| i.subgraph_chain.as_str())
.unwrap_or("");
let gateway_id = format!("{:?}", self.tap_signer);
let client_request_payload = json!({
"gateway_id": &gateway_id,
"query_id": &client_request.id,
"ray_id": &client_request.id,
"network_chain": &self.graph_env,
"graph_env": &self.graph_env,
"timestamp": timestamp,
"api_key": &client_request.api_key,
"user": &client_request.user_address,
"deployment": client_request.indexer_requests.first().map(|i| i.deployment.to_string()).unwrap_or_default(),
"indexed_chain": indexed_chain,
"network": indexed_chain,
"response_time_ms": client_request.response_time_ms,
"request_bytes": client_request.request_bytes,
"response_bytes": client_request.response_bytes,
"budget": self.budget.to_string(),
"query_count": 1,
"fee": total_fees_grt as f32,
"fee_usd": total_fees_usd as f32,
"status": legacy_status_message,
"status_code": legacy_status_code,
});

for indexer_request in client_request.indexer_requests {
let indexer_errors = indexer_request
let client_query_msg = ClientQueryProtobuf {
gateway_id: self.graph_env.clone(),
receipt_signer: self.tap_signer.to_vec(),
query_id: client_request.id,
api_key: client_request.api_key,
result: client_request
.result
.as_ref()
.map(|r| {
r.errors
.iter()
.map(|err| err.as_str())
.collect::<Vec<&str>>()
.join("; ")
})
.unwrap_or_default();
let legacy_status_code: u32 = {
let (prefix, data) = match &indexer_request.result {
Ok(_) => (0x0, 200_u32.to_be()),
Err(errors::IndexerError::Internal(_)) => (0x1, 0x0),
Err(errors::IndexerError::Unavailable(_)) => (0x2, 0x0),
Err(errors::IndexerError::Timeout) => (0x3, 0x0),
Err(errors::IndexerError::BadResponse(_)) => (0x4, 0x0),
};
(prefix << 28) | (data & (u32::MAX >> 4))
};

let indexer_request_payload = json!({
"gateway_id": &gateway_id,
"query_id": &client_request.id,
"ray_id": &client_request.id,
"network_chain": &self.graph_env,
"graph_env": &self.graph_env,
"timestamp": timestamp,
"api_key": &client_request.api_key,
"user_address": &client_request.user_address,
"deployment": &indexer_request.deployment,
"network": &indexer_request.subgraph_chain,
"indexed_chain": &indexer_request.subgraph_chain,
"indexer": &indexer_request.indexer,
"url": &indexer_request.url,
"fee": (indexer_request.receipt.grt_value() as f64 * 1e-18) as f32,
"legacy_scalar": matches!(&indexer_request.receipt, Receipt::Legacy(_, _)),
"utility": 1.0,
"seconds_behind": indexer_request.seconds_behind,
"blocks_behind": indexer_request.blocks_behind,
"response_time_ms": indexer_request.response_time_ms,
"allocation": &indexer_request.receipt.allocation(),
"indexer_errors": indexer_errors,
"status": indexer_request.result.as_ref().map(|_| "200 OK".into()).unwrap_or_else(|err| err.to_string()),
"status_code": legacy_status_code,
});
serde_json::to_writer(&mut self.write_buf, &indexer_request_payload).unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.indexer_request)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.indexer_request
))?;
self.write_buf.clear();
.map(|()| "success".to_string())
.unwrap_or_else(|err| err.to_string()),
response_time_ms: client_request.response_time_ms as u32,
request_bytes: client_request.request_bytes,
response_bytes: client_request.response_bytes,
total_fees_usd,
indexer_queries,
};

if matches!(&indexer_request.receipt, Receipt::TAP(_)) {
IndexerFeesProtobuf {
signer: self.tap_signer.to_vec(),
receiver: indexer_request.indexer.to_vec(),
fee_grt: indexer_request.receipt.grt_value() as f64 * 1e-18,
}
.encode(&mut self.write_buf)
.unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.indexer_fees)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.indexer_fees
))?;
self.write_buf.clear();
}
client_query_msg.encode(&mut self.write_buf).unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.queries).payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!("failed to send to topic {}", self.topics.queries))?;
self.write_buf.clear();

for indexer_request in client_request.indexer_requests {
if let Some((original_response, attestation)) = indexer_request
.result
.ok()
Expand All @@ -249,36 +172,80 @@ impl Reporter {
.encode(&mut self.write_buf)
.unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.attestation)
rdkafka::producer::BaseRecord::to(self.topics.attestations)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.attestation
self.topics.attestations
))?;
self.write_buf.clear();
}
}

serde_json::to_writer(&mut self.write_buf, &client_request_payload).unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.client_request).payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.client_request
))?;
self.write_buf.clear();

Ok(())
}
}

#[derive(Clone, PartialEq, prost::Message)]
#[derive(prost::Message)]
pub struct ClientQueryProtobuf {
#[prost(string, tag = "1")]
gateway_id: String,
// 20 bytes
#[prost(bytes, tag = "2")]
receipt_signer: Vec<u8>,
#[prost(string, tag = "3")]
query_id: String,
#[prost(string, tag = "4")]
api_key: String,
#[prost(string, tag = "5")]
result: String,
#[prost(uint32, tag = "6")]
response_time_ms: u32,
#[prost(uint32, tag = "7")]
request_bytes: u32,
#[prost(uint32, optional, tag = "8")]
response_bytes: Option<u32>,
#[prost(double, tag = "9")]
total_fees_usd: f64,
#[prost(message, repeated, tag = "10")]
indexer_queries: Vec<IndexerQueryProtobuf>,
}

#[derive(prost::Message)]
pub struct IndexerQueryProtobuf {
/// 20 bytes
#[prost(bytes, tag = "1")]
indexer: Vec<u8>,
/// 32 bytes
#[prost(bytes, tag = "2")]
deployment: Vec<u8>,
/// 20 bytes
#[prost(bytes, tag = "3")]
allocation: Vec<u8>,
#[prost(string, tag = "4")]
indexed_chain: String,
#[prost(string, tag = "5")]
url: String,
#[prost(double, tag = "6")]
fee_grt: f64,
#[prost(uint32, tag = "7")]
response_time_ms: u32,
#[prost(uint32, tag = "8")]
seconds_behind: u32,
#[prost(string, tag = "9")]
result: String,
#[prost(string, tag = "10")]
indexer_errors: String,
#[prost(uint64, tag = "11")]
blocks_behind: u64,
#[prost(bool, tag = "12")]
legacy_scalar: bool,
}

#[derive(prost::Message)]
pub struct AttestationProtobuf {
#[prost(string, optional, tag = "1")]
request: Option<String>,
Expand All @@ -300,15 +267,3 @@ pub struct AttestationProtobuf {
#[prost(bytes, tag = "7")]
signature: Vec<u8>,
}

#[derive(Clone, PartialEq, prost::Message)]
pub struct IndexerFeesProtobuf {
/// 20 bytes (address)
#[prost(bytes, tag = "1")]
signer: Vec<u8>,
/// 20 bytes (address)
#[prost(bytes, tag = "2")]
receiver: Vec<u8>,
#[prost(double, tag = "3")]
fee_grt: f64,
}

0 comments on commit 359482c

Please sign in to comment.