Skip to content

Commit

Permalink
Eio.Net.connect: add options for bind and reuse_addr/port
Browse files Browse the repository at this point in the history
  • Loading branch information
art-w committed Mar 26, 2024
1 parent d3f243c commit 7fd9a62
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 44 deletions.
2 changes: 1 addition & 1 deletion lib_eio/mock/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module Impl = struct
Switch.on_release sw (fun () -> Eio.Resource.close socket);
socket

let connect t ~sw ?bind:_ addr =
let connect t ~sw ~options:_ addr =
traceln "%s: connect to %a" t.label Eio.Net.Sockaddr.pp addr;
let socket = Handler.run t.on_connect in
Switch.on_release sw (fun () -> Eio.Flow.close socket);
Expand Down
11 changes: 8 additions & 3 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ type 'tag ty = [`Network | `Platform of 'tag]
type 'a t = 'a r
constraint 'a = [> [> `Generic] ty]

type option = ..
type option += Bind of Sockaddr.stream
| Reuse_addr
| Reuse_port

module Pi = struct
module type STREAM_SOCKET = sig
type tag
Expand Down Expand Up @@ -235,7 +240,7 @@ module Pi = struct
type tag

val listen : t -> reuse_addr:bool -> reuse_port:bool -> backlog:int -> sw:Switch.t -> Sockaddr.stream -> tag listening_socket_ty r
val connect : t -> sw:Switch.t -> ?bind:Sockaddr.stream -> Sockaddr.stream -> tag stream_socket_ty r
val connect : t -> sw:Switch.t -> options:option list -> Sockaddr.stream -> tag stream_socket_ty r
val datagram_socket :
t
-> reuse_addr:bool
Expand Down Expand Up @@ -295,10 +300,10 @@ let listen (type tag) ?(reuse_addr=false) ?(reuse_port=false) ~backlog ~sw (t:[>
let module X = (val (Resource.get ops Pi.Network)) in
X.listen t ~reuse_addr ~reuse_port ~backlog ~sw

let connect (type tag) ~sw ?bind (t:[> tag ty] r) addr =
let connect (type tag) ~sw ?(options = []) (t:[> tag ty] r) addr =
let (Resource.T (t, ops)) = t in
let module X = (val (Resource.get ops Pi.Network)) in
try X.connect t ~sw ?bind addr
try X.connect t ~sw ~options addr
with Exn.Io _ as ex ->
let bt = Printexc.get_raw_backtrace () in
Exn.reraise_with_context ex bt "connecting to %a" Sockaddr.pp addr
Expand Down
13 changes: 8 additions & 5 deletions lib_eio/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,15 @@ type 'a t = 'a r

(** {2 Out-bound Connections} *)

val connect : sw:Switch.t -> ?bind:Sockaddr.stream -> [> 'tag ty] t -> Sockaddr.stream -> 'tag stream_socket_ty r
(** [connect ~sw t addr] is a new socket connected to remote address [addr].
type option = ..
type option += Bind of Sockaddr.stream
| Reuse_addr
| Reuse_port

The new socket will be closed when [sw] finishes, unless closed manually first.
val connect : sw:Switch.t -> ?options:option list -> [> 'tag ty] t -> Sockaddr.stream -> 'tag stream_socket_ty r
(** [connect ~sw t addr] is a new socket connected to remote address [addr].
@param bind Set the outbound client address. *)
The new socket will be closed when [sw] finishes, unless closed manually first. *)

val with_tcp_connect :
?timeout:Time.Timeout.t ->
Expand Down Expand Up @@ -348,7 +351,7 @@ module Pi : sig
t -> reuse_addr:bool -> reuse_port:bool -> backlog:int -> sw:Switch.t ->
Sockaddr.stream -> tag listening_socket_ty r

val connect : t -> sw:Switch.t -> ?bind:Sockaddr.stream -> Sockaddr.stream -> tag stream_socket_ty r
val connect : t -> sw:Switch.t -> options:option list -> Sockaddr.stream -> tag stream_socket_ty r

val datagram_socket :
t
Expand Down
12 changes: 12 additions & 0 deletions lib_eio/unix/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,15 @@ let socketpair_datagram ~sw ?(domain=Unix.PF_UNIX) ?(protocol=0) () =

let fd socket =
Option.get (Resource.fd_opt socket)

let apply_option fd = function
| Eio.Net.Bind bind_addr ->
Unix.bind fd (sockaddr_to_unix bind_addr)
| Eio.Net.Reuse_addr ->
Unix.setsockopt fd Unix.SO_REUSEADDR true
| Eio.Net.Reuse_port ->
Unix.setsockopt fd Unix.SO_REUSEPORT true
| _ ->
invalid_arg "Unknown Eio.Net.option"

let configure options fd = List.iter (apply_option fd) options
3 changes: 3 additions & 0 deletions lib_eio/unix/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ val socketpair_datagram :
val getnameinfo : Eio.Net.Sockaddr.t -> (string * string)
(** [getnameinfo sockaddr] returns domain name and service for [sockaddr]. *)

val configure : Eio.Net.option list -> Unix.file_descr -> unit
(** [configure options fd] prepare the socket with the chosen options. *)

type _ Effect.t +=
| Import_socket_stream :
Switch.t * bool * Unix.file_descr -> [`Unix_fd | stream_socket_ty] r Effect.t (** See {!import_socket_stream} *)
Expand Down
9 changes: 4 additions & 5 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,11 @@ let socket_domain_of = function
~v4:(fun _ -> Unix.PF_INET)
~v6:(fun _ -> Unix.PF_INET6)

let connect ~sw ?bind connect_addr =
let connect ~sw ~options connect_addr =
let addr = Eio_unix.Net.sockaddr_to_unix connect_addr in
let sock_unix = Unix.socket ~cloexec:true (socket_domain_of connect_addr) Unix.SOCK_STREAM 0 in
let sock = Fd.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in
let bind = Option.map Eio_unix.Net.sockaddr_to_unix bind in
Low_level.connect sock ?bind addr;
Low_level.connect sock ~options addr;
(flow sock :> _ Eio_unix.Net.stream_socket)

module Impl = struct
Expand Down Expand Up @@ -297,8 +296,8 @@ module Impl = struct
Unix.listen sock_unix backlog;
(listening_socket sock :> _ Eio.Net.listening_socket_ty r)

let connect () ~sw ?bind addr =
(connect ~sw ?bind addr :> [`Generic | `Unix] Eio.Net.stream_socket_ty r)
let connect () ~sw ~options addr =
(connect ~sw ~options addr :> [`Generic | `Unix] Eio.Net.stream_socket_ty r)

let datagram_socket () ~reuse_addr ~reuse_port ~sw saddr =
if reuse_addr then (
Expand Down
17 changes: 9 additions & 8 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,16 @@ let splice src ~dst ~len =
else if res = 0 then raise End_of_file
else raise @@ Err.wrap (Uring.error_of_errno res) "splice" ""

let try_bind fd = function
| None -> ()
| Some bind_addr ->
try Unix.bind fd bind_addr
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap_fs code name arg

let connect fd ?bind addr =
let apply_option fd = function
| Eio.Net.Bind bind_addr ->
let bind_addr = Eio_unix.Net.sockaddr_to_unix bind_addr in
(try Unix.bind fd bind_addr
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap_fs code name arg)
| _ -> invalid_arg "Unknown option"

let connect fd ~options addr =
Fd.use_exn "connect" fd @@ fun fd ->
try_bind fd bind;
List.iter (apply_option fd) options ;
let res = Sched.enter "connect" (enqueue_connect fd addr) in
if res < 0 then (
let ex =
Expand Down
5 changes: 3 additions & 2 deletions lib_eio_linux/low_level.mli
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ val splice : fd -> dst:fd -> len:int -> int
@raise End_of_file [src] is at the end of the file.
@raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *)

val connect : fd -> ?bind:Unix.sockaddr -> Unix.sockaddr -> unit
(** [connect fd addr] attempts to connect socket [fd] to [addr]. *)
val connect : fd -> options:Eio.Net.option list -> Unix.sockaddr -> unit
(** [connect fd ~options addr] attempts to connect socket [fd] to [addr]
after configuring the socket with [options]. *)

val await_readable : fd -> unit
(** [await_readable fd] blocks until [fd] is readable (or has an error). *)
Expand Down
4 changes: 2 additions & 2 deletions lib_eio_posix/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ let socket ~sw socket_domain socket_type protocol =
Unix.set_nonblock sock_unix;
Fd.of_unix ~sw ~blocking:false ~close_unix:true sock_unix

let connect fd ?bind addr =
let connect fd ~options addr =
try
Fd.use_exn "connect" fd @@ fun fd ->
Option.iter (Unix.bind fd) bind;
Eio_unix.Net.configure options fd ;
Unix.connect fd addr
with
| Unix.Unix_error ((EINTR | EAGAIN | EWOULDBLOCK | EINPROGRESS), _, _) ->
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_posix/low_level.mli
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ val read : fd -> bytes -> int -> int -> int
val write : fd -> bytes -> int -> int -> int

val socket : sw:Switch.t -> Unix.socket_domain -> Unix.socket_type -> int -> fd
val connect : fd -> ?bind:Unix.sockaddr -> Unix.sockaddr -> unit
val connect : fd -> options:Eio.Net.option list -> Unix.sockaddr -> unit
val accept : sw:Switch.t -> fd -> fd * Unix.sockaddr

val shutdown : fd -> Unix.shutdown_command -> unit
Expand Down
9 changes: 4 additions & 5 deletions lib_eio_posix/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr.
);
(listening_socket ~hook sock :> _ Eio.Net.listening_socket_ty r)

let connect ~sw ?bind connect_addr =
let connect ~sw ~options connect_addr =
let socket_type, addr =
match connect_addr with
| `Unix path -> Unix.SOCK_STREAM, Unix.ADDR_UNIX path
Expand All @@ -147,9 +147,8 @@ let connect ~sw ?bind connect_addr =
Unix.SOCK_STREAM, Unix.ADDR_INET (host, port)
in
let sock = Low_level.socket ~sw (socket_domain_of connect_addr) socket_type 0 in
let bind = Option.map Eio_unix.Net.sockaddr_to_unix bind in
try
Low_level.connect sock ?bind addr;
Low_level.connect sock ~options addr;
(Flow.of_fd sock :> _ Eio_unix.Net.stream_socket)
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)

Expand All @@ -175,8 +174,8 @@ module Impl = struct

let listen () = listen

let connect () ~sw ?bind addr =
let socket = connect ~sw ?bind addr in
let connect () ~sw ~options addr =
let socket = connect ~sw ~options addr in
(socket :> [`Generic | `Unix] Eio.Net.stream_socket_ty r)

let datagram_socket () ~reuse_addr ~reuse_port ~sw saddr =
Expand Down
10 changes: 2 additions & 8 deletions lib_eio_windows/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,10 @@ let socket ~sw socket_domain socket_type protocol =
Unix.set_nonblock sock_unix;
Fd.of_unix ~sw ~blocking:false ~close_unix:true sock_unix

let try_bind fd = function
| None -> ()
| Some bind_addr ->
try Unix.bind fd bind_addr
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap_fs code name arg

let connect fd ?bind addr =
let connect fd ~options addr =
try
Fd.use_exn "connect" fd @@ fun fd ->
try_bind fd bind ;
Eio_unix.Net.configure options fd ;
Unix.connect fd addr
with
| Unix.Unix_error ((EINTR | EAGAIN | EWOULDBLOCK | EINPROGRESS), _, _) ->
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_windows/low_level.mli
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ val read_cstruct : fd -> Cstruct.t -> int
val write : fd -> bytes -> int -> int -> int

val socket : sw:Switch.t -> Unix.socket_domain -> Unix.socket_type -> int -> fd
val connect : fd -> Unix.sockaddr -> unit
val connect : fd -> options:Eio.Net.option list -> Unix.sockaddr -> unit
val accept : sw:Switch.t -> fd -> fd * Unix.sockaddr

val shutdown : fd -> Unix.shutdown_command -> unit
Expand Down
5 changes: 2 additions & 3 deletions lib_eio_windows/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr.
);
(listening_socket ~hook sock :> _ Eio.Net.listening_socket_ty r)

let connect ~sw ?bind connect_addr =
let connect ~sw ~options connect_addr =
let socket_type, addr =
match connect_addr with
| `Unix path -> Unix.SOCK_STREAM, Unix.ADDR_UNIX path
Expand All @@ -151,9 +151,8 @@ let connect ~sw ?bind connect_addr =
Unix.SOCK_STREAM, Unix.ADDR_INET (host, port)
in
let sock = Low_level.socket ~sw (socket_domain_of connect_addr) socket_type 0 in
let bind = Option.map Eio_unix.Net.sockaddr_to_unix bind in
try
Low_level.connect sock ?bind addr;
Low_level.connect sock ~options addr;
(Flow.of_fd sock :> _ Eio_unix.Net.stream_socket)
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)

Expand Down

0 comments on commit 7fd9a62

Please sign in to comment.