Skip to content

Commit

Permalink
g3proxy: block websocket upgrade request early
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Oct 10, 2024
1 parent 7059228 commit 945c64b
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 6 deletions.
44 changes: 38 additions & 6 deletions g3proxy/src/inspect/http/v1/upgrade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,34 @@ where
}
}

pub(super) async fn forward_icap<CW, UR, UW>(
async fn check_blocked<CW>(&mut self, clt_w: &mut CW) -> ServerTaskResult<()>
where
CW: AsyncWrite + Unpin,
{
if self.ctx.websocket_inspect_policy().is_block() {
let rsp = HttpProxyClientResponse::forbidden(self.req.version);
self.should_close = true;
if rsp.reply_err_to_request(clt_w).await.is_ok() {
self.http_notes.rsp_status = rsp.status();
}
Err(ServerTaskError::InternalAdapterError(anyhow!(
"websocket blocked by inspection policy"
)))
} else {
Ok(())
}
}

pub(super) async fn forward_original<CW, UR, UW>(
&mut self,
rsp_io: &mut HttpResponseIo<CW, UR, UW>,
reqmod_client: &IcapReqmodClient,
) -> Option<(HttpUpgradeToken, UpstreamAddr)>
where
CW: AsyncWrite + Unpin,
UR: AsyncRead + Unpin,
UW: AsyncWrite + Unpin,
{
match self.do_forward(rsp_io, reqmod_client).await {
match self.do_forward_original(rsp_io).await {
Ok(v) => {
intercept_log!(self, &v, "ok");
v
Expand All @@ -174,16 +191,30 @@ where
}
}

pub(super) async fn forward_original<CW, UR, UW>(
pub(super) async fn do_forward_original<CW, UR, UW>(
&mut self,
rsp_io: &mut HttpResponseIo<CW, UR, UW>,
) -> ServerTaskResult<Option<(HttpUpgradeToken, UpstreamAddr)>>
where
CW: AsyncWrite + Unpin,
UR: AsyncRead + Unpin,
UW: AsyncWrite + Unpin,
{
self.check_blocked(&mut rsp_io.clt_w).await?;
self.send_request(None, rsp_io).await
}

pub(super) async fn forward_icap<CW, UR, UW>(
&mut self,
rsp_io: &mut HttpResponseIo<CW, UR, UW>,
reqmod_client: &IcapReqmodClient,
) -> Option<(HttpUpgradeToken, UpstreamAddr)>
where
CW: AsyncWrite + Unpin,
UR: AsyncRead + Unpin,
UW: AsyncWrite + Unpin,
{
match self.send_request(None, rsp_io).await {
match self.do_forward_icap(rsp_io, reqmod_client).await {
Ok(v) => {
intercept_log!(self, &v, "ok");
v
Expand All @@ -198,7 +229,7 @@ where
}
}

async fn do_forward<CW, UR, UW>(
async fn do_forward_icap<CW, UR, UW>(
&mut self,
rsp_io: &mut HttpResponseIo<CW, UR, UW>,
reqmod_client: &IcapReqmodClient,
Expand All @@ -208,6 +239,7 @@ where
UR: AsyncRead + Unpin,
UW: AsyncWrite + Unpin,
{
self.check_blocked(&mut rsp_io.clt_w).await?;
match reqmod_client
.h1_adapter(
self.ctx.server_config.limited_copy_config(),
Expand Down
19 changes: 19 additions & 0 deletions g3proxy/src/inspect/http/v2/connect/extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ where
}
}

fn reply_forbidden(&mut self, mut clt_send_rsp: SendResponse<Bytes>) {
if let Ok(rsp) = Response::builder()
.status(StatusCode::FORBIDDEN)
.version(Version::HTTP_2)
.body(())
{
let rsp_status = rsp.status().as_u16();
if clt_send_rsp.send_response(rsp, true).is_ok() {
self.http_notes.rsp_status = rsp_status;
}
}
}

pub(crate) async fn into_running(
mut self,
clt_req: Request<RecvStream>,
Expand Down Expand Up @@ -162,6 +175,12 @@ where
}
};

if self.ctx.websocket_inspect_policy().is_block() {
self.reply_forbidden(clt_send_rsp);
intercept_log!(self, "websocket blocked by inspection policy");
return;
}

let mut ws_notes = WebSocketNotes::new(clt_req.uri().clone());
for (name, value) in clt_req.headers() {
ws_notes.append_request_header(name, value);
Expand Down
7 changes: 7 additions & 0 deletions lib/g3-dpi/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ pub enum ProtocolInspectPolicy {
Block,
}

impl ProtocolInspectPolicy {
#[inline]
pub fn is_block(&self) -> bool {
matches!(self, ProtocolInspectPolicy::Block)
}
}

impl FromStr for ProtocolInspectPolicy {
type Err = ();

Expand Down

0 comments on commit 945c64b

Please sign in to comment.