Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka topic migration #967

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,8 @@ respond to the request, then this process is repeated until all available indexe

The gateway exports data into the following kafka topics:

- client requests (`gateway_client_query_results`)
- indexer requests (`gateway_indexer_attempts`)
- queries (`gateway_queries`)
- attestations (`gateway_attestations`)
- indexer fees (TAP only) (`gateway_indexer_fees`)

Optionally, the [titorelli](https://github.com/edgeandnode/titorelli/) system can do aggregations
over these topics. For now, this is limited to creating `gateway_indexer_fees_hourly` to improve
Expand Down
6 changes: 2 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,8 @@ async fn main() {
conf.graph_env_id,
conf.query_fees_target,
reports::Topics {
client_request: "gateway_client_query_results",
indexer_request: "gateway_indexer_attempts",
attestation: "gateway_attestations",
indexer_fees: "gateway_indexer_fees",
queries: "gateway_queries",
attestations: "gateway_attestations",
},
conf.kafka,
)
Expand Down
279 changes: 117 additions & 162 deletions src/reports.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use anyhow::{anyhow, Context};
use ordered_float::NotNan;
use prost::Message;
use serde_json::json;
use thegraph_core::{Address, AllocationId, DeploymentId, IndexerId};
use tokio::sync::mpsc;
use toolshed::concat_bytes;

use crate::{errors, indexer_client::IndexerResponse, receipts::Receipt, time::unix_timestamp};
use crate::{errors, indexer_client::IndexerResponse, receipts::Receipt};

pub struct ClientRequest {
pub id: String,
Expand Down Expand Up @@ -47,10 +46,8 @@ pub struct Reporter {
}

pub struct Topics {
pub client_request: &'static str,
pub indexer_request: &'static str,
pub attestation: &'static str,
pub indexer_fees: &'static str,
pub queries: &'static str,
pub attestations: &'static str,
}

impl Reporter {
Expand Down Expand Up @@ -87,7 +84,38 @@ impl Reporter {
}

fn report(&mut self, client_request: ClientRequest) -> anyhow::Result<()> {
let timestamp = unix_timestamp();
let indexer_queries = client_request
.indexer_requests
.iter()
.map(|indexer_request| IndexerQueryProtobuf {
indexer: indexer_request.indexer.to_vec(),
deployment: indexer_request.deployment.as_ref().to_vec(),
allocation: indexer_request.receipt.allocation().to_vec(),
indexed_chain: indexer_request.subgraph_chain.clone(),
url: indexer_request.url.clone(),
fee_grt: indexer_request.receipt.grt_value() as f64 * 1e-18,
response_time_ms: indexer_request.response_time_ms as u32,
seconds_behind: indexer_request.seconds_behind,
result: indexer_request
.result
.as_ref()
.map(|_| "success".to_string())
.unwrap_or_else(|err| err.to_string()),
indexer_errors: indexer_request
.result
.as_ref()
.map(|r| {
r.errors
.iter()
.map(|err| err.as_str())
.collect::<Vec<&str>>()
.join("; ")
})
.unwrap_or_default(),
blocks_behind: indexer_request.blocks_behind,
legacy_scalar: matches!(&indexer_request.receipt, Receipt::Legacy(_, _)),
})
.collect();

let total_fees_grt: f64 = client_request
.indexer_requests
Expand All @@ -96,137 +124,32 @@ impl Reporter {
.sum();
let total_fees_usd: f64 = total_fees_grt / *client_request.grt_per_usd;

let (legacy_status_message, legacy_status_code): (String, u32) =
match &client_request.result {
Ok(_) => ("200 OK".to_string(), 0),
Err(err) => match err {
errors::Error::BlockNotFound(_) => ("Unresolved block".to_string(), 604610595),
errors::Error::Internal(_) => ("Internal error".to_string(), 816601499),
errors::Error::Auth(_) => ("Invalid API key".to_string(), 888904173),
errors::Error::BadQuery(_) => ("Invalid query".to_string(), 595700117),
errors::Error::NoIndexers => (
"No indexers found for subgraph deployment".to_string(),
1621366907,
),
errors::Error::BadIndexers(_) => (
"No suitable indexer found for subgraph deployment".to_string(),
510359393,
),
errors::Error::SubgraphNotFound(_) => (err.to_string(), 2599148187),
},
};

let indexed_chain = client_request
.indexer_requests
.first()
.map(|i| i.subgraph_chain.as_str())
.unwrap_or("");
let gateway_id = format!("{:?}", self.tap_signer);
let client_request_payload = json!({
"gateway_id": &gateway_id,
"query_id": &client_request.id,
"ray_id": &client_request.id,
"network_chain": &self.graph_env,
"graph_env": &self.graph_env,
"timestamp": timestamp,
"api_key": &client_request.api_key,
"user": &client_request.user_address,
"deployment": client_request.indexer_requests.first().map(|i| i.deployment.to_string()).unwrap_or_default(),
"indexed_chain": indexed_chain,
"network": indexed_chain,
"response_time_ms": client_request.response_time_ms,
"request_bytes": client_request.request_bytes,
"response_bytes": client_request.response_bytes,
"budget": self.budget.to_string(),
"query_count": 1,
"fee": total_fees_grt as f32,
"fee_usd": total_fees_usd as f32,
"status": legacy_status_message,
"status_code": legacy_status_code,
});

for indexer_request in client_request.indexer_requests {
let indexer_errors = indexer_request
let client_query_msg = ClientQueryProtobuf {
gateway_id: self.graph_env.clone(),
receipt_signer: self.tap_signer.to_vec(),
query_id: client_request.id,
api_key: client_request.api_key,
result: client_request
.result
.as_ref()
.map(|r| {
r.errors
.iter()
.map(|err| err.as_str())
.collect::<Vec<&str>>()
.join("; ")
})
.unwrap_or_default();
let legacy_status_code: u32 = {
let (prefix, data) = match &indexer_request.result {
Ok(_) => (0x0, 200_u32.to_be()),
Err(errors::IndexerError::Internal(_)) => (0x1, 0x0),
Err(errors::IndexerError::Unavailable(_)) => (0x2, 0x0),
Err(errors::IndexerError::Timeout) => (0x3, 0x0),
Err(errors::IndexerError::BadResponse(_)) => (0x4, 0x0),
};
(prefix << 28) | (data & (u32::MAX >> 4))
};

let indexer_request_payload = json!({
"gateway_id": &gateway_id,
"query_id": &client_request.id,
"ray_id": &client_request.id,
"network_chain": &self.graph_env,
"graph_env": &self.graph_env,
"timestamp": timestamp,
"api_key": &client_request.api_key,
"user_address": &client_request.user_address,
"deployment": &indexer_request.deployment,
"network": &indexer_request.subgraph_chain,
"indexed_chain": &indexer_request.subgraph_chain,
"indexer": &indexer_request.indexer,
"url": &indexer_request.url,
"fee": (indexer_request.receipt.grt_value() as f64 * 1e-18) as f32,
"legacy_scalar": matches!(&indexer_request.receipt, Receipt::Legacy(_, _)),
"utility": 1.0,
"seconds_behind": indexer_request.seconds_behind,
"blocks_behind": indexer_request.blocks_behind,
"response_time_ms": indexer_request.response_time_ms,
"allocation": &indexer_request.receipt.allocation(),
"indexer_errors": indexer_errors,
"status": indexer_request.result.as_ref().map(|_| "200 OK".into()).unwrap_or_else(|err| err.to_string()),
"status_code": legacy_status_code,
});
serde_json::to_writer(&mut self.write_buf, &indexer_request_payload).unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.indexer_request)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.indexer_request
))?;
self.write_buf.clear();
.map(|()| "success".to_string())
.unwrap_or_else(|err| err.to_string()),
response_time_ms: client_request.response_time_ms as u32,
request_bytes: client_request.request_bytes,
response_bytes: client_request.response_bytes,
total_fees_usd,
indexer_queries,
};

if matches!(&indexer_request.receipt, Receipt::TAP(_)) {
IndexerFeesProtobuf {
signer: self.tap_signer.to_vec(),
receiver: indexer_request.indexer.to_vec(),
fee_grt: indexer_request.receipt.grt_value() as f64 * 1e-18,
}
.encode(&mut self.write_buf)
.unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.indexer_fees)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.indexer_fees
))?;
self.write_buf.clear();
}
client_query_msg.encode(&mut self.write_buf).unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.queries).payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!("failed to send to topic {}", self.topics.queries))?;
self.write_buf.clear();

