Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin price #67

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions docs/discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,29 @@ In the diagram below, keep in mind that it is possible for IPFS files (schema fi

```mermaid
graph LR
I[Indexer] -->|post schema| IPFS[IPFS Gateway]
I -->|post schema <br> manage availability| I
C[Client] -.->|schema| IPFS
C -->|determine Bundle hash| C
C -.->|schema| I
C -.->|availability| I
C -->|paid query| I
I[Indexer] -->|1. register| H[File service contract/subgraph]
I -->|2. publish schema| IPFS[IPFS Gateway]
C[Client] -.->|3. get URLs| H
C -.->|4. query file status| I
C -.->|5. query schema| IPFS
C -.->|7. paid request| I
I -->|8. verify payment, respond| C
C -->|9. verify content| C
```
With an explorer, we can imagine a future discovery user interface along the lines of
With an explorer/gateway, we can imagine a future discovery user interface along the lines of

```mermaid
graph LR
I[Indexer] -->|manage availability| I
I[Indexer] -->|post schema| IPFS[IPFS Gateway]
E[Explorer] -.->|availability| I
E -.->|query scehma| IPFS
C[Client] -.->|select Bundle| E
C -->|authorize| E
E -->|paid query| I
E -->|respond| C
I[Indexer] -->|1. register| H[File service contract/subgraph]
I -->|2. publish schema| IPFS[IPFS Gateway]
E[Explorer] -.->|3. get URLs| H
E[Explorer] -.->|4. query file status| I
E -.->|5. query schema| IPFS
E -.->|6. human-readable view| E
C[Client] -.->|7. request service| E
E -->|8. paid request| I
I -->|9. verify payment, respond| E
E -->|10. verify content, respond| C
```

## On-chain approach (alternative)
Expand Down
18 changes: 18 additions & 0 deletions docs/onchain_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,21 @@ You can expect logs as follows
INFO file_exchange: Transaction result, result: Ok((0xe37b9ee6d657ab5700e8a964a8fcc8b39cdefd73, Some(TransactionReceipt { transaction_hash: 0xd5c7c4d3dbd4aa8f845f87f8225aef91e927fe7cd5a1cd02085b0d30a59f4743, transaction_index: 1, block_hash: Some(0xcb46a88b2a37648a38165ca3740248b9a2a41e01f3b56f65f59b33f5cbf00fd0), block_number: Some(5738566), from: 0xe9a1cabd57700b17945fd81feefba82340d9568f, to: Some(0x865365c425f3a593ffe698d9c4e6707d14d51e08), cumulative_gas_used: 345329, gas_used: Some(345329), contract_address: None, logs: [...], status: Some(1), root: None, logs_bloom: ..., transaction_type: Some(2), effective_gas_price: Some(100000000), other: OtherFields { inner: {"gasUsedForL1": String("0x28a70"), "l1BlockNumber": String("0x4d09a3")} } })))
at file-exchange/src/main.rs:88
```

### Payments

A generic digram with direct payments between provider and consumer

```mermaid

graph LR
C[Client] -->|1. approve GRT spending| G[TheGraph Token contract]
C -->|2. deposit Escrow accounts| E[Escrow Contract]
C -->|3. send receitps| I[Indexer/receiver]
I -.->|4. verify receipts| E
I -.->|5. aggregate voucher| E
I -->|6. redeem payment| E

```

Later, we can add a gateway for payment abstraction.
187 changes: 181 additions & 6 deletions file-service/src/admin.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_graphql::{Context, EmptySubscription, Object, Schema};
use async_graphql::{Context, EmptySubscription, MergedObject, Object, Schema};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::{extract::State, routing::get, Router, Server};
use http::HeaderMap;
use tokio::sync::Mutex;

use crate::file_server::status::{GraphQlBundle, StatusQuery};
use crate::file_server::util::graphql_playground;
use crate::file_server::{FileServiceError, ServerContext};
use crate::file_server::{
cost::{GraphQlCostModel, PriceQuery},
status::{GraphQlBundle, StatusQuery},
util::graphql_playground,
FileServiceError, ServerContext,
};
use file_exchange::{
errors::{Error, ServerError},
manifest::{
Expand All @@ -21,6 +24,7 @@ use file_exchange::{
pub struct AdminState {
pub client: IpfsClient,
pub bundles: Arc<Mutex<HashMap<String, LocalBundle>>>,
pub prices: Arc<Mutex<HashMap<String, f64>>>,
pub admin_auth_token: Option<String>,
pub admin_schema: AdminSchema,
}
Expand All @@ -36,10 +40,21 @@ impl AdminContext {
}
}

pub type AdminSchema = Schema<StatusQuery, StatusMutation, EmptySubscription>;
#[derive(MergedObject, Default)]
pub struct MergedQuery(StatusQuery, PriceQuery);

#[derive(MergedObject, Default)]
pub struct MergedMutation(StatusMutation, PriceMutation);

pub type AdminSchema = Schema<MergedQuery, MergedMutation, EmptySubscription>;

pub async fn build_schema() -> AdminSchema {
Schema::build(StatusQuery, StatusMutation, EmptySubscription).finish()
Schema::build(
MergedQuery(StatusQuery, PriceQuery),
MergedMutation(StatusMutation, PriceMutation),
EmptySubscription,
)
.finish()
}

fn get_token_from_headers(headers: &HeaderMap) -> Option<String> {
Expand Down Expand Up @@ -68,6 +83,7 @@ pub fn serve_admin(context: ServerContext) {
AdminState {
client: context.state.client.clone(),
bundles: context.state.bundles.clone(),
prices: context.state.prices.clone(),
admin_auth_token: context.state.admin_auth_token.clone(),
admin_schema: build_schema().await,
}
Expand Down Expand Up @@ -267,3 +283,162 @@ impl StatusMutation {
removed_bundles
}
}

#[derive(Default)]
pub struct PriceMutation;

#[Object]
impl PriceMutation {
// Set price for a deployment
async fn set_price(
&self,
ctx: &Context<'_>,
deployment: String,
price_per_byte: f64,
) -> Result<GraphQlCostModel, anyhow::Error> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(anyhow::anyhow!(format!(
"Failed to authenticate: {:#?} (admin: {:#?}",
ctx.data_opt::<String>(),
ctx.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
)));
}

ctx.data_unchecked::<AdminContext>()
.state
.prices
.lock()
.await
.insert(deployment.clone(), price_per_byte);

Ok(GraphQlCostModel {
deployment,
price_per_byte,
})
}

// Add multiple bundles
async fn set_prices(
&self,
ctx: &Context<'_>,
deployments: Vec<String>,
prices: Vec<f64>,
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(anyhow::anyhow!("Failed to authenticate"));
}
let price_ref = ctx.data_unchecked::<AdminContext>().state.prices.clone();
let prices = deployments
.iter()
.zip(prices)
.map(|(deployment, price)| {
let price_ref = price_ref.clone();

async move {
price_ref
.clone()
.lock()
.await
.insert(deployment.clone(), price);

Ok::<_, anyhow::Error>(GraphQlCostModel {
deployment: deployment.to_string(),
price_per_byte: price,
})
}
})
.collect::<Vec<_>>();

// Since collect() gathers futures, we need to resolve them. You can use `try_join_all` for this.
let resolved_prices: Result<Vec<GraphQlCostModel>, _> =
futures::future::try_join_all(prices).await;

Ok(resolved_prices.unwrap_or_default())
}

async fn remove_price(
&self,
ctx: &Context<'_>,
deployment: String,
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(anyhow::anyhow!("Failed to authenticate"));
}

let bundle = ctx
.data_unchecked::<AdminContext>()
.state
.prices
.lock()
.await
.remove(&deployment)
.map(|price| GraphQlCostModel {
deployment,
price_per_byte: price,
});

Ok(bundle)
}

async fn remove_prices(
&self,
ctx: &Context<'_>,
deployments: Vec<String>,
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
if ctx.data_opt::<String>()
!= ctx
.data_unchecked::<AdminContext>()
.state
.admin_auth_token
.as_ref()
{
return Err(anyhow::anyhow!("Failed to authenticate"));
}

let prices = deployments
.iter()
.map(|deployment| async move {
ctx.data_unchecked::<AdminContext>()
.state
.prices
.lock()
.await
.remove(deployment)
.map(|price| GraphQlCostModel {
deployment: deployment.to_string(),
price_per_byte: price,
})
.ok_or(anyhow::anyhow!(format!(
"Deployment not found: {}",
deployment
)))
})
.collect::<Vec<_>>();

let removed_prices: Result<Vec<GraphQlCostModel>, _> =
futures::future::try_join_all(prices).await;

removed_prices
}
}
8 changes: 4 additions & 4 deletions file-service/src/file_server/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ pub struct GraphQlCostModel {
}

#[derive(Default)]
pub struct Query;
pub struct PriceQuery;

#[Object]
impl Query {
impl PriceQuery {
/// Provide an array of cost model to the queried deployment whether it is served or not
async fn cost_models(
&self,
Expand Down Expand Up @@ -70,10 +70,10 @@ impl Query {
}
}

pub type CostSchema = Schema<Query, EmptyMutation, EmptySubscription>;
pub type CostSchema = Schema<PriceQuery, EmptyMutation, EmptySubscription>;

pub async fn build_schema() -> CostSchema {
Schema::build(Query, EmptyMutation, EmptySubscription).finish()
Schema::build(PriceQuery, EmptyMutation, EmptySubscription).finish()
}

pub async fn cost(State(context): State<ServerContext>, req: GraphQLRequest) -> GraphQLResponse {
Expand Down
4 changes: 3 additions & 1 deletion file-service/src/file_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub struct ServerState {
pub client: IpfsClient,
pub operator_public_key: String,
pub bundles: Arc<Mutex<HashMap<String, LocalBundle>>>, // Keyed by IPFS hash, valued by Bundle and Local path
pub admin_auth_token: Option<String>, // Add bearer prefix
pub prices: Arc<Mutex<HashMap<String, f64>>>, // Keyed by IPFS hash, valued by price per byte
pub admin_auth_token: Option<String>, // Add bearer prefix
pub config: Config,
pub database: PgPool,
pub cost_schema: crate::file_server::cost::CostSchema,
Expand Down Expand Up @@ -116,6 +117,7 @@ pub async fn initialize_server_context(config: Config) -> Result<ServerContext,
config: config.clone(),
client: client.clone(),
bundles: Arc::new(Mutex::new(HashMap::new())),
prices: Arc::new(Mutex::new(HashMap::new())),
admin_auth_token,
operator_public_key: public_key(&config.common.indexer.operator_mnemonic)
.expect("Failed to initiate with operator wallet"),
Expand Down
Loading