Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): subscribe to token updates (metadata etc.) #2990

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/torii/grpc/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ enum ComparisonOperator {
}

message Token {
string token_id = 1;
string contract_address = 2;
string name = 3;
string symbol = 4;
Expand Down
8 changes: 8 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ service World {
// Update token balance subscription
rpc UpdateTokenBalancesSubscription (UpdateTokenBalancesSubscriptionRequest) returns (google.protobuf.Empty);

// Subscribe to token updates.
rpc SubscribeTokens (RetrieveTokensRequest) returns (stream SubscribeTokensResponse);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

Expand Down Expand Up @@ -85,6 +88,11 @@ message RetrieveTokensResponse {
repeated types.Token tokens = 1;
}

// A response containing token updates
message SubscribeTokensResponse {
types.Token token = 1;
}

// A request to retrieve token balances
message RetrieveTokenBalancesRequest {
// The account addresses to retrieve balances for
Expand Down
12 changes: 5 additions & 7 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,7 @@ use crate::proto::types::member_value::ValueType;
use crate::proto::types::LogicalOperator;
use crate::proto::world::world_server::WorldServer;
use crate::proto::world::{
RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest,
RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
SubscribeTokenBalancesResponse, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse, SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -1248,6 +1243,8 @@ type RetrieveEntitiesStreamingResponseStream =
Pin<Box<dyn Stream<Item = Result<RetrieveEntitiesStreamingResponse, Status>> + Send>>;
type SubscribeTokenBalancesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokenBalancesResponse, Status>> + Send>>;
type SubscribeTokensResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokensResponse, Status>> + Send>>;

#[tonic::async_trait]
impl proto::world::world_server::World for DojoWorld {
Expand All @@ -1258,7 +1255,8 @@ impl proto::world::world_server::World for DojoWorld {
type SubscribeIndexerStream = SubscribeIndexerResponseStream;
type RetrieveEntitiesStreamingStream = RetrieveEntitiesStreamingResponseStream;
type SubscribeTokenBalancesStream = SubscribeTokenBalancesResponseStream;

type SubscribeTokensStream = SubscribeTokensResponseStream;

async fn world_metadata(
&self,
_request: Request<WorldMetadataRequest>,
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod event_message;
pub mod indexer;
pub mod model_diff;
pub mod token_balance;
pub mod token;

pub(crate) fn match_entity_keys(
id: Felt,
Expand Down
179 changes: 179 additions & 0 deletions crates/torii/grpc/src/server/subscriptions/token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::{Stream, StreamExt};
use rand::Rng;
use starknet_crypto::Felt;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_sqlite::error::{Error, ParseError};
use torii_sqlite::simple_broker::SimpleBroker;
use torii_sqlite::types::Token;
use tracing::{error, trace};

use crate::proto;
use crate::proto::world::{SubscribeTokenBalancesResponse, SubscribeTokensResponse};

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::balance";

#[derive(Debug)]
pub struct TokenSubscriber {
/// Contract addresses that the subscriber is interested in
/// If empty, subscriber receives updates for all contracts
pub contract_addresses: HashSet<Felt>,
/// The channel to send the response back to the subscriber.
pub sender: Sender<Result<SubscribeTokenBalancesResponse, tonic::Status>>,
}

#[derive(Debug, Default)]
pub struct TokenManager {
subscribers: RwLock<HashMap<u64, TokenSubscriber>>,
}

impl TokenManager {
pub async fn add_subscriber(
&self,
contract_addresses: Vec<Felt>,
) -> Result<Receiver<Result<SubscribeTokenBalancesResponse, tonic::Status>>, Error> {
let subscription_id = rand::thread_rng().gen::<u64>();
let (sender, receiver) = channel(1);

// Send initial empty response
let _ = sender
.send(Ok(SubscribeTokenBalancesResponse { subscription_id, balance: None }))
.await;

self.subscribers.write().await.insert(
subscription_id,
TokenSubscriber {
contract_addresses: contract_addresses.into_iter().collect(),
sender,
},
);

Ok(receiver)
}

pub async fn update_subscriber(
&self,
id: u64,
contract_addresses: Vec<Felt>,
) {
let sender = {
let subscribers = self.subscribers.read().await;
if let Some(subscriber) = subscribers.get(&id) {
subscriber.sender.clone()
} else {
return; // Subscriber not found, exit early
}
};

self.subscribers.write().await.insert(
id,
TokenSubscriber {
contract_addresses: contract_addresses.into_iter().collect(),
sender,
},
);
}

pub(super) async fn remove_subscriber(&self, id: u64) {
self.subscribers.write().await.remove(&id);
}
}

#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
simple_broker: Pin<Box<dyn Stream<Item = Token> + Send>>,
balance_sender: UnboundedSender<Token>,
}

impl Service {
pub fn new(subs_manager: Arc<TokenManager>) -> Self {
let (balance_sender, balance_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<Token>::subscribe()),
balance_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, balance_receiver));

service
}

async fn publish_updates(
subs: Arc<TokenManager>,
mut balance_receiver: UnboundedReceiver<Token>,
) {
while let Some(balance) = balance_receiver.recv().await {
if let Err(e) = Self::process_balance_update(&subs, &balance).await {
error!(target = LOG_TARGET, error = %e, "Processing balance update.");
}
}
}

async fn process_balance_update(
subs: &Arc<TokenManager>,
balance: &Token,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();

for (idx, sub) in subs.subscribers.read().await.iter() {
let contract_address =
Felt::from_str(&balance.contract_address).map_err(ParseError::FromStr)?;

// Skip if contract address filter doesn't match
if !sub.contract_addresses.is_empty()
&& !sub.contract_addresses.contains(&contract_address)
{
continue;
}


let resp = SubscribeTokensResponse {
token: Some(proto::types::Token {
id: balance.id.clone(),
contract_address: balance.contract_address.clone(),
name: balance.name.clone(),
symbol: balance.symbol.clone(),
decimals: balance.decimals,
metadata: balance.metadata.clone(),
}),
};

if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
}

for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing balance stream.");
subs.remove_subscriber(id).await
}

Ok(())
}
}

impl Future for Service {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.balance_sender.send(balance) {
error!(target = LOG_TARGET, error = %e, "Sending balance update to processor.");
}
}

Poll::Pending
}
}
Loading