From 4f56ea79c833288f602fab221d9824c6e3c1742e Mon Sep 17 00:00:00 2001 From: odow Date: Tue, 23 Jul 2024 15:58:45 +1200 Subject: [PATCH 01/13] WIP: add Threaded parallel scheme --- src/algorithm.jl | 65 +++++++++++++++++++++++---------- src/plugins/forward_passes.jl | 6 ++- src/plugins/parallel_schemes.jl | 49 +++++++++++++++++++++++++ src/user_interface.jl | 3 ++ 4 files changed, 101 insertions(+), 22 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index 9f7b46555..952e9216e 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -3,6 +3,16 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +macro _timeit_threadsafe(timer, label, block) + return esc(quote + if Threads.threadid() == 1 + TimerOutputs.@timeit $timer $label $block + else + $block + end + end) +end + # to_nodal_form is an internal helper function so users can pass arguments like: # risk_measure = SDDP.Expectation(), # risk_measure = Dict(1=>Expectation(), 2=>WorstCase()) @@ -101,6 +111,8 @@ struct Options{T} forward_pass_callback::Any post_iteration_callback::Any last_log_iteration::Ref{Int} + # For threading + lock::ReentrantLock # Internal function: users should never construct this themselves. function Options( model::PolicyGraph{T}, @@ -144,6 +156,7 @@ struct Options{T} forward_pass_callback, post_iteration_callback, Ref{Int}(0), # last_log_iteration + ReentrantLock(), ) end end @@ -423,7 +436,7 @@ function solve_subproblem( end state = get_outgoing_state(node) stage_objective = stage_objective_value(node.stage_objective) - TimerOutputs.@timeit model.timer_output "get_dual_solution" begin + @_timeit_threadsafe model.timer_output "get_dual_solution" begin objective, dual_values = get_dual_solution(node, duality_handler) end if node.post_optimize_hook !== nothing @@ -505,7 +518,7 @@ function backward_pass( objective_states::Vector{NTuple{N,Float64}}, belief_states::Vector{Tuple{Int,Dict{T,Float64}}}, ) where {T,NoiseType,N} - TimerOutputs.@timeit model.timer_output "prepare_backward_pass" begin + @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin restore_duality = prepare_backward_pass(model, options.duality_handler, options) end @@ -613,7 +626,7 @@ function backward_pass( end end end - TimerOutputs.@timeit model.timer_output "prepare_backward_pass" begin + @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin restore_duality() end return cuts @@ -695,7 +708,7 @@ function solve_all_children( noise.term, ) end - TimerOutputs.@timeit model.timer_output "solve_subproblem" begin + @_timeit_threadsafe model.timer_output "solve_subproblem" begin subproblem_results = solve_subproblem( model, child_node, @@ -812,11 +825,11 @@ end function iteration(model::PolicyGraph{T}, options::Options) where {T} model.ext[:numerical_issue] = false - TimerOutputs.@timeit model.timer_output "forward_pass" begin + @_timeit_threadsafe model.timer_output "forward_pass" begin forward_trajectory = forward_pass(model, options, options.forward_pass) options.forward_pass_callback(forward_trajectory) end - TimerOutputs.@timeit model.timer_output "backward_pass" begin + @_timeit_threadsafe model.timer_output "backward_pass" begin cuts = backward_pass( model, options, @@ -826,22 +839,27 @@ function iteration(model::PolicyGraph{T}, options::Options) where {T} forward_trajectory.belief_states, ) end - TimerOutputs.@timeit model.timer_output "calculate_bound" begin + @_timeit_threadsafe model.timer_output "calculate_bound" begin bound = calculate_bound(model) end - push!( - options.log, - Log( - length(options.log) + 1, - bound, - forward_trajectory.cumulative_value, - time() - options.start_time, - Distributed.myid(), - model.ext[:total_solves], - duality_log_key(options.duality_handler), - model.ext[:numerical_issue], - ), - ) + lock(options.lock) + try + push!( + options.log, + Log( + length(options.log) + 1, + bound, + forward_trajectory.cumulative_value, + time() - options.start_time, + Distributed.myid(), + model.ext[:total_solves], + duality_log_key(options.duality_handler), + model.ext[:numerical_issue], + ), + ) + finally + unlock(options.lock) + end has_converged, status = convergence_test(model, options.log, options.stopping_rules) return IterationResult( @@ -1130,6 +1148,11 @@ function train( finally # And close the dashboard callback if necessary. dashboard_callback(nothing, true) + for node in values(model.nodes) + if islocked(node.lock) + unlock(node.lock) + end + end end training_results = TrainingResults(status, log) model.most_recent_training_results = training_results @@ -1177,6 +1200,7 @@ function _simulate( objective_states = NTuple{N,Float64}[] for (depth, (node_index, noise)) in enumerate(scenario_path) node = model[node_index] + lock(node.lock) # LOCK-ID-002 # Objective state interpolation. objective_state_vector = update_objective_state( node.objective_state, @@ -1253,6 +1277,7 @@ function _simulate( push!(simulation, store) # Set outgoing state as the incoming state for the next node. incoming_state = copy(subproblem_results.state) + unlock(node.lock) # LOCK-ID-002 end return simulation end diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index a62db0f5c..c4582f12b 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -27,7 +27,7 @@ function forward_pass( ) where {T} # First up, sample a scenario. Note that if a cycle is detected, this will # return the cycle node as well. - TimerOutputs.@timeit model.timer_output "sample_scenario" begin + @_timeit_threadsafe model.timer_output "sample_scenario" begin scenario_path, terminated_due_to_cycle = sample_scenario(model, options.sampling_scheme) end @@ -51,6 +51,7 @@ function forward_pass( # Iterate down the scenario. for (depth, (node_index, noise)) in enumerate(scenario_path) node = model[node_index] + lock(node.lock) # LOCK-ID-001 # Objective state interpolation. objective_state_vector = update_objective_state( node.objective_state, @@ -94,7 +95,7 @@ function forward_pass( end # ===== End: starting state for infinite horizon ===== # Solve the subproblem, note that `duality_handler = nothing`. - TimerOutputs.@timeit model.timer_output "solve_subproblem" begin + @_timeit_threadsafe model.timer_output "solve_subproblem" begin subproblem_results = solve_subproblem( model, node, @@ -112,6 +113,7 @@ function forward_pass( # Add the outgoing state variable to the list of states we have sampled # on this forward pass. push!(sampled_states, incoming_state_value) + unlock(node.lock) # LOCK-ID-001 end if terminated_due_to_cycle # We terminated due to a cycle. Here is the list of possible starting diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index 9957b0675..9dfd3614a 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -327,3 +327,52 @@ function _simulate( end return end + +""" + Threaded() + +Run SDDP in threaded mode. +""" +struct Threaded <: AbstractParallelScheme end + +Base.show(io::IO, ::Threaded) = print(io, "Threaded()") + +interrupt(::Threaded) = nothing + +function master_loop( + ::Threaded, + model::PolicyGraph{T}, + options::Options, +) where {T} + _initialize_solver(model; throw_error = false) + while true + result = iteration(model, options) + lock(options.lock) do + options.post_iteration_callback(result) + log_iteration(options) + end + if result.has_converged + return result.status + end + end + return +end + +function _simulate( + model::PolicyGraph, + ::Threaded, + number_replications::Int, + variables::Vector{Symbol}; + kwargs..., +) + _initialize_solver(model; throw_error = false) + ret = Vector{Dict{Symbol,Any}}[] + ret_lock = ReentrantLock() + Threads.@threads for _ in 1:number_replications + simulation = _simulate(model, variables; kwargs...) + lock(ret_lock) do + push!(ret, simulation) + end + end + return ret +end diff --git a/src/user_interface.jl b/src/user_interface.jl index 7a7afc2b3..f9019d009 100644 --- a/src/user_interface.jl +++ b/src/user_interface.jl @@ -661,6 +661,8 @@ mutable struct Node{T} # An extension dictionary. This is a useful place for packages that extend # SDDP.jl to stash things. ext::Dict{Symbol,Any} + # Lock for threading + lock::ReentrantLock end function Base.show(io::IO, node::Node) @@ -990,6 +992,7 @@ function PolicyGraph( direct_mode ? nothing : optimizer, # The extension dictionary. Dict{Symbol,Any}(), + ReentrantLock(), ) subproblem.ext[:sddp_policy_graph] = policy_graph policy_graph.nodes[node_index] = subproblem.ext[:sddp_node] = node From 142e2653c6f8e84aa6b898136a8161991e13247f Mon Sep 17 00:00:00 2001 From: odow Date: Tue, 23 Jul 2024 16:47:13 +1200 Subject: [PATCH 02/13] Update --- src/algorithm.jl | 4 ++-- src/plugins/parallel_schemes.jl | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index 952e9216e..f37450b0a 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -851,7 +851,7 @@ function iteration(model::PolicyGraph{T}, options::Options) where {T} bound, forward_trajectory.cumulative_value, time() - options.start_time, - Distributed.myid(), + max(Threads.threadid(), Distributed.myid()), model.ext[:total_solves], duality_log_key(options.duality_handler), model.ext[:numerical_issue], @@ -863,7 +863,7 @@ function iteration(model::PolicyGraph{T}, options::Options) where {T} has_converged, status = convergence_test(model, options.log, options.stopping_rules) return IterationResult( - Distributed.myid(), + max(Threads.threadid(), Distributed.myid()), bound, forward_trajectory.cumulative_value, has_converged, diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index 9dfd3614a..3b659f3c2 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -350,6 +350,7 @@ function master_loop( lock(options.lock) do options.post_iteration_callback(result) log_iteration(options) + return end if result.has_converged return result.status @@ -372,6 +373,7 @@ function _simulate( simulation = _simulate(model, variables; kwargs...) lock(ret_lock) do push!(ret, simulation) + return end end return ret From 5e02b23368a77850d76707c25c7cb674e7008cae Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 09:43:03 +1200 Subject: [PATCH 03/13] Update --- src/algorithm.jl | 6 ++++++ src/plugins/bellman_functions.jl | 9 ++++++--- src/plugins/parallel_schemes.jl | 34 ++++++++++++++++---------------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index f37450b0a..ec2f925d6 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -400,6 +400,7 @@ function solve_subproblem( scenario_path::Vector{Tuple{T,S}}; duality_handler::Union{Nothing,AbstractDualityHandler}, ) where {T,S} + lock(node.lock) # LOCK-ID-005 _initialize_solver(node; throw_error = false) # Parameterize the model. First, fix the value of the incoming state # variables. Then parameterize the model depending on `noise`. Finally, @@ -442,6 +443,7 @@ function solve_subproblem( if node.post_optimize_hook !== nothing node.post_optimize_hook(pre_optimize_ret) end + unlock(node.lock) # LOCK-ID-005 return ( state = state, duals = dual_values, @@ -672,6 +674,7 @@ function solve_all_children( continue end child_node = model[child.term] + lock(child_node.lock) # LOCK-ID-004 for noise in sample_backward_noise_terms_with_state( backward_sampling_scheme, child_node, @@ -728,6 +731,7 @@ function solve_all_children( length(items.duals) end end + unlock(child_node.lock) # LOCK-ID-004 end if length(scenario_path) == length_scenario_path # No-op. There weren't any children to solve. @@ -765,6 +769,7 @@ function calculate_bound( continue end node = model[child.term] + lock(node.lock) # LOCK-ID-006 for noise in node.noise_terms if node.objective_state !== nothing update_objective_state( @@ -796,6 +801,7 @@ function calculate_bound( push!(probabilities, child.probability * noise.probability) push!(noise_supports, noise.term) end + unlock(node.lock) # LOCK-ID-006 end # Now compute the risk-adjusted probability measure: risk_adjusted_probability = similar(probabilities) diff --git a/src/plugins/bellman_functions.jl b/src/plugins/bellman_functions.jl index 5c6997edd..8a38cc3b4 100644 --- a/src/plugins/bellman_functions.jl +++ b/src/plugins/bellman_functions.jl @@ -410,6 +410,7 @@ function refine_bellman_function( nominal_probability::Vector{Float64}, objective_realizations::Vector{Float64}, ) where {T} + lock(node.lock) # LOCK-ID-003 # Sanity checks. @assert length(dual_variables) == length(noise_supports) == @@ -426,8 +427,8 @@ function refine_bellman_function( model.objective_sense == MOI.MIN_SENSE, ) # The meat of the function. - if bellman_function.cut_type == SINGLE_CUT - return _add_average_cut( + ret = if bellman_function.cut_type == SINGLE_CUT + _add_average_cut( node, outgoing_state, risk_adjusted_probability, @@ -438,7 +439,7 @@ function refine_bellman_function( else # Add a multi-cut @assert bellman_function.cut_type == MULTI_CUT _add_locals_if_necessary(node, bellman_function, length(dual_variables)) - return _add_multi_cut( + _add_multi_cut( node, outgoing_state, risk_adjusted_probability, @@ -447,6 +448,8 @@ function refine_bellman_function( offset, ) end + unlock(node.lock) # LOCK-ID-003 + return ret end function _add_average_cut( diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index 3b659f3c2..a064150b0 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -345,18 +345,23 @@ function master_loop( options::Options, ) where {T} _initialize_solver(model; throw_error = false) - while true - result = iteration(model, options) - lock(options.lock) do + convergence_lock = ReentrantLock() + keep_iterating, status = true, nothing + @sync for _ in 1:Threads.nthreads() + Threads.@spawn while keep_iterating + result = iteration(model, options) options.post_iteration_callback(result) - log_iteration(options) - return - end - if result.has_converged - return result.status + lock(() -> log_iteration(options), options.lock) + if result.has_converged + lock(convergence_lock) do + keep_iterating = false + status = result.status + return + end + end end end - return + return status end function _simulate( @@ -367,14 +372,9 @@ function _simulate( kwargs..., ) _initialize_solver(model; throw_error = false) - ret = Vector{Dict{Symbol,Any}}[] - ret_lock = ReentrantLock() - Threads.@threads for _ in 1:number_replications - simulation = _simulate(model, variables; kwargs...) - lock(ret_lock) do - push!(ret, simulation) - return - end + ret = Vector{Vector{Dict{Symbol,Any}}}(undef, number_replications) + @sync for i in 1:number_replications + Threads.@spawn ret[i] = _simulate(model, variables; kwargs...) end return ret end From 7de5c48f6443f6bf2b51eddfa4e8f2e4471468bf Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 09:47:50 +1200 Subject: [PATCH 04/13] Update docs --- docs/src/apireference.md | 1 + src/plugins/parallel_schemes.jl | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/docs/src/apireference.md b/docs/src/apireference.md index b43e867c2..248e80e94 100644 --- a/docs/src/apireference.md +++ b/docs/src/apireference.md @@ -68,6 +68,7 @@ SDDP.SimulatorSamplingScheme ```@docs SDDP.AbstractParallelScheme SDDP.Serial +SDDP.Threaded SDDP.Asynchronous ``` diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index a064150b0..5f5d41f2e 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -331,7 +331,21 @@ end """ Threaded() -Run SDDP in threaded mode. +Run SDDP in multi-threaded mode. + +Use `julia --threads N` to start Julia with `N` threads. In most cases, you +should pick `N` to be the number of physical cores on your machine. + +!!! danger + This plug-in is experimental, and parts of SDDP.jl may not be threadsafe. If + you encounter any problems or crashes, please open a GitHub issue. + +## Example + +```julia +SDDP.train(model; parallel_scheme = SDDP.Threaded()) +SDDP.simulate(model; parallel_scheme = SDDP.Threaded()) +``` """ struct Threaded <: AbstractParallelScheme end From 27d575a1ca406de521015c0a9ade8d262ea9fb92 Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 10:16:25 +1200 Subject: [PATCH 05/13] Add test --- .github/workflows/ci.yml | 2 ++ test/plugins/threaded.jl | 77 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 test/plugins/threaded.jl diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 219a0d6ab..85792a1d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,8 @@ jobs: - uses: julia-actions/cache@v1 - uses: julia-actions/julia-buildpkg@v1 - uses: julia-actions/julia-runtest@v1 + env: + JULIA_NUM_THREADS: 4 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v4 with: diff --git a/test/plugins/threaded.jl b/test/plugins/threaded.jl new file mode 100644 index 000000000..9cab0f50c --- /dev/null +++ b/test/plugins/threaded.jl @@ -0,0 +1,77 @@ +# Copyright (c) 2017-23, Oscar Dowson and SDDP.jl contributors. +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +function test_threaded() + num_periods = 24 + num_thermal = 5 + num_battery = 2 + c_capacity = 14 + c_VOLL = 35 + c_eta = [0.8, 0.5] + c_pt = [2, 5, 8, 11, 14] + c_q = fill(c_capacity, 5) + df_demand = rand(10:10:60, num_periods) + model = SDDP.LinearPolicyGraph(; + stages = num_periods, + sense = :Min, + lower_bound = 0.0, + optimizer = HiGHS.Optimizer, + ) do subproblem, t + @variables(subproblem, begin + 0 <= x_volume[1:num_battery] <= 8, SDDP.State, (initial_value = 1) + u_u[1:num_battery] >= 0 + u_v[1:num_battery] >= 0 + 0 <= u_x[i in 1:num_thermal] <= c_q[i] + u_slack >= 0 + w_noise + end) + @constraints(subproblem, begin + battery[j in 1:num_battery], + x_volume[j].out == x_volume[j].in + c_eta[j] * u_v[j] - u_u[j] + sum(u_x) + sum(u_u) - sum(u_v) + u_slack == df_demand[t] + w_noise + end) + O = [-2.5, -1.5, -0.5, 0.5, 1.5, 2.5] + SDDP.parameterize(subproblem, O) do omega + JuMP.fix(w_noise, omega) + return + end + @stageobjective(subproblem, c_pt' * u_x + u_slack * c_VOLL) + return + end + SDDP.train( + model; + iteration_limit = 100, + parallel_scheme = SDDP.Threaded(), + ) + thread_ids_seen = + Set{Int}(log.pid for log in model.most_recent_training_results.log) + if Threads.nthreads() == 1 + @test length(thread_ids_seen) == 1 + else + @test length(thread_ids_seen) > 1 + end + simulations = SDDP.simulate( + model, + 100; + parallel_scheme = SDDP.Threaded(), + custom_recorders = Dict{Symbol, Function}( + :thread_id => sp -> Threads.threadid(), + ), + ) + thread_ids_seen = Set{Int}() + for sim in simulations + thread_ids = unique(data[:thread_id] for data in sim) + @test length(thread_ids) == 1 + push!(thread_ids_seen, only(thread_ids)) + end + if Threads.nthreads() == 1 + @test length(thread_ids_seen) == 1 + else + @test length(thread_ids_seen) > 1 + end + return +end + +test_threaded() From 614c299fa61a72752b9864d92037bc3cb4138e61 Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 10:46:31 +1200 Subject: [PATCH 06/13] Update --- test/plugins/threaded.jl | 87 ++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 52 deletions(-) diff --git a/test/plugins/threaded.jl b/test/plugins/threaded.jl index 9cab0f50c..45b8ddaf0 100644 --- a/test/plugins/threaded.jl +++ b/test/plugins/threaded.jl @@ -4,73 +4,56 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. function test_threaded() - num_periods = 24 - num_thermal = 5 - num_battery = 2 - c_capacity = 14 - c_VOLL = 35 - c_eta = [0.8, 0.5] - c_pt = [2, 5, 8, 11, 14] - c_q = fill(c_capacity, 5) - df_demand = rand(10:10:60, num_periods) + c_eta, c_pt = [0.8, 0.5], [2, 5, 8, 11, 14] + df_demand = rand(10:10:60, 24) model = SDDP.LinearPolicyGraph(; - stages = num_periods, + stages = 24, sense = :Min, lower_bound = 0.0, optimizer = HiGHS.Optimizer, ) do subproblem, t - @variables(subproblem, begin - 0 <= x_volume[1:num_battery] <= 8, SDDP.State, (initial_value = 1) - u_u[1:num_battery] >= 0 - u_v[1:num_battery] >= 0 - 0 <= u_x[i in 1:num_thermal] <= c_q[i] - u_slack >= 0 - w_noise - end) - @constraints(subproblem, begin - battery[j in 1:num_battery], - x_volume[j].out == x_volume[j].in + c_eta[j] * u_v[j] - u_u[j] - sum(u_x) + sum(u_u) - sum(u_v) + u_slack == df_demand[t] + w_noise - end) - O = [-2.5, -1.5, -0.5, 0.5, 1.5, 2.5] - SDDP.parameterize(subproblem, O) do omega - JuMP.fix(w_noise, omega) - return + @variable( + subproblem, + 0 <= x_volume[1:2] <= 8, + SDDP.State, + initial_value = 1, + ) + @variable(subproblem, u_u[1:2] >= 0) + @variable(subproblem, u_v[1:2] >= 0) + @variable(subproblem, 0 <= u_x[1:5] <= 5) + @variable(subproblem, u_slack >= 0) + @variable(subproblem, w_noise) + @constraint( + subproblem, + [j in 1:2], + x_volume[j].out == x_volume[j].in + c_eta[j] * u_v[j] - u_u[j], + ) + @constraint( + subproblem, + sum(u_x) + sum(u_u) - sum(u_v) + u_slack == df_demand[t] + w_noise, + ) + SDDP.parameterize(subproblem, [-2.5, -1.5, -0.5, 0.5, 1.5, 2.5]) do w + return JuMP.fix(w_noise, w) end - @stageobjective(subproblem, c_pt' * u_x + u_slack * c_VOLL) + @stageobjective(subproblem, c_pt' * u_x + 35 * u_slack) return end - SDDP.train( - model; - iteration_limit = 100, - parallel_scheme = SDDP.Threaded(), - ) + SDDP.train(model; iteration_limit = 100, parallel_scheme = SDDP.Threaded()) thread_ids_seen = Set{Int}(log.pid for log in model.most_recent_training_results.log) - if Threads.nthreads() == 1 - @test length(thread_ids_seen) == 1 - else - @test length(thread_ids_seen) > 1 - end + min_threads = Threads.nthreads() > 1 ? 1 : 2 + @test min_threads <= length(thread_ids_seen) <= Threads.nthreads() + recorder = Dict{Symbol,Function}(:thread_id => sp -> Threads.threadid()) simulations = SDDP.simulate( model, 100; parallel_scheme = SDDP.Threaded(), - custom_recorders = Dict{Symbol, Function}( - :thread_id => sp -> Threads.threadid(), - ), + custom_recorders = recorder, ) - thread_ids_seen = Set{Int}() - for sim in simulations - thread_ids = unique(data[:thread_id] for data in sim) - @test length(thread_ids) == 1 - push!(thread_ids_seen, only(thread_ids)) - end - if Threads.nthreads() == 1 - @test length(thread_ids_seen) == 1 - else - @test length(thread_ids_seen) > 1 - end + thread_ids_seen = + Set{Int}(data[:thread_id] for sim in simulations, data in sim) + min_threads = Threads.nthreads() > 1 ? 1 : 2 + @test min_threads <= length(thread_ids_seen) <= Threads.nthreads() return end From 5852e1bea31476497d83f5a826899fbca66b279f Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 10:47:04 +1200 Subject: [PATCH 07/13] Update --- test/plugins/threaded.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugins/threaded.jl b/test/plugins/threaded.jl index 45b8ddaf0..ab5b00642 100644 --- a/test/plugins/threaded.jl +++ b/test/plugins/threaded.jl @@ -41,7 +41,7 @@ function test_threaded() SDDP.train(model; iteration_limit = 100, parallel_scheme = SDDP.Threaded()) thread_ids_seen = Set{Int}(log.pid for log in model.most_recent_training_results.log) - min_threads = Threads.nthreads() > 1 ? 1 : 2 + min_threads = Threads.nthreads() == 1 ? 1 : 2 @test min_threads <= length(thread_ids_seen) <= Threads.nthreads() recorder = Dict{Symbol,Function}(:thread_id => sp -> Threads.threadid()) simulations = SDDP.simulate( From 547f9b4209d337b24d2924cb057033a6cfa04845 Mon Sep 17 00:00:00 2001 From: Oscar Dowson Date: Wed, 24 Jul 2024 11:02:16 +1200 Subject: [PATCH 08/13] Update test/plugins/threaded.jl --- test/plugins/threaded.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugins/threaded.jl b/test/plugins/threaded.jl index ab5b00642..f10840157 100644 --- a/test/plugins/threaded.jl +++ b/test/plugins/threaded.jl @@ -51,7 +51,7 @@ function test_threaded() custom_recorders = recorder, ) thread_ids_seen = - Set{Int}(data[:thread_id] for sim in simulations, data in sim) + Set{Int}(data[:thread_id] for sim in simulations for data in sim) min_threads = Threads.nthreads() > 1 ? 1 : 2 @test min_threads <= length(thread_ids_seen) <= Threads.nthreads() return From 75e96feecc5f7912ce8bd209b5476a9711bd36f8 Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 14:09:46 +1200 Subject: [PATCH 09/13] Update --- src/algorithm.jl | 17 ++++++++++------- src/plugins/duality_handlers.jl | 29 +++++++---------------------- test/plugins/threaded.jl | 5 +++++ 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index ec2f925d6..a0a0cef8a 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -520,10 +520,6 @@ function backward_pass( objective_states::Vector{NTuple{N,Float64}}, belief_states::Vector{Tuple{Int,Dict{T,Float64}}}, ) where {T,NoiseType,N} - @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin - restore_duality = - prepare_backward_pass(model, options.duality_handler, options) - end # TODO(odow): improve storage type. cuts = Dict{T,Vector{Any}}(index => Any[] for index in keys(model.nodes)) for index in length(scenario_path):-1:1 @@ -548,6 +544,7 @@ function backward_pass( options.backward_sampling_scheme, scenario_path[1:index], options.duality_handler, + options, ) end # We need to refine our estimate at all nodes in the partition. @@ -588,6 +585,7 @@ function backward_pass( options.backward_sampling_scheme, scenario_path[1:index], options.duality_handler, + options, ) new_cuts = refine_bellman_function( model, @@ -628,9 +626,6 @@ function backward_pass( end end end - @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin - restore_duality() - end return cuts end @@ -667,6 +662,7 @@ function solve_all_children( backward_sampling_scheme::AbstractBackwardSamplingScheme, scenario_path, duality_handler::Union{Nothing,AbstractDualityHandler}, + options, ) where {T} length_scenario_path = length(scenario_path) for child in node.children @@ -675,6 +671,10 @@ function solve_all_children( end child_node = model[child.term] lock(child_node.lock) # LOCK-ID-004 + @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin + restore_duality = + prepare_backward_pass(node, options.duality_handler, options) + end for noise in sample_backward_noise_terms_with_state( backward_sampling_scheme, child_node, @@ -731,6 +731,9 @@ function solve_all_children( length(items.duals) end end + @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin + restore_duality() + end unlock(child_node.lock) # LOCK-ID-004 end if length(scenario_path) == length_scenario_path diff --git a/src/plugins/duality_handlers.jl b/src/plugins/duality_handlers.jl index df32682c2..80d7ab1a5 100644 --- a/src/plugins/duality_handlers.jl +++ b/src/plugins/duality_handlers.jl @@ -61,24 +61,6 @@ SDDiP(args...; kwargs...) = _deprecate_integrality_handler() ContinuousRelaxation(args...; kwargs...) = _deprecate_integrality_handler() -function prepare_backward_pass( - model::PolicyGraph, - duality_handler::AbstractDualityHandler, - options::Options, -) - undo = Function[] - for (_, node) in model.nodes - push!(undo, prepare_backward_pass(node, duality_handler, options)) - end - function undo_relax() - for f in undo - f() - end - return - end - return undo_relax -end - function get_dual_solution(node::Node, ::Nothing) return JuMP.objective_value(node.subproblem), Dict{Symbol,Float64}() end @@ -351,8 +333,10 @@ focus more on the more-recent rewards. mutable struct BanditDuality <: AbstractDualityHandler arms::Vector{_BanditArm} last_arm_index::Int + logs_seen::Int + function BanditDuality(args::AbstractDualityHandler...) - return new(_BanditArm[_BanditArm(arg, Float64[]) for arg in args], 1) + return new(_BanditArm[_BanditArm(arg, Float64[]) for arg in args], 1, 0) end end @@ -404,15 +388,16 @@ function _update_rewards(handler::BanditDuality, log::Vector{Log}) end function prepare_backward_pass( - model::PolicyGraph, + node::Node, handler::BanditDuality, options::Options, ) - if length(options.log) > 1 + if length(options.log) > handler.logs_seen _update_rewards(handler, options.log) + handler.logs_seen = length(options.log) end arm = _choose_best_arm(handler) - return prepare_backward_pass(model, arm.handler, options) + return prepare_backward_pass(node, arm.handler, options) end function get_dual_solution(node::Node, handler::BanditDuality) diff --git a/test/plugins/threaded.jl b/test/plugins/threaded.jl index f10840157..822e3c227 100644 --- a/test/plugins/threaded.jl +++ b/test/plugins/threaded.jl @@ -4,6 +4,11 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. function test_threaded() + # We should test that JULIA_NUM_THREADS is set in CI jobs + if get(ENV, "CI", "false") == "true" + @test get(ENV, "JULIA_NUM_THREADS", 0) == Threads.nthreads() + @test Threads.nthreads() > 1 + end c_eta, c_pt = [0.8, 0.5], [2, 5, 8, 11, 14] df_demand = rand(10:10:60, 24) model = SDDP.LinearPolicyGraph(; From 9854f3e4c94481607d1fd6379a82555fef2d9ad2 Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 14:21:53 +1200 Subject: [PATCH 10/13] Update --- src/algorithm.jl | 7 +++++-- test/plugins/threaded.jl | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index a0a0cef8a..758d9ca8b 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -672,8 +672,11 @@ function solve_all_children( child_node = model[child.term] lock(child_node.lock) # LOCK-ID-004 @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin - restore_duality = - prepare_backward_pass(node, options.duality_handler, options) + restore_duality = prepare_backward_pass( + child_node, + options.duality_handler, + options, + ) end for noise in sample_backward_noise_terms_with_state( backward_sampling_scheme, diff --git a/test/plugins/threaded.jl b/test/plugins/threaded.jl index 822e3c227..4ea04957f 100644 --- a/test/plugins/threaded.jl +++ b/test/plugins/threaded.jl @@ -25,7 +25,7 @@ function test_threaded() ) @variable(subproblem, u_u[1:2] >= 0) @variable(subproblem, u_v[1:2] >= 0) - @variable(subproblem, 0 <= u_x[1:5] <= 5) + @variable(subproblem, 0 <= u_x[1:5] <= 5, Int) @variable(subproblem, u_slack >= 0) @variable(subproblem, w_noise) @constraint( From 4d9b9f7d2576acc658ae572300b50af338e909ed Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 14:25:05 +1200 Subject: [PATCH 11/13] Update --- test/plugins/duality_handlers.jl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/plugins/duality_handlers.jl b/test/plugins/duality_handlers.jl index e6494266a..1e1b08f34 100644 --- a/test/plugins/duality_handlers.jl +++ b/test/plugins/duality_handlers.jl @@ -20,6 +20,24 @@ function runtests() return end +function SDDP.prepare_backward_pass( + model::SDDP.PolicyGraph, + duality_handler::SDDP.AbstractDualityHandler, + options::SDDP.Options, +) + undo = Function[] + for (_, node) in model.nodes + push!(undo, SDDP.prepare_backward_pass(node, duality_handler, options)) + end + function undo_relax() + for f in undo + f() + end + return + end + return undo_relax +end + # Single-stage model helps set up a node and subproblem to test dual # calculations function easy_single_stage(duality_handler) From 14cffcd5b8534aaeac29ed71d7542509f1280b95 Mon Sep 17 00:00:00 2001 From: Oscar Dowson Date: Wed, 24 Jul 2024 14:36:34 +1200 Subject: [PATCH 12/13] Update threaded.jl --- test/plugins/threaded.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/plugins/threaded.jl b/test/plugins/threaded.jl index 4ea04957f..5af855d77 100644 --- a/test/plugins/threaded.jl +++ b/test/plugins/threaded.jl @@ -6,7 +6,8 @@ function test_threaded() # We should test that JULIA_NUM_THREADS is set in CI jobs if get(ENV, "CI", "false") == "true" - @test get(ENV, "JULIA_NUM_THREADS", 0) == Threads.nthreads() + num_threads = get(ENV, "JULIA_NUM_THREADS", "0") + @test parse(Int, num_threads) == Threads.nthreads() @test Threads.nthreads() > 1 end c_eta, c_pt = [0.8, 0.5], [2, 5, 8, 11, 14] From f02dc91cfe9c8bc09a29a39a30432c3d405f859d Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 15:08:55 +1200 Subject: [PATCH 13/13] Update --- src/plugins/duality_handlers.jl | 2 +- test/plugins/duality_handlers.jl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/duality_handlers.jl b/src/plugins/duality_handlers.jl index 80d7ab1a5..0f0d9fd99 100644 --- a/src/plugins/duality_handlers.jl +++ b/src/plugins/duality_handlers.jl @@ -336,7 +336,7 @@ mutable struct BanditDuality <: AbstractDualityHandler logs_seen::Int function BanditDuality(args::AbstractDualityHandler...) - return new(_BanditArm[_BanditArm(arg, Float64[]) for arg in args], 1, 0) + return new(_BanditArm[_BanditArm(arg, Float64[]) for arg in args], 1, 1) end end diff --git a/test/plugins/duality_handlers.jl b/test/plugins/duality_handlers.jl index 1e1b08f34..2153d310c 100644 --- a/test/plugins/duality_handlers.jl +++ b/test/plugins/duality_handlers.jl @@ -428,7 +428,7 @@ function test_BanditDuality_show() return end -function test_BanditDuality_show() +function test_BanditDuality_eval() model = SDDP.LinearPolicyGraph( stages = 2, lower_bound = -100.0,