Skip to content

Commit

Permalink
feat: update client fees aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 13, 2024
1 parent 6278089 commit 1b7e430
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 15 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ aggregations into the following topics:
syntax = "proto3";
message ClientFeesHourly {
message ClientFees {
string api_key = 1;
string deployment = 2;
double fees_grt = 3;
double fees_usd = 4;
string gateway_id = 1;
string user_id = 2;
string api_key = 3;
string deployment = 4;
double fees_grt = 5;
double fees_usd = 6;
uint32 query_count = 7;
float success_rate = 8;
uint32 avg_response_time_ms = 9;
}
// start timestamp for aggregation, in unix milliseconds
int64 timestamp = 1;
Expand All @@ -36,7 +41,7 @@ aggregations into the following topics:
bytes receiver = 2;
double fees_grt = 3;
}
/// start timestamp for aggregation, in unix milliseconds
// start timestamp for aggregation, in unix milliseconds
int64 timestamp = 1;
repeated IndexerFees aggregations = 2;
}
Expand Down
36 changes: 29 additions & 7 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,18 @@ async fn handle_source_msg(

if let Some(deployment) = data.indexer_queries.first().map(|i| &i.deployment) {
let key = ClientFeesKey {
gateway_id: data.gateway_id.clone(),
api_key: data.api_key.clone(),
deployment: deployment_cid(deployment),
};
let value = agg.client_fees.entry(key).or_default();
value.fees_grt += data.indexer_queries.iter().map(|i| i.fee_grt).sum::<f64>();
value.fees_usd += data.total_fees_usd;
value.total_response_time_ms += data.response_time_ms as f64;
match data.result.as_str() {
"success" => value.success_count += 1,
_ => value.failure_count += 1,
};
}

for indexer_query in &data.indexer_queries {
Expand Down Expand Up @@ -326,11 +332,19 @@ async fn record_aggregations(
timestamp,
aggregations: client_fees
.into_iter()
.map(|(k, v)| ClientFeesProtobuf {
api_key: k.api_key,
deployment: k.deployment,
fees_grt: v.fees_grt,
fees_usd: v.fees_usd,
.map(|(k, v)| {
let query_count = (v.success_count + v.failure_count).max(1) as f64;
ClientFeesProtobuf {
gateway_id: k.gateway_id,
user_id: v.user_id,
api_key: k.api_key,
deployment: k.deployment,
fees_grt: v.fees_grt,
fees_usd: v.fees_usd,
query_count: v.success_count + v.failure_count,
success_rate: (v.success_count as f64 / query_count) as f32,
avg_response_time_ms: (v.total_response_time_ms / query_count) as u32,
}
})
.collect(),
}
Expand Down Expand Up @@ -451,14 +465,19 @@ struct Aggregations {

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct ClientFeesKey {
gateway_id: String,
api_key: String,
deployment: String,
}

#[derive(Debug, Default)]
struct ClientFeesValue {
user_id: String,
fees_grt: f64,
fees_usd: f64,
success_count: u32,
failure_count: u32,
total_response_time_ms: f64,
}

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand All @@ -473,6 +492,9 @@ pub fn legacy_messages(
) -> (serde_json::Value, Vec<serde_json::Value>) {
let address_hex = |bytes| format!("{:?}", Address::from_slice(bytes).unwrap());
let first_indexer_query = client_query.indexer_queries.first();
let user_id = client_query
.user_id
.unwrap_or("0x0000000000000000000000000000000000000000".into());
let client_request_payload = serde_json::json!({
"gateway_id": address_hex(&client_query.receipt_signer),
"query_id": &client_query.query_id,
Expand All @@ -481,7 +503,7 @@ pub fn legacy_messages(
"graph_env": &client_query.gateway_id,
"timestamp": timestamp,
"api_key": &client_query.api_key,
"user": "0x0000000000000000000000000000000000000000",
"user": &user_id,
"deployment": first_indexer_query.map(|i| deployment_cid(&i.deployment)).unwrap_or_default(),
"indexed_chain": first_indexer_query.map(|i| i.indexed_chain.clone()).unwrap_or_default(),
"network": first_indexer_query.map(|i| i.indexed_chain.clone()).unwrap_or_default(),
Expand Down Expand Up @@ -526,7 +548,7 @@ pub fn legacy_messages(
"graph_env": &client_query.gateway_id,
"timestamp": timestamp,
"api_key": &client_query.api_key,
"user_address": "0x0000000000000000000000000000000000000000",
"user_address": &user_id,
"deployment": deployment_cid(&i.deployment),
"network": &i.indexed_chain,
"indexed_chain": &i.indexed_chain,
Expand Down
19 changes: 16 additions & 3 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub struct ClientQueryProtobuf {
pub query_id: String,
#[prost(string, tag = "4")]
pub api_key: String,
#[prost(string, optional, tag = "11")]
pub user_id: Option<String>,
#[prost(string, tag = "5")]
pub result: String,
#[prost(uint32, tag = "6")]
Expand All @@ -22,6 +24,7 @@ pub struct ClientQueryProtobuf {
#[prost(message, repeated, tag = "10")]
pub indexer_queries: Vec<IndexerQueryProtobuf>,
}

#[derive(prost::Message)]
pub struct IndexerQueryProtobuf {
/// 20 bytes (address)
Expand Down Expand Up @@ -65,13 +68,23 @@ pub struct ClientFeesHourlyProtobuf {
#[derive(prost::Message)]
pub struct ClientFeesProtobuf {
#[prost(string, tag = "1")]
pub api_key: String,
pub gateway_id: String,
#[prost(string, tag = "2")]
pub user_id: String,
#[prost(string, tag = "3")]
pub api_key: String,
#[prost(string, tag = "4")]
pub deployment: String,
#[prost(double, tag = "3")]
#[prost(double, tag = "5")]
pub fees_grt: f64,
#[prost(double, tag = "4")]
#[prost(double, tag = "6")]
pub fees_usd: f64,
#[prost(uint32, tag = "7")]
pub query_count: u32,
#[prost(float, tag = "8")]
pub success_rate: f32,
#[prost(uint32, tag = "9")]
pub avg_response_time_ms: u32,
}

#[derive(prost::Message)]
Expand Down

0 comments on commit 1b7e430

Please sign in to comment.