Skip to content

Commit

Permalink
Use explicitly added ApplyDeferred stages when computing automatica…
Browse files Browse the repository at this point in the history
…lly inserted sync points. (#16782)

# Objective

- The previous implementation of automatically inserting sync points did
not consider explicitly added sync points. This created additional sync
points. For example:

```
A-B
C-D-E
```

If `A` and `B` needed a sync point, and `D` was an `ApplyDeferred`, an
additional sync point would be generated between `A` and `B`.

```
A-D2-B
C-D -E
```

This can result in the following system ordering:
```
A-D2-(B-C)-D-E
```
Where only `B` and `C` run in parallel. If we could reuse `D` as the
sync point, we would get the following ordering:
```
(A-C)-D-(B-E)
```
Now we have two more opportunities for parallelism!

## Solution

- In the first pass, we:
    - Compute the number of sync points before each node
- This was already happening but now we consider `ApplyDeferred` nodes
as creating a sync point.
- Pick an arbitrary explicit `ApplyDeferred` node for each "sync point
index" that we can (some required sync points may be missing!)
- In the second pass, we:
- For each edge, if two nodes have a different number of sync points
before them then there must be a sync point between them.
- Look for an explicit `ApplyDeferred`. If one exists, use it as the
sync point.
    - Otherwise, generate a new sync point.

I believe this should also gracefully handle changes to the
`ScheduleGraph`. Since automatically inserted sync points are inserted
as systems, they won't look any different to explicit sync points, so
they are also candidates for "reusing" sync points.

One thing this solution does not handle is "deduping" sync points. If
you add 10 sync points explicitly, there will be at least 10 sync
points. You could keep track of all the sync points at the same
"distance" and then hack apart the graph to dedup those, but that could
be a follow-up step (and it's more complicated since you have to worry
about transferring edges between nodes).

## Testing

- Added a test to test the feature.
-  The existing tests from all our crates still pass.

## Showcase

- Automatically inserted sync points can now reuse explicitly inserted
`ApplyDeferred` systems! Previously, Bevy would add new sync points
between systems, ignoring the explicitly added sync points. This would
reduce parallelism of systems in some situations. Now, the parallelism
has been improved!
  • Loading branch information
andriyDev authored Feb 24, 2025
1 parent 7c7b1e9 commit 7f14581
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 24 deletions.
114 changes: 91 additions & 23 deletions crates/bevy_ecs/src/schedule/auto_insert_apply_deferred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,41 +80,109 @@ impl ScheduleBuildPass for AutoInsertApplyDeferredPass {
let mut sync_point_graph = dependency_flattened.clone();
let topo = graph.topsort_graph(dependency_flattened, ReportCycles::Dependency)?;

fn set_has_conditions(graph: &ScheduleGraph, node: NodeId) -> bool {
!graph.set_conditions_at(node).is_empty()
|| graph
.hierarchy()
.graph()
.edges_directed(node, Direction::Incoming)
.any(|(parent, _)| set_has_conditions(graph, parent))
}

fn system_has_conditions(graph: &ScheduleGraph, node: NodeId) -> bool {
assert!(node.is_system());
!graph.system_conditions[node.index()].is_empty()
|| graph
.hierarchy()
.graph()
.edges_directed(node, Direction::Incoming)
.any(|(parent, _)| set_has_conditions(graph, parent))
}

let mut system_has_conditions_cache = HashMap::default();

fn is_valid_explicit_sync_point(
graph: &ScheduleGraph,
system: NodeId,
system_has_conditions_cache: &mut HashMap<usize, bool>,
) -> bool {
let index = system.index();
is_apply_deferred(graph.systems[index].get().unwrap())
&& !*system_has_conditions_cache
.entry(index)
.or_insert_with(|| system_has_conditions(graph, system))
}

// calculate the number of sync points each sync point is from the beginning of the graph
// use the same sync point if the distance is the same
let mut distances: HashMap<usize, Option<u32>> =
let mut distances: HashMap<usize, u32> =
HashMap::with_capacity_and_hasher(topo.len(), Default::default());
// Keep track of any explicit sync nodes for a specific distance.
let mut distance_to_explicit_sync_node: HashMap<u32, NodeId> = HashMap::default();
for node in &topo {
let add_sync_after = graph.systems[node.index()].get().unwrap().has_deferred();
let node_system = graph.systems[node.index()].get().unwrap();

let node_needs_sync =
if is_valid_explicit_sync_point(graph, *node, &mut system_has_conditions_cache) {
distance_to_explicit_sync_node.insert(
distances.get(&node.index()).copied().unwrap_or_default(),
*node,
);

// This node just did a sync, so the only reason to do another sync is if one was
// explicitly scheduled afterwards.
false
} else {
node_system.has_deferred()
};

for target in dependency_flattened.neighbors_directed(*node, Direction::Outgoing) {
let add_sync_on_edge = add_sync_after
&& !is_apply_deferred(graph.systems[target.index()].get().unwrap())
&& !self.no_sync_edges.contains(&(*node, target));

let weight = if add_sync_on_edge { 1 } else { 0 };

let edge_needs_sync = node_needs_sync
&& !self.no_sync_edges.contains(&(*node, target))
|| is_valid_explicit_sync_point(
graph,
target,
&mut system_has_conditions_cache,
);

let weight = if edge_needs_sync { 1 } else { 0 };

// Use whichever distance is larger, either the current distance, or the distance to
// the parent plus the weight.
let distance = distances
.get(&target.index())
.unwrap_or(&None)
.or(Some(0))
.map(|distance| {
distance.max(
distances.get(&node.index()).unwrap_or(&None).unwrap_or(0) + weight,
)
});
.copied()
.unwrap_or_default()
.max(distances.get(&node.index()).copied().unwrap_or_default() + weight);

distances.insert(target.index(), distance);
}
}

if add_sync_on_edge {
let sync_point =
self.get_sync_point(graph, distances[&target.index()].unwrap());
sync_point_graph.add_edge(*node, sync_point);
sync_point_graph.add_edge(sync_point, target);
// Find any edges which have a different number of sync points between them and make sure
// there is a sync point between them.
for node in &topo {
let node_distance = distances.get(&node.index()).copied().unwrap_or_default();
for target in dependency_flattened.neighbors_directed(*node, Direction::Outgoing) {
let target_distance = distances.get(&target.index()).copied().unwrap_or_default();
if node_distance == target_distance {
// These nodes are the same distance, so they don't need an edge between them.
continue;
}

// edge is now redundant
sync_point_graph.remove_edge(*node, target);
if is_apply_deferred(graph.systems[target.index()].get().unwrap()) {
// We don't need to insert a sync point since ApplyDeferred is a sync point
// already!
continue;
}
let sync_point = distance_to_explicit_sync_node
.get(&target_distance)
.copied()
.unwrap_or_else(|| self.get_sync_point(graph, target_distance));

sync_point_graph.add_edge(*node, sync_point);
sync_point_graph.add_edge(sync_point, target);

sync_point_graph.remove_edge(*node, target);
}
}

Expand Down
124 changes: 123 additions & 1 deletion crates/bevy_ecs/src/schedule/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,26 @@ impl ScheduleGraph {
.unwrap()
}

/// Returns the conditions for the set at the given [`NodeId`], if it exists.
pub fn get_set_conditions_at(&self, id: NodeId) -> Option<&[BoxedCondition]> {
if !id.is_set() {
return None;
}
self.system_set_conditions
.get(id.index())
.map(Vec::as_slice)
}

/// Returns the conditions for the set at the given [`NodeId`].
///
/// Panics if it doesn't exist.
#[track_caller]
pub fn set_conditions_at(&self, id: NodeId) -> &[BoxedCondition] {
self.get_set_conditions_at(id)
.ok_or_else(|| format!("set with id {id:?} does not exist in this Schedule"))
.unwrap()
}

/// Returns an iterator over all systems in this schedule, along with the conditions for each system.
pub fn systems(&self) -> impl Iterator<Item = (NodeId, &ScheduleSystem, &[BoxedCondition])> {
self.systems
Expand Down Expand Up @@ -2036,7 +2056,7 @@ mod tests {
use bevy_ecs_macros::ScheduleLabel;

use crate::{
prelude::{Res, Resource},
prelude::{ApplyDeferred, Res, Resource},
schedule::{
tests::ResMut, IntoSystemConfigs, IntoSystemSetConfigs, Schedule,
ScheduleBuildSettings, SystemSet,
Expand Down Expand Up @@ -2088,6 +2108,108 @@ mod tests {
assert_eq!(schedule.executable.systems.len(), 3);
}

#[test]
fn explicit_sync_point_used_as_auto_sync_point() {
let mut schedule = Schedule::default();
let mut world = World::default();
schedule.add_systems(
(
|mut commands: Commands| commands.insert_resource(Resource1),
|_: Res<Resource1>| {},
)
.chain(),
);
schedule.add_systems((|| {}, ApplyDeferred, || {}).chain());
schedule.run(&mut world);

// No sync point was inserted, since we can reuse the explicit sync point.
assert_eq!(schedule.executable.systems.len(), 5);
}

#[test]
fn conditional_explicit_sync_point_not_used_as_auto_sync_point() {
let mut schedule = Schedule::default();
let mut world = World::default();
schedule.add_systems(
(
|mut commands: Commands| commands.insert_resource(Resource1),
|_: Res<Resource1>| {},
)
.chain(),
);
schedule.add_systems((|| {}, ApplyDeferred.run_if(|| false), || {}).chain());
schedule.run(&mut world);

// A sync point was inserted, since the explicit sync point is not always run.
assert_eq!(schedule.executable.systems.len(), 6);
}

#[test]
fn conditional_explicit_sync_point_not_used_as_auto_sync_point_condition_on_chain() {
let mut schedule = Schedule::default();
let mut world = World::default();
schedule.add_systems(
(
|mut commands: Commands| commands.insert_resource(Resource1),
|_: Res<Resource1>| {},
)
.chain(),
);
schedule.add_systems((|| {}, ApplyDeferred, || {}).chain().run_if(|| false));
schedule.run(&mut world);

// A sync point was inserted, since the explicit sync point is not always run.
assert_eq!(schedule.executable.systems.len(), 6);
}

#[test]
fn conditional_explicit_sync_point_not_used_as_auto_sync_point_condition_on_system_set() {
#[derive(SystemSet, Debug, Clone, PartialEq, Eq, Hash)]
struct Set;

let mut schedule = Schedule::default();
let mut world = World::default();
schedule.configure_sets(Set.run_if(|| false));
schedule.add_systems(
(
|mut commands: Commands| commands.insert_resource(Resource1),
|_: Res<Resource1>| {},
)
.chain(),
);
schedule.add_systems((|| {}, ApplyDeferred.in_set(Set), || {}).chain());
schedule.run(&mut world);

// A sync point was inserted, since the explicit sync point is not always run.
assert_eq!(schedule.executable.systems.len(), 6);
}

#[test]
fn conditional_explicit_sync_point_not_used_as_auto_sync_point_condition_on_nested_system_set()
{
#[derive(SystemSet, Debug, Clone, PartialEq, Eq, Hash)]
struct Set1;
#[derive(SystemSet, Debug, Clone, PartialEq, Eq, Hash)]
struct Set2;

let mut schedule = Schedule::default();
let mut world = World::default();
schedule.configure_sets(Set2.run_if(|| false));
schedule.configure_sets(Set1.in_set(Set2));
schedule.add_systems(
(
|mut commands: Commands| commands.insert_resource(Resource1),
|_: Res<Resource1>| {},
)
.chain(),
);
schedule.add_systems((|| {}, ApplyDeferred, || {}).chain().in_set(Set1));
schedule.run(&mut world);

// A sync point was inserted, since the explicit sync point is not always run.
assert_eq!(schedule.executable.systems.len(), 6);
}

#[test]
fn merges_sync_points_into_one() {
let mut schedule = Schedule::default();
Expand Down

0 comments on commit 7f14581

Please sign in to comment.