Skip to content

Commit

Permalink
Update copy logic to use dedicated threads.
Browse files Browse the repository at this point in the history
Also updated some checks to help avoid hitting the file system if possible.
  • Loading branch information
Erarndt committed Jan 10, 2025
1 parent ab7c289 commit d0500f7
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 69 deletions.
15 changes: 12 additions & 3 deletions src/Framework/NativeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1522,9 +1522,18 @@ private static unsafe int GetCurrentDirectoryWin32(int nBufferLength, char* lpBu
[SupportedOSPlatform("windows")]
internal static unsafe string GetFullPath(string path)
{
int bufferSize = GetFullPathWin32(path, 0, null, IntPtr.Zero);
char* buffer = stackalloc char[bufferSize];
int fullPathLength = GetFullPathWin32(path, bufferSize, buffer, IntPtr.Zero);
char* buffer = stackalloc char[MAX_PATH];
int fullPathLength = GetFullPathWin32(path, MAX_PATH, buffer, IntPtr.Zero);

// if user is using long paths we could need to allocate a larger buffer
if (fullPathLength > MAX_PATH)
{
char* newBuffer = stackalloc char[fullPathLength];
fullPathLength = GetFullPathWin32(path, fullPathLength, newBuffer, IntPtr.Zero);

buffer = newBuffer;
}

// Avoid creating new strings unnecessarily
return AreStringsEqual(buffer, fullPathLength, path) ? path : new string(buffer, startIndex: 0, length: fullPathLength);
}
Expand Down
185 changes: 119 additions & 66 deletions src/Tasks/Copy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
using Microsoft.Build.Shared.FileSystem;
using Microsoft.Build.Utilities;

using TPLTask = System.Threading.Tasks.Task;

#nullable disable

namespace Microsoft.Build.Tasks
Expand Down Expand Up @@ -44,6 +46,33 @@ public class Copy : TaskExtension, IIncrementalTask, ICancelableTask
// taking up the whole threadpool esp. when hosted in Visual Studio. IOW we use a specific number
// instead of int.MaxValue.
private static readonly int DefaultCopyParallelism = NativeMethodsShared.GetLogicalCoreCount() > 4 ? 6 : 4;
private static Thread[] copyThreads;
private static AutoResetEvent[] copyThreadSignals;
private AutoResetEvent _signalCopyTasksCompleted;

private static ConcurrentQueue<Action> _copyActionQueue = new ConcurrentQueue<Action>();

private static void InitializeCopyThreads()
{
lock (_copyActionQueue)
{
if (copyThreads == null)
{
copyThreadSignals = new AutoResetEvent[DefaultCopyParallelism];
copyThreads = new Thread[DefaultCopyParallelism];
for (int i = 0; i < copyThreads.Length; ++i)
{
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
copyThreadSignals[i] = autoResetEvent;
Thread newThread = new Thread(ParallelCopyTask);
newThread.IsBackground = true;
newThread.Name = "Parallel Copy Thread";
newThread.Start(autoResetEvent);
copyThreads[i] = newThread;
}
}
}
}

/// <summary>
/// Constructor.
Expand All @@ -63,6 +92,8 @@ public Copy()
RemovingReadOnlyAttribute = Log.GetResourceMessage("Copy.RemovingReadOnlyAttribute");
SymbolicLinkComment = Log.GetResourceMessage("Copy.SymbolicLinkComment");
}

_signalCopyTasksCompleted = new AutoResetEvent(false);
}

private static string CreatesDirectory;
Expand All @@ -79,7 +110,7 @@ public Copy()
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

// Bool is just a placeholder, we're mainly interested in a threadsafe key set.
private readonly ConcurrentDictionary<string, bool> _directoriesKnownToExist = new ConcurrentDictionary<string, bool>(StringComparer.OrdinalIgnoreCase);
private readonly ConcurrentDictionary<string, bool> _directoriesKnownToExist = new ConcurrentDictionary<string, bool>(DefaultCopyParallelism, DefaultCopyParallelism, StringComparer.OrdinalIgnoreCase);

/// <summary>
/// Force the copy to retry even when it hits ERROR_ACCESS_DENIED -- normally we wouldn't retry in this case since
Expand Down Expand Up @@ -288,6 +319,7 @@ private void LogAlwaysRetryDiagnosticFromResources(string messageResourceName, p
}

