From 5ac4f6cf53450d4653ed5ce886a9b448d1d3c9c7 Mon Sep 17 00:00:00 2001 From: mrsobakin <68982655+mrsobakin@users.noreply.github.com> Date: Tue, 16 Jul 2024 09:38:50 +0500 Subject: [PATCH] Rewrite to use custom udp routing & dispatch, bump to v1.0.0 - Remove `udp_stream` dependency - Rewrite to use custom udp adresses translation and dispatch system. - Add `first` parameter to obfuscation - Rename `servers.proxy` -> `servers.relay` - Rename `servers.downstream` -> `servers.upstream` --- Cargo.toml | 3 +- Dockerfile | 2 +- PKGBUILD | 2 +- README.md | 21 ++-- config/config.toml | 17 +-- src/config.rs | 51 +++++++++ src/main.rs | 258 ++++++++++++++++++++++++++------------------- 7 files changed, 221 insertions(+), 133 deletions(-) create mode 100644 src/config.rs diff --git a/Cargo.toml b/Cargo.toml index 3fdcb7d..920a1eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dpimyass" -version = "0.3.0" +version = "1.0.0" edition = "2021" [dependencies] @@ -8,4 +8,3 @@ tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } serde_with = "3.5.0" toml = "0.8" -udp-stream = "0.0.10" diff --git a/Dockerfile b/Dockerfile index 3b7b844..77bf7f4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM rustlang/rust:nightly-slim AS builder +FROM rust:1.79.0-slim AS builder WORKDIR /build COPY . . RUN rustup target add x86_64-unknown-linux-musl diff --git a/PKGBUILD b/PKGBUILD index aedb026..a349238 100644 --- a/PKGBUILD +++ b/PKGBUILD @@ -1,5 +1,5 @@ pkgname=dpimyass -pkgver=0.3.0 +pkgver=1.0.0 pkgrel=1 pkgdesc="Simple UDP proxy for bypassing the DPI" arch=('x86_64') diff --git a/README.md b/README.md index 54f9eed..bba0e0d 100644 --- a/README.md +++ b/README.md @@ -49,28 +49,29 @@ DPIMyAss uses a TOML configuration file to specify its settings. Below is an exa name = "Example bridge" key = [239, 42, 13, 69] -[servers.proxy] +[servers.relay] address = "0.0.0.0:1337" -buffer = 16384 +buffer = 65536 timeout = 60 -[servers.downstream] +[servers.upstream] address = "example.com:1337" -buffer = 16384 +buffer = 65536 timeout = 60 [[servers]] name = "Another bridge" key = [4, 5, 11] +first = 64 # Obfuscate only the first 64 bytes -[servers.proxy] +[servers.relay] address = "0.0.0.0:1338" -buffer = 16384 +buffer = 65536 timeout = 120 -[servers.downstream] +[servers.upstream] address = "endpoint2.exmaple.com:443" -buffer = 16384 +buffer = 65536 timeout = 120 ``` @@ -79,9 +80,9 @@ You might encounter a problem when trying to use VPN over DPIMyAss hosted on the ### Wireguard-specific solution -If your downstream address falls inside the ips listed in wireguard's `AllowedIPs`, the packets DPIMyAss sends will be routed over VPN too, and thus they will be stuck in a network loop. +If your upstream address falls inside the ips listed in wireguard's `AllowedIPs`, the packets DPIMyAss sends will be routed over VPN too, and thus they will be stuck in a network loop. -The simplest way to fix this is to exclude your downstream endpoint ip address from the wireguard's `AllowedIPs`. This can be done with any wireguard allowed ips calculator, for example with [this one](https://www.procustodibus.com/blog/2021/03/wireguard-allowedips-calculator/). +The simplest way to fix this is to exclude your upstream endpoint ip address from the wireguard's `AllowedIPs`. This can be done with any wireguard allowed ips calculator, for example with [this one](https://www.procustodibus.com/blog/2021/03/wireguard-allowedips-calculator/). ### Windows diff --git a/config/config.toml b/config/config.toml index 83d8bdf..73f834c 100644 --- a/config/config.toml +++ b/config/config.toml @@ -2,26 +2,27 @@ name = "Example bridge" key = [239, 42, 13, 69] -[servers.proxy] +[servers.relay] address = "0.0.0.0:1337" -buffer = 16384 +buffer = 65536 timeout = 60 -[servers.downstream] +[servers.upstream] address = "example.com:1337" -buffer = 16384 +buffer = 65536 timeout = 60 [[servers]] name = "Another bridge" key = [4, 5, 11] +first = 64 # Obfuscate only the first 64 bytes -[servers.proxy] +[servers.relay] address = "0.0.0.0:1338" -buffer = 16384 +buffer = 65536 timeout = 120 -[servers.downstream] +[servers.upstream] address = "endpoint2.exmaple.com:443" -buffer = 16384 +buffer = 65536 timeout = 120 diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..3b6fbba --- /dev/null +++ b/src/config.rs @@ -0,0 +1,51 @@ +use std::{ + net::SocketAddr, + net::ToSocketAddrs, + time::Duration +}; + +use serde::Deserialize; +use serde::de::Deserializer; +use serde_with::DurationSeconds; +use serde_with::serde_as; + + +fn resolve_address<'de, D>(de: D) -> Result +where D: Deserializer<'de> { + let addr = ::deserialize(de)?; + + addr.to_socket_addrs() + .map_err(serde::de::Error::custom)? + .next() + .ok_or(serde::de::Error::custom("No address")) +} + +#[derive(Deserialize, Debug)] +pub struct Config { + pub servers: Vec +} + +#[derive(Deserialize, Debug)] +pub struct ServerConfig { + pub name: String, + #[serde(flatten)] + pub obfs: ObfsConfig, + pub relay: EndpointConfig, + pub upstream: EndpointConfig, +} + +#[derive(Deserialize, Debug)] +pub struct ObfsConfig { + pub key: Vec, + pub first: Option, +} + +#[serde_as] +#[derive(Deserialize, Debug)] +pub struct EndpointConfig { + #[serde(deserialize_with = "resolve_address")] + pub address: SocketAddr, + pub buffer: usize, + #[serde_as(as = "DurationSeconds")] + pub timeout: Duration, +} diff --git a/src/main.rs b/src/main.rs index 0c0ddfa..9ae821e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,162 +1,198 @@ -#![feature(trait_alias)] +mod config; +use std::collections::HashMap; +use std::net::{Ipv4Addr, IpAddr}; +use std::sync::{Arc, Weak}; use std::{ error::Error, net::SocketAddr, - net::ToSocketAddrs, - time::Duration }; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - io::{AsyncRead, AsyncWrite}, - time::timeout, -}; -use serde::Deserialize; -use serde::de::Deserializer; -use serde_with::DurationSeconds; -use serde_with::serde_as; -use udp_stream::{UdpListener, UdpStream}; +use tokio::net::UdpSocket; +use tokio::sync::Mutex; +use tokio::sync::RwLock; +use tokio::sync::RwLockWriteGuard; +use tokio::task::JoinSet; +use tokio::time::timeout; -fn resolve_address<'de, D>(de: D) -> Result -where D: Deserializer<'de> { - let addr = ::deserialize(de)?; +use crate::config::*; - addr.to_socket_addrs() - .map_err(serde::de::Error::custom)? - .next() - .ok_or(serde::de::Error::custom("No address")) -} -#[derive(Deserialize, Debug)] -struct Config { - servers: Vec -} +const LOCAL: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); -#[derive(Deserialize, Debug)] -struct ServerConfig { - name: String, - #[serde(flatten)] - obfs: ObfsConfig, - proxy: EndpointConfig, - downstream: EndpointConfig, -} -#[derive(Deserialize, Debug)] -struct ObfsConfig { - key: Vec, +fn xor_obfuscate(data: &mut [u8], cfg: &ObfsConfig) { + let iter = data.iter_mut() + .zip(cfg.key.iter().cycle()); + + if let Some(first) = cfg.first { + iter.take(first) + .for_each(|(di, ki)| *di ^= ki); + } else { + iter.for_each(|(di, ki)| *di ^= ki); + } } -#[serde_as] -#[derive(Deserialize, Debug)] -struct EndpointConfig { - #[serde(deserialize_with = "resolve_address")] - address: SocketAddr, - buffer: usize, - #[serde_as(as = "DurationSeconds")] - timeout: Duration, + +struct ServerHandler { + config: ServerConfig, + socket: UdpSocket, + // TODO: clean up hashmap after connections time out. + // As of now, entries will forever stay in memory. + // It's not *so* bad, but it's still a memory leak. + upstreams: RwLock>>>, + upgrade_sem: Mutex<()>, } +impl ServerHandler { + async fn open_upstream(&'static self, downstream_addr: SocketAddr) -> std::io::Result> { + let socket = UdpSocket::bind(LOCAL).await?; + timeout(self.config.upstream.timeout, socket.connect(self.config.upstream.address)).await??; -fn xor_obfuscate(data: &mut [u8], cfg: &ObfsConfig) { - for (di, ki) in data.iter_mut().zip(cfg.key.iter().cycle()) { - *di ^= ki - } -} + let socket = Arc::new(socket); + tokio::spawn(self.forward_loop(socket.clone(), downstream_addr)); -trait Stream = AsyncWrite + AsyncRead + Unpin; + Ok(socket) + } -struct StreamAndInfo { - stream: S, - buffer: usize, - timeout: Duration -} + async fn forward_loop(&'static self, upstream: Arc, downstream_addr: SocketAddr) -> Option<()> { + println!("[{}] New connection from {downstream_addr}", self.config.name); + + loop { + let mut buf = Vec::with_capacity(self.config.upstream.buffer); + + match timeout(self.config.upstream.timeout, upstream.recv_buf(&mut buf)).await { + Ok(Err(err)) => { + println!("[{}] Error while listening on {downstream_addr}: {err}", self.config.name); + return Some(()); + }, + Err(_) => { + println!("[{}] Connection timed out ({downstream_addr})", self.config.name); + return Some(()); + }, + _ => {}, + } -impl StreamAndInfo { - fn new(stream: S, config: &EndpointConfig) -> Self { - StreamAndInfo { - stream, - buffer: config.buffer, - timeout: config.timeout, + tokio::spawn(async move { + xor_obfuscate(&mut buf, &self.config.obfs); + let _ = timeout(self.config.relay.timeout, self.socket.send_to(&buf, downstream_addr)).await; + }); } } -} + async fn get_upstream_for(&'static self, downstream_addr: SocketAddr) -> std::io::Result> { + let map_rlock = self.upstreams.read().await; -async fn forward_loop(mut upstream: StreamAndInfo, mut downstream: StreamAndInfo, cfg: &ObfsConfig) -> Result<(), std::io::Error> { - let mut upbuf = vec![0u8; upstream.buffer]; - let mut downbuf = vec![0u8; downstream.buffer]; + if let Some(slot) = map_rlock.get(&downstream_addr) { + let mut slot_lock = slot.lock().await; - loop { - tokio::select! { - n = timeout(upstream.timeout, upstream.stream.read(&mut upbuf)) => { - let n = n??; - xor_obfuscate(&mut upbuf[0..n], cfg); - downstream.stream.write_all(&upbuf[0..n]).await?; - }, - n = timeout(downstream.timeout, downstream.stream.read(&mut downbuf)) => { - let n = n??; - xor_obfuscate(&mut downbuf[0..n], cfg); - upstream.stream.write_all(&downbuf[0..n]).await?; + return match slot_lock.upgrade() { + Some(udp) => Ok(udp), + None => { + let udp = self.open_upstream(downstream_addr).await?; + *slot_lock = Arc::downgrade(&udp); + Ok(udp) + } } }; - } - #[allow(unreachable_code)] - Ok::<(), std::io::Error>(()) -} + // Safely upgrade rlock to wlock + let mut map_wlock = { + let lock = self.upgrade_sem.lock().await; + drop(map_rlock); + let map_wlock = self.upstreams.write().await; + drop(lock); + map_wlock + }; + + // TODO: find a way to get slot directly, without hasing + // multiple times and having a window between locks. + map_wlock.insert(downstream_addr, Weak::default().into()); + let slot = RwLockWriteGuard::downgrade_map(map_wlock, |map| { + map.get(&downstream_addr).expect("unreachable") + }); -async fn server_loop(config: &'static ServerConfig) -> Result<(), Box> { - let listener = UdpListener::bind(config.proxy.address).await?; + let mut slot_lock = slot.lock().await; - println!("[{}] Listening on {:?}, downstream {:?}", config.name, config.proxy.address, config.downstream.address); + // Check whether slot is still empty (weak), + // as it might have been initialized inbetween + // outer lock downgrading and inner lock acquiring. + if let Some(udp) = slot_lock.upgrade() { + return Ok(udp); + } - loop { - let (upstream, addr) = listener.accept().await?; - let upstream = StreamAndInfo::new(upstream, &config.proxy); + let udp = self.open_upstream(downstream_addr).await?; + *slot_lock = Arc::downgrade(&udp); + Ok(udp) + } - println!("[{}] New incoming connection from {addr:?}", config.name); + pub async fn listen(self) { + let sself = Box::leak(Box::new(self)); - tokio::spawn(async move { - let downstream = StreamAndInfo::new(UdpStream::connect(config.downstream.address).await?, &config.downstream); + loop { + let mut buf = Vec::with_capacity(sself.config.relay.buffer); + let Ok((_, from)) = sself.socket.recv_buf_from(&mut buf).await else { continue }; - if let Err(e) = forward_loop(upstream, downstream, &config.obfs).await { - println!("[{}] Error: {e:?} ({addr:?})", config.name); - } + let sself_ref: &_ = sself; + tokio::spawn(async move { + xor_obfuscate(&mut buf, &sself_ref.config.obfs); - Ok::<(), std::io::Error>(()) - }); + let upstream = match sself_ref.get_upstream_for(from).await { + Ok(u) => u, + Err(err) => { + println!("[{}] Error opening upstream: {err}", sself_ref.config.name); + return; + } + }; + + match timeout(sself_ref.config.upstream.timeout, upstream.send(&buf)).await { + Ok(Err(err)) => println!("[{}] Error sending packet: {err}", sself_ref.config.name), + Err(_) => println!("[{}] Timeout sending packet", sself_ref.config.name), + _ => {}, + }; + }); + } + } + + pub async fn bind(config: ServerConfig) -> std::io::Result { + let socket = UdpSocket::bind(config.relay.address).await?; + println!("[{}] Listening on {:?}", config.name, socket.local_addr().unwrap()); + + Ok(Self { + config, + socket, + upstreams: Default::default(), + upgrade_sem: Default::default(), + }) } } + #[tokio::main] async fn main() -> Result<(), Box> { - let config: &Config = Box::leak({ + let config: Config = { let config_file = std::env::args().nth(1).unwrap_or_else(|| "config.toml".to_owned()); let s = std::fs::read_to_string(config_file)?; toml::from_str(&s)? - }); + }; - let mut handles = vec![]; + let mut join_set = JoinSet::new(); - for server_config in config.servers.iter() { - handles.push(tokio::spawn(async { - loop { - match server_loop(server_config).await { - Err(err) => { - println!("[{}] encountered a loop-wise error: {err}", server_config.name); - } - _ => () - } - } - })); - } + for server_config in config.servers { + let name = server_config.name.clone(); - for handle in handles { - let _ = handle.await; + match ServerHandler::bind(server_config).await { + Ok(handler) => join_set.spawn(handler.listen()), + Err(err) => { + println!("[{}] Failed to bind: {}", name, err); + continue; + }, + }; } + while let Some(_) = join_set.join_next().await {} + Ok(()) }