From 14f71e52f3b6752ccae39f71e986fbcf90f90715 Mon Sep 17 00:00:00 2001 From: Eloi DEMOLIS Date: Fri, 25 Aug 2023 00:27:57 +0200 Subject: [PATCH] PoC: pass proxied endpoints to proxy functions through trait Signed-off-by: Eloi DEMOLIS --- lib/src/protocol/mux/h1.rs | 12 +++++++-- lib/src/protocol/mux/h2.rs | 12 ++++++--- lib/src/protocol/mux/mod.rs | 51 +++++++++++++++++++++++++++++-------- 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/lib/src/protocol/mux/h1.rs b/lib/src/protocol/mux/h1.rs index 283d7562d..b002701e6 100644 --- a/lib/src/protocol/mux/h1.rs +++ b/lib/src/protocol/mux/h1.rs @@ -6,6 +6,8 @@ use crate::{ Readiness, }; +use super::UpdateReadiness; + pub struct ConnectionH1 { pub position: Position, pub readiness: Readiness, @@ -15,7 +17,10 @@ pub struct ConnectionH1 { } impl ConnectionH1 { - pub fn readable(&mut self, context: &mut Context) -> MuxResult { + pub fn readable(&mut self, context: &mut Context, endpoint: E) -> MuxResult + where + E: UpdateReadiness, + { println!("======= MUX H1 READABLE"); let stream = &mut context.streams[self.stream]; let kawa = stream.rbuffer(self.position); @@ -42,7 +47,10 @@ impl ConnectionH1 { } MuxResult::Continue } - pub fn writable(&mut self, context: &mut Context) -> MuxResult { + pub fn writable(&mut self, context: &mut Context, endpoint: E) -> MuxResult + where + E: UpdateReadiness, + { println!("======= MUX H1 WRITABLE"); let stream = &mut context.streams[self.stream]; let kawa = stream.wbuffer(self.position); diff --git a/lib/src/protocol/mux/h2.rs b/lib/src/protocol/mux/h2.rs index 78f469399..951e4e2d8 100644 --- a/lib/src/protocol/mux/h2.rs +++ b/lib/src/protocol/mux/h2.rs @@ -12,7 +12,7 @@ use crate::{ Readiness, }; -use super::GenericHttpStream; +use super::{GenericHttpStream, UpdateReadiness}; #[derive(Debug)] pub enum H2State { @@ -66,7 +66,10 @@ pub enum H2StreamId { } impl ConnectionH2 { - pub fn readable(&mut self, context: &mut Context) -> MuxResult { + pub fn readable(&mut self, context: &mut Context, endpoint: E) -> MuxResult + where + E: UpdateReadiness, + { println!("======= MUX H2 READABLE"); let (stream_id, kawa) = if let Some((stream_id, amount)) = self.expect { let kawa = match stream_id { @@ -230,7 +233,10 @@ impl ConnectionH2 { MuxResult::Continue } - pub fn writable(&mut self, context: &mut Context) -> MuxResult { + pub fn writable(&mut self, context: &mut Context, endpoint: E) -> MuxResult + where + E: UpdateReadiness, + { println!("======= MUX H2 WRITABLE"); match (&self.state, &self.position) { (H2State::ClientPreface, Position::Client) => todo!("Send PRI + client Settings"), diff --git a/lib/src/protocol/mux/mod.rs b/lib/src/protocol/mux/mod.rs index 425ba476a..246568cf3 100644 --- a/lib/src/protocol/mux/mod.rs +++ b/lib/src/protocol/mux/mod.rs @@ -56,6 +56,11 @@ pub enum Connection { H2(ConnectionH2), } +pub trait UpdateReadiness { + fn readiness(&self, token: Token) -> &Readiness; + fn readiness_mut(&mut self, token: Token) -> &mut Readiness; +} + impl Connection { pub fn new_h1_server(front_stream: Front) -> Connection { Connection::H1(ConnectionH1 { @@ -143,20 +148,46 @@ impl Connection { Connection::H2(c) => c.socket.socket_ref(), } } - fn readable(&mut self, context: &mut Context) -> MuxResult { + fn readable(&mut self, context: &mut Context, endpoint: E) -> MuxResult + where + E: UpdateReadiness, + { match self { - Connection::H1(c) => c.readable(context), - Connection::H2(c) => c.readable(context), + Connection::H1(c) => c.readable(context, endpoint), + Connection::H2(c) => c.readable(context, endpoint), } } - fn writable(&mut self, context: &mut Context) -> MuxResult { + fn writable(&mut self, context: &mut Context, endpoint: E) -> MuxResult + where + E: UpdateReadiness, + { match self { - Connection::H1(c) => c.writable(context), - Connection::H2(c) => c.writable(context), + Connection::H1(c) => c.writable(context, endpoint), + Connection::H2(c) => c.writable(context, endpoint), } } } +struct EndpointServer<'a>(&'a mut Connection); +struct EndpointClient<'a>(&'a mut Router); + +impl<'a> UpdateReadiness for EndpointServer<'a> { + fn readiness(&self, _token: Token) -> &Readiness { + self.0.readiness() + } + fn readiness_mut(&mut self, _token: Token) -> &mut Readiness { + self.0.readiness_mut() + } +} +impl<'a> UpdateReadiness for EndpointClient<'a> { + fn readiness(&self, token: Token) -> &Readiness { + self.0.backends.get(&token).unwrap().readiness() + } + fn readiness_mut(&mut self, token: Token) -> &mut Readiness { + self.0.backends.get_mut(&token).unwrap().readiness_mut() + } +} + pub struct Stream { // pub request_id: Ulid, pub window: i32, @@ -459,7 +490,7 @@ impl SessionState for Mux { let mut dirty = false; if self.frontend.readiness().filter_interest().is_readable() { - match self.frontend.readable(context) { + match self.frontend.readable(context, EndpointClient(&mut self.router)) { MuxResult::Continue => (), MuxResult::CloseSession => return SessionResult::Close, MuxResult::Close(_) => todo!(), @@ -483,7 +514,7 @@ impl SessionState for Mux { for (_, backend) in self.router.backends.iter_mut() { if backend.readiness().filter_interest().is_writable() { - match backend.writable(context) { + match backend.writable(context, EndpointServer(&mut self.frontend)) { MuxResult::Continue => (), MuxResult::CloseSession => return SessionResult::Close, MuxResult::Close(_) => todo!(), @@ -493,7 +524,7 @@ impl SessionState for Mux { } if backend.readiness().filter_interest().is_readable() { - match backend.readable(context) { + match backend.readable(context, EndpointServer(&mut self.frontend)) { MuxResult::Continue => (), MuxResult::CloseSession => return SessionResult::Close, MuxResult::Close(_) => todo!(), @@ -504,7 +535,7 @@ impl SessionState for Mux { } if self.frontend.readiness().filter_interest().is_writable() { - match self.frontend.writable(context) { + match self.frontend.writable(context, EndpointClient(&mut self.router)) { MuxResult::Continue => (), MuxResult::CloseSession => return SessionResult::Close, MuxResult::Close(_) => todo!(),