diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 219a0d6ab..85792a1d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,8 @@ jobs: - uses: julia-actions/cache@v1 - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 + env: + JULIA_NUM_THREADS: 4 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v4 with: diff --git a/docs/src/apireference.md b/docs/src/apireference.md index b43e867c2..248e80e94 100644 --- a/docs/src/apireference.md +++ b/docs/src/apireference.md @@ -68,6 +68,7 @@ SDDP.SimulatorSamplingScheme ```@docs SDDP.AbstractParallelScheme SDDP.Serial +SDDP.Threaded SDDP.Asynchronous ``` diff --git a/src/algorithm.jl b/src/algorithm.jl index 9f7b46555..758d9ca8b 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -3,6 +3,16 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +macro _timeit_threadsafe(timer, label, block) + return esc(quote + if Threads.threadid() == 1 + TimerOutputs.@timeit $timer $label $block + else + $block + end + end) +end + # to_nodal_form is an internal helper function so users can pass arguments like: # risk_measure = SDDP.Expectation(), # risk_measure = Dict(1=>Expectation(), 2=>WorstCase()) @@ -101,6 +111,8 @@ struct Options{T} forward_pass_callback::Any post_iteration_callback::Any last_log_iteration::Ref{Int} + # For threading + lock::ReentrantLock # Internal function: users should never construct this themselves. function Options( model::PolicyGraph{T}, @@ -144,6 +156,7 @@ struct Options{T} forward_pass_callback, post_iteration_callback, Ref{Int}(0), # last_log_iteration + ReentrantLock(), ) end end @@ -387,6 +400,7 @@ function solve_subproblem( scenario_path::Vector{Tuple{T,S}}; duality_handler::Union{Nothing,AbstractDualityHandler}, ) where {T,S} + lock(node.lock) # LOCK-ID-005 _initialize_solver(node; throw_error = false) # Parameterize the model. First, fix the value of the incoming state # variables. Then parameterize the model depending on `noise`. Finally, @@ -423,12 +437,13 @@ function solve_subproblem( end state = get_outgoing_state(node) stage_objective = stage_objective_value(node.stage_objective) - TimerOutputs.@timeit model.timer_output "get_dual_solution" begin + @_timeit_threadsafe model.timer_output "get_dual_solution" begin objective, dual_values = get_dual_solution(node, duality_handler) end if node.post_optimize_hook !== nothing node.post_optimize_hook(pre_optimize_ret) end + unlock(node.lock) # LOCK-ID-005 return ( state = state, duals = dual_values, @@ -505,10 +520,6 @@ function backward_pass( objective_states::Vector{NTuple{N,Float64}}, belief_states::Vector{Tuple{Int,Dict{T,Float64}}}, ) where {T,NoiseType,N} - TimerOutputs.@timeit model.timer_output "prepare_backward_pass" begin - restore_duality = - prepare_backward_pass(model, options.duality_handler, options) - end # TODO(odow): improve storage type. cuts = Dict{T,Vector{Any}}(index => Any[] for index in keys(model.nodes)) for index in length(scenario_path):-1:1 @@ -533,6 +544,7 @@ function backward_pass( options.backward_sampling_scheme, scenario_path[1:index], options.duality_handler, + options, ) end # We need to refine our estimate at all nodes in the partition. @@ -573,6 +585,7 @@ function backward_pass( options.backward_sampling_scheme, scenario_path[1:index], options.duality_handler, + options, ) new_cuts = refine_bellman_function( model, @@ -613,9 +626,6 @@ function backward_pass( end end end - TimerOutputs.@timeit model.timer_output "prepare_backward_pass" begin - restore_duality() - end return cuts end @@ -652,6 +662,7 @@ function solve_all_children( backward_sampling_scheme::AbstractBackwardSamplingScheme, scenario_path, duality_handler::Union{Nothing,AbstractDualityHandler}, + options, ) where {T} length_scenario_path = length(scenario_path) for child in node.children @@ -659,6 +670,14 @@ function solve_all_children( continue end child_node = model[child.term] + lock(child_node.lock) # LOCK-ID-004 + @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin + restore_duality = prepare_backward_pass( + child_node, + options.duality_handler, + options, + ) + end for noise in sample_backward_noise_terms_with_state( backward_sampling_scheme, child_node, @@ -695,7 +714,7 @@ function solve_all_children( noise.term, ) end - TimerOutputs.@timeit model.timer_output "solve_subproblem" begin + @_timeit_threadsafe model.timer_output "solve_subproblem" begin subproblem_results = solve_subproblem( model, child_node, @@ -715,6 +734,10 @@ function solve_all_children( length(items.duals) end end + @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin + restore_duality() + end + unlock(child_node.lock) # LOCK-ID-004 end if length(scenario_path) == length_scenario_path # No-op. There weren't any children to solve. @@ -752,6 +775,7 @@ function calculate_bound( continue end node = model[child.term] + lock(node.lock) # LOCK-ID-006 for noise in node.noise_terms if node.objective_state !== nothing update_objective_state( @@ -783,6 +807,7 @@ function calculate_bound( push!(probabilities, child.probability * noise.probability) push!(noise_supports, noise.term) end + unlock(node.lock) # LOCK-ID-006 end # Now compute the risk-adjusted probability measure: risk_adjusted_probability = similar(probabilities) @@ -812,11 +837,11 @@ end function iteration(model::PolicyGraph{T}, options::Options) where {T} model.ext[:numerical_issue] = false - TimerOutputs.@timeit model.timer_output "forward_pass" begin + @_timeit_threadsafe model.timer_output "forward_pass" begin forward_trajectory = forward_pass(model, options, options.forward_pass) options.forward_pass_callback(forward_trajectory) end - TimerOutputs.@timeit model.timer_output "backward_pass" begin + @_timeit_threadsafe model.timer_output "backward_pass" begin cuts = backward_pass( model, options, @@ -826,26 +851,31 @@ function iteration(model::PolicyGraph{T}, options::Options) where {T} forward_trajectory.belief_states, ) end - TimerOutputs.@timeit model.timer_output "calculate_bound" begin + @_timeit_threadsafe model.timer_output "calculate_bound" begin bound = calculate_bound(model) end - push!( - options.log, - Log( - length(options.log) + 1, - bound, - forward_trajectory.cumulative_value, - time() - options.start_time, - Distributed.myid(), - model.ext[:total_solves], - duality_log_key(options.duality_handler), - model.ext[:numerical_issue], - ), - ) + lock(options.lock) + try + push!( + options.log, + Log( + length(options.log) + 1, + bound, + forward_trajectory.cumulative_value, + time() - options.start_time, + max(Threads.threadid(), Distributed.myid()), + model.ext[:total_solves], + duality_log_key(options.duality_handler), + model.ext[:numerical_issue], + ), + ) + finally + unlock(options.lock) + end has_converged, status = convergence_test(model, options.log, options.stopping_rules) return IterationResult( - Distributed.myid(), + max(Threads.threadid(), Distributed.myid()), bound, forward_trajectory.cumulative_value, has_converged, @@ -1130,6 +1160,11 @@ function train( finally # And close the dashboard callback if necessary. dashboard_callback(nothing, true) + for node in values(model.nodes) + if islocked(node.lock) + unlock(node.lock) + end + end end training_results = TrainingResults(status, log) model.most_recent_training_results = training_results @@ -1177,6 +1212,7 @@ function _simulate( objective_states = NTuple{N,Float64}[] for (depth, (node_index, noise)) in enumerate(scenario_path) node = model[node_index] + lock(node.lock) # LOCK-ID-002 # Objective state interpolation. objective_state_vector = update_objective_state( node.objective_state, @@ -1253,6 +1289,7 @@ function _simulate( push!(simulation, store) # Set outgoing state as the incoming state for the next node. incoming_state = copy(subproblem_results.state) + unlock(node.lock) # LOCK-ID-002 end return simulation end diff --git a/src/plugins/bellman_functions.jl b/src/plugins/bellman_functions.jl index 5c6997edd..8a38cc3b4 100644 --- a/src/plugins/bellman_functions.jl +++ b/src/plugins/bellman_functions.jl @@ -410,6 +410,7 @@ function refine_bellman_function( nominal_probability::Vector{Float64}, objective_realizations::Vector{Float64}, ) where {T} + lock(node.lock) # LOCK-ID-003 # Sanity checks. @assert length(dual_variables) == length(noise_supports) == @@ -426,8 +427,8 @@ function refine_bellman_function( model.objective_sense == MOI.MIN_SENSE, ) # The meat of the function. - if bellman_function.cut_type == SINGLE_CUT - return _add_average_cut( + ret = if bellman_function.cut_type == SINGLE_CUT + _add_average_cut( node, outgoing_state, risk_adjusted_probability, @@ -438,7 +439,7 @@ function refine_bellman_function( else # Add a multi-cut @assert bellman_function.cut_type == MULTI_CUT _add_locals_if_necessary(node, bellman_function, length(dual_variables)) - return _add_multi_cut( + _add_multi_cut( node, outgoing_state, risk_adjusted_probability, @@ -447,6 +448,8 @@ function refine_bellman_function( offset, ) end + unlock(node.lock) # LOCK-ID-003 + return ret end function _add_average_cut( diff --git a/src/plugins/duality_handlers.jl b/src/plugins/duality_handlers.jl index df32682c2..0f0d9fd99 100644 --- a/src/plugins/duality_handlers.jl +++ b/src/plugins/duality_handlers.jl @@ -61,24 +61,6 @@ SDDiP(args...; kwargs...) = _deprecate_integrality_handler() ContinuousRelaxation(args...; kwargs...) = _deprecate_integrality_handler() -function prepare_backward_pass( - model::PolicyGraph, - duality_handler::AbstractDualityHandler, - options::Options, -) - undo = Function[] - for (_, node) in model.nodes - push!(undo, prepare_backward_pass(node, duality_handler, options)) - end - function undo_relax() - for f in undo - f() - end - return - end - return undo_relax -end - function get_dual_solution(node::Node, ::Nothing) return JuMP.objective_value(node.subproblem), Dict{Symbol,Float64}() end @@ -351,8 +333,10 @@ focus more on the more-recent rewards. mutable struct BanditDuality <: AbstractDualityHandler arms::Vector{_BanditArm} last_arm_index::Int + logs_seen::Int + function BanditDuality(args::AbstractDualityHandler...) - return new(_BanditArm[_BanditArm(arg, Float64[]) for arg in args], 1) + return new(_BanditArm[_BanditArm(arg, Float64[]) for arg in args], 1, 1) end end @@ -404,15 +388,16 @@ function _update_rewards(handler::BanditDuality, log::Vector{Log}) end function prepare_backward_pass( - model::PolicyGraph, + node::Node, handler::BanditDuality, options::Options, ) - if length(options.log) > 1 + if length(options.log) > handler.logs_seen _update_rewards(handler, options.log) + handler.logs_seen = length(options.log) end arm = _choose_best_arm(handler) - return prepare_backward_pass(model, arm.handler, options) + return prepare_backward_pass(node, arm.handler, options) end function get_dual_solution(node::Node, handler::BanditDuality) diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index a62db0f5c..c4582f12b 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -27,7 +27,7 @@ function forward_pass( ) where {T} # First up, sample a scenario. Note that if a cycle is detected, this will # return the cycle node as well. - TimerOutputs.@timeit model.timer_output "sample_scenario" begin + @_timeit_threadsafe model.timer_output "sample_scenario" begin scenario_path, terminated_due_to_cycle = sample_scenario(model, options.sampling_scheme) end @@ -51,6 +51,7 @@ function forward_pass( # Iterate down the scenario. for (depth, (node_index, noise)) in enumerate(scenario_path) node = model[node_index] + lock(node.lock) # LOCK-ID-001 # Objective state interpolation. objective_state_vector = update_objective_state( node.objective_state, @@ -94,7 +95,7 @@ function forward_pass( end # ===== End: starting state for infinite horizon ===== # Solve the subproblem, note that `duality_handler = nothing`. - TimerOutputs.@timeit model.timer_output "solve_subproblem" begin + @_timeit_threadsafe model.timer_output "solve_subproblem" begin subproblem_results = solve_subproblem( model, node, @@ -112,6 +113,7 @@ function forward_pass( # Add the outgoing state variable to the list of states we have sampled # on this forward pass. push!(sampled_states, incoming_state_value) + unlock(node.lock) # LOCK-ID-001 end if terminated_due_to_cycle # We terminated due to a cycle. Here is the list of possible starting diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index 9957b0675..5f5d41f2e 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -327,3 +327,68 @@ function _simulate( end return end + +""" + Threaded() + +Run SDDP in multi-threaded mode. + +Use `julia --threads N` to start Julia with `N` threads. In most cases, you +should pick `N` to be the number of physical cores on your machine. + +!!! danger + This plug-in is experimental, and parts of SDDP.jl may not be threadsafe. If + you encounter any problems or crashes, please open a GitHub issue. + +## Example + +```julia +SDDP.train(model; parallel_scheme = SDDP.Threaded()) +SDDP.simulate(model; parallel_scheme = SDDP.Threaded()) +``` +""" +struct Threaded <: AbstractParallelScheme end + +Base.show(io::IO, ::Threaded) = print(io, "Threaded()") + +interrupt(::Threaded) = nothing + +function master_loop( + ::Threaded, + model::PolicyGraph{T}, + options::Options, +) where {T} + _initialize_solver(model; throw_error = false) + convergence_lock = ReentrantLock() + keep_iterating, status = true, nothing + @sync for _ in 1:Threads.nthreads() + Threads.@spawn while keep_iterating + result = iteration(model, options) + options.post_iteration_callback(result) + lock(() -> log_iteration(options), options.lock) + if result.has_converged + lock(convergence_lock) do + keep_iterating = false + status = result.status + return + end + end + end + end + return status +end + +function _simulate( + model::PolicyGraph, + ::Threaded, + number_replications::Int, + variables::Vector{Symbol}; + kwargs..., +) + _initialize_solver(model; throw_error = false) + ret = Vector{Vector{Dict{Symbol,Any}}}(undef, number_replications) + @sync for i in 1:number_replications + Threads.@spawn ret[i] = _simulate(model, variables; kwargs...) + end + return ret +end diff --git a/src/user_interface.jl b/src/user_interface.jl index 7a7afc2b3..f9019d009 100644 --- a/src/user_interface.jl +++ b/src/user_interface.jl @@ -661,6 +661,8 @@ mutable struct Node{T} # An extension dictionary. This is a useful place for packages that extend # SDDP.jl to stash things. ext::Dict{Symbol,Any} + # Lock for threading + lock::ReentrantLock end function Base.show(io::IO, node::Node) @@ -990,6 +992,7 @@ function PolicyGraph( direct_mode ? nothing : optimizer, # The extension dictionary. Dict{Symbol,Any}(), + ReentrantLock(), ) subproblem.ext[:sddp_policy_graph] = policy_graph policy_graph.nodes[node_index] = subproblem.ext[:sddp_node] = node diff --git a/test/plugins/duality_handlers.jl b/test/plugins/duality_handlers.jl index e6494266a..2153d310c 100644 --- a/test/plugins/duality_handlers.jl +++ b/test/plugins/duality_handlers.jl @@ -20,6 +20,24 @@ function runtests() return end +function SDDP.prepare_backward_pass( + model::SDDP.PolicyGraph, + duality_handler::SDDP.AbstractDualityHandler, + options::SDDP.Options, +) + undo = Function[] + for (_, node) in model.nodes + push!(undo, SDDP.prepare_backward_pass(node, duality_handler, options)) + end + function undo_relax() + for f in undo + f() + end + return + end + return undo_relax +end + # Single-stage model helps set up a node and subproblem to test dual # calculations function easy_single_stage(duality_handler) @@ -410,7 +428,7 @@ function test_BanditDuality_show() return end -function test_BanditDuality_show() +function test_BanditDuality_eval() model = SDDP.LinearPolicyGraph( stages = 2, lower_bound = -100.0, diff --git a/test/plugins/threaded.jl b/test/plugins/threaded.jl new file mode 100644 index 000000000..5af855d77 --- /dev/null +++ b/test/plugins/threaded.jl @@ -0,0 +1,66 @@ +# Copyright (c) 2017-23, Oscar Dowson and SDDP.jl contributors. +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +function test_threaded() + # We should test that JULIA_NUM_THREADS is set in CI jobs + if get(ENV, "CI", "false") == "true" + num_threads = get(ENV, "JULIA_NUM_THREADS", "0") + @test parse(Int, num_threads) == Threads.nthreads() + @test Threads.nthreads() > 1 + end + c_eta, c_pt = [0.8, 0.5], [2, 5, 8, 11, 14] + df_demand = rand(10:10:60, 24) + model = SDDP.LinearPolicyGraph(; + stages = 24, + sense = :Min, + lower_bound = 0.0, + optimizer = HiGHS.Optimizer, + ) do subproblem, t + @variable( + subproblem, + 0 <= x_volume[1:2] <= 8, + SDDP.State, + initial_value = 1, + ) + @variable(subproblem, u_u[1:2] >= 0) + @variable(subproblem, u_v[1:2] >= 0) + @variable(subproblem, 0 <= u_x[1:5] <= 5, Int) + @variable(subproblem, u_slack >= 0) + @variable(subproblem, w_noise) + @constraint( + subproblem, + [j in 1:2], + x_volume[j].out == x_volume[j].in + c_eta[j] * u_v[j] - u_u[j], + ) + @constraint( + subproblem, + sum(u_x) + sum(u_u) - sum(u_v) + u_slack == df_demand[t] + w_noise, + ) + SDDP.parameterize(subproblem, [-2.5, -1.5, -0.5, 0.5, 1.5, 2.5]) do w + return JuMP.fix(w_noise, w) + end + @stageobjective(subproblem, c_pt' * u_x + 35 * u_slack) + return + end + SDDP.train(model; iteration_limit = 100, parallel_scheme = SDDP.Threaded()) + thread_ids_seen = + Set{Int}(log.pid for log in model.most_recent_training_results.log) + min_threads = Threads.nthreads() == 1 ? 1 : 2 + @test min_threads <= length(thread_ids_seen) <= Threads.nthreads() + recorder = Dict{Symbol,Function}(:thread_id => sp -> Threads.threadid()) + simulations = SDDP.simulate( + model, + 100; + parallel_scheme = SDDP.Threaded(), + custom_recorders = recorder, + ) + thread_ids_seen = + Set{Int}(data[:thread_id] for sim in simulations for data in sim) + min_threads = Threads.nthreads() > 1 ? 1 : 2 + @test min_threads <= length(thread_ids_seen) <= Threads.nthreads() + return +end + +test_threaded()