for indexer_request in client_request.indexer_requests {
if let Some((original_response, attestation)) = indexer_request
.result
.ok()
Expand All @@ -249,36 +172,80 @@ impl Reporter {
.encode(&mut self.write_buf)
.unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.attestation)
rdkafka::producer::BaseRecord::to(self.topics.attestations)
.payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.attestation
self.topics.attestations
))?;
self.write_buf.clear();
}
}

serde_json::to_writer(&mut self.write_buf, &client_request_payload).unwrap();
let record: rdkafka::producer::BaseRecord<(), [u8], ()> =
rdkafka::producer::BaseRecord::to(self.topics.client_request).payload(&self.write_buf);
self.kafka_producer
.send(record)
.map_err(|(err, _)| err)
.context(anyhow!(
"failed to send to topic {}",
self.topics.client_request
))?;
self.write_buf.clear();

Ok(())
}
}

#[derive(Clone, PartialEq, prost::Message)]
#[derive(prost::Message)]
pub struct ClientQueryProtobuf {
#[prost(string, tag = "1")]
gateway_id: String,
// 20 bytes
#[prost(bytes, tag = "2")]
receipt_signer: Vec<u8>,
#[prost(string, tag = "3")]
query_id: String,
#[prost(string, tag = "4")]
api_key: String,
#[prost(string, tag = "5")]
result: String,
#[prost(uint32, tag = "6")]
response_time_ms: u32,
#[prost(uint32, tag = "7")]
request_bytes: u32,
#[prost(uint32, optional, tag = "8")]
response_bytes: Option<u32>,
#[prost(double, tag = "9")]
total_fees_usd: f64,
#[prost(message, repeated, tag = "10")]
indexer_queries: Vec<IndexerQueryProtobuf>,
}

