Skip to content

Commit

Permalink
EXPERIMENTAL - IJobParallelFor
Browse files Browse the repository at this point in the history
  • Loading branch information
genaray committed Feb 12, 2024
1 parent 210f528 commit 7ffa0ee
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 63 deletions.
14 changes: 7 additions & 7 deletions src/Arch.SourceGen/Queries/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public struct ForEachJob<{{generics}}> : IChunkJob
public ForEach<{{generics}}> ForEach;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Execute(int index, ref Chunk chunk)
public void Execute(ref Chunk chunk)
{
var chunkSize = chunk.Size;
{{getFirstElement}}

for (var entityIndex = chunkSize - 1; entityIndex >= 0; --entityIndex)
foreach(var entityIndex in chunk)
{
{{getComponents}}
ForEach({{insertParams}});
Expand Down Expand Up @@ -63,7 +63,7 @@ public struct ForEachWithEntityJob<{{generics}}> : IChunkJob
public ForEachWithEntity<{{generics}}> ForEach;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Execute(int index, ref Chunk chunk)
public void Execute(ref Chunk chunk)
{
ref var entityFirstElement = ref chunk.Entity(0);
{{getFirstElement}}
Expand Down Expand Up @@ -104,12 +104,12 @@ public struct IForEachJob<T,{{generics}}> : IChunkJob where T : struct, IForEach
public T ForEach;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Execute(int index, ref Chunk chunk)
public void Execute(ref Chunk chunk)
{
var chunkSize = chunk.Size;
{{getFirstElement}}

for (var entityIndex = chunkSize - 1; entityIndex >= 0; --entityIndex)
foreach(var entityIndex in chunk)
{
{{getComponents}}
ForEach.Update({{insertParams}});
Expand Down Expand Up @@ -143,13 +143,13 @@ public struct IForEachWithEntityJob<T,{{generics}}> : IChunkJob where T : struct
public T ForEach;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Execute(int index, ref Chunk chunk)
public void Execute(ref Chunk chunk)
{
var chunkSize = chunk.Size;
ref var entityFirstElement = ref chunk.Entity(0);
{{getFirstElement}}

for (var entityIndex = chunkSize - 1; entityIndex >= 0; --entityIndex)
foreach(var entityIndex in chunk)
{
var entity = Unsafe.Add(ref entityFirstElement, entityIndex);
{{getComponents}}
Expand Down
80 changes: 50 additions & 30 deletions src/Arch/Core/Jobs/Jobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public Range(int start, int length)
/// </summary>
public interface IChunkJob
{
public void Execute(int index, ref Chunk chunk);
public void Execute(ref Chunk chunk);
}

/// <summary>
Expand All @@ -82,10 +82,9 @@ public struct ForEachJob : IChunkJob
/// <summary>
/// Called on each <see cref="Chunk"/> and iterates over all <see cref="Entity"/>'s to call the <see cref="ForEach"/> callback for each.
/// </summary>
/// <param name="index">The chunk index.</param>
/// <param name="chunk">A reference to the chunk which is currently processed.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public readonly void Execute(int index, ref Chunk chunk)
public readonly void Execute(ref Chunk chunk)
{
ref var entityFirstElement = ref chunk.Entity(0);
foreach(var entityIndex in chunk)
Expand Down Expand Up @@ -116,7 +115,7 @@ public struct IForEachJob<T> : IChunkJob where T : IForEach
/// <param name="index">The chunk index.</param>
/// <param name="chunk">A reference to the chunk which is currently processed.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Execute(int index, ref Chunk chunk)
public void Execute(ref Chunk chunk)
{
ref var entityFirstElement = ref chunk.Entity(0);
foreach(var entityIndex in chunk)
Expand All @@ -132,35 +131,27 @@ public void Execute(int index, ref Chunk chunk)
/// is an <see cref="IJob"/> that can be scheduled using the <see cref="JobScheduler"/> and the <see cref="World"/> to iterate multithreaded over chunks.
/// </summary>
/// <typeparam name="T">The generic type that implements the <see cref="IChunkJob"/> interface.</typeparam>
public sealed class ChunkIterationJob<T> : IJob where T : IChunkJob
public sealed class ChunkIterationJob<T> : IJobParallelFor where T : IChunkJob
{

/// <summary>
/// Initializes a new instance of the <see cref="ChunkIterationJob{T}"/> class.
/// Represents a section of chunk iteration from one archetype.
/// </summary>
public ChunkIterationJob()
private struct ChunkIterationPart
{
Chunks = Array.Empty<Chunk>();
public int Start;
public int Size;
public Chunk[]? Chunks;
}

/// <summary>
/// Initializes a new instance of the <see cref="ChunkIterationJob{T}"/> class.
/// </summary>
/// <param name="start">The start at which this job begins to process the <see cref="Chunks"/>.</param>
/// <param name="size">The size or lengths, how man <see cref="Chunks"/> this job will process.</param>
/// <param name="chunks">The <see cref="Chunk"/> array being processed.</param>
public ChunkIterationJob(int start, int size, Chunk[] chunks)
public ChunkIterationJob()
{
Start = start;
Size = size;
Chunks = chunks;
Parts = new List<ChunkIterationPart>();
}

/// <summary>
/// A <see cref="Chunk"/> array, this will be processed.
/// </summary>
public Chunk[] Chunks { get; set; }

/// <summary>
/// An instance of the generic type <typeparamref name="T"/>, being invoked upon each chunk.
/// </summary>
Expand All @@ -171,22 +162,51 @@ public ChunkIterationJob(int start, int size, Chunk[] chunks)
/// </summary>
public int Size { get; set; }

/// <summary>
/// The start index.
/// </summary>
public int Start;

private List<ChunkIterationPart> Parts { get; set; }

public int ThreadCount { get; } = Environment.ProcessorCount;
public int BatchSize { get; } = 16;

/// <summary>
/// Iterates over all <see cref="Chunks"/> between <see cref="Start"/> and <see cref="Size"/> and calls <see cref="Instance"/>.
/// Add an array of chunks to be processed by this job.
/// </summary>
public void Execute()
/// <param name="chunks">The chunks to add.</param>
/// <param name="start">The first chunk to process in <paramref name="chunks"/></param>
/// <param name="size">The amount of chunks to process in <paramref name="chunks"/></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void AddChunks(Chunk[] chunks, int start, int size)
{
ref var chunk = ref Chunks.DangerousGetReferenceAt(Start);
Parts.Add(new ChunkIterationPart{
Chunks = chunks,
Start = start,
Size = size
});
}

for (var chunkIndex = 0; chunkIndex < Size; chunkIndex++)
public void Execute(int index)
{
var sizeSoFar = 0;
for (var i = 0; i < Parts.Count; i++)
{
ref var currentChunk = ref Unsafe.Add(ref chunk, chunkIndex);
Instance?.Execute(Start + chunkIndex, ref currentChunk);
// If we're about to go over, we're ready to execute
var part = Parts[i];
if (sizeSoFar + part.Size > index)
{
// this had better be not null!
ref var chunk = ref part.Chunks!.DangerousGetReferenceAt(index - sizeSoFar + part.Start);
Instance?.Execute(ref chunk);
return;
}

sizeSoFar += part.Size;
}

throw new InvalidOperationException("Reached end of chunk, but could not find the correct index!");
}

public void Finish()
{
Parts.Clear();
}
}
47 changes: 22 additions & 25 deletions src/Arch/Core/Jobs/World.Jobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public partial class World
/// <summary>
/// A cache used for the parallel queries to prevent list allocations.
/// </summary>
internal List<IJob> JobsCache { get; set; }
internal List<IJobParallelFor> JobsCache { get; set; }

/// <summary>
/// Searches all matching <see cref="Entity"/>'s by a <see cref="QueryDescription"/> and calls the passed <see cref="ForEach"/> delegate.
Expand Down Expand Up @@ -97,42 +97,39 @@ public void InlineParallelChunkQuery<T>(in QueryDescription queryDescription, in
throw new Exception("JobScheduler was not initialized, create one instance of JobScheduler. This creates a singleton used for parallel iterations.");
}

if (!SharedJobScheduler.IsMainThread)
{
throw new Exception("JobScheduler must be called from MainThread.");
}

// Cast pool in an unsafe fast way and run the query.
var pool = JobMeta<ChunkIterationJob<T>>.Pool;
var query = Query(in queryDescription);

var pool = JobMeta<ChunkIterationJob<T>>.Pool;
var job = pool.Get();
job.Instance = innerJob;

var size = 0;
foreach (var archetype in query.GetArchetypeIterator())
{
var archetypeSize = archetype.ChunkCount;
var part = new RangePartitioner(Environment.ProcessorCount, archetypeSize);
foreach (var range in part)
{
var job = pool.Get();
job.Start = range.Start;
job.Size = range.Length;
job.Chunks = archetype.Chunks;
job.Instance = innerJob;

var jobHandle = SharedJobScheduler.Schedule(job);
JobsCache.Add(job);
JobHandles.Add(jobHandle);
job.AddChunks(archetype.Chunks, range.Start, range.Length);
size += range.Length;
}
}

// Schedule, flush, wait, return.
var handle = SharedJobScheduler.CombineDependencies(JobHandles.Span);
SharedJobScheduler.Flush();
handle.Complete();

for (var index = 0; index < JobsCache.Count; index++)
{
var job = Unsafe.As<ChunkIterationJob<T>>(JobsCache[index]);
pool.Return(job);
}
// Schedule, flush, wait, return.
var handle = SharedJobScheduler.Schedule(job, size);
SharedJobScheduler.Flush();
//handle.Complete();

JobHandles.Clear();
JobsCache.Clear();
}
pool.Return(job);
}

/*
/// <summary>
/// Finds all matching <see cref="Chunk"/>'s by a <see cref="QueryDescription"/> and calls an <see cref="IChunkJob"/> on them.
/// </summary>
Expand Down Expand Up @@ -179,5 +176,5 @@ public JobHandle ScheduleInlineParallelChunkQuery<T>(in QueryDescription queryDe
JobHandles.Clear();
return handle;
}
}*/
}
2 changes: 1 addition & 1 deletion src/Arch/Core/World.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private World(int id)

// Multithreading/Jobs.
JobHandles = new PooledList<JobHandle>(Environment.ProcessorCount);
JobsCache = new List<IJob>(Environment.ProcessorCount);
JobsCache = new List<IJobParallelFor>(Environment.ProcessorCount);
}

/// <summary>
Expand Down

0 comments on commit 7ffa0ee

Please sign in to comment.