-
Notifications
You must be signed in to change notification settings - Fork 547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
batch insertion events in archive database #14779
Merged
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
9ecf29e
add timing logs to various functions
79d9cda
add more logs to various transactions
a9347f5
add more logs for zkapp commands
4be459c
add more logs to update body
d9f961c
add more logs
55433d3
add idx for element_ids
153bfe4
add an index for zkapp_events
c64cba9
batch insertion of events and actions
42cb2f6
Merge branch 'log/add-timing-log-to-archive-processor' of github.com:…
2fb21f6
add comments
6eeefb1
Merge branch 'rampup' of github.com:MinaProtocol/mina into fix/archiv…
49874ea
remove unnecessary logs
6e5f053
refactor sql code into `mina_caqti`
8734f96
clean up
9fb47bb
add sql file to patch the database
db92754
add tracing of processor.add_block
22583bf
Merge branch 'rampup' into fix/archive-db-slowness
ghost-not-in-the-shell 5213421
Merge branch 'rampup' into fix/archive-db-slowness
deepthiskumar 7e72362
Merge branch 'rampup' into fix/archive-db-slowness
ghost-not-in-the-shell 49e407d
Merge branch 'rampup' into fix/archive-db-slowness
ghost-not-in-the-shell File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
ALTER TABLE zkapp_field_array ADD CONSTRAINT zkapp_field_array_element_ids_key UNIQUE (element_ids); | ||
ALTER TABLE zkapp_events ADD CONSTRAINT zkapp_events_element_ids_key UNIQUE (element_ids); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1417,20 +1417,83 @@ module Zkapp_events = struct | |
|
||
let table_name = "zkapp_events" | ||
|
||
module Field_array_map = Map.Make (struct | ||
type t = int array [@@deriving sexp] | ||
|
||
let compare = Array.compare Int.compare | ||
end) | ||
|
||
(* Account_update.Body.Events'.t is defined as `field array list`, | ||
which is ismorphic to a list of list of fields. | ||
|
||
We are batching the insertion of field and field_array to optimize | ||
the speed of archiving max-cost zkapps. | ||
|
||
1. we flatten the list of list of fields to get all the field elements | ||
2. insert all the field elements in one query | ||
3. construct a map "M" from `field_id` to `field` by querying against the zkapp_field table | ||
4. use "M" and the list of list of fields to compute the list of list of field_ids | ||
5. insert all list of `list of field_ids` in one query | ||
6. construct a map "M'" from `field_array_id` to `field_id array` by querying against | ||
the zkapp_field_array table | ||
7. use "M'" and the list of list of field_ids to compute the list of field_array_ids | ||
8. insert the list of field_arrays | ||
*) | ||
let add_if_doesn't_exist (module Conn : CONNECTION) | ||
(events : Account_update.Body.Events'.t) = | ||
let open Deferred.Result.Let_syntax in | ||
let%bind (element_ids : int array) = | ||
Mina_caqti.deferred_result_list_map events | ||
~f:(Zkapp_field_array.add_if_doesn't_exist (module Conn)) | ||
>>| Array.of_list | ||
let%bind field_array_id_list = | ||
if not @@ List.is_empty events then | ||
let field_list_list = | ||
List.map events ~f:(fun field_array -> | ||
Array.map field_array ~f:Pickles.Backend.Tick.Field.to_string | ||
|> Array.to_list ) | ||
in | ||
let fields = field_list_list |> List.concat in | ||
let%bind field_id_list_list = | ||
if not @@ List.is_empty fields then | ||
let%map field_map = | ||
Mina_caqti.insert_multi_into_col ~table_name:"zkapp_field" | ||
~col:("field", Caqti_type.string) | ||
(module Conn) | ||
fields | ||
>>| String.Map.of_alist_exn | ||
in | ||
let field_id_list_list = | ||
List.map field_list_list ~f:(List.map ~f:(Map.find_exn field_map)) | ||
in | ||
field_id_list_list | ||
else | ||
(* if there's no fields, then we must have some list of empty lists *) | ||
return @@ List.map field_list_list ~f:(fun _ -> []) | ||
in | ||
(* this conversion should be done by caqti using `typ`, FIX this in the future *) | ||
let field_array_list = | ||
List.map field_id_list_list ~f:(fun id_list -> | ||
List.map id_list ~f:Int.to_string | ||
|> String.concat ~sep:", " |> sprintf "{%s}" ) | ||
in | ||
let%map field_array_map = | ||
Mina_caqti.insert_multi_into_col ~table_name:"zkapp_field_array" | ||
~col:("element_ids", Mina_caqti.array_int_typ) | ||
(module Conn) | ||
field_array_list | ||
>>| Field_array_map.of_alist_exn | ||
in | ||
let field_array_id_list = | ||
List.map field_id_list_list ~f:(fun field_id_list -> | ||
Map.find_exn field_array_map (Array.of_list field_id_list) ) | ||
|> Array.of_list | ||
in | ||
field_array_id_list | ||
else return @@ Array.of_list [] | ||
in | ||
Mina_caqti.select_insert_into_cols ~select:("id", Caqti_type.int) | ||
~table_name | ||
~cols:([ "element_ids" ], Mina_caqti.array_int_typ) | ||
~tannot:(function "element_ids" -> Some "int[]" | _ -> None) | ||
(module Conn) | ||
element_ids | ||
field_array_id_list | ||
|
||
let load (module Conn : CONNECTION) id = | ||
Conn.find | ||
|
@@ -1491,14 +1554,17 @@ module Zkapp_account_update_body = struct | |
Account_identifiers.add_if_doesn't_exist (module Conn) account_identifier | ||
in | ||
let%bind update_id = | ||
Zkapp_updates.add_if_doesn't_exist (module Conn) body.update | ||
Metrics.time ~label:"zkapp_updates.add" | ||
@@ fun () -> Zkapp_updates.add_if_doesn't_exist (module Conn) body.update | ||
in | ||
let increment_nonce = body.increment_nonce in | ||
let%bind events_id = | ||
Zkapp_events.add_if_doesn't_exist (module Conn) body.events | ||
Metrics.time ~label:"Zkapp_events.add" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need all of these |
||
@@ fun () -> Zkapp_events.add_if_doesn't_exist (module Conn) body.events | ||
in | ||
let%bind actions_id = | ||
Zkapp_events.add_if_doesn't_exist (module Conn) body.actions | ||
Metrics.time ~label:"Zkapp_actions.add" | ||
@@ fun () -> Zkapp_events.add_if_doesn't_exist (module Conn) body.actions | ||
in | ||
let%bind call_data_id = | ||
Zkapp_field.add_if_doesn't_exist (module Conn) body.call_data | ||
|
@@ -1600,6 +1666,8 @@ module Zkapp_account_update = struct | |
(account_update : Account_update.Simple.t) = | ||
let open Deferred.Result.Let_syntax in | ||
let%bind body_id = | ||
Metrics.time ~label:"Zkapp_account_update_body.add" | ||
@@ fun () -> | ||
Zkapp_account_update_body.add_if_doesn't_exist | ||
(module Conn) | ||
account_update.body | ||
|
@@ -1897,11 +1965,15 @@ module User_command = struct | |
let open Deferred.Result.Let_syntax in | ||
let zkapp_command = Zkapp_command.to_simple ps in | ||
let%bind zkapp_fee_payer_body_id = | ||
Metrics.time ~label:"Zkapp_fee_payer_body.add" | ||
@@ fun () -> | ||
Zkapp_fee_payer_body.add_if_doesn't_exist | ||
(module Conn) | ||
zkapp_command.fee_payer.body | ||
in | ||
let%bind zkapp_account_updates_ids = | ||
Metrics.time ~label:"Zkapp_account_update.add" | ||
@@ fun () -> | ||
Mina_caqti.deferred_result_list_map zkapp_command.account_updates | ||
~f:(Zkapp_account_update.add_if_doesn't_exist (module Conn)) | ||
>>| Array.of_list | ||
|
@@ -2909,6 +2981,8 @@ module Block = struct | |
(failed_str, Some display) | ||
in | ||
let%bind _seq_no = | ||
Metrics.time ~label:"adding_transactions" | ||
@@ fun () -> | ||
Mina_caqti.deferred_result_list_fold transactions ~init:0 | ||
~f:(fun sequence_no -> function | ||
| { Mina_base.With_status.status | ||
|
@@ -2934,6 +3008,9 @@ module Block = struct | |
let status, failure_reasons = | ||
failure_reasons user_command.status | ||
in | ||
Metrics.time | ||
~label:"block_and_zkapp_command.add_if_doesn't_exist" | ||
@@ fun () -> | ||
Block_and_zkapp_command.add_if_doesn't_exist | ||
(module Conn) | ||
~block_id ~zkapp_command_id:id ~sequence_no ~status | ||
|
@@ -3473,48 +3550,58 @@ module Block = struct | |
then | ||
(* a new block, allows marking some pending blocks as canonical *) | ||
let%bind subchain_blocks = | ||
get_subchain | ||
(module Conn) | ||
~start_block_id:highest_canonical_block_id ~end_block_id:block_id | ||
Metrics.time ~label:"get_subchain (> canonical_height + k)" | ||
(fun () -> | ||
get_subchain | ||
(module Conn) | ||
~start_block_id:highest_canonical_block_id | ||
~end_block_id:block_id ) | ||
in | ||
let block_height_less_k_int64 = Int64.( - ) block.height k_int64 in | ||
(* mark canonical, orphaned blocks in subchain at least k behind the new block *) | ||
let canonical_blocks = | ||
List.filter subchain_blocks ~f:(fun subchain_block -> | ||
Int64.( <= ) subchain_block.height block_height_less_k_int64 ) | ||
in | ||
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:() | ||
~f:(fun () block -> | ||
let%bind () = | ||
mark_as_canonical (module Conn) ~state_hash:block.state_hash | ||
in | ||
mark_as_orphaned | ||
(module Conn) | ||
~state_hash:block.state_hash ~height:block.height ) | ||
Metrics.time ~label:"mark_as_canonical (> canonical_height + k)" | ||
(fun () -> | ||
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:() | ||
~f:(fun () block -> | ||
let%bind () = | ||
mark_as_canonical (module Conn) ~state_hash:block.state_hash | ||
in | ||
mark_as_orphaned | ||
(module Conn) | ||
~state_hash:block.state_hash ~height:block.height ) ) | ||
else if Int64.( < ) block.height greatest_canonical_height then | ||
(* a missing block added in the middle of canonical chain *) | ||
let%bind canonical_block_above_id, _above_height = | ||
get_nearest_canonical_block_above (module Conn) block.height | ||
Metrics.time ~label:"get_nearest_canonical_block_above" (fun () -> | ||
get_nearest_canonical_block_above (module Conn) block.height ) | ||
in | ||
let%bind canonical_block_below_id, _below_height = | ||
get_nearest_canonical_block_below (module Conn) block.height | ||
Metrics.time ~label:"get_neareast_canonical_block_below" (fun () -> | ||
get_nearest_canonical_block_below (module Conn) block.height ) | ||
in | ||
(* we can always find this chain: the genesis block should be marked as canonical, and we've found a | ||
canonical block above this one *) | ||
let%bind canonical_blocks = | ||
get_subchain | ||
(module Conn) | ||
~start_block_id:canonical_block_below_id | ||
~end_block_id:canonical_block_above_id | ||
Metrics.time ~label:"get_subchain (< canonical_height)" (fun () -> | ||
get_subchain | ||
(module Conn) | ||
~start_block_id:canonical_block_below_id | ||
~end_block_id:canonical_block_above_id ) | ||
in | ||
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:() | ||
~f:(fun () block -> | ||
let%bind () = | ||
mark_as_canonical (module Conn) ~state_hash:block.state_hash | ||
in | ||
mark_as_orphaned | ||
(module Conn) | ||
~state_hash:block.state_hash ~height:block.height ) | ||
Metrics.time ~label:"mark_as_canonical (< canonical_height)" | ||
(fun () -> | ||
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:() | ||
~f:(fun () block -> | ||
let%bind () = | ||
mark_as_canonical (module Conn) ~state_hash:block.state_hash | ||
in | ||
mark_as_orphaned | ||
(module Conn) | ||
~state_hash:block.state_hash ~height:block.height ) ) | ||
else | ||
(* a block at or above highest canonical block, not high enough to mark any blocks as canonical *) | ||
Deferred.Result.return () | ||
|
@@ -3644,7 +3731,12 @@ let add_block_aux ?(retries = 3) ~logger ~pool ~add_block ~hash | |
[%log info] "Attempting to add block data for $state_hash" | ||
~metadata: | ||
[ ("state_hash", Mina_base.State_hash.to_yojson state_hash) ] ; | ||
let%bind block_id = add_block (module Conn : CONNECTION) block in | ||
let%bind block_id = | ||
O1trace.thread "archive_processor.add_block" | ||
@@ fun () -> | ||
Metrics.time ~label:"add_block" | ||
@@ fun () -> add_block (module Conn : CONNECTION) block | ||
in | ||
(* if an existing block has a parent hash that's for the block just added, | ||
set its parent id | ||
*) | ||
|
@@ -3654,7 +3746,10 @@ let add_block_aux ?(retries = 3) ~logger ~pool ~add_block ~hash | |
~parent_hash:(hash block) ~parent_id:block_id | ||
in | ||
(* update chain status for existing blocks *) | ||
let%bind () = Block.update_chain_status (module Conn) ~block_id in | ||
let%bind () = | ||
Metrics.time ~label:"update_chain_status" (fun () -> | ||
Block.update_chain_status (module Conn) ~block_id ) | ||
in | ||
let%bind () = | ||
match delete_older_than with | ||
| Some num_blocks -> | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
field_id_list_list
is not needed at allThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a let binding here to make the code more readable. Just to give a name to the result of that computation.