From 952c0db86db9f998d8c8803d0021238094d858cc Mon Sep 17 00:00:00 2001 From: dev0 Date: Fri, 25 Aug 2023 02:37:44 +1000 Subject: [PATCH] add logs api --- .gitignore | 3 ++ Cargo.Bazel.lock | 21 ++++++++++-- Cargo.lock | 4 +++ README.md | 5 +-- clash_lib/Cargo.toml | 3 +- clash_lib/src/app/api/handlers/hello.rs | 9 +++++ clash_lib/src/app/api/handlers/log.rs | 30 +++++++++++++++++ clash_lib/src/app/api/handlers/mod.rs | 2 ++ clash_lib/src/app/api/mod.rs | 22 ++++++++++--- clash_lib/src/app/dns/mod.rs | 1 + clash_lib/src/app/dns/resolver.rs | 15 ++++++--- clash_lib/src/app/dns/system.rs | 44 +++++++++++++++++++++++++ clash_lib/src/app/logging.rs | 37 ++++++++++++++++++++- clash_lib/src/config/def.rs | 3 ++ clash_lib/src/config/internal/proxy.rs | 1 + clash_lib/src/lib.rs | 13 +++++--- clash_lib/src/proxy/relay/mod.rs | 8 ----- scripts/logs.py | 11 +++++++ scripts/requirements.txt | 3 +- 19 files changed, 207 insertions(+), 28 deletions(-) create mode 100644 clash_lib/src/app/api/handlers/hello.rs create mode 100644 clash_lib/src/app/api/handlers/log.rs create mode 100644 clash_lib/src/app/api/handlers/mod.rs create mode 100644 clash_lib/src/app/dns/system.rs create mode 100644 scripts/logs.py diff --git a/.gitignore b/.gitignore index 1d83a004..a8f0121a 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ venv/ /bazel-* # Ignore outputs generated during Bazel bootstrapping. /output/ + +# don't check in this real config +ignore.yaml diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index e9fded9f..e90f860e 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "4c7d42924b2314ce069523e5915170a57d8303ede8bae1fcc0f5cbcfcb256e1a", + "checksum": "eb54538b6c466464ff68b5bebe68dc857d719e9b327fcc10cc20a4e90d556649", "crates": { "addr2line 0.20.0": { "name": "addr2line", @@ -1294,7 +1294,8 @@ "original-uri", "query", "tokio", - "tower-log" + "tower-log", + "ws" ], "selects": {} }, @@ -1308,6 +1309,10 @@ "id": "axum-core 0.3.4", "target": "axum_core" }, + { + "id": "base64 0.21.2", + "target": "base64" + }, { "id": "bitflags 1.3.2", "target": "bitflags" @@ -1372,6 +1377,10 @@ "id": "serde_urlencoded 0.7.1", "target": "serde_urlencoded" }, + { + "id": "sha1 0.10.5", + "target": "sha1" + }, { "id": "sync_wrapper 0.1.2", "target": "sync_wrapper" @@ -1380,6 +1389,10 @@ "id": "tokio 1.29.1", "target": "tokio" }, + { + "id": "tokio-tungstenite 0.20.0", + "target": "tokio_tungstenite" + }, { "id": "tower 0.4.13", "target": "tower" @@ -3431,6 +3444,10 @@ "id": "trust-dns-proto 0.22.0", "target": "trust_dns_proto" }, + { + "id": "trust-dns-resolver 0.22.0", + "target": "trust_dns_resolver" + }, { "id": "url 2.4.0", "target": "url" diff --git a/Cargo.lock b/Cargo.lock index ca4023a0..4eec7bf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -260,6 +260,7 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", + "base64 0.21.2", "bitflags 1.3.2", "bytes", "futures-util", @@ -277,8 +278,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -668,6 +671,7 @@ dependencies = [ "tracing-subscriber", "trust-dns-client", "trust-dns-proto 0.22.0", + "trust-dns-resolver", "url", "uuid", "webpki-roots", diff --git a/README.md b/README.md index a921d6e3..f154c0a2 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ Just a toy for fun, don't use please :+| TODOs -- [ ] proxy rules (relay, select, ...) +- [x] proxy rules (relay, select, ...) - [ ] dashboard - [ ] DNS server -- [ ] trojan proto \ No newline at end of file +- [ ] trojan proto +- [ ] hc and fetchers \ No newline at end of file diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index 983a9f7c..28e3da0d 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -45,13 +45,14 @@ md-5 = "0.10.5" chacha20poly1305 = "0.10" aes-gcm = "0.10" filetime = "0.2" -axum = "0.6.20" +axum = { version = "0.6.20", features = ["ws"] } serde = { version = "1.0", features=["derive"] } serde_yaml = "0.9" trust-dns-client = "0.22" +trust-dns-resolver = "0.22.0" trust-dns-proto = { version = "0.22", features = ["dns-over-rustls", "dns-over-https-rustls"]} # DoH rustls = { version = "0.20", features=["dangerous_configuration"] } diff --git a/clash_lib/src/app/api/handlers/hello.rs b/clash_lib/src/app/api/handlers/hello.rs new file mode 100644 index 00000000..52eb71c0 --- /dev/null +++ b/clash_lib/src/app/api/handlers/hello.rs @@ -0,0 +1,9 @@ +use std::collections::HashMap; + +use axum::response::IntoResponse; + +pub async fn handle() -> axum::response::Response { + let mut val = HashMap::new(); + val.insert("hello".to_owned(), "clash-rs".to_owned()); + axum::response::Json(val).into_response() +} diff --git a/clash_lib/src/app/api/handlers/log.rs b/clash_lib/src/app/api/handlers/log.rs new file mode 100644 index 00000000..8851ed32 --- /dev/null +++ b/clash_lib/src/app/api/handlers/log.rs @@ -0,0 +1,30 @@ +use std::{net::SocketAddr, sync::Arc}; + +use axum::{ + extract::{ws::Message, ConnectInfo, State, WebSocketUpgrade}, + response::IntoResponse, +}; + +use tracing::{debug, warn}; + +use crate::app::api::AppState; + +pub async fn handle( + ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo, + State(state): State>, +) -> impl IntoResponse { + debug!("ws connect from {}", addr); + ws.on_failed_upgrade(|e| { + warn!("ws upgrade error: {}", e); + }) + .on_upgrade(move |mut socket| async move { + let mut rx = state.log_source_tx.subscribe(); + while let Ok(msg) = rx.recv().await { + if let Err(e) = socket.send(Message::Text(msg)).await { + warn!("ws send error: {}", e); + break; + } + } + }) +} diff --git a/clash_lib/src/app/api/handlers/mod.rs b/clash_lib/src/app/api/handlers/mod.rs new file mode 100644 index 00000000..1d9422cb --- /dev/null +++ b/clash_lib/src/app/api/handlers/mod.rs @@ -0,0 +1,2 @@ +pub mod hello; +pub mod log; diff --git a/clash_lib/src/app/api/mod.rs b/clash_lib/src/app/api/mod.rs index 4516c61c..fa0f025a 100644 --- a/clash_lib/src/app/api/mod.rs +++ b/clash_lib/src/app/api/mod.rs @@ -1,20 +1,32 @@ +use std::{net::SocketAddr, sync::Arc}; + use axum::{routing::get, Router}; + +use tokio::sync::broadcast::Sender; use tracing::info; use crate::{config::internal::config::Controller, Runner}; -async fn root() -> &'static str { - "Hello, World!" +mod handlers; + +pub struct AppState { + log_source_tx: Sender, } -pub fn get_api_runner(controller_cfg: Controller) -> Option { +pub fn get_api_runner(controller_cfg: Controller, log_source: Sender) -> Option { if let Some(bind_addr) = controller_cfg.external_controller { + let app_state = AppState { + log_source_tx: log_source, + }; let addr = bind_addr.parse().unwrap(); let runner = async move { info!("Starting API server at {}", addr); - let app = Router::new().route("/", get(root)); + let app = Router::new() + .route("/", get(handlers::hello::handle)) + .route("/logs", get(handlers::log::handle)) + .with_state(Arc::new(app_state)); axum::Server::bind(&addr) - .serve(app.into_make_service()) + .serve(app.into_make_service_with_connect_info::()) .await .unwrap(); }; diff --git a/clash_lib/src/app/dns/mod.rs b/clash_lib/src/app/dns/mod.rs index f4a97eb4..ba42aefb 100644 --- a/clash_lib/src/app/dns/mod.rs +++ b/clash_lib/src/app/dns/mod.rs @@ -14,6 +14,7 @@ mod fakeip; mod filters; mod helper; pub mod resolver; +mod system; use crate::dns::dns_client::DNSNetMode; diff --git a/clash_lib/src/app/dns/resolver.rs b/clash_lib/src/app/dns/resolver.rs index 0e5eab46..aacf540e 100644 --- a/clash_lib/src/app/dns/resolver.rs +++ b/clash_lib/src/app/dns/resolver.rs @@ -11,15 +11,16 @@ use trust_dns_proto::{op, rr}; #[cfg(test)] use mockall::automock; +use crate::app::ThreadSafeDNSResolver; use crate::dns::helper::make_clients; use crate::dns::ThreadSafeDNSClient; use crate::{common::trie, Error}; +use super::system::SystemResolver; use super::{ filters::{DomainFilter, FallbackDomainFilter, FallbackIPFilter, GeoIPFilter, IPNetFilter}, Config, }; -use super::{DNSNetMode, NameServer}; static TTL: Duration = Duration::from_secs(60); @@ -267,7 +268,10 @@ impl ClashResolver for Resolver { impl Resolver { /// For testing purpose + #[cfg(test)] pub async fn new_default() -> Self { + use super::{DNSNetMode, NameServer}; + Resolver { ipv6: false, hosts: None, @@ -288,7 +292,11 @@ impl Resolver { } } - pub async fn new(cfg: Config) -> Self { + pub async fn new(cfg: Config) -> ThreadSafeDNSResolver { + if !cfg.enable { + return Arc::new(SystemResolver::new().expect("failed to create system resolver")); + } + let default_resolver = Arc::new(Resolver { ipv6: false, hosts: None, @@ -356,7 +364,7 @@ impl Resolver { }, }; - r + Arc::new(r) } pub async fn batch_exchange( @@ -365,7 +373,6 @@ impl Resolver { ) -> anyhow::Result { let mut queries = Vec::new(); for c in clients { - // TODO: how to use .map() queries.push( async move { c.lock() diff --git a/clash_lib/src/app/dns/system.rs b/clash_lib/src/app/dns/system.rs new file mode 100644 index 00000000..deeece60 --- /dev/null +++ b/clash_lib/src/app/dns/system.rs @@ -0,0 +1,44 @@ +use async_trait::async_trait; +use rand::seq::IteratorRandom; +use trust_dns_resolver::TokioAsyncResolver; + +use super::ClashResolver; + +pub(crate) struct SystemResolver { + resolver: TokioAsyncResolver, +} + +impl SystemResolver { + pub fn new() -> anyhow::Result { + let resolver = TokioAsyncResolver::tokio_from_system_conf()?; + Ok(Self { resolver }) + } +} + +#[async_trait] +impl ClashResolver for SystemResolver { + async fn resolve(&self, host: &str) -> anyhow::Result> { + let response = self.resolver.lookup_ip(host).await?; + Ok(response.iter().choose(&mut rand::thread_rng())) + } + async fn resolve_v4(&self, host: &str) -> anyhow::Result> { + let response = self.resolver.lookup_ip(host).await?; + Ok(response + .iter() + .filter_map(|ip| match ip { + std::net::IpAddr::V4(ip) => Some(ip), + _ => None, + }) + .choose(&mut rand::thread_rng())) + } + async fn resolve_v6(&self, host: &str) -> anyhow::Result> { + let response = self.resolver.lookup_ip(host).await?; + Ok(response + .iter() + .filter_map(|ip| match ip { + std::net::IpAddr::V6(ip) => Some(ip), + _ => None, + }) + .choose(&mut rand::thread_rng())) + } +} diff --git a/clash_lib/src/app/logging.rs b/clash_lib/src/app/logging.rs index f2c9e405..bcfa9568 100644 --- a/clash_lib/src/app/logging.rs +++ b/clash_lib/src/app/logging.rs @@ -1,6 +1,9 @@ use crate::def::LogLevel; +use tokio::sync::broadcast::Sender; +use tracing::debug; use tracing_subscriber::filter::Targets; use tracing_subscriber::prelude::*; +use tracing_subscriber::Layer; use tracing_subscriber::{filter, EnvFilter}; impl From for filter::LevelFilter { @@ -15,7 +18,38 @@ impl From for filter::LevelFilter { } } -pub fn setup_logging(level: LogLevel) -> anyhow::Result<()> { +pub struct EventCollector(Vec>); + +impl EventCollector { + pub fn new(recivers: Vec>) -> Self { + Self(recivers) + } +} + +impl Layer for EventCollector +where + S: tracing::Subscriber, +{ + fn on_event( + &self, + event: &tracing::Event<'_>, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + let mut msg = vec![]; + msg.push(format!("{}", event.metadata().level())); + msg.push(format!("{}", event.metadata().target())); + msg.push(format!("{}", event.metadata().name())); + for field in event.fields() { + msg.push(format!("{}", field.name())); + } + + for tx in &self.0 { + _ = tx.send(msg.join("")); + } + } +} + +pub fn setup_logging(level: LogLevel, collector: EventCollector) -> anyhow::Result<()> { let filter = EnvFilter::builder() .with_default_directive(filter::LevelFilter::from(level).into()) .from_env_lossy(); @@ -23,6 +57,7 @@ pub fn setup_logging(level: LogLevel) -> anyhow::Result<()> { let subscriber = tracing_subscriber::registry() .with(filter) .with(Targets::new().with_target("clash", level)) + .with(collector) .with( tracing_subscriber::fmt::Layer::new() .with_ansi(atty::is(atty::Stream::Stdout)) diff --git a/clash_lib/src/config/def.rs b/clash_lib/src/config/def.rs index 4073c091..1f25b060 100644 --- a/clash_lib/src/config/def.rs +++ b/clash_lib/src/config/def.rs @@ -11,9 +11,12 @@ use serde_yaml::Value; #[derive(Serialize, Deserialize, Default, Copy, Clone)] #[serde(rename_all = "lowercase")] pub enum RunMode { + #[serde(alias = "Global")] Global, #[default] + #[serde(alias = "Rule")] Rule, + #[serde(alias = "Direct")] Direct, } diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 02cb8df1..242410e7 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -156,6 +156,7 @@ pub struct OutboundVmess { pub skip_cert_verify: Option, pub server_name: Option, pub network: Option, + #[serde(alias = "ws-opts")] pub ws_opts: Option, } diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index e362a007..3c467315 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -15,7 +15,7 @@ use std::io; use std::sync::{Arc, Once}; use thiserror::Error; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{broadcast, mpsc, RwLock}; mod app; mod common; @@ -100,15 +100,20 @@ async fn start_async(opts: Options) -> Result<(), Error> { Config::Str(s) => s.as_str().parse::()?.try_into()?, }; + let (log_tx, _) = broadcast::channel(100); + + let log_collector = app::logging::EventCollector::new(vec![log_tx.clone()]); + static ONCE: Once = Once::new(); ONCE.call_once(|| { - app::logging::setup_logging(config.general.log_level).expect("failed to setup logging"); + app::logging::setup_logging(config.general.log_level, log_collector) + .expect("failed to setup logging"); }); let mut tasks = Vec::::new(); let mut runners = Vec::new(); - let default_dns_resolver = Arc::new(dns::Resolver::new(config.dns).await); + let default_dns_resolver = dns::Resolver::new(config.dns).await; let outbound_manager = Arc::new(RwLock::new( OutboundManager::new( @@ -155,7 +160,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let mut inbound_runners = inbound_manager.get_runners()?; runners.append(&mut inbound_runners); - let api_runner = app::api::get_api_runner(config.general.controller); + let api_runner = app::api::get_api_runner(config.general.controller, log_tx); if let Some(r) = api_runner { runners.push(r); } diff --git a/clash_lib/src/proxy/relay/mod.rs b/clash_lib/src/proxy/relay/mod.rs index aa8d5325..f9e7199c 100644 --- a/clash_lib/src/proxy/relay/mod.rs +++ b/clash_lib/src/proxy/relay/mod.rs @@ -2,7 +2,6 @@ use std::{io, sync::Arc}; use async_trait::async_trait; use futures::stream::{self, StreamExt}; -use tracing::{debug, error}; use crate::{ app::{ @@ -103,13 +102,6 @@ impl OutboundHandler for Handler { let mut next_sess = sess.clone(); for i in 1..proxies.len() { let proxy = proxies[i].clone(); - error!( - "relay {} -> {} -> {} -> {}", - first.name(), - proxy.name(), - proxy.remote_addr().await.unwrap(), - proxies.len() - ); next_sess.destination = proxy.remote_addr().await.expect("must have remote addr"); s = first.proxy_stream(s, &next_sess, resolver.clone()).await?; diff --git a/scripts/logs.py b/scripts/logs.py new file mode 100644 index 00000000..db35883d --- /dev/null +++ b/scripts/logs.py @@ -0,0 +1,11 @@ +from websockets.sync.client import connect + + +def hello(): + with connect("ws://127.1:6170/logs") as websocket: + while True: + message = websocket.recv() + print(f"Received: {message}") + + +hello() diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 224e6641..de79ada5 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -1 +1,2 @@ -PySocks \ No newline at end of file +PySocks +websockets \ No newline at end of file