Skip to content

Commit

Permalink
aggregate client fees
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 6, 2024
1 parent ca61e47 commit cd5263f
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 15 deletions.
88 changes: 73 additions & 15 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use serde::Deserialize;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use titorelli::{
kafka::{assign_partitions, fetch_partition_ids, latest_messages},
messages::{ClientQueryProtobuf, IndexerFeesHourlyProtobuf, IndexerFeesProtobuf},
messages::{
ClientFeesHourlyProtobuf, ClientFeesProtobuf, ClientQueryProtobuf,
IndexerFeesHourlyProtobuf, IndexerFeesProtobuf,
},
print_unix_millis,
};
use tokio::{sync::mpsc, task::JoinHandle};
Expand Down Expand Up @@ -182,6 +185,18 @@ async fn handle_source_msg(
data,
} => {
if aggregation_timestamp >= start_timestamp {
let agg = aggregations.entry(aggregation_timestamp).or_default();

if let Some(deployment) = data.indexer_queries.first().map(|i| &i.deployment) {
let key = ClientFeesKey {
api_key: data.api_key.clone(),
deployment: deployment_cid(deployment),
};
let value = agg.client_fees.entry(key).or_default();
value.fees_grt += data.indexer_queries.iter().map(|i| i.fee_grt).sum::<f64>();
value.fees_usd += data.total_fees_usd;
}

for indexer_query in &data.indexer_queries {
if indexer_query.legacy_scalar.unwrap_or(false) {
continue;
Expand All @@ -190,7 +205,6 @@ async fn handle_source_msg(
signer: Address::from_slice(&data.receipt_signer)?,
receiver: Address::from_slice(&indexer_query.indexer)?,
};
let agg = aggregations.entry(aggregation_timestamp).or_default();
*agg.indexer_fees.entry(key).or_default() += indexer_query.fee_grt;
}
}
Expand Down Expand Up @@ -218,7 +232,11 @@ async fn handle_source_msg(
}

async fn latest_sink_timestamp(consumer: &StreamConsumer) -> anyhow::Result<Option<i64>> {
let latest_messages = latest_messages(consumer, &["gateway_indexer_fees_hourly"]).await?;
let latest_messages = latest_messages(
consumer,
&["gateway_client_fees_hourly", "gateway_indexer_fees_hourly"],
)
.await?;
let timestamp = latest_messages
.into_iter()
.map(|msg| -> anyhow::Result<i64> {
Expand Down Expand Up @@ -290,7 +308,34 @@ async fn record_aggregations(
timestamp: i64,
aggregations: Aggregations,
) -> anyhow::Result<()> {
let Aggregations { indexer_fees } = aggregations;
let record_key = timestamp.to_be_bytes();
let Aggregations {
client_fees,
indexer_fees,
} = aggregations;

let record_payload = ClientFeesHourlyProtobuf {
timestamp,
aggregations: client_fees
.into_iter()
.map(|(k, v)| ClientFeesProtobuf {
api_key: k.api_key,
deployment: k.deployment,
fees_grt: v.fees_grt,
fees_usd: v.fees_usd,
})
.collect(),
}
.encode_to_vec();
let record = rdkafka::producer::FutureRecord::to("gateway_client_fees_hourly")
.key(&record_key)
.payload(&record_payload);
producer
.send(record, Duration::from_secs(30))
.await
.map_err(|(err, _)| err)
.context("send aggregation record")?;

let record_payload = IndexerFeesHourlyProtobuf {
timestamp,
aggregations: indexer_fees
Expand All @@ -303,9 +348,8 @@ async fn record_aggregations(
.collect(),
}
.encode_to_vec();
let key = timestamp.to_be_bytes();
let record = rdkafka::producer::FutureRecord::to("gateway_indexer_fees_hourly")
.key(&key)
.key(&record_key)
.payload(&record_payload);
producer
.send(record, Duration::from_secs(30))
Expand Down Expand Up @@ -342,6 +386,13 @@ impl std::fmt::Debug for Address {
}
}

fn deployment_cid(bytes: &[u8]) -> String {
let mut buf = [0_u8; 34];
buf[0..2].copy_from_slice(&[0x12, 0x20]);
buf[2..].copy_from_slice(bytes);
bs58::encode(buf).into_string()
}

#[derive(Debug)]
enum SourceMsg {
Flush {
Expand Down Expand Up @@ -386,10 +437,23 @@ impl SourceMsg {

#[derive(Debug, Default)]
struct Aggregations {
client_fees: BTreeMap<ClientFeesKey, ClientFeesValue>,
indexer_fees: BTreeMap<IndexerFeesKey, f64>,
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct ClientFeesKey {
api_key: String,
deployment: String,
}

#[derive(Debug, Default)]
struct ClientFeesValue {
fees_grt: f64,
fees_usd: f64,
}

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct IndexerFeesKey {
signer: Address,
receiver: Address,
Expand All @@ -400,12 +464,6 @@ pub fn legacy_messages(
client_query: ClientQueryProtobuf,
) -> (serde_json::Value, Vec<serde_json::Value>) {
let address_hex = |bytes| format!("{:?}", Address::from_slice(bytes).unwrap());
let deployment_str = |bytes| {
let mut buf = [0_u8; 34];
buf[0..2].copy_from_slice(&[0x12, 0x20]);
buf[2..].copy_from_slice(bytes);
bs58::encode(buf).into_string()
};
let first_indexer_query = client_query.indexer_queries.first();
let client_request_payload = serde_json::json!({
"gateway_id": address_hex(&client_query.receipt_signer),
Expand All @@ -416,7 +474,7 @@ pub fn legacy_messages(
"timestamp": timestamp,
"api_key": &client_query.api_key,
"user": "0x0000000000000000000000000000000000000000",
"deployment": first_indexer_query.map(|i| deployment_str(&i.deployment)).unwrap_or_default(),
"deployment": first_indexer_query.map(|i| deployment_cid(&i.deployment)).unwrap_or_default(),
"indexed_chain": first_indexer_query.map(|i| i.indexed_chain.clone()).unwrap_or_default(),
"network": first_indexer_query.map(|i| i.indexed_chain.clone()).unwrap_or_default(),
"response_time_ms": client_query.response_time_ms,
Expand Down Expand Up @@ -461,7 +519,7 @@ pub fn legacy_messages(
"timestamp": timestamp,
"api_key": &client_query.api_key,
"user_address": "0x0000000000000000000000000000000000000000",
"deployment": deployment_str(&i.deployment),
"deployment": deployment_cid(&i.deployment),
"network": &i.indexed_chain,
"indexed_chain": &i.indexed_chain,
"indexer": address_hex(&i.indexer),
Expand Down
21 changes: 21 additions & 0 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ pub struct IndexerQueryProtobuf {
pub legacy_scalar: Option<bool>,
}

#[derive(prost::Message)]
pub struct ClientFeesHourlyProtobuf {
/// start timestamp for aggregation, in unix milliseconds
#[prost(int64, tag = "1")]
pub timestamp: i64,
#[prost(message, repeated, tag = "2")]
pub aggregations: Vec<ClientFeesProtobuf>,
}

#[derive(prost::Message)]
pub struct ClientFeesProtobuf {
#[prost(string, tag = "1")]
pub api_key: String,
#[prost(string, tag = "2")]
pub deployment: String,
#[prost(double, tag = "3")]
pub fees_grt: f64,
#[prost(double, tag = "4")]
pub fees_usd: f64,
}

#[derive(prost::Message)]
pub struct IndexerFeesHourlyProtobuf {
/// start timestamp for aggregation, in unix milliseconds
Expand Down

0 comments on commit cd5263f

Please sign in to comment.