diff --git a/src/app/cli/src/init/test_ledger_application.ml b/src/app/cli/src/init/test_ledger_application.ml index 7adddbef204..8946166fa0a 100644 --- a/src/app/cli/src/init/test_ledger_application.ml +++ b/src/app/cli/src/init/test_ledger_application.ml @@ -149,7 +149,7 @@ let apply_txs ~action_elements ~event_elements ~constraint_constants in Ledger.unsafe_preload_accounts_from_parent ledger accounts_accessed ; let start = Time.now () in - match%map + match Staged_ledger.Test_helpers.update_coinbase_stack_and_get_data_impl ~first_partition_slots ~is_new_stack:(not no_new_stack) ~no_second_partition:(not has_second_partition) ~constraint_constants @@ -222,17 +222,17 @@ let test ~privkey_path ~ledger_path ?prev_block_path ~first_partition_slots printf !"Init root %s\n%!" (Ledger_hash.to_base58_check init_root) ; Deferred.List.fold (List.init rounds ~f:ident) ~init:(init_ledger, []) ~f:(fun (ledger, ledgers) i -> - let%bind () = + let%map () = if tracing && i = 1 then Mina_tracing.start "." else Deferred.unit in List.hd (List.drop ledgers (max_depth - 1)) |> Option.iter ~f:drop_old_ledger ; apply ~action_elements:0 ~event_elements:0 ~num_txs:num_txs_per_round ~i ledger - >>| mask_handler ledger - >>| Fn.flip Tuple2.create (ledger :: ledgers) ) + |> mask_handler ledger + |> Fn.flip Tuple2.create (ledger :: ledgers) ) >>| fst - >>= apply ~num_txs:num_txs_final + >>| apply ~num_txs:num_txs_final ~action_elements:genesis_constants.max_action_elements ~event_elements:genesis_constants.max_event_elements ~i:rounds >>| stop_tracing diff --git a/src/lib/staged_ledger/staged_ledger.ml b/src/lib/staged_ledger/staged_ledger.ml index af202350850..47fae2d82c0 100644 --- a/src/lib/staged_ledger/staged_ledger.ml +++ b/src/lib/staged_ledger/staged_ledger.ml @@ -15,13 +15,6 @@ let transaction_application_scheduler_batch_size = 10 let option lab = Option.value_map ~default:(Or_error.error_string lab) ~f:(fun x -> Ok x) -let yield_result = Fn.compose (Deferred.map ~f:Result.return) Scheduler.yield - -let yield_result_every ~n = - Fn.compose - (Deferred.map ~f:Result.return) - (Staged.unstage @@ Scheduler.yield_every ~n) - module Pre_statement = struct type t = { partially_applied_transaction : Ledger.Transaction_partially_applied.t @@ -630,9 +623,8 @@ module T = struct ; block_global_slot = global_slot } - let apply_transactions_first_pass ~yield ~constraint_constants ~global_slot - ledger init_pending_coinbase_stack_state ts current_state_view = - let open Deferred.Result.Let_syntax in + let apply_transactions_first_pass ~constraint_constants ~global_slot ledger + init_pending_coinbase_stack_state ts current_state_view = let apply pending_coinbase_stack_state txn = match List.find (Transaction.public_keys txn.With_status.data) ~f:(fun pk -> @@ -644,34 +636,33 @@ module T = struct apply_single_transaction_first_pass ~constraint_constants ~global_slot ledger pending_coinbase_stack_state txn current_state_view in - let%map res_rev, pending_coinbase_stack_state = - Mina_stdlib.Deferred.Result.List.fold ts - ~init:([], init_pending_coinbase_stack_state) - ~f:(fun (acc, pending_coinbase_stack_state) t -> - let%bind pre_witness, pending_coinbase_stack_state' = - Deferred.return (apply pending_coinbase_stack_state t) - in - let%map () = yield () in - (pre_witness :: acc, pending_coinbase_stack_state') ) + let%map.Result res_rev, pending_coinbase_stack_state = + List.fold ts + ~init:(Result.Ok ([], init_pending_coinbase_stack_state)) + ~f:(fun res t -> + Result.bind res ~f:(fun (acc, pending_coinbase_stack_state) -> + let%map.Result pre_witness, pending_coinbase_stack_state' = + apply pending_coinbase_stack_state t + in + (pre_witness :: acc, pending_coinbase_stack_state') ) ) in (List.rev res_rev, pending_coinbase_stack_state.pc.target) - let apply_transactions_second_pass ~constraint_constants ~yield ~global_slot - ledger state_and_body_hash pre_stmts = - let open Deferred.Result.Let_syntax in + let apply_transactions_second_pass ~constraint_constants ~global_slot ledger + state_and_body_hash pre_stmts = let connecting_ledger = Ledger.merkle_root ledger in - Mina_stdlib.Deferred.Result.List.map pre_stmts ~f:(fun pre_stmt -> - let%bind result = + List.fold_left pre_stmts ~init:(Result.Ok []) ~f:(fun res pre_stmt -> + let%bind.Result acc = res in + let%map.Result el = apply_single_transaction_second_pass ~constraint_constants ~connecting_ledger ~global_slot ledger state_and_body_hash pre_stmt - |> Deferred.return in - let%map () = yield () in - result ) + el :: acc ) + |> Result.map ~f:List.rev let update_ledger_and_get_statements ~constraint_constants ~global_slot ledger current_stack tss current_state_view state_and_body_hash = - let open Deferred.Result.Let_syntax in + let open Result.Let_syntax in let state_body_hash = snd state_and_body_hash in let ts, ts_opt = tss in let apply_first_pass working_stack ts = @@ -686,12 +677,7 @@ module T = struct apply_transactions_first_pass ~constraint_constants ~global_slot ledger init_pending_coinbase_stack_state ts current_state_view in - let yield = - yield_result_every ~n:transaction_application_scheduler_batch_size - in - let%bind pre_stmts1, updated_stack1 = - apply_first_pass ~yield current_stack ts - in + let%bind pre_stmts1, updated_stack1 = apply_first_pass current_stack ts in let%bind pre_stmts2, updated_stack2 = match ts_opt with | None -> @@ -700,12 +686,12 @@ module T = struct let current_stack2 = Pending_coinbase.Stack.create_with current_stack in - apply_first_pass ~yield current_stack2 ts + apply_first_pass current_stack2 ts in let first_pass_ledger_end = Ledger.merkle_root ledger in let%map txns_with_witnesses = - apply_transactions_second_pass ~constraint_constants ~yield ~global_slot - ledger state_and_body_hash (pre_stmts1 @ pre_stmts2) + apply_transactions_second_pass ~constraint_constants ~global_slot ledger + state_and_body_hash (pre_stmts1 @ pre_stmts2) in (txns_with_witnesses, updated_stack1, updated_stack2, first_pass_ledger_end) @@ -824,7 +810,7 @@ module T = struct ~global_slot ~first_partition_slots:slots ~no_second_partition ~is_new_stack ledger pending_coinbase_collection transactions current_state_view state_and_body_hash = - let open Deferred.Result.Let_syntax in + let open Result.Let_syntax in let coinbase_exists txns = List.fold_until ~init:false txns ~f:(fun acc t -> @@ -841,7 +827,6 @@ module T = struct 2.create data for enqueuing onto the scan state *) let%bind working_stack = working_stack pending_coinbase_collection ~is_new_stack - |> Deferred.return in [%log internal] "Update_ledger_and_get_statements" ~metadata:[ ("partition", `String "single") ] ; @@ -874,7 +859,6 @@ module T = struct let coinbase_in_first_partition = coinbase_exists txns_for_partition1 in let%bind working_stack1 = working_stack pending_coinbase_collection ~is_new_stack:false - |> Deferred.return in let txns_for_partition2 = List.drop transactions slots in [%log internal] "Update_ledger_and_get_statements" @@ -935,13 +919,12 @@ module T = struct state_and_body_hash else ( [%log internal] "Update_coinbase_stack_done" ; - Deferred.return - (Ok - ( false - , [] - , Pending_coinbase.Update.Action.Update_none - , `Update_none - , `First_pass_ledger_end (Ledger.merkle_root ledger) ) ) ) + Ok + ( false + , [] + , Pending_coinbase.Update.Action.Update_none + , `Update_none + , `First_pass_ledger_end (Ledger.merkle_root ledger) ) ) (*update the pending_coinbase tree with the updated/new stack and delete the oldest stack if a proof was emitted*) let update_pending_coinbase_collection ~depth pending_coinbase_collection @@ -1006,10 +989,10 @@ module T = struct (Pre_diff_info.Error.Coinbase_error "More than two coinbase parts") ) - let apply_diff ?(skip_verification = false) ~logger ~constraint_constants - ~global_slot t pre_diff_info ~current_state_view ~state_and_body_hash - ~log_prefix ~zkapp_cmd_limit_hardcap = - let open Deferred.Result.Let_syntax in + let apply_diff_impl ~logger ~constraint_constants ~global_slot t pre_diff_info + ~current_state_view ~state_and_body_hash ~log_prefix + ~zkapp_cmd_limit_hardcap = + let open Result.Let_syntax in let max_throughput = Int.pow 2 t.constraint_constants.transaction_capacity_log_2 in @@ -1031,7 +1014,7 @@ module T = struct Ledger.unsafe_preload_accounts_from_parent new_ledger accounts_accessed ; let%bind () = (* Check number of zkApps in a block does not exceed hardcap *) - O1trace.thread "zkapp_hardcap_check" (fun () -> + O1trace.sync_thread "zkapp_hardcap_check" (fun () -> let is_zkapp : Transaction.t With_status.t -> bool = function | { With_status.data = Transaction.Command (Mina_base.User_command.Zkapp_command _) @@ -1043,10 +1026,10 @@ module T = struct in let zk_app_count = List.count ~f:is_zkapp transactions in if zk_app_count > zkapp_cmd_limit_hardcap then - Deferred.Result.fail + Result.fail (Staged_ledger_error.ZkApps_exceed_limit (zk_app_count, zkapp_cmd_limit_hardcap) ) - else Deferred.Result.return () ) + else Result.return () ) in [%log internal] "Update_coinbase_stack" ~metadata: @@ -1063,7 +1046,7 @@ module T = struct , stack_update_in_snark , stack_update , `First_pass_ledger_end first_pass_ledger_end ) = - O1trace.thread "update_coinbase_stack_start_time" (fun () -> + O1trace.sync_thread "update_coinbase_stack_start_time" (fun () -> update_coinbase_stack_and_get_data ~logger ~constraint_constants ~global_slot t.scan_state new_ledger t.pending_coinbase_collection transactions current_state_view state_and_body_hash ) @@ -1079,26 +1062,26 @@ module T = struct ; ("free_space", `Int (Scan_state.free_space t.scan_state)) ] ; let%bind () = - O1trace.thread "check_for_sufficient_snark_work" (fun () -> + O1trace.sync_thread "check_for_sufficient_snark_work" (fun () -> let required = List.length required_pairs in if work_count < required && List.length data > Scan_state.free_space t.scan_state - required + work_count then - Deferred.Result.fail + Result.fail (Staged_ledger_error.Insufficient_work (sprintf !"Insufficient number of transaction snark work (slots \ occupying: %d) required %d, got %d" slots required work_count ) ) - else Deferred.Result.return () ) + else Result.return () ) in [%log internal] "Check_zero_fee_excess" ; - let%bind () = Deferred.return (check_zero_fee_excess t.scan_state data) in + let%bind () = check_zero_fee_excess t.scan_state data in [%log internal] "Fill_work_and_enqueue_transactions" ; - let%bind res_opt, scan_state' = - O1trace.thread "fill_work_and_enqueue_transactions" (fun () -> + let%bind ledger_proof, scan_state' = + O1trace.sync_thread "fill_work_and_enqueue_transactions" (fun () -> let r = Scan_state.fill_work_and_enqueue_transactions t.scan_state ~logger data works @@ -1121,29 +1104,68 @@ module T = struct ] !"$prefix: Unexpected error when applying diff data $data to \ the scan_state $scan_state: $error" ) ; - Deferred.return (to_staged_ledger_or_error r) ) + to_staged_ledger_or_error r ) in - let%bind () = yield_result () in [%log internal] "Update_pending_coinbase_collection" ; let%bind updated_pending_coinbase_collection' = - O1trace.thread "update_pending_coinbase_collection" (fun () -> + O1trace.sync_thread "update_pending_coinbase_collection" (fun () -> update_pending_coinbase_collection ~depth:t.constraint_constants.pending_coinbase_depth t.pending_coinbase_collection stack_update ~is_new_stack - ~ledger_proof:res_opt - |> Deferred.return ) + ~ledger_proof ) in - let%bind () = yield_result () in - let%bind coinbase_amount = - Deferred.return (coinbase_for_blockchain_snark coinbases) - in - let%bind latest_pending_coinbase_stack = + let%bind coinbase_amount = coinbase_for_blockchain_snark coinbases in + let%map latest_pending_coinbase_stack = Pending_coinbase.latest_stack ~is_new_stack:false updated_pending_coinbase_collection' - |> to_staged_ledger_or_error |> Deferred.return + |> to_staged_ledger_or_error in - let%bind () = yield_result () in - let%map () = + let new_staged_ledger = + { scan_state = scan_state' + ; ledger = new_ledger + ; constraint_constants = t.constraint_constants + ; pending_coinbase_collection = updated_pending_coinbase_collection' + } + in + let debug_log_metadata = + [ ("user_command_count", `Int commands_count) + ; ("coinbase_count", `Int (List.length coinbases)) + ; ("spots_available", `Int spots_available) + ; ("proof_bundles_waiting", `Int proofs_waiting) + ; ("work_count", `Int (List.length works)) + ; ("prefix", `String log_prefix) + ] + in + ( is_new_stack + , data + , stack_update_in_snark + , first_pass_ledger_end + , ledger_proof + , scan_state' + , coinbase_amount + , latest_pending_coinbase_stack + , new_staged_ledger + , debug_log_metadata ) + + let apply_diff ?(skip_verification = false) ~logger ~constraint_constants + ~global_slot t pre_diff_info ~current_state_view ~state_and_body_hash + ~log_prefix ~zkapp_cmd_limit_hardcap = + let%bind.Deferred.Result ( is_new_stack + , data + , stack_update_in_snark + , first_pass_ledger_end + , ledger_proof + , scan_state' + , coinbase_amount + , latest_pending_coinbase_stack + , new_staged_ledger + , debug_log_metadata ) = + apply_diff_impl ~logger ~constraint_constants ~global_slot t pre_diff_info + ~current_state_view ~state_and_body_hash ~log_prefix + ~zkapp_cmd_limit_hardcap + |> Deferred.return + in + let%map.Deferred.Result () = if skip_verification || List.is_empty data then Deferred.return (Ok ()) else ( [%log internal] "Verify_scan_state_after_apply" ; @@ -1153,37 +1175,22 @@ module T = struct ~first_pass_ledger_end ~second_pass_ledger_end: (Frozen_ledger_hash.of_ledger_hash - (Ledger.merkle_root new_ledger) ) + (Ledger.merkle_root new_staged_ledger.ledger) ) ~pending_coinbase_stack:latest_pending_coinbase_stack scan_state' >>| to_staged_ledger_or_error) ) ) in - [%log debug] - ~metadata: - [ ("user_command_count", `Int commands_count) - ; ("coinbase_count", `Int (List.length coinbases)) - ; ("spots_available", `Int spots_available) - ; ("proof_bundles_waiting", `Int proofs_waiting) - ; ("work_count", `Int (List.length works)) - ; ("prefix", `String log_prefix) - ] + [%log debug] ~metadata:debug_log_metadata "$prefix: apply_diff block info: No of transactions \ included:$user_command_count\n\ \ Coinbase parts:$coinbase_count Spots\n\ \ available:$spots_available Pending work in the \ scan-state:$proof_bundles_waiting Work included:$work_count" ; - let new_staged_ledger = - { scan_state = scan_state' - ; ledger = new_ledger - ; constraint_constants = t.constraint_constants - ; pending_coinbase_collection = updated_pending_coinbase_collection' - } - in [%log internal] "Hash_new_staged_ledger" ; let staged_ledger_hash = hash new_staged_ledger in [%log internal] "Hash_new_staged_ledger_done" ; ( `Hash_after_applying staged_ledger_hash - , `Ledger_proof res_opt + , `Ledger_proof ledger_proof , `Staged_ledger new_staged_ledger , `Pending_coinbase_update ( is_new_stack diff --git a/src/lib/staged_ledger/staged_ledger.mli b/src/lib/staged_ledger/staged_ledger.mli index 805314908cb..ede4d681eb7 100644 --- a/src/lib/staged_ledger/staged_ledger.mli +++ b/src/lib/staged_ledger/staged_ledger.mli @@ -379,5 +379,5 @@ module Test_helpers : sig * Pending_coinbase.Stack_versioned.t ] * [> `First_pass_ledger_end of Frozen_ledger_hash.t ] , Staged_ledger_error.t ) - Deferred.Result.t + Result.t end diff --git a/src/lib/transaction_snark_scan_state/transaction_snark_scan_state.ml b/src/lib/transaction_snark_scan_state/transaction_snark_scan_state.ml index 47418185c32..720bab23fd9 100644 --- a/src/lib/transaction_snark_scan_state/transaction_snark_scan_state.ml +++ b/src/lib/transaction_snark_scan_state/transaction_snark_scan_state.ml @@ -594,7 +594,7 @@ struct () in match%map - O1trace.sync_thread "validate_transaction_snark_scan_state" (fun () -> + O1trace.thread "validate_transaction_snark_scan_state" (fun () -> scan_statement t ~constraint_constants ~statement_check ~verifier ) with | Error (`Error e) ->