#[derive(prost::Message)]
pub struct IndexerQueryProtobuf {
/// 20 bytes
#[prost(bytes, tag = "1")]
indexer: Vec<u8>,
/// 32 bytes
#[prost(bytes, tag = "2")]
deployment: Vec<u8>,
/// 20 bytes
#[prost(bytes, tag = "3")]
allocation: Vec<u8>,
#[prost(string, tag = "4")]
indexed_chain: String,
#[prost(string, tag = "5")]
url: String,
#[prost(double, tag = "6")]
fee_grt: f64,
#[prost(uint32, tag = "7")]
response_time_ms: u32,
#[prost(uint32, tag = "8")]
seconds_behind: u32,
#[prost(string, tag = "9")]
result: String,
#[prost(string, tag = "10")]
indexer_errors: String,
#[prost(uint64, tag = "11")]
blocks_behind: u64,
#[prost(bool, tag = "12")]
legacy_scalar: bool,
}

#[derive(prost::Message)]
pub struct AttestationProtobuf {
#[prost(string, optional, tag = "1")]
request: Option<String>,
Expand All @@ -300,15 +267,3 @@ pub struct AttestationProtobuf {
#[prost(bytes, tag = "7")]
signature: Vec<u8>,
}

#[derive(Clone, PartialEq, prost::Message)]
pub struct IndexerFeesProtobuf {
/// 20 bytes (address)
#[prost(bytes, tag = "1")]
signer: Vec<u8>,
/// 20 bytes (address)
#[prost(bytes, tag = "2")]
receiver: Vec<u8>,
#[prost(double, tag = "3")]
fee_grt: f64,
}
Loading