Skip to content

Commit

Permalink
Child_processes: ensure stderr/stdout get flushed
Browse files Browse the repository at this point in the history
  • Loading branch information
emberian committed Dec 13, 2023
1 parent dad360b commit 2d386c0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 28 deletions.
6 changes: 1 addition & 5 deletions src/lib/child_processes/child_processes.ml
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,7 @@ let start_custom :
Deferred.Or_error.try_with ~here:[%here] (fun () -> Process.wait process)
in
[%log trace] "child process %s died" name ;
don't_wait_for
(let%bind () = after (Time.Span.of_sec 1.) in
let%bind () = Writer.close @@ Process.stdin process in
let%bind () = Reader.close @@ Process.stdout process in
Reader.close @@ Process.stderr process ) ;
don't_wait_for (Writer.close @@ Process.stdin process) ;
let%bind () = Sys.remove lock_path in
Ivar.fill terminated_ivar termination_status ;
let log_bad_termination () =
Expand Down
57 changes: 34 additions & 23 deletions src/lib/mina_net2/libp2p_helper.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type t =
{ process : Child_processes.t
; logger : Logger.t
; mutable finished : bool
; stderr_finished : unit Ivar.t
; outstanding_requests :
Libp2p_ipc.rpc_response_body Or_error.t Ivar.t
Libp2p_ipc.Sequence_number.Table.t
Expand All @@ -141,13 +142,15 @@ let handle_libp2p_helper_termination t ~pids ~killed result =
~metadata:
[ ("exit_status", `String (Unix.Exit_or_signal.to_string_hum e)) ] ;
t.finished <- true ;
let%map () = Ivar.read t.stderr_finished in
raise Libp2p_helper_died_unexpectedly
| Error err ->
[%log' fatal t.logger]
!"Child processes library could not track libp2p_helper process: $err"
~metadata:[ ("err", Error_json.error_to_yojson err) ] ;
t.finished <- true ;
let%map () = Deferred.ignore_m (Child_processes.kill t.process) in
let%bind () = Deferred.ignore_m (Child_processes.kill t.process) in
let%map () = Ivar.read t.stderr_finished in
raise Libp2p_helper_died_unexpectedly
| Ok (Ok ()) ->
[%log' error t.logger]
Expand Down Expand Up @@ -249,33 +252,39 @@ let spawn ?(allow_multiple_instances = false) ~logger ~pids ~conf_dir
{ process
; logger
; finished = false
; stderr_finished = Ivar.create ()
; outstanding_requests = Libp2p_ipc.Sequence_number.Table.create ()
}
in
termination_handler := handle_libp2p_helper_termination t ~pids ;
O1trace.background_thread "handle_libp2p_helper_subprocess_logs"
(fun () ->
Child_processes.stderr process
|> Strict_pipe.Reader.iter ~f:(fun line ->
Mina_metrics.(
Counter.inc_one Mina_metrics.Network.ipc_logs_received_total) ;
let record_result =
try
Some
(Go_log.record_of_yojson @@ Yojson.Safe.from_string line)
with Yojson.Json_error _error -> None
in
( match record_result with
| Some (Ok record) ->
record |> Go_log.record_to_message |> Logger.raw logger
| Some (Error error) ->
[%log error]
"failed to parse record over libp2p_helper stderr: \
$error"
~metadata:[ ("error", `String error) ]
| None ->
Core.print_endline line ) ;
Deferred.unit ) ) ;
let%map () =
Child_processes.stderr process
|> Strict_pipe.Reader.iter ~f:(fun line ->
Mina_metrics.(
Counter.inc_one
Mina_metrics.Network.ipc_logs_received_total) ;
let record_result =
try
Some
( Go_log.record_of_yojson
@@ Yojson.Safe.from_string line )
with Yojson.Json_error _error -> None
in
( match record_result with
| Some (Ok record) ->
record |> Go_log.record_to_message |> Logger.raw logger
| Some (Error error) ->
[%log error]
"failed to parse record over libp2p_helper stderr: \
$error"
~metadata:[ ("error", `String error) ]
| None ->
Core.print_endline line ) ;
Deferred.unit )
in
Ivar.fill t.stderr_finished () ) ;
O1trace.background_thread "handle_libp2p_ipc_incoming" (fun () ->
Child_processes.stdout process
|> Libp2p_ipc.read_incoming_messages
Expand All @@ -284,7 +293,9 @@ let spawn ?(allow_multiple_instances = false) ~logger ~pids ~conf_dir
let msg =
Libp2p_ipc.Reader.DaemonInterface.Message.get msg
in
handle_incoming_message t msg ~handle_push_message
if not t.finished then
handle_incoming_message t msg ~handle_push_message
else Deferred.unit
| Error error ->
[%log error]
"failed to parse IPC message over libp2p_helper stdout: \
Expand Down

0 comments on commit 2d386c0

Please sign in to comment.