From 4f8e915d04608e8e827483bc777a5870b1291e9d Mon Sep 17 00:00:00 2001 From: rem1-dev <150667999+rem1-dev@users.noreply.github.com> Date: Tue, 13 Feb 2024 09:53:49 +0100 Subject: [PATCH] Add optional structured logs of RPC related events --- src/config.rs | 36 ++++++++++++++++++++++++++ src/electrum/server.rs | 58 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index cec2b3f0d..d3529e9e5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,6 +39,7 @@ pub struct Config { pub utxos_limit: usize, pub electrum_txs_limit: usize, pub electrum_banner: String, + pub rpc_logging: Option, #[cfg(feature = "liquid")] pub parent_network: BNetwork, @@ -65,6 +66,10 @@ fn str_to_socketaddr(address: &str, what: &str) -> SocketAddr { impl Config { pub fn from_args() -> Config { let network_help = format!("Select network type ({})", Network::names().join(", ")); + let rpc_logging_help = format!( + "Select RPC logging option ({})", + RpcLogging::options().join(", ") + ); let args = App::new("Electrum Rust Server") .version(crate_version!()) @@ -181,6 +186,11 @@ impl Config { .long("electrum-banner") .help("Welcome banner for the Electrum server, shown in the console to clients.") .takes_value(true) + ).arg( + Arg::with_name("rpc_logging") + .long("rpc-logging") + .help(&rpc_logging_help) + .takes_value(true), ); #[cfg(unix)] @@ -381,6 +391,9 @@ impl Config { electrum_rpc_addr, electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize), electrum_banner, + rpc_logging: m + .value_of("rpc_logging") + .map(|option| RpcLogging::from(option)), http_addr, http_socket_file, monitoring_addr, @@ -420,6 +433,29 @@ impl Config { } } +#[derive(Debug, Clone)] +pub enum RpcLogging { + Full, + NoParams, +} + +impl RpcLogging { + pub fn options() -> Vec { + return vec!["full".to_string(), "no-params".to_string()]; + } +} + +impl From<&str> for RpcLogging { + fn from(option: &str) -> Self { + match option { + "full" => RpcLogging::Full, + "no-params" => RpcLogging::NoParams, + + _ => panic!("unsupported RPC logging option: {:?}", option), + } + } +} + pub fn get_network_subdir(network: Network) -> Option<&'static str> { match network { #[cfg(not(feature = "liquid"))] diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 939893b75..8bb355aad 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -18,7 +18,7 @@ use bitcoin::consensus::encode::serialize_hex; use elements::encode::serialize_hex; use crate::chain::Txid; -use crate::config::Config; +use crate::config::{Config, RpcLogging}; use crate::electrum::{get_electrum_height, ProtocolVersion}; use crate::errors::*; use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; @@ -101,6 +101,7 @@ struct Connection { txs_limit: usize, #[cfg(feature = "electrum-discovery")] discovery: Option>, + rpc_logging: Option, } impl Connection { @@ -111,6 +112,7 @@ impl Connection { stats: Arc, txs_limit: usize, #[cfg(feature = "electrum-discovery")] discovery: Option>, + rpc_logging: Option, ) -> Connection { Connection { query, @@ -123,6 +125,7 @@ impl Connection { txs_limit, #[cfg(feature = "electrum-discovery")] discovery, + rpc_logging, } } @@ -490,6 +493,27 @@ impl Connection { Ok(result) } + fn log_rpc_event(&self, entries: &Vec<(&str, Value)>) { + if let Some(_) = self.rpc_logging { + let mut log = json!({}); + + if let Some(log_map) = log.as_object_mut() { + entries.into_iter().for_each(|e| { + log_map.insert(e.0.to_string(), e.1.clone()); + }); + log_map.insert( + "source".to_string(), + json!({ + "ip": self.addr.ip().to_string(), + "port": self.addr.port(), + }), + ); + } + + info!("{}", log); + } + } + fn send_values(&mut self, values: &[Value]) -> Result<()> { for value in values { let line = value.to_string() + "\n"; @@ -507,6 +531,7 @@ impl Connection { trace!("RPC {:?}", msg); match msg { Message::Request(line) => { + let method_info: String; let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?; let reply = match ( cmd.get("method"), @@ -517,9 +542,30 @@ impl Connection { Some(&Value::String(ref method)), &Value::Array(ref params), Some(ref id), - ) => self.handle_command(method, params, id)?, + ) => { + let mut log_entries = + vec![("event", json!("rpc request")), ("method", json!(method))]; + + if let Some(RpcLogging::Full) = self.rpc_logging { + log_entries.push(("params", json!(params))); + } + + self.log_rpc_event(&log_entries); + method_info = method.clone(); + + self.handle_command(method, params, id)? + } _ => bail!("invalid command: {}", cmd), }; + + let line = reply.to_string() + "\n"; + + self.log_rpc_event(&vec![ + ("event", json!("rpc response")), + ("payload_size", json!(line.as_bytes().len())), + ("method", json!(method_info)), + ]); + self.send_values(&[reply])? } Message::PeriodicUpdate => { @@ -563,6 +609,9 @@ impl Connection { pub fn run(mut self) { self.stats.clients.inc(); + + self.log_rpc_event(&vec![("event", json!("connection established"))]); + let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); let tx = self.chan.sender(); let child = spawn_thread("reader", || Connection::handle_requests(reader, tx)); @@ -579,6 +628,9 @@ impl Connection { .sub(self.status_hashes.len() as i64); debug!("[{}] shutting down connection", self.addr); + + self.log_rpc_event(&vec![("event", json!("connection closed"))]); + let _ = self.stream.shutdown(Shutdown::Both); if let Err(err) = child.join().expect("receiver panicked") { error!("[{}] receiver failed: {}", self.addr, err); @@ -741,6 +793,7 @@ impl RPC { let garbage_sender = garbage_sender.clone(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); + let rpc_logging = config.rpc_logging.clone(); let spawned = spawn_thread("peer", move || { info!("[{}] connected peer", addr); @@ -752,6 +805,7 @@ impl RPC { txs_limit, #[cfg(feature = "electrum-discovery")] discovery, + rpc_logging, ); senders.lock().unwrap().push(conn.chan.sender()); conn.run();