if (!Traits.Instance.EscapeHatches.CopyWithoutDelete &&
(UseHardlinksIfPossible || UseSymboliclinksIfPossible) &&
destinationFileState.FileExists &&
!destinationFileState.IsReadOnly)
{
Expand Down Expand Up @@ -507,6 +539,22 @@ private bool CopySingleThreaded(
return success;
}

private static void ParallelCopyTask(object state)
{
AutoResetEvent autoResetEvent = (AutoResetEvent)state;
while (true)
{
if (_copyActionQueue.TryDequeue(out Action copyAction))
{
copyAction();
}
else
{
autoResetEvent.WaitOne();
}
}
}

/// <summary>
/// Parallelize I/O with the same semantics as the single-threaded copy method above.
/// ResolveAssemblyReferences tends to generate longer and longer lists of files to send
Expand Down Expand Up @@ -559,77 +607,23 @@ private bool CopyParallel(

// Lockless flags updated from each thread - each needs to be a processor word for atomicity.
var successFlags = new IntPtr[DestinationFiles.Length];
var actionBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = parallelism,
CancellationToken = _cancellationTokenSource.Token
};
var partitionCopyActionBlock = new ActionBlock<List<int>>(
async (List<int> partition) =>
{
// Break from synchronous thread context of caller to get onto thread pool thread.
await System.Threading.Tasks.Task.Yield();

for (int partitionIndex = 0; partitionIndex < partition.Count && !_cancellationTokenSource.IsCancellationRequested; partitionIndex++)
{
int fileIndex = partition[partitionIndex];
ITaskItem sourceItem = SourceFiles[fileIndex];
ITaskItem destItem = DestinationFiles[fileIndex];
string sourcePath = sourceItem.ItemSpec;

// Check if we just copied from this location to the destination, don't copy again.
MSBuildEventSource.Log.CopyUpToDateStart(destItem.ItemSpec);
bool copyComplete = partitionIndex > 0 &&
String.Equals(
sourcePath,
SourceFiles[partition[partitionIndex - 1]].ItemSpec,
StringComparison.OrdinalIgnoreCase);

if (!copyComplete)
{
if (DoCopyIfNecessary(
new FileState(sourceItem.ItemSpec),
new FileState(destItem.ItemSpec),
copyFile))
{
copyComplete = true;
}
else
{
// Thread race to set outer variable but they race to set the same (false) value.
success = false;
}
}
else
{
MSBuildEventSource.Log.CopyUpToDateStop(destItem.ItemSpec, true);
}
ConcurrentQueue<List<int>> partitionQueue = new ConcurrentQueue<List<int>>(partitionsByDestination.Values);

if (copyComplete)
{
sourceItem.CopyMetadataTo(destItem);
successFlags[fileIndex] = (IntPtr)1;
}
}
},
actionBlockOptions);
int activeCopyThreads = DefaultCopyParallelism;
for (int i = 0; i < DefaultCopyParallelism; ++i)
{
_copyActionQueue.Enqueue(ProcessPartition);
}

foreach (List<int> partition in partitionsByDestination.Values)
InitializeCopyThreads();

for (int i = 0; i < DefaultCopyParallelism; ++i)
{
bool partitionAccepted = partitionCopyActionBlock.Post(partition);
if (_cancellationTokenSource.IsCancellationRequested)
{
break;
}
else if (!partitionAccepted)
{
// Retail assert...
ErrorUtilities.ThrowInternalError("Failed posting a file copy to an ActionBlock. Should not happen with block at max int capacity.");
}
copyThreadSignals[i].Set();
}

partitionCopyActionBlock.Complete();
partitionCopyActionBlock.Completion.GetAwaiter().GetResult();
_signalCopyTasksCompleted.WaitOne();

// Assemble an in-order list of destination items that succeeded.
destinationFilesSuccessfullyCopied = new List<ITaskItem>(DestinationFiles.Length);
Expand All @@ -642,6 +636,65 @@ private bool CopyParallel(
}

return success;

void ProcessPartition()
{
try
{
while (partitionQueue.TryDequeue(out List<int> partition))
{
for (int partitionIndex = 0; partitionIndex < partition.Count && !_cancellationTokenSource.IsCancellationRequested; partitionIndex++)
{
int fileIndex = partition[partitionIndex];
ITaskItem sourceItem = SourceFiles[fileIndex];
ITaskItem destItem = DestinationFiles[fileIndex];
string sourcePath = sourceItem.ItemSpec;

// Check if we just copied from this location to the destination, don't copy again.
MSBuildEventSource.Log.CopyUpToDateStart(destItem.ItemSpec);
bool copyComplete = partitionIndex > 0 &&
String.Equals(
sourcePath,
SourceFiles[partition[partitionIndex - 1]].ItemSpec,
StringComparison.OrdinalIgnoreCase);

if (!copyComplete)
{
if (DoCopyIfNecessary(
new FileState(sourceItem.ItemSpec),
new FileState(destItem.ItemSpec),
copyFile))
{
copyComplete = true;
}
else
{
// Thread race to set outer variable but they race to set the same (false) value.
success = false;
}
}
else
{
MSBuildEventSource.Log.CopyUpToDateStop(destItem.ItemSpec, true);
}

if (copyComplete)
{
sourceItem.CopyMetadataTo(destItem);
successFlags[fileIndex] = (IntPtr)1;
}
}
}
}
finally
{
int count = System.Threading.Interlocked.Decrement(ref activeCopyThreads);
if (count == 0)
{
_signalCopyTasksCompleted.Set();
}
}
}
}

private bool IsSourceSetEmpty()
Expand Down

0 comments on commit d0500f7

Please sign in to comment.