diff --git a/README.md b/README.md index 3d236b9..beab285 100644 --- a/README.md +++ b/README.md @@ -13,10 +13,15 @@ aggregations into the following topics: syntax = "proto3"; message ClientFeesHourly { message ClientFees { - string api_key = 1; - string deployment = 2; - double fees_grt = 3; - double fees_usd = 4; + string gateway_id = 1; + string user_id = 2; + string api_key = 3; + string deployment = 4; + double fees_grt = 5; + double fees_usd = 6; + uint32 query_count = 7; + float success_rate = 8; + uint32 avg_response_time_ms = 9; } // start timestamp for aggregation, in unix milliseconds int64 timestamp = 1; @@ -36,7 +41,7 @@ aggregations into the following topics: bytes receiver = 2; double fees_grt = 3; } - /// start timestamp for aggregation, in unix milliseconds + // start timestamp for aggregation, in unix milliseconds int64 timestamp = 1; repeated IndexerFees aggregations = 2; } diff --git a/src/bin/main.rs b/src/bin/main.rs index fbf3465..ce97f18 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -189,12 +189,18 @@ async fn handle_source_msg( if let Some(deployment) = data.indexer_queries.first().map(|i| &i.deployment) { let key = ClientFeesKey { + gateway_id: data.gateway_id.clone(), api_key: data.api_key.clone(), deployment: deployment_cid(deployment), }; let value = agg.client_fees.entry(key).or_default(); value.fees_grt += data.indexer_queries.iter().map(|i| i.fee_grt).sum::(); value.fees_usd += data.total_fees_usd; + value.total_response_time_ms += data.response_time_ms as f64; + match data.result.as_str() { + "success" => value.success_count += 1, + _ => value.failure_count += 1, + }; } for indexer_query in &data.indexer_queries { @@ -326,11 +332,19 @@ async fn record_aggregations( timestamp, aggregations: client_fees .into_iter() - .map(|(k, v)| ClientFeesProtobuf { - api_key: k.api_key, - deployment: k.deployment, - fees_grt: v.fees_grt, - fees_usd: v.fees_usd, + .map(|(k, v)| { + let query_count = (v.success_count + v.failure_count).max(1) as f64; + ClientFeesProtobuf { + gateway_id: k.gateway_id, + user_id: v.user_id, + api_key: k.api_key, + deployment: k.deployment, + fees_grt: v.fees_grt, + fees_usd: v.fees_usd, + query_count: v.success_count + v.failure_count, + success_rate: (v.success_count as f64 / query_count) as f32, + avg_response_time_ms: (v.total_response_time_ms / query_count) as u32, + } }) .collect(), } @@ -451,14 +465,19 @@ struct Aggregations { #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] struct ClientFeesKey { + gateway_id: String, api_key: String, deployment: String, } #[derive(Debug, Default)] struct ClientFeesValue { + user_id: String, fees_grt: f64, fees_usd: f64, + success_count: u32, + failure_count: u32, + total_response_time_ms: f64, } #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -473,6 +492,9 @@ pub fn legacy_messages( ) -> (serde_json::Value, Vec) { let address_hex = |bytes| format!("{:?}", Address::from_slice(bytes).unwrap()); let first_indexer_query = client_query.indexer_queries.first(); + let user_id = client_query + .user_id + .unwrap_or("0x0000000000000000000000000000000000000000".into()); let client_request_payload = serde_json::json!({ "gateway_id": address_hex(&client_query.receipt_signer), "query_id": &client_query.query_id, @@ -481,7 +503,7 @@ pub fn legacy_messages( "graph_env": &client_query.gateway_id, "timestamp": timestamp, "api_key": &client_query.api_key, - "user": "0x0000000000000000000000000000000000000000", + "user": &user_id, "deployment": first_indexer_query.map(|i| deployment_cid(&i.deployment)).unwrap_or_default(), "indexed_chain": first_indexer_query.map(|i| i.indexed_chain.clone()).unwrap_or_default(), "network": first_indexer_query.map(|i| i.indexed_chain.clone()).unwrap_or_default(), @@ -526,7 +548,7 @@ pub fn legacy_messages( "graph_env": &client_query.gateway_id, "timestamp": timestamp, "api_key": &client_query.api_key, - "user_address": "0x0000000000000000000000000000000000000000", + "user_address": &user_id, "deployment": deployment_cid(&i.deployment), "network": &i.indexed_chain, "indexed_chain": &i.indexed_chain, diff --git a/src/messages.rs b/src/messages.rs index c79d687..fd29e7a 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -9,6 +9,8 @@ pub struct ClientQueryProtobuf { pub query_id: String, #[prost(string, tag = "4")] pub api_key: String, + #[prost(string, optional, tag = "11")] + pub user_id: Option, #[prost(string, tag = "5")] pub result: String, #[prost(uint32, tag = "6")] @@ -22,6 +24,7 @@ pub struct ClientQueryProtobuf { #[prost(message, repeated, tag = "10")] pub indexer_queries: Vec, } + #[derive(prost::Message)] pub struct IndexerQueryProtobuf { /// 20 bytes (address) @@ -65,13 +68,23 @@ pub struct ClientFeesHourlyProtobuf { #[derive(prost::Message)] pub struct ClientFeesProtobuf { #[prost(string, tag = "1")] - pub api_key: String, + pub gateway_id: String, #[prost(string, tag = "2")] + pub user_id: String, + #[prost(string, tag = "3")] + pub api_key: String, + #[prost(string, tag = "4")] pub deployment: String, - #[prost(double, tag = "3")] + #[prost(double, tag = "5")] pub fees_grt: f64, - #[prost(double, tag = "4")] + #[prost(double, tag = "6")] pub fees_usd: f64, + #[prost(uint32, tag = "7")] + pub query_count: u32, + #[prost(float, tag = "8")] + pub success_rate: f32, + #[prost(uint32, tag = "9")] + pub avg_response_time_ms: u32, } #[derive(prost::Message)]