From bfd9535062ca047a8e186df3cffeef83a705f5e0 Mon Sep 17 00:00:00 2001 From: Yuwei Ba Date: Mon, 30 Sep 2024 17:09:11 +1000 Subject: [PATCH] refactor(proxy): some code refactorings (#614) * move group folders * some refactors * up * clippy --- clash/tests/data/config/tproxy.yaml | 17 + clash_lib/src/app/api/handlers/config.rs | 6 +- clash_lib/src/app/inbound/manager.rs | 17 + clash_lib/src/app/inbound/network_listener.rs | 6 +- clash_lib/src/app/profile/mod.rs | 4 +- clash_lib/src/config/def.rs | 1 - clash_lib/src/config/internal/config.rs | 2 +- clash_lib/src/lib.rs | 431 ++++++++---------- .../src/proxy/{ => group}/fallback/mod.rs | 13 +- .../proxy/{ => group}/loadbalance/helpers.rs | 0 .../src/proxy/{ => group}/loadbalance/mod.rs | 13 +- clash_lib/src/proxy/group/mod.rs | 5 + clash_lib/src/proxy/{ => group}/relay/mod.rs | 19 +- .../src/proxy/{ => group}/selector/mod.rs | 15 +- .../src/proxy/{ => group}/urltest/mod.rs | 13 +- clash_lib/src/proxy/mod.rs | 8 +- clash_lib/src/proxy/tproxy/mod.rs | 40 ++ 17 files changed, 311 insertions(+), 299 deletions(-) create mode 100644 clash/tests/data/config/tproxy.yaml rename clash_lib/src/proxy/{ => group}/fallback/mod.rs (93%) rename clash_lib/src/proxy/{ => group}/loadbalance/helpers.rs (100%) rename clash_lib/src/proxy/{ => group}/loadbalance/mod.rs (94%) create mode 100644 clash_lib/src/proxy/group/mod.rs rename clash_lib/src/proxy/{ => group}/relay/mod.rs (96%) rename clash_lib/src/proxy/{ => group}/selector/mod.rs (95%) rename clash_lib/src/proxy/{ => group}/urltest/mod.rs (95%) create mode 100644 clash_lib/src/proxy/tproxy/mod.rs diff --git a/clash/tests/data/config/tproxy.yaml b/clash/tests/data/config/tproxy.yaml new file mode 100644 index 00000000..52d26e3f --- /dev/null +++ b/clash/tests/data/config/tproxy.yaml @@ -0,0 +1,17 @@ +--- +port: 8888 +socks-port: 8889 +mixed-port: 8899 +tproxy-port: 8900 + +mode: rule +log-level: debug + + +proxies: + - name: "tor" + type: tor + +rules: + - MATCH, tor +... diff --git a/clash_lib/src/app/api/handlers/config.rs b/clash_lib/src/app/api/handlers/config.rs index 6927c98c..7166061c 100644 --- a/clash_lib/src/app/api/handlers/config.rs +++ b/clash_lib/src/app/api/handlers/config.rs @@ -220,13 +220,11 @@ async fn patch_configs( inbound_manager.rebuild_listeners(ports); - if let Some(h) = global_state.inbound_listener_handle.take() { - h.abort() - } + global_state.inbound_listener_handle.abort(); let r = inbound_manager.get_runner().unwrap(); - global_state.inbound_listener_handle = Some(tokio::spawn(r)); + global_state.inbound_listener_handle = tokio::spawn(r); } if let Some(mode) = payload.mode { diff --git a/clash_lib/src/app/inbound/manager.rs b/clash_lib/src/app/inbound/manager.rs index de5491bd..ab9e785f 100644 --- a/clash_lib/src/app/inbound/manager.rs +++ b/clash_lib/src/app/inbound/manager.rs @@ -101,6 +101,9 @@ impl InboundManager { ListenerType::Mixed => { ports.mixed_port = Some(x.port); } + ListenerType::Tproxy => { + ports.tproxy_port = Some(x.port); + } }); ports @@ -150,6 +153,20 @@ impl InboundManager { ); } + if let Some(tproxy_port) = ports.tproxy_port { + network_listeners.insert( + ListenerType::Tproxy, + NetworkInboundListener { + name: "TProxy".to_string(), + bind_addr: self.bind_address.clone(), + port: tproxy_port, + listener_type: ListenerType::Tproxy, + dispatcher: self.dispatcher.clone(), + authenticator: self.authenticator.clone(), + }, + ); + } + self.network_listeners = network_listeners; } } diff --git a/clash_lib/src/app/inbound/network_listener.rs b/clash_lib/src/app/inbound/network_listener.rs index 729021fa..31bc7b17 100644 --- a/clash_lib/src/app/inbound/network_listener.rs +++ b/clash_lib/src/app/inbound/network_listener.rs @@ -2,7 +2,7 @@ use crate::{ common::auth::ThreadSafeAuthenticator, config::internal::config::BindAddress, }; -use crate::proxy::{http, mixed, socks, AnyInboundListener}; +use crate::proxy::{http, mixed, socks, tproxy, AnyInboundListener}; use crate::{proxy::utils::Interface, Dispatcher, Error, Runner}; use futures::FutureExt; @@ -19,6 +19,7 @@ pub enum ListenerType { Http, Socks5, Mixed, + Tproxy, } pub struct NetworkInboundListener { @@ -122,6 +123,9 @@ impl NetworkInboundListener { self.dispatcher.clone(), self.authenticator.clone(), )), + ListenerType::Tproxy => { + Arc::new(tproxy::Listener::new((ip, self.port).into())) + } }; if listener.handle_tcp() { diff --git a/clash_lib/src/app/profile/mod.rs b/clash_lib/src/app/profile/mod.rs index 4dd2c7df..c458069b 100644 --- a/clash_lib/src/app/profile/mod.rs +++ b/clash_lib/src/app/profile/mod.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use serde::{Deserialize, Serialize}; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; #[derive(Serialize, Deserialize, Debug, Clone)] struct Db { @@ -119,7 +119,7 @@ impl CacheFile { } }, Err(e) => { - error!("failed to read cache file: {}, initializing a new one", e); + warn!("failed to read cache file: {}, initializing a new one", e); Db { selected: HashMap::new(), ip_to_host: HashMap::new(), diff --git a/clash_lib/src/config/def.rs b/clash_lib/src/config/def.rs index 641f78c3..1b7e9fcd 100644 --- a/clash_lib/src/config/def.rs +++ b/clash_lib/src/config/def.rs @@ -247,7 +247,6 @@ pub struct Config { /// The redir port #[doc(hidden)] pub redir_port: Option, - #[doc(hidden)] pub tproxy_port: Option, /// The HTTP/SOCKS5 mixed proxy port /// # Example diff --git a/clash_lib/src/config/internal/config.rs b/clash_lib/src/config/internal/config.rs index 1bb484db..8f273d4e 100644 --- a/clash_lib/src/config/internal/config.rs +++ b/clash_lib/src/config/internal/config.rs @@ -355,7 +355,7 @@ pub struct Inbound { pub bind_address: BindAddress, } -#[derive(Serialize, Deserialize, Default)] +#[derive(Serialize, Deserialize, Default, Clone)] pub struct Controller { pub external_controller: Option, pub external_ui: Option, diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index ef2aeae0..1d559e22 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -14,7 +14,11 @@ use crate::{ internal::{proxy::OutboundProxy, InternalConfig}, }, }; -use app::{dispatcher::StatisticsManager, dns::SystemResolver, profile}; +use app::{ + dispatcher::StatisticsManager, + dns::{SystemResolver, ThreadSafeDNSResolver}, + profile, +}; use common::{auth, http::new_http_client, mmdb}; use config::def::LogLevel; use once_cell::sync::OnceCell; @@ -95,7 +99,9 @@ impl Config { pub struct GlobalState { log_level: LogLevel, - inbound_listener_handle: Option>>, + // must be Some otherwise we'll refuse to start + inbound_listener_handle: JoinHandle>, + tunnel_listener_handle: Option>>, api_listener_handle: Option>>, dns_listener_handle: Option>>, @@ -170,15 +176,156 @@ async fn start_async(opts: Options) -> Result<(), Error> { let cwd = PathBuf::from(cwd); - debug!("initializing cache store"); - let cache_store = profile::ThreadSafeCacheFile::new( - cwd.join("cache.db").as_path().to_str().unwrap(), - config.profile.store_selected, + // things we need to clone before consuming config + let controller_cfg = config.general.controller.clone(); + let log_level = config.general.log_level; + + let components = create_components(cwd.clone(), config).await?; + + let inbound_runner = components.inbound_manager.lock().await.get_runner()?; + let inbound_listener_handle = tokio::spawn(inbound_runner); + + let tun_runner_handle = components.tun_runner.map(tokio::spawn); + let dns_listener_handle = components.dns_listener.map(tokio::spawn); + + let (reload_tx, mut reload_rx) = mpsc::channel(1); + + let global_state = Arc::new(Mutex::new(GlobalState { + log_level, + inbound_listener_handle, + tunnel_listener_handle: tun_runner_handle, + dns_listener_handle, + reload_tx, + api_listener_handle: None, + cwd: cwd.to_string_lossy().to_string(), + })); + + let api_runner = app::api::get_api_runner( + controller_cfg, + log_tx.clone(), + components.inbound_manager, + components.dispatcher, + global_state.clone(), + components.dns_resolver, + components.outbound_manager, + components.statistics_manager, + components.cache_store, + components.router, + cwd.to_string_lossy().to_string(), ); + if let Some(r) = api_runner { + let api_listener_handle = tokio::spawn(r); + global_state.lock().await.api_listener_handle = Some(api_listener_handle); + } - debug!("initializing dns resolver"); + runners.push(Box::pin(async move { + shutdown_rx.recv().await; + info!("receiving shutdown signal"); + Ok(()) + })); + + tasks.push(Box::pin(async move { + futures::future::select_all(runners).await.0 + })); + + tasks.push(Box::pin(async move { + let _ = tokio::signal::ctrl_c().await; + Ok(()) + })); + + tasks.push(Box::pin(async move { + while let Some((config, done)) = reload_rx.recv().await { + info!("reloading config"); + let config = match config.try_parse() { + Ok(c) => c, + Err(e) => { + error!("failed to reload config: {}", e); + continue; + } + }; + + let controller_cfg = config.general.controller.clone(); + + let new_componenets = create_components(cwd.clone(), config).await?; + + done.send(()).unwrap(); + + debug!("stopping listeners"); + let mut g = global_state.lock().await; + g.inbound_listener_handle.abort(); + if let Some(h) = g.tunnel_listener_handle.take() { + h.abort(); + } + if let Some(h) = g.dns_listener_handle.take() { + h.abort(); + } + if let Some(h) = g.api_listener_handle.take() { + h.abort(); + } + + debug!("reloading inbound listener"); + let inbound_listener_handle = new_componenets + .inbound_manager + .lock() + .await + .get_runner() + .map(tokio::spawn)?; + + debug!("reloading tun runner"); + let tun_runner_handle = new_componenets.tun_runner.map(tokio::spawn); + + debug!("reloading dns listener"); + let dns_listener_handle = new_componenets.dns_listener.map(tokio::spawn); + + debug!("reloading api listener"); + let api_listener_handle = app::api::get_api_runner( + controller_cfg, + log_tx.clone(), + new_componenets.inbound_manager, + new_componenets.dispatcher, + global_state.clone(), + new_componenets.dns_resolver, + new_componenets.outbound_manager, + new_componenets.statistics_manager, + new_componenets.cache_store, + new_componenets.router, + cwd.to_string_lossy().to_string(), + ) + .map(tokio::spawn); + + g.inbound_listener_handle = inbound_listener_handle; + g.tunnel_listener_handle = tun_runner_handle; + g.dns_listener_handle = dns_listener_handle; + g.api_listener_handle = api_listener_handle; + } + Ok(()) + })); + + futures::future::select_all(tasks).await.0.map_err(|x| { + error!("runtime error: {}, shutting down", x); + x + }) +} + +struct RuntimeComponents { + cache_store: profile::ThreadSafeCacheFile, + dns_resolver: ThreadSafeDNSResolver, + outbound_manager: Arc, + router: Arc, + dispatcher: Arc, + statistics_manager: Arc, + inbound_manager: Arc>, + + tun_runner: Option, + dns_listener: Option, +} + +async fn create_components( + cwd: PathBuf, + config: InternalConfig, +) -> Result { let system_resolver = Arc::new( - SystemResolver::new(config.general.ipv6 && config.dns.ipv6) + SystemResolver::new(config.dns.ipv6) .map_err(|x| Error::DNSError(x.to_string()))?, ); let client = new_http_client(system_resolver.clone()) @@ -194,6 +341,24 @@ async fn start_async(opts: Options) -> Result<(), Error> { .await?, ); + let client = new_http_client(system_resolver) + .map_err(|x| Error::DNSError(x.to_string()))?; + let geodata = Arc::new( + geodata::GeoData::new( + cwd.join(&config.general.geosite), + config.general.geosite_download_url, + client, + ) + .await?, + ); + + debug!("initializing cache store"); + let cache_store = profile::ThreadSafeCacheFile::new( + cwd.join("cache.db").as_path().to_str().unwrap(), + config.profile.store_selected, + ); + + debug!("initializing dns resolver"); let dns_resolver = dns::new_resolver( &config.dns, Some(cache_store.clone()), @@ -230,17 +395,6 @@ async fn start_async(opts: Options) -> Result<(), Error> { ); debug!("initializing router"); - let client = new_http_client(system_resolver) - .map_err(|x| Error::DNSError(x.to_string()))?; - let geodata = Arc::new( - geodata::GeoData::new( - cwd.join(&config.general.geosite), - config.general.geosite_download_url, - client, - ) - .await?, - ); - let router = Arc::new( Router::new( config.rules, @@ -255,6 +409,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { let statistics_manager = StatisticsManager::new(); + debug!("initializing dispatcher"); let dispatcher = Arc::new(Dispatcher::new( outbound_manager.clone(), router.clone(), @@ -263,6 +418,7 @@ async fn start_async(opts: Options) -> Result<(), Error> { statistics_manager.clone(), )); + debug!("initializing authenticator"); let authenticator = Arc::new(auth::PlainAuthenticator::new(config.users)); debug!("initializing inbound manager"); @@ -272,241 +428,24 @@ async fn start_async(opts: Options) -> Result<(), Error> { authenticator, )?)); - let inbound_runner = inbound_manager.lock().await.get_runner()?; - let inbound_listener_handle = tokio::spawn(inbound_runner); - + debug!("initializing tun runner"); let tun_runner = get_tun_runner(config.tun, dispatcher.clone(), dns_resolver.clone())?; - let tun_runner_handle = tun_runner.map(tokio::spawn); debug!("initializing dns listener"); - let dns_listener_handle = - dns::get_dns_listener(config.dns.listen, dns_resolver.clone(), &cwd) - .await - .map(tokio::spawn); - - let (reload_tx, mut reload_rx) = mpsc::channel(1); - - let global_state = Arc::new(Mutex::new(GlobalState { - log_level: config.general.log_level, - inbound_listener_handle: Some(inbound_listener_handle), - tunnel_listener_handle: tun_runner_handle, - dns_listener_handle, - reload_tx, - api_listener_handle: None, - cwd: cwd.to_string_lossy().to_string(), - })); + let dns_listener = + dns::get_dns_listener(config.dns.listen, dns_resolver.clone(), &cwd).await; - let api_runner = app::api::get_api_runner( - config.general.controller, - log_tx.clone(), - inbound_manager.clone(), - dispatcher, - global_state.clone(), + Ok(RuntimeComponents { + cache_store, dns_resolver, outbound_manager, - statistics_manager, - cache_store, router, - cwd.to_string_lossy().to_string(), - ); - if let Some(r) = api_runner { - let api_listener_handle = tokio::spawn(r); - global_state.lock().await.api_listener_handle = Some(api_listener_handle); - } - - runners.push(Box::pin(async move { - shutdown_rx.recv().await; - info!("receiving shutdown signal"); - Ok(()) - })); - - tasks.push(Box::pin(async move { - futures::future::select_all(runners).await.0 - })); - - tasks.push(Box::pin(async move { - let _ = tokio::signal::ctrl_c().await; - Ok(()) - })); - - tasks.push(Box::pin(async move { - while let Some((config, done)) = reload_rx.recv().await { - info!("reloading config"); - let config = match config.try_parse() { - Ok(c) => c, - Err(e) => { - error!("failed to reload config: {}", e); - continue; - } - }; - - debug!("reloading dns resolver"); - let system_resolver = Arc::new( - SystemResolver::new(config.dns.ipv6) - .map_err(|x| Error::DNSError(x.to_string()))?, - ); - let client = new_http_client(system_resolver.clone()) - .map_err(|x| Error::DNSError(x.to_string()))?; - - debug!("reloading mmdb"); - let mmdb = Arc::new( - mmdb::Mmdb::new( - cwd.join(&config.general.mmdb), - config.general.mmdb_download_url, - client, - ) - .await?, - ); - - let client = new_http_client(system_resolver) - .map_err(|x| Error::DNSError(x.to_string()))?; - let geodata = Arc::new( - geodata::GeoData::new( - cwd.join(&config.general.geosite), - config.general.geosite_download_url, - client, - ) - .await?, - ); - - debug!("reloading cache store"); - let cache_store = profile::ThreadSafeCacheFile::new( - cwd.join("cache.db").as_path().to_str().unwrap(), - config.profile.store_selected, - ); - - let dns_resolver = dns::new_resolver( - &config.dns, - Some(cache_store.clone()), - Some(mmdb.clone()), - ) - .await; - - debug!("reloading outbound manager"); - let outbound_manager = Arc::new( - OutboundManager::new( - config - .proxies - .into_values() - .filter_map(|x| match x { - OutboundProxy::ProxyServer(s) => Some(s), - _ => None, - }) - .collect(), - config - .proxy_groups - .into_values() - .filter_map(|x| match x { - OutboundProxy::ProxyGroup(g) => Some(g), - _ => None, - }) - .collect(), - config.proxy_providers, - config.proxy_names, - dns_resolver.clone(), - cache_store.clone(), - cwd.to_string_lossy().to_string(), - ) - .await?, - ); - - debug!("reloading router"); - let router = Arc::new( - Router::new( - config.rules, - config.rule_providers, - dns_resolver.clone(), - mmdb, - geodata, - cwd.to_string_lossy().to_string(), - ) - .await, - ); - - let statistics_manager = StatisticsManager::new(); - - let dispatcher = Arc::new(Dispatcher::new( - outbound_manager.clone(), - router.clone(), - dns_resolver.clone(), - config.general.mode, - statistics_manager.clone(), - )); - - let authenticator = - Arc::new(auth::PlainAuthenticator::new(config.users)); - - debug!("reloading inbound manager"); - let inbound_manager = Arc::new(Mutex::new(InboundManager::new( - config.general.inbound, - dispatcher.clone(), - authenticator, - )?)); - - done.send(()).unwrap(); - - debug!("stopping listeners"); - let mut g = global_state.lock().await; - if let Some(h) = g.inbound_listener_handle.take() { - h.abort(); - } - if let Some(h) = g.tunnel_listener_handle.take() { - h.abort(); - } - if let Some(h) = g.dns_listener_handle.take() { - h.abort(); - } - if let Some(h) = g.api_listener_handle.take() { - h.abort(); - } - - let inbound_listener_handle = inbound_manager - .lock() - .await - .get_runner() - .map(tokio::spawn)?; - - let tun_runner_handle = get_tun_runner( - config.tun, - dispatcher.clone(), - dns_resolver.clone(), - )? - .map(tokio::spawn); - - debug!("reloading dns listener"); - let dns_listener_handle = - dns::get_dns_listener(config.dns.listen, dns_resolver.clone(), &cwd) - .await - .map(tokio::spawn); - - debug!("reloading api listener"); - let api_listener_handle = app::api::get_api_runner( - config.general.controller, - log_tx.clone(), - inbound_manager.clone(), - dispatcher, - global_state.clone(), - dns_resolver, - outbound_manager, - statistics_manager, - cache_store, - router, - cwd.to_string_lossy().to_string(), - ) - .map(tokio::spawn); - - g.inbound_listener_handle = Some(inbound_listener_handle); - g.tunnel_listener_handle = tun_runner_handle; - g.dns_listener_handle = dns_listener_handle; - g.api_listener_handle = api_listener_handle; - } - Ok(()) - })); - - futures::future::select_all(tasks).await.0.map_err(|x| { - error!("runtime error: {}, shutting down", x); - x + dispatcher, + statistics_manager, + inbound_manager, + tun_runner, + dns_listener, }) } diff --git a/clash_lib/src/proxy/fallback/mod.rs b/clash_lib/src/proxy/group/fallback/mod.rs similarity index 93% rename from clash_lib/src/proxy/fallback/mod.rs rename to clash_lib/src/proxy/group/fallback/mod.rs index ba31406c..ebdd92fd 100644 --- a/clash_lib/src/proxy/fallback/mod.rs +++ b/clash_lib/src/proxy/group/fallback/mod.rs @@ -11,18 +11,17 @@ use crate::{ providers::proxy_provider::ThreadSafeProxyProvider, ProxyManager, }, }, + proxy::{ + utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, + AnyOutboundHandler, ConnectorType, DialWithConnector, HandlerCommonOptions, + OutboundHandler, OutboundType, + }, session::Session, }; -use super::{ - utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, - AnyOutboundHandler, ConnectorType, DialWithConnector, OutboundHandler, - OutboundType, -}; - #[derive(Default, Clone)] pub struct HandlerOptions { - pub common_opts: super::options::HandlerCommonOptions, + pub common_opts: HandlerCommonOptions, pub name: String, pub udp: bool, } diff --git a/clash_lib/src/proxy/loadbalance/helpers.rs b/clash_lib/src/proxy/group/loadbalance/helpers.rs similarity index 100% rename from clash_lib/src/proxy/loadbalance/helpers.rs rename to clash_lib/src/proxy/group/loadbalance/helpers.rs diff --git a/clash_lib/src/proxy/loadbalance/mod.rs b/clash_lib/src/proxy/group/loadbalance/mod.rs similarity index 94% rename from clash_lib/src/proxy/loadbalance/mod.rs rename to clash_lib/src/proxy/group/loadbalance/mod.rs index 3f4e7d87..1766d04d 100644 --- a/clash_lib/src/proxy/loadbalance/mod.rs +++ b/clash_lib/src/proxy/group/loadbalance/mod.rs @@ -13,20 +13,19 @@ use crate::{ remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, config::internal::proxy::LoadBalanceStrategy, + proxy::{ + utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, + AnyOutboundHandler, ConnectorType, DialWithConnector, HandlerCommonOptions, + OutboundHandler, OutboundType, + }, session::Session, }; use self::helpers::{strategy_consistent_hashring, strategy_rr, StrategyFn}; -use super::{ - utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, - AnyOutboundHandler, ConnectorType, DialWithConnector, OutboundHandler, - OutboundType, -}; - #[derive(Default, Clone)] pub struct HandlerOptions { - pub common_opts: super::options::HandlerCommonOptions, + pub common_opts: HandlerCommonOptions, pub name: String, pub udp: bool, pub strategy: LoadBalanceStrategy, diff --git a/clash_lib/src/proxy/group/mod.rs b/clash_lib/src/proxy/group/mod.rs new file mode 100644 index 00000000..12c632b7 --- /dev/null +++ b/clash_lib/src/proxy/group/mod.rs @@ -0,0 +1,5 @@ +pub mod fallback; +pub mod loadbalance; +pub mod relay; +pub mod selector; +pub mod urltest; diff --git a/clash_lib/src/proxy/relay/mod.rs b/clash_lib/src/proxy/group/relay/mod.rs similarity index 96% rename from clash_lib/src/proxy/relay/mod.rs rename to clash_lib/src/proxy/group/relay/mod.rs index bb428762..ac781b9f 100644 --- a/clash_lib/src/proxy/relay/mod.rs +++ b/clash_lib/src/proxy/group/relay/mod.rs @@ -15,21 +15,20 @@ use crate::{ remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, common::errors::new_io_error, - session::Session, -}; - -use super::{ - utils::{ - provider_helper::get_proxies_from_providers, DirectConnector, - ProxyConnector, RemoteConnector, + proxy::{ + utils::{ + provider_helper::get_proxies_from_providers, DirectConnector, + ProxyConnector, RemoteConnector, + }, + AnyOutboundHandler, ConnectorType, DialWithConnector, HandlerCommonOptions, + OutboundHandler, OutboundType, }, - AnyOutboundHandler, ConnectorType, DialWithConnector, OutboundHandler, - OutboundType, + session::Session, }; #[derive(Default)] pub struct HandlerOptions { - pub common_opts: super::options::HandlerCommonOptions, + pub common_opts: HandlerCommonOptions, pub name: String, } diff --git a/clash_lib/src/proxy/selector/mod.rs b/clash_lib/src/proxy/group/selector/mod.rs similarity index 95% rename from clash_lib/src/proxy/selector/mod.rs rename to clash_lib/src/proxy/group/selector/mod.rs index b20f3f47..63454b89 100644 --- a/clash_lib/src/proxy/selector/mod.rs +++ b/clash_lib/src/proxy/group/selector/mod.rs @@ -11,16 +11,15 @@ use crate::{ dns::ThreadSafeDNSResolver, remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider, }, + proxy::{ + utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, + AnyOutboundHandler, ConnectorType, DialWithConnector, HandlerCommonOptions, + OutboundHandler, OutboundType, + }, session::Session, Error, }; -use super::{ - utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, - AnyOutboundHandler, ConnectorType, DialWithConnector, OutboundHandler, - OutboundType, -}; - #[async_trait] pub trait SelectorControl { async fn select(&mut self, name: &str) -> Result<(), Error>; @@ -35,7 +34,7 @@ struct HandlerInner { #[derive(Default, Clone)] pub struct HandlerOptions { - pub common_opts: super::options::HandlerCommonOptions, + pub common_opts: HandlerCommonOptions, pub name: String, pub udp: bool, } @@ -213,8 +212,8 @@ mod tests { use tokio::sync::{Mutex, RwLock}; use crate::proxy::{ + group::selector::ThreadSafeSelectorControl, mocks::{MockDummyOutboundHandler, MockDummyProxyProvider}, - selector::ThreadSafeSelectorControl, }; #[tokio::test] diff --git a/clash_lib/src/proxy/urltest/mod.rs b/clash_lib/src/proxy/group/urltest/mod.rs similarity index 95% rename from clash_lib/src/proxy/urltest/mod.rs rename to clash_lib/src/proxy/group/urltest/mod.rs index 84039744..f2e2d2c2 100644 --- a/clash_lib/src/proxy/urltest/mod.rs +++ b/clash_lib/src/proxy/group/urltest/mod.rs @@ -13,18 +13,17 @@ use crate::{ providers::proxy_provider::ThreadSafeProxyProvider, ProxyManager, }, }, + proxy::{ + utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, + AnyOutboundHandler, ConnectorType, DialWithConnector, HandlerCommonOptions, + OutboundHandler, OutboundType, + }, session::Session, }; -use super::{ - utils::{provider_helper::get_proxies_from_providers, RemoteConnector}, - AnyOutboundHandler, ConnectorType, DialWithConnector, OutboundHandler, - OutboundType, -}; - #[derive(Default)] pub struct HandlerOptions { - pub common_opts: super::options::HandlerCommonOptions, + pub common_opts: HandlerCommonOptions, pub name: String, pub udp: bool, } diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index 22bee285..2ad38df5 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -28,6 +28,7 @@ pub mod reject; pub mod http; pub mod mixed; +pub mod tproxy; pub(crate) mod datagram; @@ -45,11 +46,8 @@ pub mod utils; pub mod vmess; pub mod wg; -pub mod fallback; -pub mod loadbalance; -pub mod relay; -pub mod selector; -pub mod urltest; +pub mod group; +pub use group::{fallback, loadbalance, relay, selector, urltest}; mod common; mod options; diff --git a/clash_lib/src/proxy/tproxy/mod.rs b/clash_lib/src/proxy/tproxy/mod.rs new file mode 100644 index 00000000..fd00c9c6 --- /dev/null +++ b/clash_lib/src/proxy/tproxy/mod.rs @@ -0,0 +1,40 @@ +use crate::proxy::InboundListener; +use async_trait::async_trait; +use std::net::SocketAddr; + +use tracing::warn; + +pub struct Listener { + addr: SocketAddr, +} + +impl Drop for Listener { + fn drop(&mut self) { + warn!("Tproxy inbound listener on {} stopped", self.addr); + } +} + +impl Listener { + pub fn new(addr: SocketAddr) -> Self { + Self { addr } + } +} + +#[async_trait] +impl InboundListener for Listener { + fn handle_tcp(&self) -> bool { + false + } + + fn handle_udp(&self) -> bool { + false + } + + async fn listen_tcp(&self) -> std::io::Result<()> { + unimplemented!("don't listen to me :)") + } + + async fn listen_udp(&self) -> std::io::Result<()> { + unimplemented!("don't listen to me :)") + } +}