From 359eb50f65eddb16c3dac4f33ca83b62d2c4d93a Mon Sep 17 00:00:00 2001 From: Mohammad Ibrahim <109541540+v-imohammad@users.noreply.github.com> Date: Wed, 13 Dec 2023 15:09:58 -0800 Subject: [PATCH] Hotfix4.28.3 - remove condition exception/fixing race condition (#9742) * Removing conditional exception handling from FileLogger (#9739) * fixing race condition in worker shutdown (#9738) * updating common prop --------- Co-authored-by: Mathew Charles Co-authored-by: Brett Samblanet --- build/common.props | 2 +- src/WebJobs.Script/Diagnostics/FileLogger.cs | 2 +- .../RpcFunctionInvocationDispatcher.cs | 6 ++-- .../Rpc/IWebHostRpcWorkerChannelManager.cs | 2 +- .../Rpc/WebHostRpcWorkerChannelManager.cs | 28 ++++++++-------- .../Rpc/TestRpcWorkerChannelManager.cs | 2 +- .../WebHostRpcWorkerChannelManagerTests.cs | 33 +++++++++++++++++++ 7 files changed, 53 insertions(+), 22 deletions(-) diff --git a/build/common.props b/build/common.props index b35fbea512..919c99c4ac 100644 --- a/build/common.props +++ b/build/common.props @@ -5,7 +5,7 @@ latest 4 28 - 2 + 3 0 diff --git a/src/WebJobs.Script/Diagnostics/FileLogger.cs b/src/WebJobs.Script/Diagnostics/FileLogger.cs index 12a42acc53..b74fee74a6 100644 --- a/src/WebJobs.Script/Diagnostics/FileLogger.cs +++ b/src/WebJobs.Script/Diagnostics/FileLogger.cs @@ -93,7 +93,7 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except { _fileWriter.AppendLine(formattedMessage); } - catch (Exception ex) when (!ex.IsFatal()) + catch (Exception) { // Make sure the Logger doesn't throw if there are Exceptions (disk full, etc). } diff --git a/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs b/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs index b4c820ecd7..e8de96d00c 100644 --- a/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs +++ b/src/WebJobs.Script/Workers/Rpc/FunctionRegistration/RpcFunctionInvocationDispatcher.cs @@ -187,7 +187,7 @@ internal async void ShutdownWebhostLanguageWorkerChannels() await _webHostLanguageWorkerChannelManager?.ShutdownChannelsAsync(); } - private void SetDispatcherStateToInitialized(Dictionary> webhostLanguageWorkerChannel = null) + private void SetDispatcherStateToInitialized(IDictionary> webhostLanguageWorkerChannel = null) { // RanToCompletion indicates successful process startup if (State != FunctionInvocationDispatcherState.Initialized @@ -198,7 +198,7 @@ private void SetDispatcherStateToInitialized(Dictionary, Task> startAction, bool initializeDispatcher = false, Dictionary> webhostLanguageWorkerChannel = null, IEnumerable functionLanguages = null) + private void StartWorkerProcesses(int startIndex, Func, Task> startAction, bool initializeDispatcher = false, IDictionary> webhostLanguageWorkerChannel = null, IEnumerable functionLanguages = null) { Task.Run(async () => { @@ -309,7 +309,7 @@ public async Task InitializeAsync(IEnumerable functions, Cance if (Utility.IsSupportedRuntime(_workerRuntime, _workerConfigs) || _environment.IsMultiLanguageRuntimeEnvironment()) { State = FunctionInvocationDispatcherState.Initializing; - Dictionary> webhostLanguageWorkerChannels = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime); + IDictionary> webhostLanguageWorkerChannels = _webHostLanguageWorkerChannelManager.GetChannels(_workerRuntime); if (webhostLanguageWorkerChannels != null) { int workerProcessCount = 0; diff --git a/src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs b/src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs index e8969ef218..9df04d3d62 100644 --- a/src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs +++ b/src/WebJobs.Script/Workers/Rpc/IWebHostRpcWorkerChannelManager.cs @@ -11,7 +11,7 @@ public interface IWebHostRpcWorkerChannelManager { Task InitializeChannelAsync(IEnumerable workerConfigs, string language); - Dictionary> GetChannels(string language); + IDictionary> GetChannels(string language); Task SpecializeAsync(); diff --git a/src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs b/src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs index ef2dae92f5..34ba5b8dc1 100644 --- a/src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs +++ b/src/WebJobs.Script/Workers/Rpc/WebHostRpcWorkerChannelManager.cs @@ -7,9 +7,7 @@ using System.Linq; using System.Reactive.Linq; using System.Threading.Tasks; -using Microsoft.Azure.AppService.Proxy.Common.Extensions; using Microsoft.Azure.AppService.Proxy.Common.Infra; -using Microsoft.Azure.AppService.Proxy.Runtime; using Microsoft.Azure.WebJobs.Script.Config; using Microsoft.Azure.WebJobs.Script.Diagnostics; using Microsoft.Azure.WebJobs.Script.Eventing; @@ -36,7 +34,7 @@ public class WebHostRpcWorkerChannelManager : IWebHostRpcWorkerChannelManager private Action _shutdownStandbyWorkerChannels; private IConfiguration _config; - private ConcurrentDictionary>> _workerChannels = new ConcurrentDictionary>>(StringComparer.OrdinalIgnoreCase); + private ConcurrentDictionary>> _workerChannels = new(StringComparer.OrdinalIgnoreCase); public WebHostRpcWorkerChannelManager(IScriptEventManager eventManager, IEnvironment environment, @@ -101,7 +99,7 @@ await rpcWorkerChannel.StartWorkerProcessAsync().ContinueWith(processStartTask = internal Task GetChannelAsync(string language) { - if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out Dictionary> workerChannels)) + if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out ConcurrentDictionary> workerChannels)) { if (workerChannels.Count > 0 && workerChannels.TryGetValue(workerChannels.Keys.First(), out TaskCompletionSource valueTask)) { @@ -111,9 +109,9 @@ internal Task GetChannelAsync(string language) return Task.FromResult(null); } - public Dictionary> GetChannels(string language) + public IDictionary> GetChannels(string language) { - if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out Dictionary> workerChannels)) + if (!string.IsNullOrEmpty(language) && _workerChannels.TryGetValue(language, out ConcurrentDictionary> workerChannels)) { return workerChannels; } @@ -237,7 +235,7 @@ public Task ShutdownChannelIfExistsAsync(string language, string workerId, if (_hostingConfigOptions.Value.RevertWorkerShutdownBehaviour) { - if (_workerChannels.TryRemove(language, out Dictionary> rpcWorkerChannels)) + if (_workerChannels.TryRemove(language, out ConcurrentDictionary> rpcWorkerChannels)) { if (rpcWorkerChannels.TryGetValue(workerId, out TaskCompletionSource value)) { @@ -264,7 +262,7 @@ public Task ShutdownChannelIfExistsAsync(string language, string workerId, } else { - if (_workerChannels.TryGetValue(language, out Dictionary> rpcWorkerChannels) + if (_workerChannels.TryGetValue(language, out ConcurrentDictionary> rpcWorkerChannels) && rpcWorkerChannels.TryRemove(workerId, out TaskCompletionSource value)) { value?.Task.ContinueWith(channelTask => @@ -304,7 +302,7 @@ internal void ScheduleShutdownStandbyChannels() using (_metricsLogger.LatencyEvent(string.Format(MetricEventNames.SpecializationShutdownStandbyChannels, runtime.Key))) { _logger.LogInformation("Disposing standby channel for runtime:{language}", runtime.Key); - if (_workerChannels.TryRemove(runtime.Key, out Dictionary> standbyChannels)) + if (_workerChannels.TryRemove(runtime.Key, out ConcurrentDictionary> standbyChannels)) { foreach (string workerId in standbyChannels.Keys) { @@ -338,7 +336,7 @@ public async Task ShutdownChannelsAsync() foreach (string runtime in _workerChannels.Keys) { _logger.LogInformation("Shutting down language worker channels for runtime:{runtime}", runtime); - if (_workerChannels.TryRemove(runtime, out Dictionary> standbyChannels)) + if (_workerChannels.TryRemove(runtime, out ConcurrentDictionary> standbyChannels)) { foreach (string workerId in standbyChannels.Keys) { @@ -378,13 +376,13 @@ internal void AddOrUpdateWorkerChannels(string initializedRuntime, IRpcWorkerCha _workerChannels.AddOrUpdate(initializedRuntime, (runtime) => { - Dictionary> newLanguageWorkerChannels = new Dictionary>(); - newLanguageWorkerChannels.Add(initializedLanguageWorkerChannel.Id, new TaskCompletionSource()); + ConcurrentDictionary> newLanguageWorkerChannels = new(StringComparer.OrdinalIgnoreCase); + newLanguageWorkerChannels.TryAdd(initializedLanguageWorkerChannel.Id, new TaskCompletionSource()); return newLanguageWorkerChannels; }, (runtime, existingLanguageWorkerChannels) => { - existingLanguageWorkerChannels.Add(initializedLanguageWorkerChannel.Id, new TaskCompletionSource()); + existingLanguageWorkerChannels.TryAdd(initializedLanguageWorkerChannel.Id, new TaskCompletionSource()); return existingLanguageWorkerChannels; }); } @@ -392,7 +390,7 @@ internal void AddOrUpdateWorkerChannels(string initializedRuntime, IRpcWorkerCha internal void SetInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel) { _logger.LogDebug("Adding webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id); - if (_workerChannels.TryGetValue(initializedRuntime, out Dictionary> channel)) + if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary> channel)) { if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource value)) { @@ -404,7 +402,7 @@ internal void SetInitializedWorkerChannel(string initializedRuntime, IRpcWorkerC internal void SetExceptionOnInitializedWorkerChannel(string initializedRuntime, IRpcWorkerChannel initializedLanguageWorkerChannel, Exception exception) { _logger.LogDebug("Failed to initialize webhost language worker channel for runtime: {language}. workerId:{id}", initializedRuntime, initializedLanguageWorkerChannel.Id); - if (_workerChannels.TryGetValue(initializedRuntime, out Dictionary> channel)) + if (_workerChannels.TryGetValue(initializedRuntime, out ConcurrentDictionary> channel)) { if (channel.TryGetValue(initializedLanguageWorkerChannel.Id, out TaskCompletionSource value)) { diff --git a/test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs b/test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs index c7f87df462..fe3a3187e9 100644 --- a/test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs +++ b/test/WebJobs.Script.Tests/Workers/Rpc/TestRpcWorkerChannelManager.cs @@ -35,7 +35,7 @@ public IRpcWorkerChannel GetChannel(string language) throw new System.NotImplementedException(); } - public Dictionary> GetChannels(string language) + public IDictionary> GetChannels(string language) { if (_workerChannels.TryGetValue(language, out Dictionary> workerChannels)) { diff --git a/test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs b/test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs index 2ee6adbe38..4382ca4867 100644 --- a/test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs +++ b/test/WebJobs.Script.Tests/Workers/Rpc/WebHostRpcWorkerChannelManagerTests.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Script.Config; using Microsoft.Azure.WebJobs.Script.Description; @@ -407,6 +408,38 @@ public async Task ShutdownChannelsIfExist_Succeeds() Assert.Null(initializedChannel); } + [Fact] + public void ShutdownChannelsIfExist_Race_Succeeds() + { + var channel = CreateTestChannel(RpcWorkerConstants.JavaLanguageWorkerName); + string id = channel.Id; + + List> tasks = new(); + List threads = new(); + for (int i = 0; i < 2; i++) + { + Thread t = new(static (state) => + { + var (channelManager, tasks, id) = ((WebHostRpcWorkerChannelManager, List>, string))state; + tasks.Add(channelManager.ShutdownChannelIfExistsAsync(RpcWorkerConstants.JavaLanguageWorkerName, id)); + }); + threads.Add(t); + } + + foreach (Thread t in threads) + { + t.Start((_rpcWorkerChannelManager, tasks, id)); + } + + foreach (Thread t in threads) + { + t.Join(); + } + + // only one should successfully shut down + Assert.Single(tasks, t => t.Result == true); + } + [Fact] public async Task ShutdownChannelsIfExistsAsync_StopsWorkerInvocations() {