Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove some yield calls from staged ledger application #16054

Draft
wants to merge 1 commit into
base: compatible
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/app/cli/src/init/test_ledger_application.ml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ let apply_txs ~action_elements ~event_elements ~constraint_constants
in
Ledger.unsafe_preload_accounts_from_parent ledger accounts_accessed ;
let start = Time.now () in
match%map
match
Staged_ledger.Test_helpers.update_coinbase_stack_and_get_data_impl
~first_partition_slots ~is_new_stack:(not no_new_stack)
~no_second_partition:(not has_second_partition) ~constraint_constants
Expand Down Expand Up @@ -222,17 +222,17 @@ let test ~privkey_path ~ledger_path ?prev_block_path ~first_partition_slots
printf !"Init root %s\n%!" (Ledger_hash.to_base58_check init_root) ;
Deferred.List.fold (List.init rounds ~f:ident) ~init:(init_ledger, [])
~f:(fun (ledger, ledgers) i ->
let%bind () =
let%map () =
if tracing && i = 1 then Mina_tracing.start "." else Deferred.unit
in
List.hd (List.drop ledgers (max_depth - 1))
|> Option.iter ~f:drop_old_ledger ;
apply ~action_elements:0 ~event_elements:0 ~num_txs:num_txs_per_round ~i
ledger
>>| mask_handler ledger
>>| Fn.flip Tuple2.create (ledger :: ledgers) )
|> mask_handler ledger
|> Fn.flip Tuple2.create (ledger :: ledgers) )
>>| fst
>>= apply ~num_txs:num_txs_final
>>| apply ~num_txs:num_txs_final
~action_elements:genesis_constants.max_action_elements
~event_elements:genesis_constants.max_event_elements ~i:rounds
>>| stop_tracing
191 changes: 99 additions & 92 deletions src/lib/staged_ledger/staged_ledger.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ let transaction_application_scheduler_batch_size = 10
let option lab =
Option.value_map ~default:(Or_error.error_string lab) ~f:(fun x -> Ok x)

let yield_result = Fn.compose (Deferred.map ~f:Result.return) Scheduler.yield

let yield_result_every ~n =
Fn.compose
(Deferred.map ~f:Result.return)
(Staged.unstage @@ Scheduler.yield_every ~n)

module Pre_statement = struct
type t =
{ partially_applied_transaction : Ledger.Transaction_partially_applied.t
Expand Down Expand Up @@ -630,9 +623,8 @@ module T = struct
; block_global_slot = global_slot
}

