diff --git a/crates/node/migrations/20231128120351_transactions-table.sql b/crates/node/migrations/20231128120351_transactions-table.sql index fa9f7599..85ccc0fe 100644 --- a/crates/node/migrations/20231128120351_transactions-table.sql +++ b/crates/node/migrations/20231128120351_transactions-table.sql @@ -12,6 +12,7 @@ DROP TYPE IF EXISTS transaction_kind; CREATE TYPE transaction_kind AS ENUM ('empty','transfer', 'stake', 'unstake', 'deploy', 'run', 'proof', 'proofkey', 'verification', 'cancel'); CREATE TABLE transaction ( + author VARCHAR(130) NOT NULL, hash VARCHAR(64) PRIMARY KEY NOT NULL, kind transaction_kind NOT NULL, nonce NUMERIC NOT NULL, diff --git a/crates/node/migrations/20240121151959_acl_whitelist.sql b/crates/node/migrations/20240121151959_acl_whitelist.sql new file mode 100644 index 00000000..c4ac4867 --- /dev/null +++ b/crates/node/migrations/20240121151959_acl_whitelist.sql @@ -0,0 +1,6 @@ + +DROP TABLE IF EXISTS acl_whitelist; + +CREATE TABLE acl_whitelist ( + key VARCHAR(130) PRIMARY KEY NOT NULL +); diff --git a/crates/node/src/cli.rs b/crates/node/src/cli.rs index 664f59b1..065a7fe7 100644 --- a/crates/node/src/cli.rs +++ b/crates/node/src/cli.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, path::PathBuf}; -use clap::{Args, Parser, Subcommand, ValueEnum}; +use clap::{Args, Parser, Subcommand}; #[derive(Debug, Args)] pub struct Config { @@ -114,28 +114,25 @@ pub struct NodeKeyOptions { pub node_key_file: PathBuf, } -#[derive(Clone, Debug, ValueEnum)] -pub enum ACLTarget { - All, - Txs, - P2p, -} - -#[derive(Debug, Args)] -pub struct PeerACLOptions { - target: ACLTarget, -} - #[derive(Debug, Subcommand)] pub enum PeerCommand { Whitelist { - #[command(flatten)] - whitelist: PeerACLOptions, + #[arg( + long, + long_help = "Database URL", + env = "GEVULOT_DB_URL", + default_value = "postgres://gevulot:gevulot@localhost/gevulot" + )] + db_url: String, }, - Deny { - #[command(flatten)] - deny: PeerACLOptions, + #[arg( + long, + long_help = "Database URL", + env = "GEVULOT_DB_URL", + default_value = "postgres://gevulot:gevulot@localhost/gevulot" + )] + db_url: String, }, } @@ -147,6 +144,18 @@ pub enum GenerateCommand { }, } +#[derive(Debug, Subcommand)] +pub enum ShowCommand { + PublicKey { + #[arg( + long, + long_help = "Key filename", + default_value_os_t = PathBuf::from("/var/lib/gevulot/node.key"), + )] + key_file: PathBuf, + }, +} + #[allow(clippy::large_enum_variant)] #[derive(Debug, Subcommand)] pub enum Command { @@ -180,6 +189,12 @@ pub enum Command { #[command(flatten)] config: Config, }, + + /// Show information. + Show { + #[command(subcommand)] + op: ShowCommand, + }, } #[derive(Debug, Parser)] diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index c3e1e69f..f6d4ebf9 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -13,10 +13,10 @@ use std::{ use asset_manager::AssetManager; use async_trait::async_trait; use clap::Parser; -use cli::{Cli, Command, Config, GenerateCommand, NodeKeyOptions, PeerCommand}; +use cli::{Cli, Command, Config, GenerateCommand, NodeKeyOptions, PeerCommand, ShowCommand}; use eyre::Result; use gevulot_node::types; -use libsecp256k1::SecretKey; +use libsecp256k1::{PublicKey, SecretKey}; use pea2pea::Pea2Pea; use rand::{rngs::StdRng, SeedableRng}; use sqlx::postgres::PgPoolOptions; @@ -38,7 +38,7 @@ mod vmm; mod workflow; use mempool::Mempool; -use storage::Database; +use storage::{database::entity, Database}; fn start_logger(default_level: LevelFilter) { let filter = match EnvFilter::try_from_default_env() { @@ -76,14 +76,27 @@ async fn main() -> Result<()> { sqlx::migrate!().run(&pool).await.map_err(|e| e.into()) } Command::Peer { peer, op } => match op { - PeerCommand::Whitelist { whitelist } => { - todo!("implement peer whitelisting"); + PeerCommand::Whitelist { db_url } => { + let db = storage::Database::new(&db_url).await?; + let key = entity::PublicKey::try_from(peer.as_str())?; + db.acl_whitelist(&key).await } - PeerCommand::Deny { deny } => { - todo!("implement peer denying"); + PeerCommand::Deny { db_url } => { + let db = storage::Database::new(&db_url).await?; + let key = entity::PublicKey::try_from(peer.as_str())?; + db.acl_deny(&key).await } }, Command::Run { config } => run(Arc::new(config)).await, + Command::Show { op } => match op { + ShowCommand::PublicKey { key_file } => { + let bs = std::fs::read(key_file)?; + let key = SecretKey::parse(bs.as_slice().try_into()?)?; + let public_key = PublicKey::from_secret_key(&key); + println!("{}", hex::encode(public_key.serialize())); + Ok(()) + } + }, } } @@ -140,22 +153,20 @@ impl workflow::TransactionStore for storage::Database { } } -struct AuthenticatingTxHandler { +struct P2PTxHandler { mempool: Arc>, database: Arc, } -impl AuthenticatingTxHandler { +impl P2PTxHandler { pub fn new(mempool: Arc>, database: Arc) -> Self { Self { mempool, database } } } #[async_trait::async_trait] -impl networking::p2p::TxHandler for AuthenticatingTxHandler { +impl networking::p2p::TxHandler for P2PTxHandler { async fn recv_tx(&self, tx: Transaction) -> Result<()> { - // TODO: Authenticate tx by signature. - // The transaction was received from P2P network so we can consider it // propagated at this point. let mut tx = tx; @@ -166,6 +177,14 @@ impl networking::p2p::TxHandler for AuthenticatingTxHandler { } } +#[async_trait::async_trait] +impl mempool::AclWhitelist for Database { + async fn contains(&self, key: &PublicKey) -> Result { + let key = entity::PublicKey(*key); + self.acl_whitelist_has(&key).await + } +} + async fn run(config: Arc) -> Result<()> { let database = Arc::new(Database::new(&config.db_url).await?); let file_storage = Arc::new(storage::File::new(&config.data_directory)); @@ -180,10 +199,10 @@ async fn run(config: Arc) -> Result<()> { ); let mempool = Arc::new(RwLock::new( - Mempool::new(database.clone(), Some(p2p.clone())).await?, + Mempool::new(database.clone(), database.clone(), Some(p2p.clone())).await?, )); - p2p.register_tx_handler(Arc::new(AuthenticatingTxHandler::new( + p2p.register_tx_handler(Arc::new(P2PTxHandler::new( mempool.clone(), database.clone(), ))) diff --git a/crates/node/src/mempool/mod.rs b/crates/node/src/mempool/mod.rs index e8cf46f9..e8ff0e41 100644 --- a/crates/node/src/mempool/mod.rs +++ b/crates/node/src/mempool/mod.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; use bytes::Bytes; use eyre::Result; +use libsecp256k1::PublicKey; use pea2pea::protocols::Writing; use std::collections::VecDeque; use std::sync::Arc; +use thiserror::Error; use tokio::sync::RwLock; use crate::{ @@ -18,9 +20,22 @@ pub trait Storage: Send + Sync { async fn fill_deque(&self, deque: &mut VecDeque) -> Result<()>; } +#[async_trait] +pub trait AclWhitelist: Send + Sync { + async fn contains(&self, key: &PublicKey) -> Result; +} + +#[allow(clippy::enum_variant_names)] +#[derive(Error, Debug)] +pub enum MempoolError { + #[error("permission denied")] + PermissionDenied, +} + #[derive(Clone)] pub struct Mempool { storage: Arc, + acl_whitelist: Arc, // TODO: This should be refactored to PubSub channel abstraction later on. tx_chan: Option>, deque: VecDeque, @@ -29,6 +44,7 @@ pub struct Mempool { impl Mempool { pub async fn new( storage: Arc, + acl_whitelist: Arc, tx_chan: Option>, ) -> Result { let mut deque = VecDeque::new(); @@ -36,6 +52,7 @@ impl Mempool { Ok(Self { storage, + acl_whitelist, tx_chan, deque, }) @@ -54,6 +71,11 @@ impl Mempool { // First validate transaction. tx.validate()?; + // Secondly verify that author is whitelisted. + if !self.acl_whitelist.contains(&tx.author).await? { + return Err(MempoolError::PermissionDenied.into()); + } + let mut tx = tx; self.storage.set(&tx).await?; diff --git a/crates/node/src/networking/p2p.rs b/crates/node/src/networking/p2p.rs index 797e8238..11edaa2d 100644 --- a/crates/node/src/networking/p2p.rs +++ b/crates/node/src/networking/p2p.rs @@ -177,8 +177,6 @@ mod tests { #[tokio::test] async fn test_two_peers() { - start_logger(LevelFilter::ERROR); - let (tx1, mut rx1) = mpsc::channel(1); let (tx2, mut rx2) = mpsc::channel(1); let (sink1, sink2) = ( diff --git a/crates/node/src/rpc_server/mod.rs b/crates/node/src/rpc_server/mod.rs index 9716b962..6cbc8c53 100644 --- a/crates/node/src/rpc_server/mod.rs +++ b/crates/node/src/rpc_server/mod.rs @@ -73,8 +73,6 @@ impl RpcServer { async fn send_transaction(params: Params<'static>, ctx: Arc) -> RpcResponse<()> { tracing::info!("JSON-RPC: send_transaction()"); - dbg!(¶ms); - // Real logic let tx: Transaction = match params.one() { Ok(tx) => tx, @@ -138,10 +136,21 @@ mod tests { core::{client::ClientT, params::ArrayParams}, http_client::HttpClientBuilder, }; + use libsecp256k1::{PublicKey, SecretKey}; use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan, EnvFilter}; + use crate::mempool; + use super::*; + struct AlwaysGrantAclWhitelist; + #[async_trait::async_trait] + impl mempool::AclWhitelist for AlwaysGrantAclWhitelist { + async fn contains(&self, key: &PublicKey) -> Result { + Ok(true) + } + } + #[ignore] #[tokio::test] async fn test_send_transaction() { @@ -153,12 +162,15 @@ mod tests { .build(url) .expect("http client"); - let tx = Transaction::default(); + let key = SecretKey::default(); + let mut tx = Transaction::default(); + tx.sign(&key); + let mut params = ArrayParams::new(); params.insert(&tx).expect("rpc params"); let resp = rpc_client - .request::("sendTransaction", params) + .request::, ArrayParams>("sendTransaction", params) .await .expect("rpc request"); @@ -196,7 +208,11 @@ mod tests { }); let db = Arc::new(Database::new(&cfg.db_url).await.unwrap()); - let mempool = Arc::new(RwLock::new(Mempool::new(db.clone(), None).await.unwrap())); + let mempool = Arc::new(RwLock::new( + Mempool::new(db.clone(), Arc::new(AlwaysGrantAclWhitelist {}), None) + .await + .unwrap(), + )); let asset_manager = Arc::new(AssetManager::new(cfg.clone(), db.clone())); RpcServer::run(cfg.clone(), db.clone(), mempool, asset_manager) diff --git a/crates/node/src/storage/database/postgres.rs b/crates/node/src/storage/database/postgres.rs index 50ab9891..0940d63e 100644 --- a/crates/node/src/storage/database/postgres.rs +++ b/crates/node/src/storage/database/postgres.rs @@ -375,7 +375,8 @@ impl Database { let mut db_tx = self.pool.begin().await?; sqlx::query( - "INSERT INTO transaction ( hash, kind, nonce, signature, propagated ) VALUES ( $1, $2, $3, $4, $5 ) ON CONFLICT (hash) DO UPDATE SET propagated = $5") + "INSERT INTO transaction ( author, hash, kind, nonce, signature, propagated ) VALUES ( $1, $2, $3, $4, $5, $6 ) ON CONFLICT (hash) DO UPDATE SET propagated = $6") + .bind(entity.author) .bind(entity.hash) .bind(entity.kind) .bind(entity.nonce) @@ -498,6 +499,32 @@ impl Database { db_tx.commit().await.map_err(|e| e.into()) } + pub async fn acl_whitelist_has(&self, key: &entity::PublicKey) -> Result { + let res: Option = sqlx::query("SELECT 1 FROM acl_whitelist WHERE key = $1") + .bind(key) + .map(|row: sqlx::postgres::PgRow| row.get(0)) + .fetch_optional(&self.pool) + .await?; + + Ok(res.is_some()) + } + + pub async fn acl_whitelist(&self, key: &entity::PublicKey) -> Result<()> { + sqlx::query("INSERT INTO acl_whitelist ( key ) VALUES ( $1 ) ON CONFLICT (key) DO NOTHING") + .bind(key) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn acl_deny(&self, key: &entity::PublicKey) -> Result<()> { + sqlx::query("DELETE FROM acl_whitelist WHERE key = $1") + .bind(key) + .execute(&self.pool) + .await?; + Ok(()) + } + // Delete is mainly for test cases. async fn delete_transaction(&self, tx_hash: &Hash) -> Result<()> { let mut db_tx = self.pool.begin().await?; diff --git a/crates/node/src/storage/mod.rs b/crates/node/src/storage/mod.rs index df430083..af53df79 100644 --- a/crates/node/src/storage/mod.rs +++ b/crates/node/src/storage/mod.rs @@ -1,4 +1,4 @@ -mod database; +pub(crate) mod database; mod file; pub use database::postgres::Database;