diff --git a/core/gsb-api/src/api.rs b/core/gsb-api/src/api.rs index e3da028133..0e49749a4e 100644 --- a/core/gsb-api/src/api.rs +++ b/core/gsb-api/src/api.rs @@ -1,3 +1,4 @@ +#![allow(non_snake_case)] use crate::model::{ GsbApiError, ServiceListenResponse, ServicePath, ServiceRequest, ServiceResponse, }; @@ -367,8 +368,7 @@ mod tests { verify_delete_service(&mut api, &service_addr).await; } - - #[test_case(r#"{}"#, Frame::Close(Some(CloseReason { + #[test_case(r#"{}"#, Frame::Close(Some(CloseReason { code: CloseCode::Policy, description: Some("Failed to read response. Err: Missing root map. Err: Empty map".to_string()) })); "Close when empty" diff --git a/core/vpn/Cargo.toml b/core/vpn/Cargo.toml index 6083b50ef1..3063418fda 100644 --- a/core/vpn/Cargo.toml +++ b/core/vpn/Cargo.toml @@ -40,6 +40,10 @@ tokio-stream = "0.1.6" uuid = { version = "0.8", features = ["v4"] } [features] +# Add log for outgoing and incoming packets for VPN RAW +# It includes transport bypassing smalltcp stack +trace-raw-packets = [] +# Trace for VPN packets packet-trace-enable = ["ya-packet-trace/enable"] default = [] diff --git a/core/vpn/src/lib.rs b/core/vpn/src/lib.rs index 3ab72bec36..e5c159e5da 100644 --- a/core/vpn/src/lib.rs +++ b/core/vpn/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(clippy::unit_arg)] mod message; mod network; mod requestor; diff --git a/core/vpn/src/message.rs b/core/vpn/src/message.rs index bf6b83a999..7123d0999e 100644 --- a/core/vpn/src/message.rs +++ b/core/vpn/src/message.rs @@ -1,6 +1,7 @@ use crate::Result; use actix::{Message, Recipient}; use futures::channel::mpsc; +use std::net::IpAddr; use ya_client_model::net::*; use ya_utils_networking::vpn::{ stack::{ @@ -42,24 +43,38 @@ pub struct RemoveNode { pub struct GetConnections; #[derive(Message)] -#[rtype(result = "Result")] -pub struct Connect { +#[rtype(result = "Result")] +pub struct ConnectTcp { pub protocol: Protocol, pub address: String, pub port: u16, } +#[derive(Debug, Clone, Eq, Hash, PartialEq)] +pub struct RawSocketDesc { + pub src_addr: IpAddr, + pub dst_addr: IpAddr, + pub dst_id: String, +} + +#[derive(Debug, Message)] +#[rtype(result = "Result")] +pub struct ConnectRaw { + pub raw_socket_desc: RawSocketDesc, +} + #[derive(Debug, Message)] #[rtype(result = "Result<()>")] -pub struct Disconnect { +pub struct DisconnectTcp { pub desc: SocketDesc, pub reason: DisconnectReason, } -impl Disconnect { - pub fn new(desc: SocketDesc, reason: DisconnectReason) -> Self { - Self { desc, reason } - } +#[derive(Debug, Message)] +#[rtype(result = "Result<()>")] +pub struct DisconnectRaw { + pub raw_socket_desc: RawSocketDesc, + pub reason: DisconnectReason, } #[derive(Message)] @@ -78,12 +93,17 @@ pub struct Shutdown; pub struct DataSent; #[derive(Debug)] -pub struct UserConnection { +pub struct UserTcpConnection { pub vpn: Recipient, pub rx: mpsc::Receiver>, pub stack_connection: Connection, } +#[derive(Debug)] +pub struct UserRawConnection { + pub rx: mpsc::Receiver>, +} + #[derive(Clone, Debug)] pub enum DisconnectReason { SinkClosed, diff --git a/core/vpn/src/network.rs b/core/vpn/src/network.rs index a2a55e7248..f450a4e9f0 100644 --- a/core/vpn/src/network.rs +++ b/core/vpn/src/network.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeSet, HashMap}; use std::net::IpAddr; use std::rc::Rc; use std::str::FromStr; +use std::time::Duration; use actix::prelude::*; use futures::channel::oneshot::Canceled; @@ -209,11 +210,19 @@ impl VpnSupervisor { } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct RawConnectionMeta { + pub local: IpAddr, + pub remote: IpAddr, + pub remote_id: String, +} + pub struct Vpn { node_id: String, vpn: Network>, stack_network: net::Network, - connections: HashMap, + connections_tcp: HashMap, + connections_raw: HashMap, } impl Vpn { @@ -226,7 +235,8 @@ impl Vpn { node_id: node_id.to_string(), vpn, stack_network, - connections: Default::default(), + connections_tcp: Default::default(), + connections_raw: Default::default(), } } } @@ -411,39 +421,38 @@ impl Handler for Vpn { } } -impl Handler for Vpn { - type Result = ActorResponse>; +impl Handler for Vpn { + type Result = ActorResponse>; - fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: ConnectTcp, _: &mut Self::Context) -> Self::Result { let remote = match to_ip(&msg.address) { Ok(ip) => IpEndpoint::new(ip.into(), msg.port), Err(err) => return ActorResponse::reply(Err(err)), }; - let vpn_id = self.vpn.id(); - log::info!("VPN {vpn_id}: connecting to {remote:?}"); + let vpn_id = self.vpn.id().clone(); + log::info!("VPN {}: connecting (tcp) to {:?}", vpn_id, remote); - let id = self.vpn.id().clone(); let network = self.stack_network.clone(); let fut = async move { network.connect(remote, TCP_CONN_TIMEOUT).await } .into_actor(self) .map(move |result, this, ctx| { let stack_connection = result?; - log::info!("VPN {id}: connected to {remote:?}"); + log::info!("VPN {}: connected (tcp) to {:?}", vpn_id, remote); let vpn = ctx.address().recipient(); let (tx, rx) = mpsc::channel(1); - this.connections.insert( + this.connections_tcp.insert( stack_connection.meta.into(), - InternalConnection { + InternalTcpConnection { stack_connection, ingress_tx: tx, }, ); - Ok(UserConnection { + Ok(UserTcpConnection { vpn, rx, stack_connection, @@ -454,11 +463,42 @@ impl Handler for Vpn { } } -impl Handler for Vpn { - type Result = ::Result; +impl Handler for Vpn { + type Result = ActorResponse>; + + fn handle(&mut self, msg: ConnectRaw, _: &mut Self::Context) -> Self::Result { + //todo: nicer checks without converting to string and back + let raw_socket_desc = msg.raw_socket_desc; + let remote = match to_ip(&raw_socket_desc.dst_addr.to_string()) { + Ok(ip) => ip, + Err(err) => return ActorResponse::reply(Err(err)), + }; + let local = match to_ip(&raw_socket_desc.src_addr.to_string()) { + Ok(ip) => ip, + Err(err) => return ActorResponse::reply(Err(err)), + }; + + log::info!( + "VPN {}: connecting (raw) from {} to {}", + self.vpn.id(), + local, + remote + ); + + let (tx, rx) = mpsc::channel(1); + + self.connections_raw + .insert(raw_socket_desc, InternalRawConnection { src_tx: tx }); + + ActorResponse::reply(Ok(UserRawConnection { rx })) + } +} + +impl Handler for Vpn { + type Result = ::Result; - fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) -> Self::Result { - match self.connections.remove(&msg.desc) { + fn handle(&mut self, msg: DisconnectTcp, _: &mut Self::Context) -> Self::Result { + match self.connections_tcp.remove(&msg.desc) { Some(mut connection) => { log::info!( "Dropping connection to {:?}: {:?}", @@ -482,12 +522,35 @@ impl Handler for Vpn { } } +impl Handler for Vpn { + type Result = ::Result; + + fn handle(&mut self, msg: DisconnectRaw, _: &mut Self::Context) -> Self::Result { + match self.connections_raw.remove(&msg.raw_socket_desc) { + Some(mut _connection) => { + log::info!("Dropping raw connection {:?}", msg.raw_socket_desc); + Ok(()) + } + None => { + log::error!( + "Cannot disconnect, no raw connection found: {:?}", + msg.raw_socket_desc + ); + Err(Error::ConnectionError(format!( + "Cannot disconnect, no raw connection found: {:?}", + msg.raw_socket_desc + ))) + } + } + } +} + /// Handle egress packet from the user impl Handler for Vpn { type Result = ActorResponse>; fn handle(&mut self, pkt: Packet, ctx: &mut Self::Context) -> Self::Result { - match self.connections.get(&pkt.meta.into()).cloned() { + match self.connections_tcp.get(&pkt.meta.into()).cloned() { Some(connection) => { // packet tracing is also done when the packet data is no longer available, // so we have to make a temporary copy. This incurs no runtime overhead on builds @@ -517,10 +580,10 @@ impl Handler for Vpn { e ); - ctx.address().do_send(Disconnect::new( - connection.stack_connection.meta.into(), - DisconnectReason::ConnectionError, - )); + ctx.address().do_send(DisconnectTcp { + desc: connection.stack_connection.meta.into(), + reason: DisconnectReason::ConnectionError, + }); } })); ActorResponse::reply(Ok(())) @@ -535,22 +598,93 @@ impl Handler for Vpn { /// Handle ingress packet from the network impl Handler> for Vpn { - type Result = as Message>::Result; + type Result = ActorResponse as Message>::Result>; fn handle(&mut self, msg: RpcEnvelope, _: &mut Self::Context) -> Self::Result { self.stack_network.receive(msg.into_inner().0); self.stack_network.poll(); - Ok(()) + ActorResponse::reply(Ok(())) } } impl Handler for Vpn { - type Result = std::result::Result, ya_service_bus::Error>; + type Result = ActorResponse, ya_service_bus::Error>>; fn handle(&mut self, msg: RpcRawCall, _: &mut Self::Context) -> Self::Result { + #[cfg(feature = "trace-raw-packets")] + let packet_no = { + use std::sync::atomic::{AtomicU64, Ordering}; + static PACKET_NO: AtomicU64 = AtomicU64::new(0); + let packet_no = PACKET_NO.fetch_add(1, Ordering::Relaxed); + log::info!("Get raw call message from {} {}", msg.caller, packet_no); + packet_no + }; + + if !self.connections_raw.is_empty() { + let connection_raw = self + .connections_raw + .iter() + .find(|(raw_sock, _)| raw_sock.dst_id == msg.caller); + + if let Some((raw_socket_desc, connection)) = connection_raw { + let payload = msg.body; + #[cfg(feature = "trace-raw-packets")] + log::info!("VPN: sending raw packet to connection.src_tx {}", packet_no); + + //Forward packet into raw connection (VpnRawSocket) + //look for impl StreamHandler> for VpnRawSocket + let raw_socket_desc: RawSocketDesc = (*raw_socket_desc).clone(); + let mut src_tx = connection.src_tx.clone(); + let fut = async move { + tokio::time::timeout(Duration::from_millis(300), src_tx.send(payload)).await + } + .into_actor(self) + .map(move |res, self2, ctx| { + { + let res = match res { + Ok(res) => res, + Err(_) => { + log::warn!("VPN: timeout on sent to raw endpoint"); + return Err(ya_service_bus::error::Error::RemoteError( + self2.node_id.clone(), + "VPN: timeout on sent to raw endpoint".into(), + )); + } + }; + #[cfg(feature = "trace-raw-packets")] + log::info!( + "VPN: raw packet has been sent to connection.src_tx {}", + packet_no + ); + res.map_err(|e| { + log::error!("VPN {}: cannot sent into raw endpoint: {e}", e); + ctx.address().do_send(DisconnectRaw { + raw_socket_desc, + reason: DisconnectReason::SinkClosed, + }); + + ya_service_bus::error::Error::RemoteError( + self2.node_id.clone(), + format!("VPN: cannot sent into raw endpoint: {e}"), + ) + }) + } + .map(|_| Vec::new()) + }); + return ActorResponse::r#async(fut); + } + #[cfg(feature = "trace-raw-packets")] + log::info!( + "VPN {}: cannot find RAW connection, passing to stack", + self.vpn.id() + ); + } + + //Default behavior - forward packet into stack self.stack_network.receive(msg.body); self.stack_network.poll(); - Ok(Vec::new()) + + ActorResponse::reply(Ok(Vec::new())) } } @@ -576,23 +710,27 @@ impl Handler for Vpn { desc.protocol, desc.remote, ); - ctx.address() - .do_send(Disconnect::new(desc, DisconnectReason::SocketClosed)); + ctx.address().do_send(DisconnectTcp { + desc, + reason: DisconnectReason::SocketClosed, + }); ActorResponse::reply(Ok(())) } IngressEvent::Packet { payload, desc, .. } => { ya_packet_trace::packet_trace!("Vpn::Tx::Handler", { &payload }); - if let Some(mut connection) = self.connections.get(&desc).cloned() { + if let Some(mut connection) = self.connections_tcp.get(&desc).cloned() { log::debug!("[vpn] ingress proxy: send to {:?}", desc.local); let fut = async move { connection.ingress_tx.send(payload).await } .into_actor(self) .map(move |res, _, ctx| { res.map_err(|e| { - ctx.address() - .do_send(Disconnect::new(desc, DisconnectReason::SinkClosed)); + ctx.address().do_send(DisconnectTcp { + desc, + reason: DisconnectReason::SinkClosed, + }); Error::Other(e.to_string()) }) @@ -661,11 +799,16 @@ impl Handler for Vpn { } #[derive(Debug, Clone)] -struct InternalConnection { +struct InternalTcpConnection { pub stack_connection: stack::Connection, pub ingress_tx: mpsc::Sender>, } +#[derive(Debug, Clone)] +struct InternalRawConnection { + pub src_tx: mpsc::Sender>, +} + async fn vpn_ingress_handler(rx: IngressReceiver, addr: Addr, vpn_id: String) { let mut rx = UnboundedReceiverStream::new(rx); while let Some(event) = rx.next().await { @@ -681,7 +824,7 @@ async fn vpn_ingress_handler(rx: IngressReceiver, addr: Addr, vpn_id: Strin e ); - addr.do_send(Disconnect { + addr.do_send(DisconnectTcp { desc: match event { IngressEvent::InboundConnection { desc } => desc, IngressEvent::Disconnected { desc } => desc, @@ -711,7 +854,7 @@ async fn vpn_egress_handler(rx: EgressReceiver, addr: Addr, vpn_id: String) ); if let Some((desc, ..)) = event.desc { - addr.do_send(Disconnect { + addr.do_send(DisconnectTcp { desc, reason: DisconnectReason::ConnectionError, }); diff --git a/core/vpn/src/requestor.rs b/core/vpn/src/requestor.rs index ff04a7513a..ae269eb8c8 100644 --- a/core/vpn/src/requestor.rs +++ b/core/vpn/src/requestor.rs @@ -1,18 +1,21 @@ #![allow(clippy::let_unit_value)] use crate::message::*; -use crate::network::VpnSupervisor; +use crate::network::{Vpn, VpnSupervisor}; use actix::prelude::*; use actix_web::{web, HttpRequest, HttpResponse, Responder, ResponseError}; use actix_web_actors::ws; +use actix_web_actors::ws::WsResponseBuilder; use futures::channel::mpsc; use futures::lock::Mutex; +use futures::FutureExt; use futures::StreamExt; use serde::{Deserialize, Serialize}; +use std::net::IpAddr; use std::sync::Arc; use std::time::{Duration, Instant}; use ya_client_model::net::*; -use ya_client_model::ErrorMessage; +use ya_client_model::{ErrorMessage, NodeId}; use ya_service_api_web::middleware::Identity; use ya_utils_networking::vpn::stack::connection::ConnectionMeta; use ya_utils_networking::vpn::{Error as VpnError, Protocol}; @@ -51,6 +54,7 @@ fn vpn_web_scope(path: &str) -> actix_web::Scope { .service(add_node) .service(remove_node) .service(connect_tcp) + .service(connect_raw) } /// Retrieves existing virtual private networks. @@ -208,13 +212,14 @@ async fn connect_tcp( stream: web::Payload, identity: Identity, ) -> Result { + log::warn!("connect_tcp called {:?}", path); let path = path.into_inner(); let vpn = { let supervisor = vpn_sup.lock().await; supervisor.get_network(&identity.identity, &path.net_id)? }; let conn = vpn - .send(Connect { + .send(ConnectTcp { protocol: Protocol::Tcp, address: path.ip.to_string(), port: path.port, @@ -236,7 +241,7 @@ pub struct VpnWebSocket { } impl VpnWebSocket { - pub fn new(network_id: String, conn: UserConnection) -> Self { + pub fn new(network_id: String, conn: UserTcpConnection) -> Self { VpnWebSocket { network_id, heartbeat: Instant::now(), @@ -334,6 +339,210 @@ impl Handler for VpnWebSocket { } } +/// Initiates a new RAW connection via WebSockets to the destination address. +#[actix_web::get("/net/{net_id}/raw/from/{src}/to/{dst}")] +async fn connect_raw( + vpn_sup: web::Data>>, + path: web::Path<(String, IpAddr, IpAddr)>, + req: HttpRequest, + stream: web::Payload, + identity: Identity, +) -> Result { + let (net_id, src, dst_ip) = path.into_inner(); + log::info!("vpn {net_id} connection from {src} to {dst_ip}"); + let vpn = { + let supervisor = vpn_sup.lock().await; + supervisor.get_network(&identity.identity, &net_id) + }?; + + let nodes = vpn.send(GetNodes).await??; + let dst_ip_str = dst_ip.to_string(); + let dst_node = match nodes.into_iter().find(|n| n.ip == dst_ip_str) { + Some(n) => n, + None => { + return Err(ApiError::Vpn(VpnError::ConnectionError( + "destination address not found".to_string(), + ))) + } + }; + + let raw_socket_desc = RawSocketDesc { + dst_addr: dst_ip, + src_addr: src, + dst_id: dst_node.id.clone(), + }; + + let conn = vpn + .send(ConnectRaw { + raw_socket_desc: raw_socket_desc.clone(), + }) + .await??; + + let (_addr, response) = WsResponseBuilder::new( + VpnRawSocket { + node_id: identity.identity.to_string(), + dst_node, + network_id: net_id, + raw_conn_desc: raw_socket_desc, + heartbeat: Instant::now(), + vpn_rx: Some(conn.rx), + vpn_service: vpn, + }, + &req, + stream, + ) + .start_with_addr()?; + + Ok(response) +} + +pub struct VpnRawSocket { + node_id: String, + dst_node: Node, + network_id: String, + raw_conn_desc: RawSocketDesc, + heartbeat: Instant, + vpn_rx: Option>>, + vpn_service: Addr, +} + +impl VpnRawSocket { + fn forward(&self, data: Vec, ctx: &mut ::Context) { + use ya_net::*; + + let dst_node_id: NodeId = self.dst_node.id.parse().unwrap(); + let current_node_id = self.node_id.clone(); + + #[cfg(feature = "trace-raw-packets")] + use std::sync::atomic::{AtomicU64, Ordering}; + #[cfg(feature = "trace-raw-packets")] + static PACKET_NO: AtomicU64 = AtomicU64::new(0); + #[cfg(feature = "trace-raw-packets")] + let packet_no = PACKET_NO.fetch_add(1, Ordering::Relaxed); + + #[cfg(feature = "trace-raw-packets")] + log::info!( + "VPN WebSocket: VPN {} forwarding packet to {}", + packet_no, + dst_node_id + ); + let vpn_node = dst_node_id.service_udp(&format!("/public/vpn/{}/raw", self.network_id)); + + ctx.wait( + async move { + let _res = match vpn_node.push_raw_as(¤t_node_id, data).await { + Ok(_) => Ok(()), + Err(e) => { + log::error!("failed to send packet {:?}", e); + Err(anyhow::anyhow!("failed to send packet {:?}", e)) + } + }; + #[cfg(feature = "trace-raw-packets")] + log::info!("Pushed message to {}", packet_no); + + Ok::<_, anyhow::Error>(()) + } + .then(|v| match v { + Err(e) => fut::ready(log::error!("failed to send packet {:?}", e)), + Ok(()) => fut::ready(()), + }) + .into_actor(self), + ); + } +} + +impl Actor for VpnRawSocket { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + if Instant::now().duration_since(act.heartbeat) > CLIENT_TIMEOUT { + log::warn!("VPN WebSocket: VPN {} connection timed out", act.network_id); + ctx.stop(); + } else { + ctx.ping(b""); + } + }); + + ctx.add_stream(self.vpn_rx.take().unwrap()); + } + + fn stopped(&mut self, _: &mut Self::Context) { + log::info!("VPN RawSocket: VPN {} connection stopped", self.network_id); + self.vpn_service.do_send(DisconnectRaw { + raw_socket_desc: self.raw_conn_desc.clone(), + reason: DisconnectReason::SocketClosed, + }); + } +} + +impl StreamHandler> for VpnRawSocket { + fn handle(&mut self, data: Vec, ctx: &mut Self::Context) { + ctx.binary(data) + } + + fn finished(&mut self, ctx: &mut Self::Context) { + log::info!("VPN RawSocket: UserRawConnection stream closed"); + ctx.stop(); + } +} + +impl StreamHandler> for VpnRawSocket { + fn handle(&mut self, msg: WsResult, ctx: &mut Self::Context) { + self.heartbeat = Instant::now(); + match msg { + Ok(ws::Message::Text(_)) => { + log::warn!("VPN RawSocket: Received text message, not supported"); + } + Ok(ws::Message::Binary(bytes)) => self.forward(bytes.to_vec(), ctx), + Ok(ws::Message::Ping(msg)) => { + ctx.pong(&msg); + } + Ok(ws::Message::Pong(_)) => {} + Ok(ws::Message::Close(reason)) => { + log::warn!("Received message close, close reason: {:?}", reason); + ctx.close(reason); + ctx.stop(); + } + Ok(ws::Message::Continuation(_)) => { + log::warn!( + "VPN RawSocket: VPN {} connection error: continuation not supported", + self.network_id + ); + } + Ok(ws::Message::Nop) => { + log::warn!( + "VPN RawSocket: VPN {} connection error: nop not supported", + self.network_id + ); + } + Err(e) => { + log::error!( + "VPN RawSocket: VPN {} connection error: {:?}", + self.network_id, + e + ); + ctx.stop(); + } + } + } + + fn finished(&mut self, ctx: &mut Self::Context) { + log::info!("VPN RawSocket: Websocket stream closed"); + ctx.stop(); + } +} + +impl Handler for VpnRawSocket { + type Result = ::Result; + + fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result { + log::info!("VPN RawSocket: VPN {} is shutting down", self.network_id); + ctx.stop(); + Ok(()) + } +} + #[derive(thiserror::Error, Debug)] enum ApiError { #[error("VPN communication error: {0:?}")] diff --git a/exe-unit/src/network.rs b/exe-unit/src/network.rs index 64dd188e42..2f9f10ef33 100644 --- a/exe-unit/src/network.rs +++ b/exe-unit/src/network.rs @@ -1,4 +1,3 @@ -use std::convert::TryFrom; use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::path::Path; @@ -20,7 +19,7 @@ use ya_runtime_api::deploy::ContainerEndpoint; use ya_runtime_api::server::Network; use ya_service_bus::{typed, typed::Endpoint as GsbEndpoint}; use ya_utils_networking::vpn::common::DEFAULT_MAX_FRAME_SIZE; -use ya_utils_networking::vpn::{network::DuoEndpoint, Error as NetError}; +use ya_utils_networking::vpn::network::DuoEndpoint; use crate::error::Error; use crate::state::DeploymentNetwork; @@ -332,24 +331,12 @@ impl RemoteEndpoint { } } -impl<'a> TryFrom<&'a DeploymentNetwork> for Network { - type Error = Error; - - fn try_from(net: &'a DeploymentNetwork) -> Result { - let ip = net.network.addr(); - let mask = net.network.netmask(); - let gateway = net - .network - .hosts() - .find(|ip_| ip_ != &ip) - .ok_or(NetError::NetAddrTaken(ip))?; - - Ok(Network { - addr: ip.to_string(), - gateway: gateway.to_string(), - mask: mask.to_string(), - if_addr: net.node_ip.to_string(), - }) +fn network_to_runtime_command(net: &DeploymentNetwork) -> Network { + Network { + addr: net.network.addr().to_string(), + gateway: net.gateway.map(|g| g.to_string()).unwrap_or_default(), + mask: net.network.netmask().to_string(), + if_addr: net.node_ip.to_string(), } } diff --git a/exe-unit/src/network/vpn.rs b/exe-unit/src/network/vpn.rs index c466bd676c..0018ba0121 100644 --- a/exe-unit/src/network/vpn.rs +++ b/exe-unit/src/network/vpn.rs @@ -1,4 +1,4 @@ -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use actix::prelude::*; use futures::{future, FutureExt}; @@ -7,17 +7,17 @@ use ya_client_model::NodeId; use ya_core_model::activity::{self, RpcMessageError, VpnControl, VpnPacket}; use ya_core_model::identity; use ya_runtime_api::deploy::ContainerEndpoint; -use ya_runtime_api::server::{CreateNetwork, NetworkInterface, RuntimeService}; +use ya_runtime_api::server::{CreateNetwork, Network, NetworkInterface, RuntimeService}; use ya_service_bus::typed::Endpoint as GsbEndpoint; use ya_service_bus::{actix_rpc, typed, RpcEndpoint, RpcEnvelope, RpcRawCall}; use ya_utils_networking::vpn::network::DuoEndpoint; -use ya_utils_networking::vpn::{common::ntoh, Error as NetError, PeekPacket}; +use ya_utils_networking::vpn::{common::ntoh, Error as NetError, EtherField, PeekPacket}; use ya_utils_networking::vpn::{ArpField, ArpPacket, EtherFrame, EtherType, IpPacket, Networks}; use crate::acl::Acl; use crate::error::Error; use crate::message::Shutdown; -use crate::network::{self, Endpoint}; +use crate::network::{self, network_to_runtime_command, Endpoint}; use crate::state::Deployment; pub(crate) async fn start_vpn( @@ -39,18 +39,20 @@ pub(crate) async fn start_vpn( .ok_or_else(|| Error::Other("no default identity set".to_string()))? .node_id; - let networks = deployment + let network_commands = deployment .networks .values() - .map(TryFrom::try_from) - .collect::>()?; + .map(network_to_runtime_command) + .collect::>(); + let create_network = CreateNetwork { + networks: network_commands, + hosts: deployment.hosts.clone(), + interface: NetworkInterface::Vpn as i32, + }; + log::info!("Creating VPN network: {:#?}", create_network); let response = service - .create_network(CreateNetwork { - networks, - hosts: deployment.hosts.clone(), - interface: NetworkInterface::Vpn as i32, - }) + .create_network(create_network) .await .map_err(|e| Error::Other(format!("initialization error: {:?}", e)))?; @@ -153,6 +155,7 @@ impl Vpn { } fn handle_ip( + dst_mac: [u8; 6], frame: EtherFrame, networks: &Networks>, default_id: &str, @@ -173,7 +176,24 @@ impl Vpn { let ip = ip_pkt.dst_address(); match networks.endpoint(ip) { Some(endpoint) => Self::forward_frame(endpoint, default_id, frame), - None => log::debug!("[vpn] no endpoint for {ip:?}"), + None => { + //yagna local network mac address assignment + if dst_mac[0] == 0xA0 && dst_mac[1] == 0x13 { + //last four bytes should be ip address (our convention of assigning mac addresses) + match networks.endpoint(&dst_mac[2..6]) { + Some(endpoint) => Self::forward_frame(endpoint, default_id, frame), + None => { + log::debug!( + "[vpn] endpoint not found {:?} or {:?}", + &ip, + &dst_mac[2..6] + ) + } + } + } else { + log::debug!("[vpn] mac address not recognized {dst_mac:?}") + } + } } } } @@ -280,10 +300,17 @@ impl StreamHandler>> for Vpn { ya_packet_trace::try_extract_from_ip_frame(&packet) }); + if packet.len() < 14 { + log::debug!("[vpn] packet too short (egress)"); + return; + } + let dst_mac: [u8; 6] = packet[EtherField::DST_MAC].try_into().unwrap(); match EtherFrame::try_from(packet) { Ok(frame) => match &frame { EtherFrame::Arp(_) => Self::handle_arp(frame, &self.networks, &self.default_id), - EtherFrame::Ip(_) => Self::handle_ip(frame, &self.networks, &self.default_id), + EtherFrame::Ip(_) => { + Self::handle_ip(dst_mac, frame, &self.networks, &self.default_id) + } frame => log::debug!("[vpn] unimplemented EtherType: {}", frame), }, Err(err) => { diff --git a/exe-unit/src/state.rs b/exe-unit/src/state.rs index 5729a1416f..a213f54850 100644 --- a/exe-unit/src/state.rs +++ b/exe-unit/src/state.rs @@ -392,6 +392,7 @@ pub(crate) struct Deployment { pub(crate) struct DeploymentNetwork { pub network: IpNet, pub node_ip: IpAddr, + pub gateway: Option, pub nodes: HashMap, } @@ -408,12 +409,14 @@ impl Deployment { let network = to_net(&net.ip, net.mask)?; let node_ip = IpAddr::from_str(&net.node_ip)?; let nodes = Self::map_nodes(net.nodes)?; + let gateway = net.gateway.map(|ip| IpAddr::from_str(&ip)).transpose()?; Ok(( id, DeploymentNetwork { network, node_ip, nodes, + gateway, }, )) })