Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] using buffered pipe instead of synchronous pipe for archiver #14768

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/app/archive/lib/processor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3980,22 +3980,30 @@ let setup_server ~metrics_server_port ~constraint_constants ~logger
let where_to_listen =
Async.Tcp.Where_to_listen.bind_to All_addresses (On_port server_port)
in
let reader, writer = Strict_pipe.create ~name:"archive" Synchronous in
let reader, writer =
Strict_pipe.create ~name:"archive"
(Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
in
let precomputed_block_reader, precomputed_block_writer =
Strict_pipe.create ~name:"precomputed_archive_block" Synchronous
Strict_pipe.create ~name:"precomputed_archive_block"
(Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
in
let extensional_block_reader, extensional_block_writer =
Strict_pipe.create ~name:"extensional_archive_block" Synchronous
Strict_pipe.create ~name:"extensional_archive_block"
(Buffered (`Capacity 10, `Overflow (Drop_head ignore)))
in
let implementations =
[ Async.Rpc.Rpc.implement Archive_rpc.t (fun () archive_diff ->
Strict_pipe.Writer.write writer archive_diff )
Strict_pipe.Writer.write writer archive_diff ;
Deferred.unit )
; Async.Rpc.Rpc.implement Archive_rpc.precomputed_block
(fun () precomputed_block ->
Strict_pipe.Writer.write precomputed_block_writer precomputed_block )
Strict_pipe.Writer.write precomputed_block_writer precomputed_block ;
Deferred.unit )
; Async.Rpc.Rpc.implement Archive_rpc.extensional_block
(fun () extensional_block ->
Strict_pipe.Writer.write extensional_block_writer extensional_block )
Strict_pipe.Writer.write extensional_block_writer extensional_block ;
Deferred.unit )
]
in
match Caqti_async.connect_pool ~max_size:30 postgres_address with
Expand Down