diff --git a/WORKSPACE b/WORKSPACE index b65f366d..69577648 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -1,38 +1,16 @@ workspace(name = "clash-rs") -load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") - -http_archive( - name = "bazel_skylib", - sha256 = "66ffd9315665bfaafc96b52278f57c7e2dd09f5ede279ea6d39b2be471e7e3aa", - urls = [ - "https://mirror.bazel.build/github.com/bazelbuild/bazel-skylib/releases/download/1.4.2/bazel-skylib-1.4.2.tar.gz", - "https://github.com/bazelbuild/bazel-skylib/releases/download/1.4.2/bazel-skylib-1.4.2.tar.gz", - ], -) - -load("@bazel_skylib//:workspace.bzl", "bazel_skylib_workspace") - -bazel_skylib_workspace() - -http_archive( - name = "rules_python", - sha256 = "84aec9e21cc56fbc7f1335035a71c850d1b9b5cc6ff497306f84cced9a769841", - strip_prefix = "rules_python-0.23.1", - url = "https://github.com/bazelbuild/rules_python/releases/download/0.23.1/rules_python-0.23.1.tar.gz", -) - -load("@rules_python//python:repositories.bzl", "py_repositories") - -py_repositories() +#load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") # To find additional information on this release or newer ones visit: # https://github.com/bazelbuild/rules_rust/releases -#http_archive( -#name = "rules_rust", -#sha256 = "0c2ff9f58bbd6f2a4fc4fbea3a34e85fe848e7e4317357095551a18b2405a01c", -#urls = ["https://github.com/bazelbuild/rules_rust/releases/download/0.25.0/rules_rust-v0.25.0.tar.gz"], -#) + +# http_archive( +# name = "rules_rust", +# sha256 = "9d04e658878d23f4b00163a72da3db03ddb451273eb347df7d7c50838d698f49", +# urls = ["https://github.com/bazelbuild/rules_rust/releases/download/0.26.0/rules_rust-v0.26.0.tar.gz"], +# ) + load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") git_repository( diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index 4049f4c1..20a777ce 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -122,7 +122,7 @@ impl OutboundManager { Error::InvalidConfig(format!("invalid provider config: {}", x)) })?; - providers.push(Arc::new(provider)); + providers.push(Arc::new(Mutex::new(provider))); } if let Some(provider_names) = &proto.use_provider { @@ -181,13 +181,13 @@ impl OutboundManager { let provider = ProxySetProvider::new( name.clone(), Duration::from_secs(http.interval), - Arc::new(Mutex::new(vehicle)), + Arc::new(vehicle), hc, proxy_manager.clone(), ) .map_err(|x| Error::InvalidConfig(format!("invalid provider config: {}", x)))?; - provider_registry.insert(name, Arc::new(provider)); + provider_registry.insert(name, Arc::new(Mutex::new(provider))); } OutboundProxyProvider::File(file) => { let vehicle = file_vehicle::Vehicle::new(&file.path); @@ -203,13 +203,13 @@ impl OutboundManager { let provider = ProxySetProvider::new( name.clone(), Duration::from_secs(file.interval.unwrap_or_default()), - Arc::new(Mutex::new(vehicle)), + Arc::new(vehicle), hc, proxy_manager.clone(), ) .map_err(|x| Error::InvalidConfig(format!("invalid provider config: {}", x)))?; - provider_registry.insert(name, Arc::new(provider)); + provider_registry.insert(name, Arc::new(Mutex::new(provider))); } } } diff --git a/clash_lib/src/app/proxy_manager/mod.rs b/clash_lib/src/app/proxy_manager/mod.rs index 857a2020..3b2ef005 100644 --- a/clash_lib/src/app/proxy_manager/mod.rs +++ b/clash_lib/src/app/proxy_manager/mod.rs @@ -162,7 +162,7 @@ impl ProxyManager { }; let result = tester.await; - self.report_alive(&name, result.is_ok()); + self.report_alive(&name, result.is_ok()).await; let ins = DelayHistory { time: SystemTime::now(), delay: result.as_ref().map(|x| x.0).unwrap_or(0), @@ -192,7 +192,7 @@ mod tests { }; #[tokio::test] - async fn test_proxy_manager() { + async fn test_proxy_manager_alive() { let mut mock_resolver = MockClashResolver::new(); mock_resolver .expect_resolve() @@ -227,7 +227,7 @@ mod tests { assert!(manager.last_delay(PROXY_DIRECT).await > 0); assert!(manager.delay_history(PROXY_DIRECT).await.len() > 0); - manager.report_alive(PROXY_DIRECT, false); + manager.report_alive(PROXY_DIRECT, false).await; assert!(!manager.alive(PROXY_DIRECT).await); for _ in 0..10 { diff --git a/clash_lib/src/app/proxy_manager/providers/fether.rs b/clash_lib/src/app/proxy_manager/providers/fether.rs index 2aa5b81b..312880b3 100644 --- a/clash_lib/src/app/proxy_manager/providers/fether.rs +++ b/clash_lib/src/app/proxy_manager/providers/fether.rs @@ -71,18 +71,15 @@ where self.name.as_str() } - pub async fn vehicle_type(&self) -> super::ProviderVehicleType { - self.vehicle.lock().await.typ() + pub fn vehicle_type(&self) -> super::ProviderVehicleType { + self.vehicle.typ() } pub async fn initial(&mut self) -> anyhow::Result { let mut is_local = false; let mut immediately_update = false; - let vehicle_path = { - let l = self.vehicle.lock().await; - l.path().to_owned() - }; + let vehicle_path = self.vehicle.path().to_owned(); let mut inner = self.inner.lock().await; @@ -97,7 +94,7 @@ where > self.interval; content } - Err(_) => self.vehicle.lock().await.read().await?, + Err(_) => self.vehicle.read().await?, }; let proxies = match (self.parser.lock().await)(&content) { @@ -106,19 +103,19 @@ where if !is_local { return Err(e); } - let content = self.vehicle.lock().await.read().await?; + let content = self.vehicle.read().await?; (self.parser.lock().await)(&content)? } }; - if self.vehicle_type().await != ProviderVehicleType::File && !is_local { - let p = self.vehicle.lock().await.path().to_owned(); + if self.vehicle_type() != ProviderVehicleType::File && !is_local { + let p = self.vehicle.path().to_owned(); let path = Path::new(p.as_str()); let prefix = path.parent().unwrap(); if !prefix.exists() { fs::create_dir_all(prefix)?; } - fs::write(self.vehicle.lock().await.path(), &content)?; + fs::write(self.vehicle.path(), &content)?; } inner.hash = utils::md5(&content)[..16] @@ -149,7 +146,7 @@ where parser: Arc>, ) -> anyhow::Result<(T, bool)> { let mut this = inner.lock().await; - let content = vehicle.lock().await.read().await?; + let content = vehicle.read().await?; let proxies = (parser.lock().await)(&content)?; let now = SystemTime::now(); @@ -159,21 +156,21 @@ where if hash == this.hash { this.updated_at = now; - filetime::set_file_times(vehicle.lock().await.path(), now.into(), now.into())?; + filetime::set_file_times(vehicle.path(), now.into(), now.into())?; return Ok((proxies, false)); } let proxies = (parser.lock().await)(&content)?; - if vehicle.lock().await.typ() != ProviderVehicleType::File { - let p = vehicle.lock().await.path().to_owned(); + if vehicle.typ() != ProviderVehicleType::File { + let p = vehicle.path().to_owned(); let path = Path::new(p.as_str()); let prefix = path.parent().unwrap(); if !prefix.exists() { fs::create_dir_all(prefix)?; } - fs::write(vehicle.lock().await.path(), &content)?; + fs::write(vehicle.path(), &content)?; return Ok((proxies, false)); } @@ -244,7 +241,7 @@ where mod tests { use std::{path::Path, sync::Arc, time::Duration}; - use tokio::{sync::Mutex, time::sleep}; + use tokio::time::sleep; use crate::app::proxy_manager::providers::{MockProviderVehicle, ProviderVehicleType}; @@ -283,7 +280,7 @@ mod tests { let mut f = Fetcher::new( "test_fetcher".to_string(), Duration::from_secs(1), - Arc::new(Mutex::new(mock_vehicle)), + Arc::new(mock_vehicle), parser, Some(updater), ); diff --git a/clash_lib/src/app/proxy_manager/providers/mod.rs b/clash_lib/src/app/proxy_manager/providers/mod.rs index f70249f6..fc0db6a2 100644 --- a/clash_lib/src/app/proxy_manager/providers/mod.rs +++ b/clash_lib/src/app/proxy_manager/providers/mod.rs @@ -2,7 +2,6 @@ use async_trait::async_trait; use std::fmt::{Display, Formatter}; use std::io; use std::sync::Arc; -use tokio::sync::Mutex; pub mod fether; pub mod file_vehicle; @@ -32,7 +31,7 @@ impl Display for ProviderVehicleType { } } -pub type ThreadSafeProviderVehicle = Arc>; +pub type ThreadSafeProviderVehicle = Arc; #[cfg_attr(test, automock)] #[async_trait] @@ -58,9 +57,9 @@ impl Display for ProviderType { /// either Proxy or Rule provider #[async_trait] pub trait Provider { - async fn name(&self) -> &str; - async fn vehicle_type(&self) -> ProviderVehicleType; - async fn typ(&self) -> ProviderType; + fn name(&self) -> &str; + fn vehicle_type(&self) -> ProviderVehicleType; + fn typ(&self) -> ProviderType; async fn initialize(&mut self) -> io::Result<()>; async fn update(&self) -> io::Result<()>; } diff --git a/clash_lib/src/app/proxy_manager/providers/plain_provider.rs b/clash_lib/src/app/proxy_manager/providers/plain_provider.rs index db8c0a53..dd942798 100644 --- a/clash_lib/src/app/proxy_manager/providers/plain_provider.rs +++ b/clash_lib/src/app/proxy_manager/providers/plain_provider.rs @@ -3,11 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use tokio::sync::Mutex; -use crate::{ - app::proxy_manager::{healthcheck::HealthCheck, ProxyManager}, - proxy::AnyOutboundHandler, - Error, -}; +use crate::{app::proxy_manager::ProxyManager, proxy::AnyOutboundHandler, Error}; use super::{proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehicleType}; @@ -40,13 +36,13 @@ impl PlainProvider { #[async_trait] impl Provider for PlainProvider { - async fn name(&self) -> &str { + fn name(&self) -> &str { &self.name } - async fn vehicle_type(&self) -> ProviderVehicleType { + fn vehicle_type(&self) -> ProviderVehicleType { ProviderVehicleType::Compatible } - async fn typ(&self) -> ProviderType { + fn typ(&self) -> ProviderType { ProviderType::Proxy } async fn initialize(&mut self) -> std::io::Result<()> { diff --git a/clash_lib/src/app/proxy_manager/providers/proxy_provider.rs b/clash_lib/src/app/proxy_manager/providers/proxy_provider.rs index 8f84ac61..a03a014d 100644 --- a/clash_lib/src/app/proxy_manager/providers/proxy_provider.rs +++ b/clash_lib/src/app/proxy_manager/providers/proxy_provider.rs @@ -1,12 +1,13 @@ use std::sync::Arc; use async_trait::async_trait; +use tokio::sync::Mutex; use crate::proxy::AnyOutboundHandler; use super::Provider; -pub type ThreadSafeProxyProvider = Arc; +pub type ThreadSafeProxyProvider = Arc>; #[async_trait] pub trait ProxyProvider: Provider { diff --git a/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs b/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs index 97a1e60a..85fe1ae7 100644 --- a/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs +++ b/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs @@ -103,15 +103,15 @@ impl ProxySetProvider { #[async_trait] impl Provider for ProxySetProvider { - async fn name(&self) -> &str { + fn name(&self) -> &str { self.fetcher.name() } - async fn vehicle_type(&self) -> ProviderVehicleType { - self.fetcher.vehicle_type().await + fn vehicle_type(&self) -> ProviderVehicleType { + self.fetcher.vehicle_type() } - async fn typ(&self) -> ProviderType { + fn typ(&self) -> ProviderType { ProviderType::Proxy } @@ -202,7 +202,7 @@ proxies: .expect_typ() .return_const(ProviderVehicleType::File); - let vehicle = Arc::new(Mutex::new(mock_vehicle)); + let vehicle = Arc::new(mock_vehicle); let mock_resolver = MockClashResolver::new(); diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 503abc18..0b22dd32 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -240,7 +240,7 @@ pub enum LoadBalanceStrategy { RoundRobin, } -#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[derive(serde::Serialize, serde::Deserialize, Debug, Default)] pub struct OutboundGroupSelect { pub name: String, diff --git a/clash_lib/src/lib.rs b/clash_lib/src/lib.rs index d44bee09..6da28bb6 100644 --- a/clash_lib/src/lib.rs +++ b/clash_lib/src/lib.rs @@ -37,6 +37,8 @@ pub enum Error { DNSError(String), #[error("crypto error: {0}")] Crypto(String), + #[error("operation error: {0}")] + Operation(String), } pub type Runner = futures::future::BoxFuture<'static, ()>; diff --git a/clash_lib/src/proxy/direct/mod.rs b/clash_lib/src/proxy/direct/mod.rs index 0bdacb52..917a4081 100644 --- a/clash_lib/src/proxy/direct/mod.rs +++ b/clash_lib/src/proxy/direct/mod.rs @@ -25,11 +25,11 @@ impl OutboundHandler for Handler { OutboundProxy::ProxyServer(OutboundProxyProtocol::Direct) } - fn remote_addr(&self) -> Option { + async fn remote_addr(&self) -> Option { None } - fn support_udp(&self) -> bool { + async fn support_udp(&self) -> bool { true } diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index 36312513..48e5caf9 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -27,6 +27,7 @@ pub mod converters; // proxy groups pub mod relay; +pub mod selector; mod transport; @@ -113,10 +114,10 @@ pub trait OutboundHandler: Sync + Send + Unpin { fn proto(&self) -> OutboundProxy; /// The proxy remote address - fn remote_addr(&self) -> Option; + async fn remote_addr(&self) -> Option; /// whether the outbound handler support UDP - fn support_udp(&self) -> bool; + async fn support_udp(&self) -> bool; /// connect to remote target via TCP async fn connect_stream( diff --git a/clash_lib/src/proxy/reject/mod.rs b/clash_lib/src/proxy/reject/mod.rs index 110644eb..b130a28c 100644 --- a/clash_lib/src/proxy/reject/mod.rs +++ b/clash_lib/src/proxy/reject/mod.rs @@ -24,11 +24,11 @@ impl OutboundHandler for Handler { OutboundProxy::ProxyServer(OutboundProxyProtocol::Reject) } - fn remote_addr(&self) -> Option { + async fn remote_addr(&self) -> Option { None } - fn support_udp(&self) -> bool { + async fn support_udp(&self) -> bool { false } diff --git a/clash_lib/src/proxy/relay/mod.rs b/clash_lib/src/proxy/relay/mod.rs index 0f0737a4..3eb38216 100644 --- a/clash_lib/src/proxy/relay/mod.rs +++ b/clash_lib/src/proxy/relay/mod.rs @@ -1,6 +1,7 @@ use std::{io, sync::Arc}; use async_trait::async_trait; +use futures::stream::{self, StreamExt}; use crate::{ app::{ @@ -11,7 +12,10 @@ use crate::{ session::{Session, SocksAddr}, }; -use super::{AnyOutboundDatagram, AnyOutboundHandler, AnyStream, CommonOption, OutboundHandler}; +use super::{ + utils::provider_helper::get_proxies_from_providers, AnyOutboundDatagram, AnyOutboundHandler, + AnyStream, CommonOption, OutboundHandler, +}; #[derive(Default)] pub struct HandlerOptions { @@ -32,12 +36,8 @@ impl Handler { Arc::new(Self { opts, providers }) } - async fn get_proxies(&self) -> Vec { - futures::future::join_all(self.providers.iter().map(|x| x.proxies())) - .await - .into_iter() - .flatten() - .collect::>() + async fn get_proxies(&self, touch: bool) -> Vec { + get_proxies_from_providers(&self.providers, touch).await } } @@ -55,12 +55,12 @@ impl OutboundHandler for Handler { ) } - fn remote_addr(&self) -> Option { + async fn remote_addr(&self) -> Option { None } - fn support_udp(&self) -> bool { - todo!("support_udp is not implemented yet") + async fn support_udp(&self) -> bool { + false } async fn connect_stream( @@ -68,15 +68,15 @@ impl OutboundHandler for Handler { sess: &Session, resolver: ThreadSafeDNSResolver, ) -> io::Result { - let proxies: Vec = self - .get_proxies() - .await - .into_iter() - .filter(|x| match x.remote_addr() { - Some(_) => true, - None => false, + let proxies: Vec = stream::iter(self.get_proxies(true).await) + .filter_map(|x| async { + match x.remote_addr().await { + Some(_) => Some(x), + None => None, + } }) - .collect(); + .collect() + .await; match proxies.len() { 0 => Err(new_io_error("no proxy available")), @@ -91,7 +91,7 @@ impl OutboundHandler for Handler { let mut sess = sess.clone(); for i in 1..proxies.len() - 1 { let proxy = proxies[i].clone(); - sess.destination = proxy.remote_addr().expect("must have remote addr"); + sess.destination = proxy.remote_addr().await.expect("must have remote addr"); s = proxy.proxy_stream(s, &sess, resolver.clone()).await?; } Ok(s) diff --git a/clash_lib/src/proxy/selector/mod.rs b/clash_lib/src/proxy/selector/mod.rs new file mode 100644 index 00000000..b03a3bb5 --- /dev/null +++ b/clash_lib/src/proxy/selector/mod.rs @@ -0,0 +1,126 @@ +use std::io; + +use async_trait::async_trait; + +use crate::{ + app::{ + proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider, ThreadSafeDNSResolver, + }, + config::internal::proxy::{OutboundGroupSelect, OutboundProxy}, + session::{Session, SocksAddr}, + Error, +}; + +use super::{ + utils::provider_helper::get_proxies_from_providers, AnyOutboundDatagram, AnyOutboundHandler, + AnyStream, CommonOption, OutboundHandler, +}; + +#[async_trait] +pub trait SelectorControl { + async fn select(&mut self, name: &str) -> Result<(), Error>; + fn current(&self) -> &str; +} + +pub struct HandlerOptions { + pub name: String, + pub udp: bool, + + pub common_option: CommonOption, +} + +pub struct Handler { + opts: HandlerOptions, + current: String, + providers: Vec, +} + +impl Handler { + pub async fn new(opts: HandlerOptions, providers: Vec) -> Self { + let current = providers.first().unwrap().lock().await.name().to_owned(); + Self { + opts, + current, + providers, + } + } + + async fn selected_proxy(&self, touch: bool) -> AnyOutboundHandler { + let proxies = get_proxies_from_providers(&self.providers, touch).await; + proxies + .into_iter() + .find(|x| x.name() == self.current) + .unwrap() + } +} + +#[async_trait] +impl SelectorControl for Handler { + async fn select(&mut self, name: &str) -> Result<(), Error> { + let proxies = get_proxies_from_providers(&self.providers, true).await; + if proxies.iter().any(|x| x.name() == name) { + self.current = name.to_owned(); + Ok(()) + } else { + Err(Error::Operation(format!("proxy {} not found", name))) + } + } + + fn current(&self) -> &str { + &self.current + } +} + +#[async_trait] +impl OutboundHandler for Handler { + fn name(&self) -> &str { + &self.opts.name + } + + fn proto(&self) -> OutboundProxy { + OutboundProxy::ProxyGroup( + crate::config::internal::proxy::OutboundGroupProtocol::Select( + OutboundGroupSelect::default(), + ), + ) + } + + async fn remote_addr(&self) -> Option { + self.selected_proxy(false).await.remote_addr().await + } + + async fn support_udp(&self) -> bool { + self.opts.udp && self.selected_proxy(false).await.support_udp().await + } + + async fn connect_stream( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> io::Result { + self.selected_proxy(true) + .await + .connect_stream(sess, resolver) + .await + } + + async fn proxy_stream( + &self, + s: AnyStream, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> io::Result { + unimplemented!("proxy_stream not implemented") + } + + async fn connect_datagram( + &self, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> io::Result { + self.selected_proxy(true) + .await + .connect_datagram(sess, resolver) + .await + } +} diff --git a/clash_lib/src/proxy/shadowsocks/mod.rs b/clash_lib/src/proxy/shadowsocks/mod.rs index 98a9ed15..a4f478ba 100644 --- a/clash_lib/src/proxy/shadowsocks/mod.rs +++ b/clash_lib/src/proxy/shadowsocks/mod.rs @@ -159,11 +159,11 @@ impl OutboundHandler for Handler { OutboundProxy::ProxyServer(OutboundProxyProtocol::Ss(Default::default())) } - fn remote_addr(&self) -> Option { + async fn remote_addr(&self) -> Option { Some(SocksAddr::Domain(self.opts.server.clone(), self.opts.port)) } - fn support_udp(&self) -> bool { + async fn support_udp(&self) -> bool { true } diff --git a/clash_lib/src/proxy/utils/mod.rs b/clash_lib/src/proxy/utils/mod.rs index b07bfd31..50f1e343 100644 --- a/clash_lib/src/proxy/utils/mod.rs +++ b/clash_lib/src/proxy/utils/mod.rs @@ -1,5 +1,6 @@ use std::net::{IpAddr, SocketAddr}; +pub mod provider_helper; mod socket_helpers; pub use socket_helpers::*; diff --git a/clash_lib/src/proxy/utils/provider_helper.rs b/clash_lib/src/proxy/utils/provider_helper.rs new file mode 100644 index 00000000..aa3a9d5f --- /dev/null +++ b/clash_lib/src/proxy/utils/provider_helper.rs @@ -0,0 +1,27 @@ +use crate::{ + app::proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider, + proxy::AnyOutboundHandler, +}; + +pub async fn get_proxies_from_providers( + providers: &Vec, + touch: bool, +) -> Vec { + let mut proxies = vec![]; + for provider in providers { + if touch { + provider.lock().await.touch(); + } + + let mut proxies_from_provider = provider + .lock() + .await + .proxies() + .await + .iter() + .map(|x| x.clone()) + .collect::>(); + proxies.append(&mut proxies_from_provider); + } + proxies +}