let apply_transactions_first_pass ~yield ~constraint_constants ~global_slot
ledger init_pending_coinbase_stack_state ts current_state_view =
let open Deferred.Result.Let_syntax in
let apply_transactions_first_pass ~constraint_constants ~global_slot ledger
init_pending_coinbase_stack_state ts current_state_view =
let apply pending_coinbase_stack_state txn =
match
List.find (Transaction.public_keys txn.With_status.data) ~f:(fun pk ->
Expand All @@ -644,34 +636,33 @@ module T = struct
apply_single_transaction_first_pass ~constraint_constants ~global_slot
ledger pending_coinbase_stack_state txn current_state_view
in
let%map res_rev, pending_coinbase_stack_state =
Mina_stdlib.Deferred.Result.List.fold ts
~init:([], init_pending_coinbase_stack_state)
~f:(fun (acc, pending_coinbase_stack_state) t ->
let%bind pre_witness, pending_coinbase_stack_state' =
Deferred.return (apply pending_coinbase_stack_state t)
in
let%map () = yield () in
(pre_witness :: acc, pending_coinbase_stack_state') )
let%map.Result res_rev, pending_coinbase_stack_state =
List.fold ts
~init:(Result.Ok ([], init_pending_coinbase_stack_state))
~f:(fun res t ->
Result.bind res ~f:(fun (acc, pending_coinbase_stack_state) ->
let%map.Result pre_witness, pending_coinbase_stack_state' =
apply pending_coinbase_stack_state t
in
(pre_witness :: acc, pending_coinbase_stack_state') ) )
in
(List.rev res_rev, pending_coinbase_stack_state.pc.target)

let apply_transactions_second_pass ~constraint_constants ~yield ~global_slot
ledger state_and_body_hash pre_stmts =
let open Deferred.Result.Let_syntax in
let apply_transactions_second_pass ~constraint_constants ~global_slot ledger
state_and_body_hash pre_stmts =
let connecting_ledger = Ledger.merkle_root ledger in
Mina_stdlib.Deferred.Result.List.map pre_stmts ~f:(fun pre_stmt ->
let%bind result =
List.fold_left pre_stmts ~init:(Result.Ok []) ~f:(fun res pre_stmt ->
let%bind.Result acc = res in
let%map.Result el =
apply_single_transaction_second_pass ~constraint_constants
~connecting_ledger ~global_slot ledger state_and_body_hash pre_stmt
|> Deferred.return
in
let%map () = yield () in
result )
el :: acc )
|> Result.map ~f:List.rev

let update_ledger_and_get_statements ~constraint_constants ~global_slot ledger
current_stack tss current_state_view state_and_body_hash =
let open Deferred.Result.Let_syntax in
let open Result.Let_syntax in
let state_body_hash = snd state_and_body_hash in
let ts, ts_opt = tss in
let apply_first_pass working_stack ts =
Expand All @@ -686,12 +677,7 @@ module T = struct
apply_transactions_first_pass ~constraint_constants ~global_slot ledger
init_pending_coinbase_stack_state ts current_state_view
in
let yield =
yield_result_every ~n:transaction_application_scheduler_batch_size
in
let%bind pre_stmts1, updated_stack1 =
apply_first_pass ~yield current_stack ts
in
let%bind pre_stmts1, updated_stack1 = apply_first_pass current_stack ts in
let%bind pre_stmts2, updated_stack2 =
match ts_opt with
| None ->
Expand All @@ -700,12 +686,12 @@ module T = struct
let current_stack2 =
Pending_coinbase.Stack.create_with current_stack
in
apply_first_pass ~yield current_stack2 ts
apply_first_pass current_stack2 ts
in
let first_pass_ledger_end = Ledger.merkle_root ledger in
let%map txns_with_witnesses =
apply_transactions_second_pass ~constraint_constants ~yield ~global_slot
ledger state_and_body_hash (pre_stmts1 @ pre_stmts2)
apply_transactions_second_pass ~constraint_constants ~global_slot ledger
state_and_body_hash (pre_stmts1 @ pre_stmts2)
in
(txns_with_witnesses, updated_stack1, updated_stack2, first_pass_ledger_end)

Expand Down Expand Up @@ -824,7 +810,7 @@ module T = struct
~global_slot ~first_partition_slots:slots ~no_second_partition
~is_new_stack ledger pending_coinbase_collection transactions
current_state_view state_and_body_hash =
let open Deferred.Result.Let_syntax in
let open Result.Let_syntax in
let coinbase_exists txns =
List.fold_until ~init:false txns
~f:(fun acc t ->
Expand All @@ -841,7 +827,6 @@ module T = struct
2.create data for enqueuing onto the scan state *)
let%bind working_stack =
working_stack pending_coinbase_collection ~is_new_stack
|> Deferred.return
in
[%log internal] "Update_ledger_and_get_statements"
~metadata:[ ("partition", `String "single") ] ;
Expand Down Expand Up @@ -874,7 +859,6 @@ module T = struct
let coinbase_in_first_partition = coinbase_exists txns_for_partition1 in
let%bind working_stack1 =
working_stack pending_coinbase_collection ~is_new_stack:false
|> Deferred.return
in
let txns_for_partition2 = List.drop transactions slots in
[%log internal] "Update_ledger_and_get_statements"
Expand Down Expand Up @@ -935,13 +919,12 @@ module T = struct
state_and_body_hash
else (
[%log internal] "Update_coinbase_stack_done" ;
Deferred.return
(Ok
( false
, []
, Pending_coinbase.Update.Action.Update_none
, `Update_none
, `First_pass_ledger_end (Ledger.merkle_root ledger) ) ) )
Ok
( false
, []
, Pending_coinbase.Update.Action.Update_none
, `Update_none
, `First_pass_ledger_end (Ledger.merkle_root ledger) ) )

(*update the pending_coinbase tree with the updated/new stack and delete the oldest stack if a proof was emitted*)
let update_pending_coinbase_collection ~depth pending_coinbase_collection
Expand Down Expand Up @@ -1006,10 +989,10 @@ module T = struct
(Pre_diff_info.Error.Coinbase_error "More than two coinbase parts")
)

let apply_diff ?(skip_verification = false) ~logger ~constraint_constants
~global_slot t pre_diff_info ~current_state_view ~state_and_body_hash
~log_prefix ~zkapp_cmd_limit_hardcap =
let open Deferred.Result.Let_syntax in
let apply_diff_impl ~logger ~constraint_constants ~global_slot t pre_diff_info
~current_state_view ~state_and_body_hash ~log_prefix
~zkapp_cmd_limit_hardcap =
let open Result.Let_syntax in
let max_throughput =
Int.pow 2 t.constraint_constants.transaction_capacity_log_2
in
Expand All @@ -1031,7 +1014,7 @@ module T = struct
Ledger.unsafe_preload_accounts_from_parent new_ledger accounts_accessed ;
let%bind () =
(* Check number of zkApps in a block does not exceed hardcap *)
O1trace.thread "zkapp_hardcap_check" (fun () ->
O1trace.sync_thread "zkapp_hardcap_check" (fun () ->
let is_zkapp : Transaction.t With_status.t -> bool = function
| { With_status.data =
Transaction.Command (Mina_base.User_command.Zkapp_command _)
Expand All @@ -1043,10 +1026,10 @@ module T = struct
in
let zk_app_count = List.count ~f:is_zkapp transactions in
if zk_app_count > zkapp_cmd_limit_hardcap then
Deferred.Result.fail
Result.fail
(Staged_ledger_error.ZkApps_exceed_limit
(zk_app_count, zkapp_cmd_limit_hardcap) )
else Deferred.Result.return () )
else Result.return () )
in
[%log internal] "Update_coinbase_stack"
~metadata:
Expand All @@ -1063,7 +1046,7 @@ module T = struct
, stack_update_in_snark
, stack_update
, `First_pass_ledger_end first_pass_ledger_end ) =
O1trace.thread "update_coinbase_stack_start_time" (fun () ->
O1trace.sync_thread "update_coinbase_stack_start_time" (fun () ->
update_coinbase_stack_and_get_data ~logger ~constraint_constants
~global_slot t.scan_state new_ledger t.pending_coinbase_collection
transactions current_state_view state_and_body_hash )
Expand All @@ -1079,26 +1062,26 @@ module T = struct
; ("free_space", `Int (Scan_state.free_space t.scan_state))
] ;
let%bind () =
O1trace.thread "check_for_sufficient_snark_work" (fun () ->
O1trace.sync_thread "check_for_sufficient_snark_work" (fun () ->
let required = List.length required_pairs in
if
work_count < required
&& List.length data
> Scan_state.free_space t.scan_state - required + work_count
then
Deferred.Result.fail
Result.fail
(Staged_ledger_error.Insufficient_work
(sprintf
!"Insufficient number of transaction snark work (slots \
occupying: %d) required %d, got %d"
slots required work_count ) )
else Deferred.Result.return () )
else Result.return () )
in
[%log internal] "Check_zero_fee_excess" ;
let%bind () = Deferred.return (check_zero_fee_excess t.scan_state data) in
let%bind () = check_zero_fee_excess t.scan_state data in
[%log internal] "Fill_work_and_enqueue_transactions" ;
let%bind res_opt, scan_state' =
O1trace.thread "fill_work_and_enqueue_transactions" (fun () ->
let%bind ledger_proof, scan_state' =
O1trace.sync_thread "fill_work_and_enqueue_transactions" (fun () ->
let r =
Scan_state.fill_work_and_enqueue_transactions t.scan_state ~logger
data works
Expand All @@ -1121,29 +1104,68 @@ module T = struct
]
!"$prefix: Unexpected error when applying diff data $data to \
the scan_state $scan_state: $error" ) ;
Deferred.return (to_staged_ledger_or_error r) )
to_staged_ledger_or_error r )
in
let%bind () = yield_result () in
[%log internal] "Update_pending_coinbase_collection" ;
let%bind updated_pending_coinbase_collection' =
O1trace.thread "update_pending_coinbase_collection" (fun () ->
O1trace.sync_thread "update_pending_coinbase_collection" (fun () ->
update_pending_coinbase_collection
~depth:t.constraint_constants.pending_coinbase_depth
t.pending_coinbase_collection stack_update ~is_new_stack
~ledger_proof:res_opt
|> Deferred.return )
~ledger_proof )
in
let%bind () = yield_result () in
let%bind coinbase_amount =
Deferred.return (coinbase_for_blockchain_snark coinbases)
in
let%bind latest_pending_coinbase_stack =
let%bind coinbase_amount = coinbase_for_blockchain_snark coinbases in
let%map latest_pending_coinbase_stack =
Pending_coinbase.latest_stack ~is_new_stack:false
updated_pending_coinbase_collection'
|> to_staged_ledger_or_error |> Deferred.return
|> to_staged_ledger_or_error
in
let%bind () = yield_result () in
let%map () =
let new_staged_ledger =
{ scan_state = scan_state'
; ledger = new_ledger
; constraint_constants = t.constraint_constants
; pending_coinbase_collection = updated_pending_coinbase_collection'
}
in
let debug_log_metadata =
[ ("user_command_count", `Int commands_count)
; ("coinbase_count", `Int (List.length coinbases))
; ("spots_available", `Int spots_available)
; ("proof_bundles_waiting", `Int proofs_waiting)
; ("work_count", `Int (List.length works))
; ("prefix", `String log_prefix)
]
in
( is_new_stack
, data
, stack_update_in_snark
, first_pass_ledger_end
, ledger_proof
, scan_state'
, coinbase_amount
, latest_pending_coinbase_stack
, new_staged_ledger
, debug_log_metadata )

let apply_diff ?(skip_verification = false) ~logger ~constraint_constants
~global_slot t pre_diff_info ~current_state_view ~state_and_body_hash
~log_prefix ~zkapp_cmd_limit_hardcap =
let%bind.Deferred.Result ( is_new_stack
, data
, stack_update_in_snark
, first_pass_ledger_end
, ledger_proof
, scan_state'
, coinbase_amount
, latest_pending_coinbase_stack
, new_staged_ledger
, debug_log_metadata ) =
apply_diff_impl ~logger ~constraint_constants ~global_slot t pre_diff_info
~current_state_view ~state_and_body_hash ~log_prefix
~zkapp_cmd_limit_hardcap
|> Deferred.return
in
let%map.Deferred.Result () =
if skip_verification || List.is_empty data then Deferred.return (Ok ())
else (
[%log internal] "Verify_scan_state_after_apply" ;
Expand All @@ -1153,37 +1175,22 @@ module T = struct
~first_pass_ledger_end
~second_pass_ledger_end:
(Frozen_ledger_hash.of_ledger_hash
(Ledger.merkle_root new_ledger) )
(Ledger.merkle_root new_staged_ledger.ledger) )
~pending_coinbase_stack:latest_pending_coinbase_stack
scan_state'
>>| to_staged_ledger_or_error) ) )
in
[%log debug]
~metadata:
[ ("user_command_count", `Int commands_count)
; ("coinbase_count", `Int (List.length coinbases))
; ("spots_available", `Int spots_available)
; ("proof_bundles_waiting", `Int proofs_waiting)
; ("work_count", `Int (List.length works))
; ("prefix", `String log_prefix)
]
[%log debug] ~metadata:debug_log_metadata
"$prefix: apply_diff block info: No of transactions \
included:$user_command_count\n\
\ Coinbase parts:$coinbase_count Spots\n\
\ available:$spots_available Pending work in the \
scan-state:$proof_bundles_waiting Work included:$work_count" ;
let new_staged_ledger =
{ scan_state = scan_state'
; ledger = new_ledger
; constraint_constants = t.constraint_constants
; pending_coinbase_collection = updated_pending_coinbase_collection'
}
in
[%log internal] "Hash_new_staged_ledger" ;
let staged_ledger_hash = hash new_staged_ledger in
[%log internal] "Hash_new_staged_ledger_done" ;
( `Hash_after_applying staged_ledger_hash
, `Ledger_proof res_opt
, `Ledger_proof ledger_proof
, `Staged_ledger new_staged_ledger
, `Pending_coinbase_update
( is_new_stack
Expand Down
2 changes: 1 addition & 1 deletion src/lib/staged_ledger/staged_ledger.mli
Original file line number Diff line number Diff line change
Expand Up @@ -379,5 +379,5 @@ module Test_helpers : sig
* Pending_coinbase.Stack_versioned.t ]
* [> `First_pass_ledger_end of Frozen_ledger_hash.t ]
, Staged_ledger_error.t )
Deferred.Result.t
Result.t
end
Loading