From cb0e8fa368390d5531b6e2ddc6ea0d9e53d4fc3c Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Tue, 27 Aug 2024 11:44:07 -0700 Subject: [PATCH] adapt to incoming httpun / httpun-ws / h2 APIs (#207) * adapt to incoming httpun / httpun-ws / h2 APIs * make it build * bring back some pins * wip * wip * wip --- flake.lock | 14 +++++------ lib/body.ml | 54 +++++++++++++++++++++++------------------ lib/http_impl.ml | 14 +++++------ lib/http_server_impl.ml | 26 +++++++++++--------- lib/response.ml | 8 ++---- lib/ws.ml | 15 ++++++++---- piaf.opam | 8 +++++- piaf.opam.template | 8 +++++- 8 files changed, 84 insertions(+), 63 deletions(-) diff --git a/flake.lock b/flake.lock index 93b265e..9cbb229 100644 --- a/flake.lock +++ b/flake.lock @@ -41,11 +41,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1719276345, - "narHash": "sha256-rXdYFXVXSpYO5libenjqPogTEMZ7OCPaSoB/3nix5UQ=", + "lastModified": 1724782686, + "narHash": "sha256-rsfvlQ7nZETmkRaSZqQhHOd8b2Qfy3ciTS/7fJAIbBk=", "owner": "nix-ocaml", "repo": "nix-overlays", - "rev": "b7367d848bb29cda64468d064bd8f2f04fc63034", + "rev": "cc5455017564e67e97004761b4cc5422bcd7317b", "type": "github" }, "original": { @@ -56,17 +56,17 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1719233611, - "narHash": "sha256-YZO/PDjf9js7lQsSCLFMMFRCbgIKzasGA2y1NVRIQR8=", + "lastModified": 1724740600, + "narHash": "sha256-JP6fFxQyeWd5GTJrlwItIJJtT2VuOiwIzb/QEMNICwI=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "36e8d8a53b253019a3b325015be47196c278afb7", + "rev": "c32bd049ad54e6a8c1300164061a6e18de6781bc", "type": "github" }, "original": { "owner": "NixOS", "repo": "nixpkgs", - "rev": "36e8d8a53b253019a3b325015be47196c278afb7", + "rev": "c32bd049ad54e6a8c1300164061a6e18de6781bc", "type": "github" } }, diff --git a/lib/body.ml b/lib/body.ml index e1cbade..9a96700 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -321,7 +321,7 @@ module Raw = struct val write_string : t -> ?off:int -> ?len:int -> string -> unit val write_bigstring : t -> ?off:int -> ?len:int -> Bigstringaf.t -> unit val schedule_bigstring : t -> ?off:int -> ?len:int -> Bigstringaf.t -> unit - val flush : t -> (unit -> unit) -> unit + val flush : t -> ([ `Written | `Closed ] -> unit) -> unit val close : t -> unit val is_closed : t -> bool end @@ -438,36 +438,44 @@ module Raw = struct to_t reader ?on_eof ~body_length ~body_error:incomplete_body_error body let flush_and_close : - type a. (module Writer with type t = a) -> a -> (unit -> unit) -> unit - = - fun (module Writer) body f -> - Writer.close body; - Writer.flush body f - - let stream_write_body : type a. (module Writer with type t = a) -> a - -> Bigstringaf.t IOVec.t Stream.t + -> ([ `Written | `Closed ] -> unit) -> unit = - fun (module Writer) body stream -> - Stream.iter - ~f:(fun { IOVec.buffer; off; len } -> - (* If the peer left abruptly the connection will be shutdown. Avoid - * crashing the server with exceptions related to the writer being - * closed. *) - if not (Writer.is_closed body) - then ( + fun (module Writer) body f -> + Writer.close body; + Writer.flush body f + + exception Local + + let stream_write_body = + let stream_write_body : + type a. + (module Writer with type t = a) + -> a + -> Bigstringaf.t IOVec.t Stream.t + -> unit + = + fun (module Writer) body stream -> + Stream.iter + ~f:(fun { IOVec.buffer; off; len } -> + (* If the peer left abruptly the connection will be shutdown. Avoid + * crashing the server with exceptions related to the writer being + * closed. *) Writer.schedule_bigstring body ~off ~len buffer; let p, u = Promise.create () in - Writer.flush body (fun () -> - Promise.resolve u (); + Writer.flush body (fun reason -> + Promise.resolve u reason; Logs.debug (fun m -> m "Flushed output chunk of length %d" len)); - Promise.await p) - else ()) - stream; - flush_and_close (module Writer) body ignore + match Promise.await p with `Closed -> raise Local | `Written -> ()) + stream; + flush_and_close (module Writer) body ignore + in + + fun writer body stream -> + try stream_write_body writer body stream with Local -> () end (* Traversal *) diff --git a/lib/http_impl.ml b/lib/http_impl.ml index 21c68f3..f5448c4 100644 --- a/lib/http_impl.ml +++ b/lib/http_impl.ml @@ -82,9 +82,11 @@ let flush_and_close : type a. (module Body.Raw.Writer with type t = a) -> a -> unit = fun b request_body -> - Body.Raw.flush_and_close b request_body (fun () -> - Logs.info (fun m -> - m "Request body has been completely and successfully uploaded")) + Body.Raw.flush_and_close b request_body (function + | `Closed -> Logs.warn (fun m -> m "Request body not completely written") + | `Written -> + Logs.info (fun m -> + m "Request body has been completely and successfully uploaded")) let handle_response : sw:Switch.t @@ -215,12 +217,8 @@ let upgrade_connection : Logs.info (fun m -> m "Upgrading connection to the Websocket protocol"); let ws_conn = - let error_handler _wsd error = - Promise.resolve notify_error (error :> Error.client) - in Httpun_ws.Client_connection.create - ~error_handler - (Ws.Handler.websocket_handler ~sw ~notify_wsd) + (Ws.Handler.websocket_handler ~sw ~notify_wsd ~notify_error) in let result = Fiber.any diff --git a/lib/http_server_impl.ml b/lib/http_server_impl.ml index 4e2a0b4..c09a04b 100644 --- a/lib/http_server_impl.ml +++ b/lib/http_server_impl.ml @@ -69,18 +69,20 @@ let do_sendfile : (* Flush everything to the wire before calling `sendfile`, as we're gonna bypass the http/af runtime and write bytes to the file descriptor directly. *) - Http.Body.Writer.flush response_body (fun () -> - match - Posix.sendfile - (module Http.Body.Writer) - ~src_fd - ~dst_fd:fd - response_body - with - | Ok () -> Http.Body.Writer.close response_body - | Error exn -> - Http.Body.Writer.close response_body; - report_exn exn)) + Http.Body.Writer.flush response_body (function + | `Closed -> () + | `Written -> + (match + Posix.sendfile + (module Http.Body.Writer) + ~src_fd + ~dst_fd:fd + response_body + with + | Ok () -> Http.Body.Writer.close response_body + | Error exn -> + Http.Body.Writer.close response_body; + report_exn exn))) let handle_request : type reqd writer. diff --git a/lib/response.ml b/lib/response.ml index de1a655..126c13b 100644 --- a/lib/response.ml +++ b/lib/response.ml @@ -118,16 +118,12 @@ module Upgrade = struct | HTTP_1_1 -> let upgrade_handler = let wsd_received, notify_wsd = Promise.create () in + (* TODO(anmonteiro): not handling this error? *) let _error_received, notify_error = Promise.create () in fun ~sw upgrade -> - let error_handler _wsd error = - Promise.resolve notify_error (error :> Error.client) - in - let ws_conn = Httpun_ws.Server_connection.create_websocket - ~error_handler - (Ws.Handler.websocket_handler ~sw ~notify_wsd) + (Ws.Handler.websocket_handler ~sw ~notify_wsd ~notify_error) in Fiber.fork ~sw (fun () -> f (Promise.await wsd_received)); upgrade (Gluten.make (module Httpun_ws.Server_connection) ws_conn) diff --git a/lib/ws.ml b/lib/ws.ml index adcc352..0985751 100644 --- a/lib/ws.ml +++ b/lib/ws.ml @@ -113,7 +113,7 @@ end = struct end module Handler = struct - let websocket_handler ~sw ~notify_wsd wsd = + let websocket_handler ~sw ~notify_wsd ~notify_error wsd = let frameq = Queue.create () in let messages, push_to_messages = Stream.create 256 in Promise.resolve notify_wsd (Descriptor.create ~messages wsd); @@ -190,10 +190,15 @@ module Handler = struct | `Other _ -> failwith "Custom WebSocket frame types not yet supported") in - let eof () = - Logs.info (fun m -> m "Websocket connection EOF"); - Httpun_ws.Wsd.close wsd; - push_to_messages None + let eof ?error () = + match error with + | Some error -> + Httpun_ws.Wsd.close wsd; + Promise.resolve notify_error (error :> Error.client) + | None -> + Logs.info (fun m -> m "Websocket connection EOF"); + Httpun_ws.Wsd.close wsd; + push_to_messages None in { Httpun_ws.Websocket_connection.frame; eof } end diff --git a/piaf.opam b/piaf.opam index 474a318..71b8b83 100644 --- a/piaf.opam +++ b/piaf.opam @@ -43,6 +43,12 @@ build: [ ] dev-repo: "git+https://github.com/anmonteiro/piaf.git" pin-depends: [ - [ "eio-ssl.dev" "git+https://github.com/anmonteiro/eio-ssl.git#0.3.0" ] + [ "httpun-types.dev" "git+https://github.com/anmonteiro/httpun.git" ] + [ "httpun.dev" "git+https://github.com/anmonteiro/httpun.git" ] + [ "httpun-eio.dev" "git+https://github.com/anmonteiro/httpun.git" ] + [ "hpack.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ] + [ "h2.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ] + [ "h2-eio.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ] + [ "httpun-ws.dev" "git+https://github.com/anmonteiro/httpun-ws.git" ] [ "multipart_form.dev" "git+https://github.com/anmonteiro/multipart_form.git" ] ] diff --git a/piaf.opam.template b/piaf.opam.template index 7ece3e5..44dde5f 100644 --- a/piaf.opam.template +++ b/piaf.opam.template @@ -1,4 +1,10 @@ pin-depends: [ - [ "eio-ssl.dev" "git+https://github.com/anmonteiro/eio-ssl.git#0.3.0" ] + [ "httpun-types.dev" "git+https://github.com/anmonteiro/httpun.git" ] + [ "httpun.dev" "git+https://github.com/anmonteiro/httpun.git" ] + [ "httpun-eio.dev" "git+https://github.com/anmonteiro/httpun.git" ] + [ "hpack.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ] + [ "h2.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ] + [ "h2-eio.dev" "git+https://github.com/anmonteiro/ocaml-h2.git" ] + [ "httpun-ws.dev" "git+https://github.com/anmonteiro/httpun-ws.git" ] [ "multipart_form.dev" "git+https://github.com/anmonteiro/multipart_form.git" ] ]