diff --git a/WebJobs.Extensions.DurableTask.sln b/WebJobs.Extensions.DurableTask.sln index 8efe24eee..54242679f 100644 --- a/WebJobs.Extensions.DurableTask.sln +++ b/WebJobs.Extensions.DurableTask.sln @@ -94,6 +94,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PerfTests", "PerfTests", "{ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DFPerfScenariosV4", "test\DFPerfScenarios\DFPerfScenariosV4.csproj", "{FC8AD123-F949-4D21-B817-E5A4BBF7F69B}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "IsolatedEntities", "test\IsolatedEntities\IsolatedEntities.csproj", "{8CBB856D-2D77-4052-9E50-2F635DE5C88F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -178,6 +180,10 @@ Global {FC8AD123-F949-4D21-B817-E5A4BBF7F69B}.Debug|Any CPU.Build.0 = Debug|Any CPU {FC8AD123-F949-4D21-B817-E5A4BBF7F69B}.Release|Any CPU.ActiveCfg = Release|Any CPU {FC8AD123-F949-4D21-B817-E5A4BBF7F69B}.Release|Any CPU.Build.0 = Release|Any CPU + {8CBB856D-2D77-4052-9E50-2F635DE5C88F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8CBB856D-2D77-4052-9E50-2F635DE5C88F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8CBB856D-2D77-4052-9E50-2F635DE5C88F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8CBB856D-2D77-4052-9E50-2F635DE5C88F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -211,6 +217,7 @@ Global {65F904AA-0F6F-48CB-BE19-593B7D68152A} = {7387E723-E153-4B7A-B105-8C67BFBD48CF} {7387E723-E153-4B7A-B105-8C67BFBD48CF} = {78BCF152-C22C-408F-9FB1-0F8C99B154B5} {FC8AD123-F949-4D21-B817-E5A4BBF7F69B} = {7387E723-E153-4B7A-B105-8C67BFBD48CF} + {8CBB856D-2D77-4052-9E50-2F635DE5C88F} = {78BCF152-C22C-408F-9FB1-0F8C99B154B5} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {5E9AC327-DE18-41A5-A55D-E44CB4281943} diff --git a/test/IsolatedEntities/Common/HttpTriggers.cs b/test/IsolatedEntities/Common/HttpTriggers.cs new file mode 100644 index 000000000..bb450f5d2 --- /dev/null +++ b/test/IsolatedEntities/Common/HttpTriggers.cs @@ -0,0 +1,79 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Runtime.Serialization; +using System.Text; +using System.Text.RegularExpressions; +using Azure.Core; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.Azure.Functions.Worker.Http; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace IsolatedEntities; + +/// +/// Provides an http trigger to run functional tests for entities. +/// +public static class HttpTriggers +{ + [Function(nameof(RunAllTests))] + public static async Task RunAllTests( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "tests/")] HttpRequestData request, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext) + { + var context = new TestContext(client, executionContext); + string result = await TestRunner.RunAsync(context, filter: null); + HttpResponseData response = request.CreateResponse(System.Net.HttpStatusCode.OK); + response.WriteString(result); + return response; + } + + [Function(nameof(RunFilteredTests))] + public static async Task RunFilteredTests( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "tests/{filter}")] HttpRequestData request, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + string filter) + { + var context = new TestContext(client, executionContext); + string result = await TestRunner.RunAsync(context, filter); + HttpResponseData response = request.CreateResponse(System.Net.HttpStatusCode.OK); + response.WriteString(result); + return response; + } + + [Function(nameof(ListAllTests))] + public static async Task ListAllTests( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "tests/")] HttpRequestData request, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext) + { + var context = new TestContext(client, executionContext); + string result = await TestRunner.RunAsync(context, filter: null, listOnly: true); + HttpResponseData response = request.CreateResponse(System.Net.HttpStatusCode.OK); + response.WriteString(result); + return response; + } + + [Function(nameof(ListFilteredTests))] + public static async Task ListFilteredTests( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "tests/{filter}")] HttpRequestData request, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + string filter) + { + var context = new TestContext(client, executionContext); + string result = await TestRunner.RunAsync(context, filter, listOnly: true); + HttpResponseData response = request.CreateResponse(System.Net.HttpStatusCode.OK); + response.WriteString(result); + return response; + } +} + + + diff --git a/test/IsolatedEntities/Common/ProblematicObject.cs b/test/IsolatedEntities/Common/ProblematicObject.cs new file mode 100644 index 000000000..b1bd7adbd --- /dev/null +++ b/test/IsolatedEntities/Common/ProblematicObject.cs @@ -0,0 +1,69 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Tasks; +using Azure.Core.Serialization; + +namespace IsolatedEntities +{ + internal static class CustomSerialization + { + public static ProblematicObject CreateUnserializableObject() + { + return new ProblematicObject(serializable: false, deserializable: false); + } + + public static ProblematicObject CreateUndeserializableObject() + { + return new ProblematicObject(serializable: true, deserializable: false); + } + + public class ProblematicObject + { + public ProblematicObject(bool serializable = true, bool deserializable = true) + { + this.Serializable = serializable; + this.Deserializable = deserializable; + } + + public bool Serializable { get; set; } + + public bool Deserializable { get; set; } + } + + public class ProblematicObjectJsonConverter : JsonConverter + { + public override ProblematicObject Read( + ref Utf8JsonReader reader, + Type typeToConvert, + JsonSerializerOptions options) + { + bool deserializable = reader.GetBoolean(); + if (!deserializable) + { + throw new JsonException("problematic object: is not deserializable"); + } + return new ProblematicObject(serializable: true, deserializable: true); + } + + public override void Write( + Utf8JsonWriter writer, + ProblematicObject value, + JsonSerializerOptions options) + { + if (!value.Serializable) + { + throw new JsonException("problematic object: is not serializable"); + } + writer.WriteBooleanValue(value.Deserializable); + } + } + } +} diff --git a/test/IsolatedEntities/Common/Test.cs b/test/IsolatedEntities/Common/Test.cs new file mode 100644 index 000000000..73fc8b151 --- /dev/null +++ b/test/IsolatedEntities/Common/Test.cs @@ -0,0 +1,19 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace IsolatedEntities; + +internal abstract class Test +{ + public virtual string Name => this.GetType().Name; + + public abstract Task RunAsync(TestContext context); + + public virtual TimeSpan Timeout => TimeSpan.FromSeconds(30); +} diff --git a/test/IsolatedEntities/Common/TestContext.cs b/test/IsolatedEntities/Common/TestContext.cs new file mode 100644 index 000000000..6d8cf7789 --- /dev/null +++ b/test/IsolatedEntities/Common/TestContext.cs @@ -0,0 +1,36 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace IsolatedEntities; + +internal class TestContext +{ + public TestContext(DurableTaskClient client, FunctionContext executionContext) + { + this.ExecutionContext = executionContext; + this.Client = client; + this.Logger = executionContext.GetLogger(nameof(IsolatedEntities)); + } + + public FunctionContext ExecutionContext { get; } + + public DurableTaskClient Client { get; } + + public ILogger Logger { get; } + + public CancellationToken CancellationToken { get; set; } + + public bool BackendSupportsImplicitEntityDeletion { get; set; } = false; // false for Azure Storage, true for Netherite and MSSQL +} diff --git a/test/IsolatedEntities/Common/TestContextExtensions.cs b/test/IsolatedEntities/Common/TestContextExtensions.cs new file mode 100644 index 000000000..b9e891f2f --- /dev/null +++ b/test/IsolatedEntities/Common/TestContextExtensions.cs @@ -0,0 +1,77 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace IsolatedEntities; + +internal static class TestContextExtensions +{ + public static async Task WaitForEntityStateAsync( + this TestContext context, + EntityInstanceId entityInstanceId, + TimeSpan? timeout = null, + Func? describeWhatWeAreWaitingFor = null) + { + if (timeout == null) + { + timeout = Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(30); + } + + Stopwatch sw = Stopwatch.StartNew(); + + EntityMetadata? response; + + do + { + response = await context.Client.Entities.GetEntityAsync(entityInstanceId, includeState: true); + + if (response != null) + { + if (describeWhatWeAreWaitingFor == null) + { + break; + } + else + { + var waitForResult = describeWhatWeAreWaitingFor(response.State.ReadAs()); + + if (string.IsNullOrEmpty(waitForResult)) + { + break; + } + else + { + context.Logger.LogInformation($"Waiting for {entityInstanceId} : {waitForResult}"); + } + } + } + else + { + context.Logger.LogInformation($"Waiting for {entityInstanceId} to have state."); + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + while (sw.Elapsed < timeout); + + if (response != null) + { + string serializedState = response.State.Value; + context.Logger.LogInformation($"Found state: {serializedState}"); + return response.State.ReadAs(); + } + else + { + throw new TimeoutException($"Durable entity '{entityInstanceId}' still doesn't have any state!"); + } + } +} diff --git a/test/IsolatedEntities/Common/TestRunner.cs b/test/IsolatedEntities/Common/TestRunner.cs new file mode 100644 index 000000000..17ca3bd17 --- /dev/null +++ b/test/IsolatedEntities/Common/TestRunner.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Linq; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace IsolatedEntities; + +internal static class TestRunner +{ + public static async Task RunAsync(TestContext context, string? filter = null, bool listOnly = false) + { + var output = new StringBuilder(); + + foreach (var test in All.GetAllTests()) + { + if (filter == null || test.Name.ToLowerInvariant().Equals(filter.ToLowerInvariant())) + { + if (listOnly) + { + output.AppendLine(test.Name); + } + else + { + context.Logger.LogWarning("------------ starting {testName}", test.Name); + + // if debugging, time out after 60m + // otherwise, time out either when the http request times out or when the individual test time limit is exceeded + using CancellationTokenSource cancellationTokenSource + = Debugger.IsAttached ? new() : CancellationTokenSource.CreateLinkedTokenSource(context.ExecutionContext.CancellationToken); + cancellationTokenSource.CancelAfter(Debugger.IsAttached ? TimeSpan.FromMinutes(60) : test.Timeout); + context.CancellationToken = cancellationTokenSource.Token; + + try + { + await test.RunAsync(context); + output.AppendLine($"PASSED {test.Name}"); + } + catch (Exception ex) + { + context.Logger.LogError(ex, "test {testName} failed", test.Name); + output.AppendLine($"FAILED {test.Name} {ex.ToString()}"); + break; + } + } + } + } + + return output.ToString(); + } +} diff --git a/test/IsolatedEntities/Entities/BatchEntity.cs b/test/IsolatedEntities/Entities/BatchEntity.cs new file mode 100644 index 000000000..7f352f827 --- /dev/null +++ b/test/IsolatedEntities/Entities/BatchEntity.cs @@ -0,0 +1,54 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Entities; + +namespace IsolatedEntities; + +/// +/// An entity that records all batch positions and batch sizes +/// +class BatchEntity : ITaskEntity +{ + int operationCounter; + + public ValueTask RunAsync(TaskEntityOperation operation) + { + List? state = (List?) operation.State.GetState(typeof(List)); + int batchNo; + if (state == null) + { + batchNo = 0; + state = new List(); + } + else if (operationCounter == 0) + { + batchNo = state.Last().batch + 1; + } + else + { + batchNo = state.Last().batch; + } + + state.Add(new Entry(batchNo, operationCounter++)); + operation.State.SetState(state); + return default; + } + + public record struct Entry(int batch, int operation); + + [Function(nameof(BatchEntity))] + public static Task Boilerplate([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(new BatchEntity()); + } +} diff --git a/test/IsolatedEntities/Entities/Counter.cs b/test/IsolatedEntities/Entities/Counter.cs new file mode 100644 index 000000000..8c5c21ae8 --- /dev/null +++ b/test/IsolatedEntities/Entities/Counter.cs @@ -0,0 +1,43 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Entities; + +namespace IsolatedEntities; + +class Counter : TaskEntity +{ + public void Increment() + { + this.State++; + } + + public void Add(int amount) + { + this.State += amount; + } + + public int Get() + { + return this.State; + } + + public void Set(int value) + { + this.State = value; + } + + [Function(nameof(Counter))] + public static Task Boilerplate([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } +} diff --git a/test/IsolatedEntities/Entities/FaultyEntity.cs b/test/IsolatedEntities/Entities/FaultyEntity.cs new file mode 100644 index 000000000..854f6c8eb --- /dev/null +++ b/test/IsolatedEntities/Entities/FaultyEntity.cs @@ -0,0 +1,201 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Tasks; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace IsolatedEntities +{ + // we use a low-level ITaskEntity so we can intercept some of the operations without going through + // the default sequence of serialization and deserialization of state. This is needed to construct + // this type of test, it does not reflect typical useage. + public class FaultyEntity : ITaskEntity + { + class State + { + [JsonInclude] + public int Value { get; set; } + + [JsonInclude] + [JsonConverter(typeof(CustomSerialization.ProblematicObjectJsonConverter))] + public CustomSerialization.ProblematicObject? ProblematicObject { get; set; } + + [JsonInclude] + public int NumberIncrementsSent { get; set; } + + public Task Send(EntityInstanceId target, TaskEntityContext context) + { + var desc = $"{++this.NumberIncrementsSent}:{this.Value}"; + context.SignalEntity(target, desc); + return Task.CompletedTask; + } + } + + [Function(nameof(FaultyEntity))] + public async Task EntryPoint([EntityTrigger] TaskEntityDispatcher dispatcher) + { + await dispatcher.DispatchAsync(); + } + + public static void ThrowTestException() + { + throw new TestException("KABOOM"); + } + + [Serializable] + public class TestException : Exception + { + public TestException() : base() { } + public TestException(string message) : base(message) { } + public TestException(string message, Exception inner) : base(message, inner) { } + } + + public async ValueTask RunAsync(TaskEntityOperation operation) + { + State? Get() + { + return (State?)operation.State.GetState(typeof(State)); + } + State GetOrCreate() + { + State? s = Get(); + if (s is null) + { + operation.State.SetState(s = new State()); + } + return s; + } + + switch (operation.Name) + { + case "Exists": + { + try + { + return Get() != null; + } + catch (Exception) // the entity has state, even if that state is corrupted + { + return true; + } + } + case "Delay": + { + int delayInSeconds = (int)operation.GetInput(typeof(int))!; + await Task.Delay(TimeSpan.FromSeconds(delayInSeconds)); + return default; + } + case "Delete": + { + operation.State.SetState(null); + return default; + } + case "DeleteWithoutReading": + { + // do not read the state first otherwise the deserialization may throw before we can delete it + operation.State.SetState(null); + return default; + } + case "DeleteThenThrow": + { + operation.State.SetState(null); + ThrowTestException(); + return default; + } + case "Throw": + { + ThrowTestException(); + return default; + } + case "ThrowNested": + { + try + { + ThrowTestException(); + } + catch (Exception e) + { + throw new Exception("KABOOOOOM", e); + } + return default; + } + case "Get": + { + return GetOrCreate().Value; + } + case "GetNumberIncrementsSent": + { + return GetOrCreate().NumberIncrementsSent; + } + case "Set": + { + State state = GetOrCreate(); + state.Value = (int)operation.GetInput(typeof(int))!; + operation.State.SetState(state); + return default; + } + case "SetToUnserializable": + { + State state = GetOrCreate(); + state.ProblematicObject = CustomSerialization.CreateUnserializableObject(); + operation.State.SetState(state); + return default; + } + case "SetToUndeserializable": + { + State state = GetOrCreate(); + state.ProblematicObject = CustomSerialization.CreateUndeserializableObject(); + operation.State.SetState(state); + return default; + } + case "SetThenThrow": + { + State state = GetOrCreate(); + state.Value = (int)operation.GetInput(typeof(int))!; + operation.State.SetState(state); + ThrowTestException(); + return default; + } + case "Send": + { + State state = GetOrCreate(); + EntityInstanceId entityId = (EntityInstanceId)operation.GetInput(typeof(EntityId))!; + await state.Send(entityId, operation.Context); + operation.State.SetState(state); + return default; + } + case "SendThenThrow": + { + State state = GetOrCreate(); + EntityInstanceId entityId = (EntityInstanceId)operation.GetInput(typeof(EntityId))!; + await state.Send(entityId, operation.Context); + operation.State.SetState(state); + ThrowTestException(); + return default; + } + case "SendThenMakeUnserializable": + { + State state = GetOrCreate(); + EntityInstanceId entityId = (EntityInstanceId)operation.GetInput(typeof(EntityId))!; + await state.Send(entityId, operation.Context); + state.ProblematicObject = CustomSerialization.CreateUnserializableObject(); + operation.State.SetState(state); + return default; + } + default: + { + throw new InvalidOperationException($"undefined entity operation: {operation.Name}"); + } + } + } + } +} diff --git a/test/IsolatedEntities/Entities/Launcher.cs b/test/IsolatedEntities/Entities/Launcher.cs new file mode 100644 index 000000000..98b82a534 --- /dev/null +++ b/test/IsolatedEntities/Entities/Launcher.cs @@ -0,0 +1,63 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Castle.Core.Logging; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Entities; + +namespace IsolatedEntities; + +class Launcher +{ + public string? OrchestrationInstanceId { get; set; } + + public DateTime? ScheduledTime { get; set; } + + public bool IsDone { get; set; } + + public string? ErrorMessage { get; set; } + + public void Launch(TaskEntityContext context, DateTime? scheduledTime = null) + { + this.OrchestrationInstanceId = context.ScheduleNewOrchestration( + nameof(FireAndForget.SignallingOrchestration), + context.Id, + new StartOrchestrationOptions(StartAt: scheduledTime)); + } + + public string? Get() + { + if (this.ErrorMessage != null) + { + throw new Exception(this.ErrorMessage); + } + return this.IsDone ? this.OrchestrationInstanceId : null; + } + + public void Done() + { + this.IsDone = true; + + if (this.ScheduledTime != null) + { + DateTime now = DateTime.UtcNow; + if (now < this.ScheduledTime) + { + this.ErrorMessage = $"delay was too short, expected >= {this.ScheduledTime}, actual = {now}"; + } + } + } + + [Function(nameof(Launcher))] + public static Task Boilerplate([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } +} diff --git a/test/IsolatedEntities/Entities/Relay.cs b/test/IsolatedEntities/Entities/Relay.cs new file mode 100644 index 000000000..296a6d6e6 --- /dev/null +++ b/test/IsolatedEntities/Entities/Relay.cs @@ -0,0 +1,44 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace IsolatedEntities; + +/// +/// A stateless entity that forwards signals +/// +class Relay : ITaskEntity +{ + [Function(nameof(Relay))] + public static Task Boilerplate([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } + + public record Input(EntityInstanceId entityInstanceId, string operationName, object? input, DateTimeOffset? scheduledTime); + + public ValueTask RunAsync(TaskEntityOperation operation) + { + T GetInput() => (T)operation.GetInput(typeof(T))!; + + Input input = GetInput(); + + operation.Context.SignalEntity( + input.entityInstanceId, + input.operationName, + input.input, + new SignalEntityOptions() { SignalTime = input.scheduledTime }); + + return default; + } +} diff --git a/test/IsolatedEntities/Entities/SchedulerEntity.cs b/test/IsolatedEntities/Entities/SchedulerEntity.cs new file mode 100644 index 000000000..a31322411 --- /dev/null +++ b/test/IsolatedEntities/Entities/SchedulerEntity.cs @@ -0,0 +1,49 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace IsolatedEntities; + +class SchedulerEntity : ITaskEntity +{ + private readonly ILogger logger; + + public SchedulerEntity(ILogger logger) + { + this.logger = logger; + } + + [Function(nameof(SchedulerEntity))] + public static Task Boilerplate([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } + + public ValueTask RunAsync(TaskEntityOperation operation) + { + this.logger.LogInformation("{entityId} received {operationName} signal", operation.Context.Id, operation.Name); + + List state = (List?)operation.State.GetState(typeof(List)) ?? new List(); + + if (state.Contains(operation.Name)) + { + this.logger.LogError($"duplicate: {operation.Name}"); + } + else + { + state.Add(operation.Name); + } + + return default; + } +} diff --git a/test/IsolatedEntities/Entities/SelfSchedulingEntity.cs b/test/IsolatedEntities/Entities/SelfSchedulingEntity.cs new file mode 100644 index 000000000..39ea0d0cb --- /dev/null +++ b/test/IsolatedEntities/Entities/SelfSchedulingEntity.cs @@ -0,0 +1,58 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask.Entities; + +namespace IsolatedEntities +{ + public class SelfSchedulingEntity + { + public string Value { get; set; } = ""; + + public void Start(TaskEntityContext context) + { + var now = DateTime.UtcNow; + + var timeA = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1); + var timeB = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(2); + var timeC = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(3); + var timeD = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(4); + + context.SignalEntity(context.Id, nameof(D), options: timeD); + context.SignalEntity(context.Id, nameof(C), options: timeC); + context.SignalEntity(context.Id, nameof(B), options: timeB); + context.SignalEntity(context.Id, nameof(A), options: timeA); + } + + public void A() + { + this.Value += "A"; + } + + public Task B() + { + this.Value += "B"; + return Task.Delay(100); + } + + public void C() + { + this.Value += "C"; + } + + public Task D() + { + this.Value += "D"; + return Task.FromResult(111); + } + + [Function(nameof(SelfSchedulingEntity))] + public static Task SelfSchedulingEntityFunction([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } + } +} diff --git a/test/IsolatedEntities/Entities/StringStore.cs b/test/IsolatedEntities/Entities/StringStore.cs new file mode 100644 index 000000000..03c87b15c --- /dev/null +++ b/test/IsolatedEntities/Entities/StringStore.cs @@ -0,0 +1,119 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json.Serialization; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; + +namespace IsolatedEntities; + +// three variations of the same simple entity: an entity that stores a string +// supporting get, set, and delete operations. There are slight semantic differences. + +//-------------- a class-based implementation ----------------- + +public class StringStore +{ + [JsonInclude] + public string Value { get; set; } + + public StringStore() + { + this.Value = string.Empty; + } + + public string Get() + { + return this.Value; + } + + public void Set(string value) + { + this.Value = value; + } + + // Delete is implicitly defined + + [Function(nameof(StringStore))] + public static Task Boilerplate([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } +} + +//-------------- a TaskEntity-based implementation ----------------- + +public class StringStore2 : TaskEntity +{ + public string Get() + { + return this.State; + } + + public void Set(string value) + { + this.State = value; + } + + protected override string InitializeState(TaskEntityOperation operation) + { + return string.Empty; + } + + // Delete is implicitly defined + + [Function(nameof(StringStore2))] + public static Task Boilerplate([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } +} + +//-------------- a direct ITaskEntity-based implementation ----------------- + +class StringStore3 : ITaskEntity +{ + public ValueTask RunAsync(TaskEntityOperation operation) + { + switch (operation.Name) + { + case "set": + operation.State.SetState((string?)operation.GetInput(typeof(string))); + return default; + + case "get": + // note: this does not assign a state to the entity if it does not already exist + return new ValueTask((string?)operation.State.GetState(typeof(string))); + + case "delete": + if (operation.State.GetState(typeof(string)) == null) + { + return new ValueTask(false); + } + else + { + operation.State.SetState(null); + return new ValueTask(true); + } + + default: + throw new NotImplementedException("no such operation"); + } + } + + [Function(nameof(StringStore3))] + public static Task Boilerplate([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(); + } +} + + diff --git a/test/IsolatedEntities/IsolatedEntities.csproj b/test/IsolatedEntities/IsolatedEntities.csproj new file mode 100644 index 000000000..7db30d5ce --- /dev/null +++ b/test/IsolatedEntities/IsolatedEntities.csproj @@ -0,0 +1,33 @@ + + + net6.0 + v4 + exe + false + enable + enable + + + + + + + + + + + + + + + + + + PreserveNewest + + + PreserveNewest + Never + + + diff --git a/test/IsolatedEntities/Program.cs b/test/IsolatedEntities/Program.cs new file mode 100644 index 000000000..ae3ff181c --- /dev/null +++ b/test/IsolatedEntities/Program.cs @@ -0,0 +1,19 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace IsolatedEntities; + +public class Program +{ + public static void Main() + { + IHost host = new HostBuilder() + .ConfigureFunctionsWorkerDefaults() + .Build(); + host.Run(); + } +} diff --git a/test/IsolatedEntities/Tests/All.cs b/test/IsolatedEntities/Tests/All.cs new file mode 100644 index 000000000..c6aa8d1ba --- /dev/null +++ b/test/IsolatedEntities/Tests/All.cs @@ -0,0 +1,70 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Diagnostics; +using System.Runtime.Serialization; +using System.Text; +using System.Text.RegularExpressions; +using Azure.Core; +using IsolatedEntities.Tests; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.Azure.Functions.Worker.Http; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +/// +/// A collection containing all the unit tests. +/// +static class All +{ + public static IEnumerable GetAllTests() + { + yield return new SetAndGet(); + yield return new CallCounter(); + yield return new BatchedEntitySignals(100); + yield return new SignalAndCall(typeof(StringStore)); + yield return new SignalAndCall(typeof(StringStore2)); + yield return new SignalAndCall(typeof(StringStore3)); + yield return new CallAndDelete(typeof(StringStore)); + yield return new CallAndDelete(typeof(StringStore2)); + yield return new CallAndDelete(typeof(StringStore3)); + yield return new SignalThenPoll(direct: true, delayed: false); + yield return new SignalThenPoll(direct: true, delayed: true); + yield return new SignalThenPoll(direct: false, delayed: false); + yield return new SignalThenPoll(direct: false, delayed: true); + yield return new SelfScheduling(); + yield return new FireAndForget(null); + yield return new FireAndForget(0); + yield return new FireAndForget(5); + yield return new SingleLockedTransfer(); + yield return new MultipleLockedTransfers(2); + yield return new MultipleLockedTransfers(5); + yield return new MultipleLockedTransfers(100); + yield return new FaultyCriticalSection(); + yield return new LargeEntity(); + yield return new CallFaultyEntity(); + yield return new CallFaultyEntityBatches(); + yield return new EntityQueries1(); + yield return new EntityQueries2(); + yield return new CleanOrphanedLock(); + yield return new InvalidEntityId(InvalidEntityId.Location.ClientGet); + yield return new InvalidEntityId(InvalidEntityId.Location.ClientSignal); + yield return new InvalidEntityId(InvalidEntityId.Location.OrchestrationCall); + yield return new InvalidEntityId(InvalidEntityId.Location.OrchestrationSignal); + yield return new CallFaultyActivity(nested: false); + + // requires https://github.com/Azure/azure-functions-durable-extension/pull/2748 + yield return new CallFaultySuborchestration(nested: false); + + // these tests require us to implement better propagation of FailureDetails for activities and orchestrations + // yield return new CallFaultyActivity(nested: true); + //yield return new CallFaultySuborchestration(nested: true); + } + +} diff --git a/test/IsolatedEntities/Tests/BatchedEntitySignals.cs b/test/IsolatedEntities/Tests/BatchedEntitySignals.cs new file mode 100644 index 000000000..6cc7ea4dd --- /dev/null +++ b/test/IsolatedEntities/Tests/BatchedEntitySignals.cs @@ -0,0 +1,68 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class BatchedEntitySignals : Test +{ + readonly int numIterations; + + public BatchedEntitySignals(int numIterations) + { + this.numIterations = numIterations; + } + + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(nameof(BatchEntity), Guid.NewGuid().ToString().Substring(0,8)); + + // send a number of signals immediately after each other + List tasks = new List(); + for (int i = 0; i < numIterations; i++) + { + tasks.Add(context.Client.Entities.SignalEntityAsync(entityId, string.Empty, i)); + } + + await Task.WhenAll(tasks); + + var result = await context.WaitForEntityStateAsync>( + entityId, + timeout: default, + list => list.Count == this.numIterations ? null : $"waiting for {this.numIterations - list.Count} signals"); + + Assert.Equal(new BatchEntity.Entry(0, 0), result[0]); + Assert.Equal(this.numIterations, result.Count); + + for (int i = 0; i < numIterations - 1; i++) + { + if (result[i].batch == result[i + 1].batch) + { + Assert.Equal(result[i].operation + 1, result[i + 1].operation); + } + else + { + Assert.Equal(result[i].batch + 1, result[i + 1].batch); + Assert.Equal(0, result[i + 1].operation); + } + } + + // there should always be some batching going on + int numBatches = result.Last().batch + 1; + Assert.True(numBatches < numIterations); + context.Logger.LogInformation($"completed {numIterations} operations in {numBatches} batches"); + } +} diff --git a/test/IsolatedEntities/Tests/CallAndDelete.cs b/test/IsolatedEntities/Tests/CallAndDelete.cs new file mode 100644 index 000000000..7f242c1c7 --- /dev/null +++ b/test/IsolatedEntities/Tests/CallAndDelete.cs @@ -0,0 +1,102 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +class CallAndDelete : Test +{ + private readonly Type stringStoreType; + + public CallAndDelete(Type stringStoreType) + { + this.stringStoreType = stringStoreType; + } + + public override string Name => $"{base.Name}.{this.stringStoreType.Name}"; + + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(this.stringStoreType.Name, Guid.NewGuid().ToString()); + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(CallAndDeleteOrchestration), entityId); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + + // check that entity was deleted + var entityMetadata = await context.Client.Entities.GetEntityAsync(entityId); + Assert.Null(entityMetadata); + } + + static bool GetOperationInitializesEntity(EntityInstanceId entityInstanceId) + => !string.Equals(entityInstanceId.Name, nameof(StringStore3).ToLowerInvariant(), StringComparison.InvariantCulture); + + static bool DeleteReturnsBoolean(EntityInstanceId entityInstanceId) + => string.Equals(entityInstanceId.Name, nameof(StringStore3).ToLowerInvariant(), StringComparison.InvariantCulture); + + [Function(nameof(CallAndDeleteOrchestration))] + public static async Task CallAndDeleteOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + EntityInstanceId entityId = context.GetInput(); + await context.Entities.CallEntityAsync(entityId, "set", "333"); + + string value = await context.Entities.CallEntityAsync(entityId, "get"); + Assert.Equal("333", value); + + if (DeleteReturnsBoolean(entityId)) + { + bool deleted = await context.Entities.CallEntityAsync(entityId, "delete"); + Assert.True(deleted); + + bool deletedAgain = await context.Entities.CallEntityAsync(entityId, "delete"); + Assert.False(deletedAgain); + } + else + { + await context.Entities.CallEntityAsync(entityId, "delete"); + } + + string getValue = await context.Entities.CallEntityAsync(entityId, "get"); + if (GetOperationInitializesEntity(entityId)) + { + Assert.Equal("", getValue); + } + else + { + Assert.Equal(null, getValue); + } + + if (DeleteReturnsBoolean(entityId)) + { + bool deletedAgain = await context.Entities.CallEntityAsync(entityId, "delete"); + if (GetOperationInitializesEntity(entityId)) + { + Assert.True(deletedAgain); + } + else + { + Assert.False(deletedAgain); + } + } + else + { + await context.Entities.CallEntityAsync(entityId, "delete"); + } + + return "ok"; + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/Tests/CallCounter.cs b/test/IsolatedEntities/Tests/CallCounter.cs new file mode 100644 index 000000000..1dfd614d1 --- /dev/null +++ b/test/IsolatedEntities/Tests/CallCounter.cs @@ -0,0 +1,59 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +class CallCounter : Test +{ + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(nameof(Counter), Guid.NewGuid().ToString()); + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(CallCounterOrchestration), entityId); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("OK", metadata.ReadOutputAs()); + + // entity ids cannot be used for orchestration instance queries + await Assert.ThrowsAsync(() => context.Client.GetInstanceAsync(entityId.ToString())); + + // and are not returned by them + List results = await context.Client.GetAllInstancesAsync().ToListAsync(); + Assert.DoesNotContain(results, metadata => metadata.InstanceId.StartsWith("@")); + + // check that entity state is correct + EntityMetadata? entityMetadata = await context.Client.Entities.GetEntityAsync(entityId, includeState:true); + Assert.NotNull(entityMetadata); + Assert.Equal(33, entityMetadata!.State); + } + + [Function(nameof(CallCounterOrchestration))] + public static async Task CallCounterOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + EntityInstanceId entityId = context.GetInput(); + await context.Entities.CallEntityAsync(entityId, "set", 33); + int result = await context.Entities.CallEntityAsync(entityId, "get"); + + if (result == 33) + { + return "OK"; + } + else + { + return $"wrong result: {result} instead of 33"; + } + } +} diff --git a/test/IsolatedEntities/Tests/CallFaultyActivity.cs b/test/IsolatedEntities/Tests/CallFaultyActivity.cs new file mode 100644 index 000000000..6ea40393b --- /dev/null +++ b/test/IsolatedEntities/Tests/CallFaultyActivity.cs @@ -0,0 +1,120 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class CallFaultyActivity : Test +{ + // this is not an entity test... but it's a good place to put this test + + private readonly bool nested; + + public CallFaultyActivity(bool nested) + { + this.nested = nested; + } + public override string Name => $"{base.Name}.{(this.nested ? "Nested" : "NotNested")}"; + + public override async Task RunAsync(TestContext context) + { + string orchestrationName = nameof(CallFaultyActivityOrchestration); + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, this.nested); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + } +} + +class CallFaultyActivityOrchestration +{ + readonly ILogger logger; + + public CallFaultyActivityOrchestration(ILogger logger) + { + this.logger = logger; + } + + [Function(nameof(FaultyActivity))] + public void FaultyActivity([ActivityTrigger] bool nested) + { + if (!nested) + { + this.MethodThatThrowsException(); + } + else + { + this.MethodThatThrowsNestedException(); + } + } + + void MethodThatThrowsNestedException() + { + try + { + this.MethodThatThrowsException(); + } + catch (Exception e) + { + throw new Exception("KABOOOOOM", e); + } + } + + void MethodThatThrowsException() + { + throw new Exception("KABOOM"); + } + + [Function(nameof(CallFaultyActivityOrchestration))] + public async Task RunAsync([OrchestrationTrigger] TaskOrchestrationContext context) + { + bool nested = context.GetInput(); + + try + { + await context.CallActivityAsync(nameof(FaultyActivity), nested); + throw new Exception("expected activity to throw exception, but none was thrown"); + } + catch (TaskFailedException taskFailedException) + { + Assert.NotNull(taskFailedException.FailureDetails); + + if (!nested) + { + Assert.Equal("KABOOM", taskFailedException.FailureDetails.ErrorMessage); + Assert.Contains(nameof(MethodThatThrowsException), taskFailedException.FailureDetails.StackTrace); + } + else + { + Assert.Equal("KABOOOOOM", taskFailedException.FailureDetails.ErrorMessage); + Assert.Contains(nameof(MethodThatThrowsNestedException), taskFailedException.FailureDetails.StackTrace); + + Assert.NotNull(taskFailedException.FailureDetails.InnerFailure); + Assert.Equal("KABOOM", taskFailedException.FailureDetails.InnerFailure!.ErrorMessage); + Assert.Contains(nameof(MethodThatThrowsException), taskFailedException.FailureDetails.InnerFailure.StackTrace); + } + } + catch (Exception e) + { + throw new Exception($"wrong exception thrown", e); + } + + return "ok"; + } +} diff --git a/test/IsolatedEntities/Tests/CallFaultyEntity.cs b/test/IsolatedEntities/Tests/CallFaultyEntity.cs new file mode 100644 index 000000000..9d3e08fe4 --- /dev/null +++ b/test/IsolatedEntities/Tests/CallFaultyEntity.cs @@ -0,0 +1,187 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class CallFaultyEntity : Test +{ + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(nameof(FaultyEntity), Guid.NewGuid().ToString()); + string orchestrationName = nameof(CallFaultyEntityOrchestration); + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, entityId); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + } +} + +class CallFaultyEntityOrchestration +{ + readonly ILogger logger; + + public CallFaultyEntityOrchestration(ILogger logger) + { + this.logger = logger; + } + + [Function(nameof(CallFaultyEntityOrchestration))] + public async Task RunAsync([OrchestrationTrigger] TaskOrchestrationContext context) + { + // read entity id from input + var entityId = context.GetInput(); + + async Task ExpectOperationExceptionAsync(Task t, EntityInstanceId entityId, string operationName, + string errorMessage, string? errorMethod = null, string? innerErrorMessage = null, string innerErrorMethod = "") + { + try + { + await t; + throw new Exception("expected operation exception, but none was thrown"); + } + catch (EntityOperationFailedException entityException) + { + Assert.Equal(operationName, entityException.OperationName); + Assert.Equal(entityId, entityException.EntityId); + Assert.Contains(errorMessage, entityException.Message); + + Assert.NotNull(entityException.FailureDetails); + Assert.Equal(errorMessage, entityException.FailureDetails.ErrorMessage); + + if (errorMethod != null) + { + Assert.Contains(errorMethod, entityException.FailureDetails.StackTrace); + } + + if (innerErrorMessage != null) + { + Assert.NotNull(entityException.FailureDetails.InnerFailure); + Assert.Equal(innerErrorMessage, entityException.FailureDetails.InnerFailure!.ErrorMessage); + + if (innerErrorMethod != null) + { + Assert.Contains(innerErrorMethod, entityException.FailureDetails.InnerFailure.StackTrace); + } + } + else + { + Assert.Null(entityException.FailureDetails.InnerFailure); + } + } + catch (Exception e) + { + throw new Exception($"wrong exception thrown", e); + } + } + + try + { + Assert.False(await context.Entities.CallEntityAsync(entityId, "Exists")); + + await ExpectOperationExceptionAsync( + context.Entities.CallEntityAsync(entityId, "Throw"), + entityId, + "Throw", + "KABOOM", + "ThrowTestException"); + + await ExpectOperationExceptionAsync( + context.Entities.CallEntityAsync(entityId, "ThrowNested"), + entityId, + "ThrowNested", + "KABOOOOOM", + "FaultyEntity.RunAsync", + "KABOOM", + "ThrowTestException"); + + await ExpectOperationExceptionAsync( + context.Entities.CallEntityAsync(entityId, "SetToUnserializable"), + entityId, + "SetToUnserializable", + "problematic object: is not serializable", + "ProblematicObjectJsonConverter.Write"); + + // since the operations failed, the entity state is unchanged, meaning the entity still does not exist + Assert.False(await context.Entities.CallEntityAsync(entityId, "Exists")); + + await context.Entities.CallEntityAsync(entityId, "SetToUndeserializable"); + + Assert.True(await context.Entities.CallEntityAsync(entityId, "Exists")); + + await ExpectOperationExceptionAsync( + context.Entities.CallEntityAsync(entityId, "Get"), + entityId, + "Get", + "problematic object: is not deserializable", + "ProblematicObjectJsonConverter.Read"); + + await context.Entities.CallEntityAsync(entityId, "DeleteWithoutReading"); + + Assert.False(await context.Entities.CallEntityAsync(entityId, "Exists")); + + await context.Entities.CallEntityAsync(entityId, "Set", 3); + + Assert.Equal(3, await context.Entities.CallEntityAsync(entityId, "Get")); + + await ExpectOperationExceptionAsync( + context.Entities.CallEntityAsync(entityId, "SetThenThrow", 333), + entityId, + "SetThenThrow", + "KABOOM", + "FaultyEntity.RunAsync"); + + + // value should be unchanged + Assert.Equal(3, await context.Entities.CallEntityAsync(entityId, "Get")); + + await ExpectOperationExceptionAsync( + context.Entities.CallEntityAsync(entityId, "DeleteThenThrow"), + entityId, + "DeleteThenThrow", + "KABOOM", + "FaultyEntity.RunAsync"); + + // value should be unchanged + Assert.Equal(3, await context.Entities.CallEntityAsync(entityId, "Get")); + + await context.Entities.CallEntityAsync(entityId, "Delete"); + + // entity was deleted + Assert.False(await context.Entities.CallEntityAsync(entityId, "Exists")); + + await ExpectOperationExceptionAsync( + context.Entities.CallEntityAsync(entityId, "SetThenThrow", 333), + entityId, + "SetThenThrow", + "KABOOM", + "FaultyEntity.RunAsync"); + + // must have rolled back to non-existing state + Assert.False(await context.Entities.CallEntityAsync(entityId, "Exists")); + + return "ok"; + } + catch (Exception e) + { + logger.LogError("exception in CallFaultyEntityOrchestration: {exception}", e); + return e.ToString(); + } + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/Tests/CallFaultyEntityBatches.cs b/test/IsolatedEntities/Tests/CallFaultyEntityBatches.cs new file mode 100644 index 000000000..8abf6405c --- /dev/null +++ b/test/IsolatedEntities/Tests/CallFaultyEntityBatches.cs @@ -0,0 +1,147 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class CallFaultyEntityBatches : Test +{ + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(nameof(FaultyEntity), Guid.NewGuid().ToString()); + string orchestrationName = nameof(CallFaultyEntityBatchesOrchestration); + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, entityId); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + } +} + +class CallFaultyEntityBatchesOrchestration +{ + readonly ILogger logger; + + public CallFaultyEntityBatchesOrchestration(ILogger logger) + { + this.logger = logger; + } + + [Function(nameof(CallFaultyEntityBatchesOrchestration))] + public async Task RunAsync([OrchestrationTrigger] TaskOrchestrationContext context) + { + // read entity id from input + var entityId = context.GetInput(); + + // we use this utility function to try to enforce that a bunch of signals is delivered as a single batch. + // This is required for some of the tests here to work, since the batching affects the entity state management. + // The "enforcement" mechanism we use is not 100% failsafe (it still makes timing assumptions about the provider) + // but it should be more reliable than the original version of this test which failed quite frequently, as it was + // simply assuming that signals that are sent at the same time are always processed as a batch. + async Task ProcessSignalBatch(IEnumerable<(string,int?)> signals) + { + // first issue a signal that, when delivered, keeps the entity busy for a split second + await context.Entities.SignalEntityAsync(entityId, "Delay", 0.5); + + // we now need to yield briefly so that the delay signal is sent before the others + await context.CreateTimer(context.CurrentUtcDateTime + TimeSpan.FromMilliseconds(1), CancellationToken.None); + + // now send the signals one by one. These should all arrive and get queued (inside the storage provider) + // while the entity is executing the delay operation. Therefore, after the delay operation finishes, + // all of the signals are processed in a single batch. + foreach ((string operation, int? arg) in signals) + { + await context.Entities.SignalEntityAsync(entityId, operation, arg); + } + } + + try + { + await ProcessSignalBatch(new (string, int?)[] + { + new("Set", 42), // state that survives + new("SetThenThrow", 333), + new("DeleteThenThrow", null), + }); + + Assert.Equal(42, await context.Entities.CallEntityAsync(entityId, "Get")); + + await ProcessSignalBatch(new (string, int?)[] + { + new("Get", null), + new("Set", 42), + new("Delete", null), + new("Set", 43), // state that survives + new("DeleteThenThrow", null), + }); + + Assert.Equal(43, await context.Entities.CallEntityAsync(entityId, "Get")); + + await ProcessSignalBatch(new (string, int?)[] + { + new("Set", 55), // state that survives + new("SetToUnserializable", null), + }); + + + Assert.Equal(55, await context.Entities.CallEntityAsync(entityId, "Get")); + + await ProcessSignalBatch(new (string, int?)[] + { + new("Set", 1), + new("Delete", null), + new("Set", 2), + new("Delete", null), // state that survives + new("SetThenThrow", 333), + }); + + Assert.False(await context.Entities.CallEntityAsync(entityId, "Exists")); + + await ProcessSignalBatch(new (string, int?)[] + { + new("Set", 1), + new("Delete", null), + new("Set", 2), + new("Delete", null), // state that survives + new("SetThenThrow", 333), + }); + + // must have rolled back to non-existing state + Assert.False(await context.Entities.CallEntityAsync(entityId, "Exists")); + + await ProcessSignalBatch(new (string, int?)[] + { + new("Set", 1), + new("SetThenThrow", 333), + new("Set", 2), + new("DeleteThenThrow", null), + new("Delete", null), + new("Set", 3), // state that survives + }); + + Assert.Equal(3, await context.Entities.CallEntityAsync(entityId, "Get")); + + return "ok"; + } + catch (Exception e) + { + logger.LogError("exception in CallFaultyEntityBatchesOrchestration: {exception}", e); + return e.ToString(); + } + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/Tests/CallFaultySuborchestration.cs b/test/IsolatedEntities/Tests/CallFaultySuborchestration.cs new file mode 100644 index 000000000..e2ec11069 --- /dev/null +++ b/test/IsolatedEntities/Tests/CallFaultySuborchestration.cs @@ -0,0 +1,123 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class CallFaultySuborchestration : Test +{ + // this is not an entity test... but it's a good place to put this test + + private readonly bool nested; + + public CallFaultySuborchestration(bool nested) + { + this.nested = nested; + } + + public override string Name => $"{base.Name}.{(this.nested ? "Nested" : "NotNested")}"; + + public override async Task RunAsync(TestContext context) + { + string orchestrationName = nameof(CallFaultySuborchestrationOrchestration); + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, this.nested); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + } +} + +class CallFaultySuborchestrationOrchestration +{ + readonly ILogger logger; + + public CallFaultySuborchestrationOrchestration(ILogger logger) + { + this.logger = logger; + } + + [Function(nameof(FaultySuborchestration))] + public void FaultySuborchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + bool nested = context.GetInput(); + + if (!nested) + { + this.MethodThatThrowsException(); + } + else + { + this.MethodThatThrowsNestedException(); + } + } + + void MethodThatThrowsNestedException() + { + try + { + this.MethodThatThrowsException(); + } + catch (Exception e) + { + throw new Exception("KABOOOOOM", e); + } + } + + void MethodThatThrowsException() + { + throw new Exception("KABOOM"); + } + + [Function(nameof(CallFaultySuborchestrationOrchestration))] + public async Task RunAsync([OrchestrationTrigger] TaskOrchestrationContext context) + { + bool nested = context.GetInput(); + + try + { + await context.CallSubOrchestratorAsync(nameof(FaultySuborchestration), nested); + throw new Exception("expected suborchestrator to throw exception, but none was thrown"); + } + catch (TaskFailedException taskFailedException) + { + Assert.NotNull(taskFailedException.FailureDetails); + + if (!nested) + { + Assert.Equal("KABOOM", taskFailedException.FailureDetails.ErrorMessage); + Assert.Contains(nameof(MethodThatThrowsException), taskFailedException.FailureDetails.StackTrace); + } + else + { + Assert.Equal("KABOOOOOM", taskFailedException.FailureDetails.ErrorMessage); + Assert.Contains(nameof(MethodThatThrowsNestedException), taskFailedException.FailureDetails.StackTrace); + + Assert.NotNull(taskFailedException.FailureDetails.InnerFailure); + Assert.Equal("KABOOM", taskFailedException.FailureDetails.InnerFailure!.ErrorMessage); + Assert.Contains(nameof(MethodThatThrowsException), taskFailedException.FailureDetails.InnerFailure.StackTrace); + } + } + catch (Exception e) + { + throw new Exception($"wrong exception thrown", e); + } + + return "ok"; + } +} diff --git a/test/IsolatedEntities/Tests/CleanOrphanedLock.cs b/test/IsolatedEntities/Tests/CleanOrphanedLock.cs new file mode 100644 index 000000000..8050dc47d --- /dev/null +++ b/test/IsolatedEntities/Tests/CleanOrphanedLock.cs @@ -0,0 +1,133 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; +using DurableTask.Core.Entities.OperationFormat; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +class CleanOrphanedLock : Test +{ + + public override async Task RunAsync(TestContext context) + { + // clean the storage before starting the test so we start from a clean slate + await context.Client.Entities.CleanEntityStorageAsync(); + + DateTime startTime = DateTime.UtcNow; + + // construct unique names for this test + string prefix = Guid.NewGuid().ToString("N").Substring(0, 6); + var orphanedEntityId = new EntityInstanceId(nameof(Counter), $"{prefix}-orphaned"); + var orchestrationA = $"{prefix}-A"; + var orchestrationB = $"{prefix}-B"; + + // start an orchestration A that acquires the lock and then waits forever + await context.Client.ScheduleNewOrchestrationInstanceAsync( + nameof(BadLockerOrchestration), + orphanedEntityId, + new StartOrchestrationOptions() { InstanceId = orchestrationA }, + context.CancellationToken); + await context.Client.WaitForInstanceStartAsync(orchestrationA, context.CancellationToken); + + // start an orchestration B that queues behind A for the lock (and thus gets stuck) + await context.Client.ScheduleNewOrchestrationInstanceAsync( + nameof(UnluckyWaiterOrchestration), + orphanedEntityId, + new StartOrchestrationOptions() { InstanceId = orchestrationB }, + context.CancellationToken); + await context.Client.WaitForInstanceStartAsync(orchestrationB, context.CancellationToken); + + // brutally and unsafely purge the running orchestrationA from storage, leaving the lock orphaned + await context.Client.PurgeInstanceAsync(orchestrationA); + + // check the status of the entity to confirm that the lock is held + List results = await context.Client.Entities.GetAllEntitiesAsync( + new Microsoft.DurableTask.Client.Entities.EntityQuery() + { + InstanceIdStartsWith = orphanedEntityId.ToString(), + IncludeTransient = true, + IncludeState = true, + }).ToListAsync(); + Assert.Equal(1, results.Count); + Assert.Equal(orphanedEntityId, results[0].Id); + Assert.False(results[0].IncludesState); + Assert.True(results[0].LastModifiedTime > startTime); + Assert.Equal(orchestrationA, results[0].LockedBy); + Assert.Equal(1, results[0].BacklogQueueSize); // that's the request that is waiting for the lock + DateTimeOffset lastModified = results[0].LastModifiedTime; + + // clean the entity storage to remove the orphaned lock + var cleaningResponse = await context.Client.Entities.CleanEntityStorageAsync(); + Assert.Equal(0, cleaningResponse.EmptyEntitiesRemoved); + Assert.Equal(1, cleaningResponse.OrphanedLocksReleased); + + // now wait for orchestration B to finish + OrchestrationMetadata metadata = await context.Client.WaitForInstanceCompletionAsync(orchestrationB, getInputsAndOutputs: true, context.CancellationToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + + // clean the entity storage again, this time there should be nothing left to clean + cleaningResponse = await context.Client.Entities.CleanEntityStorageAsync(); + Assert.Equal(0, cleaningResponse.EmptyEntitiesRemoved); + Assert.Equal(0, cleaningResponse.OrphanedLocksReleased); + + // check the status of the entity to confirm that the lock is no longer held + results = await context.Client.Entities.GetAllEntitiesAsync( + new Microsoft.DurableTask.Client.Entities.EntityQuery() + { + InstanceIdStartsWith = orphanedEntityId.ToString(), + IncludeTransient = true, + IncludeState = true, + }).ToListAsync(); + Assert.Equal(1, results.Count); + Assert.Equal(orphanedEntityId, results[0].Id); + Assert.True(results[0].IncludesState); + Assert.Equal(1, results[0].State.ReadAs()); + Assert.True(results[0].LastModifiedTime > lastModified); + Assert.Null(results[0].LockedBy); + Assert.Equal(0, results[0].BacklogQueueSize); + } + + [Function(nameof(BadLockerOrchestration))] + public static async Task BadLockerOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + // read entity id from input + var entityId = context.GetInput(); + + await using (await context.Entities.LockEntitiesAsync(entityId)) + { + await context.CreateTimer(DateTime.UtcNow + TimeSpan.FromDays(365), CancellationToken.None); + } + + // will never reach the end here because we get purged in the middle + return "ok"; + } + + [Function(nameof(UnluckyWaiterOrchestration))] + public static async Task UnluckyWaiterOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + // read entity id from input + var entityId = context.GetInput(); + + await using (await context.Entities.LockEntitiesAsync(entityId)) + { + await context.Entities.CallEntityAsync(entityId, "increment"); + + // we got the entity + return "ok"; + } + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/Tests/EntityQueries1.cs b/test/IsolatedEntities/Tests/EntityQueries1.cs new file mode 100644 index 000000000..241f833bf --- /dev/null +++ b/test/IsolatedEntities/Tests/EntityQueries1.cs @@ -0,0 +1,246 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class EntityQueries1 : Test +{ + public override async Task RunAsync(TestContext context) + { + // ----- first, delete all already-existing entities in storage to ensure queries have predictable results + context.Logger.LogInformation("deleting existing entities"); + + // we simply delete all the running instances + await context.Client.PurgeAllInstancesAsync( + new PurgeInstancesFilter() + { + CreatedFrom = DateTime.MinValue, + Statuses = new OrchestrationRuntimeStatus[] { OrchestrationRuntimeStatus.Running } + }, + context.CancellationToken); + + var yesterday = DateTime.UtcNow.Subtract(TimeSpan.FromDays(1)); + var tomorrow = DateTime.UtcNow.Add(TimeSpan.FromDays(1)); + + // check that a blank entity query returns no elements now + + var e = context.Client.Entities.GetAllEntitiesAsync(new EntityQuery()).GetAsyncEnumerator(); + Assert.False(await e.MoveNextAsync()); + + // ----- next, run a number of orchestrations in order to create specific instances + context.Logger.LogInformation("creating entities"); + + List entityIds = new List() + { + new EntityInstanceId("StringStore", "foo"), + new EntityInstanceId("StringStore", "bar"), + new EntityInstanceId("StringStore", "baz"), + new EntityInstanceId("StringStore2", "foo"), + }; + + await Parallel.ForEachAsync( + Enumerable.Range(0, entityIds.Count), + context.CancellationToken, + async (int i, CancellationToken cancellation) => + { + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(SignalAndCall.SignalAndCallOrchestration), entityIds[i]); + await context.Client.WaitForInstanceCompletionAsync(instanceId, cancellation); + }); + + await Task.Delay(TimeSpan.FromSeconds(3)); // accounts for delay in updating instance tables + + // ----- to more easily read this, we first create a collection of (query, validation function) pairs + context.Logger.LogInformation("starting query tests"); + + var tests = new (EntityQuery query, Action> test)[] + { + (new EntityQuery + { + InstanceIdStartsWith = "StringStore", + }, + result => + { + Assert.Equal(4, result.Count()); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@StringStore", + }, + result => + { + Assert.Equal(4, result.Count()); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@stringstore", + }, + result => + { + Assert.Equal(4, result.Count()); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@StringStore@", + }, + result => + { + Assert.Equal(3, result.Count()); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "StringStore@", + }, + result => + { + Assert.Equal(3, result.Count()); + }), + + + (new EntityQuery + { + InstanceIdStartsWith = "@StringStore@foo", + }, + result => + { + Assert.Equal(1, result.Count); + Assert.True(result[0].IncludesState); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@StringStore@foo", + IncludeState = false, + }, + result => + { + Assert.Equal(1, result.Count); + Assert.False(result[0].IncludesState); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@StringStore@", + LastModifiedFrom = yesterday, + LastModifiedTo = tomorrow, + }, + result => + { + Assert.Equal(3, result.Count); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "StringStore@ba", + LastModifiedFrom = yesterday, + LastModifiedTo = tomorrow, + }, + result => + { + Assert.Equal(2, result.Count); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "stringstore@BA", + LastModifiedFrom = yesterday, + LastModifiedTo = tomorrow, + }, + result => + { + Assert.Equal(0, result.Count); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@StringStore@ba", + LastModifiedFrom = yesterday, + LastModifiedTo = tomorrow, + }, + result => + { + Assert.Equal(2, result.Count); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@stringstore@BA", + LastModifiedFrom = yesterday, + LastModifiedTo = tomorrow, + }, + result => + { + Assert.Equal(0, result.Count); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@StringStore@", + PageSize = 2, + }, + result => + { + Assert.Equal(3, result.Count()); + }), + + (new EntityQuery + { + InstanceIdStartsWith = "@noResult", + LastModifiedFrom = yesterday, + LastModifiedTo = tomorrow, + }, + result => + { + Assert.Equal(0, result.Count()); + }), + + (new EntityQuery + { + LastModifiedFrom = tomorrow, + }, + result => + { + Assert.Equal(0, result.Count()); + }), + + (new EntityQuery + { + LastModifiedTo = yesterday, + }, + result => + { + Assert.Equal(0, result.Count()); + }), + + }; + + foreach (var item in tests) + { + List results = new List(); + await foreach (var element in context.Client.Entities.GetAllEntitiesAsync(item.query)) + { + results.Add(element); + } + + item.test(results); + } + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/Tests/EntityQueries2.cs b/test/IsolatedEntities/Tests/EntityQueries2.cs new file mode 100644 index 000000000..954158333 --- /dev/null +++ b/test/IsolatedEntities/Tests/EntityQueries2.cs @@ -0,0 +1,146 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class EntityQueries2 : Test +{ + public override async Task RunAsync(TestContext context) + { + // ----- first, delete all already-existing entities in storage to ensure queries have predictable results + context.Logger.LogInformation("deleting existing entities"); + + // we simply delete all the running instances which does also purge all entities + await context.Client.PurgeAllInstancesAsync( + new PurgeInstancesFilter() + { + CreatedFrom = DateTime.MinValue, + Statuses = new OrchestrationRuntimeStatus[] { OrchestrationRuntimeStatus.Running } + }, + context.CancellationToken); + + var yesterday = DateTime.UtcNow.Subtract(TimeSpan.FromDays(1)); + var tomorrow = DateTime.UtcNow.Add(TimeSpan.FromDays(1)); + + // check that everything is completely blank, there are no entities, not even stateless ones + + var e = context.Client.Entities.GetAllEntitiesAsync(new EntityQuery() { IncludeTransient = true }).GetAsyncEnumerator(); + Assert.False(await e.MoveNextAsync()); + + // ----- next, run a number of orchestrations in order to create and/or delete specific instances + context.Logger.LogInformation("creating and deleting entities"); + + List orchestrations = new List() + { + nameof(SignalAndCall.SignalAndCallOrchestration), + nameof(CallAndDelete.CallAndDeleteOrchestration), + nameof(SignalAndCall.SignalAndCallOrchestration), + nameof(CallAndDelete.CallAndDeleteOrchestration), + nameof(SignalAndCall.SignalAndCallOrchestration), + nameof(CallAndDelete.CallAndDeleteOrchestration), + nameof(SignalAndCall.SignalAndCallOrchestration), + nameof(CallAndDelete.CallAndDeleteOrchestration), + }; + + List entityIds = new List() + { + new EntityInstanceId("StringStore", "foo"), + new EntityInstanceId("StringStore2", "bar"), + new EntityInstanceId("StringStore2", "baz"), + new EntityInstanceId("StringStore2", "foo"), + new EntityInstanceId("StringStore2", "ffo"), + new EntityInstanceId("StringStore2", "zzz"), + new EntityInstanceId("StringStore2", "aaa"), + new EntityInstanceId("StringStore2", "bbb"), + }; + + await Parallel.ForEachAsync( + Enumerable.Range(0, entityIds.Count), + context.CancellationToken, + async (int i, CancellationToken cancellation) => + { + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(orchestrations[i], entityIds[i]); + await context.Client.WaitForInstanceCompletionAsync(instanceId, cancellation); + }); + + await Task.Delay(TimeSpan.FromSeconds(3)); // accounts for delay in updating instance tables + + // ----- use a collection of (query, validation function) pairs + context.Logger.LogInformation("starting query tests"); + + var tests = new (EntityQuery query, Action> test)[] + { + (new EntityQuery + { + }, + result => + { + Assert.Equal(4, result.Count()); + }), + + (new EntityQuery + { + IncludeTransient = true, + }, + result => + { + Assert.Equal(context.BackendSupportsImplicitEntityDeletion ? 4 : 8, result.Count()); + }), + + + (new EntityQuery + { + PageSize = 3, + }, + result => + { + Assert.Equal(4, result.Count()); + }), + + (new EntityQuery + { + IncludeTransient = true, + PageSize = 3, + }, + result => + { + Assert.Equal(context.BackendSupportsImplicitEntityDeletion ? 4 : 8, result.Count()); // TODO this is provider-specific + }), + }; + + foreach (var item in tests) + { + List results = new List(); + await foreach (var element in context.Client.Entities.GetAllEntitiesAsync(item.query)) + { + results.Add(element); + } + + item.test(results); + } + + // ----- remove the 4 deleted entities whose metadata still lingers in Azure Storage provider + + context.Logger.LogInformation("starting storage cleaning"); + + var cleaningResponse = await context.Client.Entities.CleanEntityStorageAsync(); + + Assert.Equal(context.BackendSupportsImplicitEntityDeletion ? 0 : 4, cleaningResponse.EmptyEntitiesRemoved); + Assert.Equal(0, cleaningResponse.OrphanedLocksReleased); + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/Tests/FaultyCriticalSection.cs b/test/IsolatedEntities/Tests/FaultyCriticalSection.cs new file mode 100644 index 000000000..493d76e11 --- /dev/null +++ b/test/IsolatedEntities/Tests/FaultyCriticalSection.cs @@ -0,0 +1,74 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class FaultyCriticalSection : Test +{ + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(nameof(Counter), Guid.NewGuid().ToString()); + string orchestrationName = nameof(FaultyCriticalSectionOrchestration); + + // run the critical section but fail in the middle + { + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, new FaultyCriticalSectionOrchestration.Input(entityId, true)); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.True(metadata.SerializedOutput!.Contains("KABOOM")); + } + + // run the critical section again without failing this time - this will time out if lock was not released properly. + { + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, new FaultyCriticalSectionOrchestration.Input(entityId, false)); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + } + } +} + +class FaultyCriticalSectionOrchestration +{ + readonly ILogger logger; + + public record Input(EntityInstanceId EntityInstanceId, bool Fail); + + public FaultyCriticalSectionOrchestration(ILogger logger) + { + this.logger = logger; + } + + [Function(nameof(FaultyCriticalSectionOrchestration))] + public async Task RunAsync([OrchestrationTrigger] TaskOrchestrationContext context) + { + // read input + var input = context.GetInput()!; + + await using (await context.Entities.LockEntitiesAsync(input.EntityInstanceId)) + { + if (input.Fail) + { + throw new Exception("KABOOM"); + } + } + + return "ok"; + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/Tests/FireAndForget.cs b/test/IsolatedEntities/Tests/FireAndForget.cs new file mode 100644 index 000000000..a54d29d33 --- /dev/null +++ b/test/IsolatedEntities/Tests/FireAndForget.cs @@ -0,0 +1,90 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +/// +/// Scenario that starts a new orchestration from an entity. +/// +class FireAndForget : Test +{ + private readonly int? delay; + + public FireAndForget(int? delay) + { + this.delay = delay; + } + + public override string Name => $"{base.Name}.{(this.delay.HasValue ? "Delay" + this.delay.Value.ToString() : "NoDelay")}"; + + public override async Task RunAsync(TestContext context) + { + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(LaunchOrchestrationFromEntity), this.delay, context.CancellationToken); + OrchestrationMetadata metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, context.CancellationToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + string? launchedId = metadata.ReadOutputAs(); + Assert.NotNull(launchedId); + var launchedMetadata = await context.Client.GetInstanceAsync(launchedId!, getInputsAndOutputs: true, context.CancellationToken); + Assert.NotNull(launchedMetadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, launchedMetadata!.RuntimeStatus); + Assert.Equal("ok", launchedMetadata!.ReadOutputAs()); + } + + [Function(nameof(LaunchOrchestrationFromEntity))] + public static async Task LaunchOrchestrationFromEntity([OrchestrationTrigger] TaskOrchestrationContext context) + { + int? delay = context.GetInput(); + + var entityId = new EntityInstanceId("Launcher", context.NewGuid().ToString().Substring(0, 8)); + + if (delay.HasValue) + { + await context.Entities.CallEntityAsync(entityId, "launch", context.CurrentUtcDateTime + TimeSpan.FromSeconds(delay.Value)); + } + else + { + await context.Entities.CallEntityAsync(entityId, "launch"); + } + + while (true) + { + string? launchedOrchestrationId = await context.Entities.CallEntityAsync(entityId, "get"); + + if (launchedOrchestrationId != null) + { + return launchedOrchestrationId; + } + + await context.CreateTimer(DateTime.UtcNow + TimeSpan.FromSeconds(1), CancellationToken.None); + } + } + + [Function(nameof(SignallingOrchestration))] + public static async Task SignallingOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + var entityId = context.GetInput(); + + await context.CreateTimer(DateTime.UtcNow + TimeSpan.FromSeconds(.2), CancellationToken.None); + + await context.Entities.SignalEntityAsync(entityId, "done"); + + return "ok"; + } +} diff --git a/test/IsolatedEntities/Tests/InvalidEntityId.cs b/test/IsolatedEntities/Tests/InvalidEntityId.cs new file mode 100644 index 000000000..fbf62c346 --- /dev/null +++ b/test/IsolatedEntities/Tests/InvalidEntityId.cs @@ -0,0 +1,81 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +/// +/// This test is not entity related, but discovered an issue with how failures in orchestrators are captured. +/// +class InvalidEntityId : Test +{ + public enum Location + { + ClientSignal, + ClientGet, + OrchestrationSignal, + OrchestrationCall, + } + + readonly Location location; + + public InvalidEntityId(Location location) + { + this.location = location; + } + + public override string Name => $"{base.Name}.{this.location}"; + + public override async Task RunAsync(TestContext context) + { + switch (this.location) + { + case Location.ClientSignal: + await Assert.ThrowsAsync( + typeof(ArgumentNullException), + async () => + { + await context.Client.Entities.SignalEntityAsync(default, "add", 1); + }); + return; + + case Location.ClientGet: + await Assert.ThrowsAsync( + typeof(ArgumentNullException), + async () => + { + await context.Client.Entities.GetEntityAsync(default); + }); + return; + + case Location.OrchestrationSignal: + { + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(SignalAndCall.SignalAndCallOrchestration) /* missing input */); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + //Assert.NotNull(metadata.FailureDetails); // TODO currently failing because FailureDetails are not propagated for some reason + } + break; + + case Location.OrchestrationCall: + { + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(CallCounter.CallCounterOrchestration) /* missing input */); + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + //Assert.NotNull(metadata.FailureDetails); // TODO currently failing because FailureDetails are not propagated for some reason + } + break; + } + } +} diff --git a/test/IsolatedEntities/Tests/LargeEntity.cs b/test/IsolatedEntities/Tests/LargeEntity.cs new file mode 100644 index 000000000..e1af2412a --- /dev/null +++ b/test/IsolatedEntities/Tests/LargeEntity.cs @@ -0,0 +1,85 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities.Tests +{ + /// + /// validates a simple entity scenario where an entity's state is + /// larger than what fits into Azure table rows. + /// + internal class LargeEntity : Test + { + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(nameof(StringStore2), Guid.NewGuid().ToString().Substring(0, 8)); + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(LargeEntityOrchestration), entityId); + + // wait for completion of the orchestration + { + var metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, true); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + } + + // read untyped without including state + { + EntityMetadata? metadata = await context.Client.Entities.GetEntityAsync(entityId, includeState: false, context.CancellationToken); + Assert.NotNull(metadata); + Assert.Throws(() => metadata!.State); + } + + // read untyped including state + { + EntityMetadata? metadata = await context.Client.Entities.GetEntityAsync(entityId, includeState:true, context.CancellationToken); + Assert.NotNull(metadata); + Assert.NotNull(metadata!.State); + Assert.Equal(100000, metadata!.State.ReadAs().Length); + } + + // read typed without including state + { + EntityMetadata? metadata = await context.Client.Entities.GetEntityAsync(entityId, includeState: false, context.CancellationToken); + Assert.NotNull(metadata); + Assert.Throws(() => metadata!.State); + } + + // read typed including state + { + EntityMetadata? metadata = await context.Client.Entities.GetEntityAsync(entityId, includeState: true, context.CancellationToken); + Assert.NotNull(metadata); + Assert.NotNull(metadata!.State); + Assert.Equal(100000, metadata!.State.Length); + } + } + + [Function(nameof(LargeEntityOrchestration))] + public static async Task LargeEntityOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + var entityId = context.GetInput(); + string content = new string('.', 100000); + + await context.Entities.CallEntityAsync(entityId, "set", content); + + var result = await context.Entities.CallEntityAsync(entityId, "get"); + + if (result != content) + { + return $"fail: wrong entity state"; + } + + return "ok"; + } + } +} diff --git a/test/IsolatedEntities/Tests/MultipleLockedTransfers.cs b/test/IsolatedEntities/Tests/MultipleLockedTransfers.cs new file mode 100644 index 000000000..ed7f559c6 --- /dev/null +++ b/test/IsolatedEntities/Tests/MultipleLockedTransfers.cs @@ -0,0 +1,85 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +class MultipleLockedTransfers : Test +{ + readonly int numberEntities; + + public MultipleLockedTransfers(int numberEntities) + { + this.numberEntities = numberEntities; + } + + public override string Name => $"{base.Name}.{this.numberEntities}"; + + public override async Task RunAsync(TestContext context) + { + // create specified number of counters + var counters = new EntityInstanceId[this.numberEntities]; + for (int i = 0; i < this.numberEntities; i++) + { + counters[i] = new EntityInstanceId(nameof(Counter), Guid.NewGuid().ToString().Substring(0, 8)); + } + + // in parallel, start one transfer per counter, each decrementing a counter and incrementing + // its successor (where the last one wraps around to the first) + // This is a pattern that would deadlock if we didn't order the lock acquisition. + var instances = new Task[this.numberEntities]; + for (int i = 0; i < this.numberEntities; i++) + { + instances[i] = context.Client.ScheduleNewOrchestrationInstanceAsync( + nameof(SingleLockedTransfer.LockedTransferOrchestration), + new[] { counters[i], counters[(i + 1) % this.numberEntities] }, + context.CancellationToken); + } + await Task.WhenAll(instances); + + + // in parallel, wait for all transfers to complete + var metadata = new Task[this.numberEntities]; + for (int i = 0; i < this.numberEntities; i++) + { + metadata[i] = context.Client.WaitForInstanceCompletionAsync(instances[i].Result, getInputsAndOutputs: true, context.CancellationToken); + } + await Task.WhenAll(metadata); + + // check that they all completed + for (int i = 0; i < this.numberEntities; i++) + { + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata[i].Result.RuntimeStatus); + } + + // in parallel, read all the entity states + var entityMetadata = new Task?>[this.numberEntities]; + for (int i = 0; i < this.numberEntities; i++) + { + entityMetadata[i] = context.Client.Entities.GetEntityAsync(counters[i], includeState: true, context.CancellationToken); + } + await Task.WhenAll(entityMetadata); + + // check that the counter states are all back to 0 + // (since each participated in 2 transfers, one incrementing and one decrementing) + for (int i = 0; i < numberEntities; i++) + { + EntityMetadata? response = entityMetadata[i].Result; + Assert.NotNull(response); + Assert.Equal(0, response!.State); + } + } +} diff --git a/test/IsolatedEntities/Tests/SelfScheduling.cs b/test/IsolatedEntities/Tests/SelfScheduling.cs new file mode 100644 index 000000000..394d4aee2 --- /dev/null +++ b/test/IsolatedEntities/Tests/SelfScheduling.cs @@ -0,0 +1,36 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace IsolatedEntities; + +class SelfScheduling : Test +{ + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(nameof(SelfSchedulingEntity), Guid.NewGuid().ToString().Substring(0,8)); + + await context.Client.Entities.SignalEntityAsync(entityId, "start"); + + var result = await context.WaitForEntityStateAsync( + entityId, + timeout: default, + entityState => entityState.Value.Length == 4 ? null : "expect 4 letters"); + + Assert.NotNull(result); + Assert.Equal("ABCD", result.Value); + } +} diff --git a/test/IsolatedEntities/Tests/SetAndGet.cs b/test/IsolatedEntities/Tests/SetAndGet.cs new file mode 100644 index 000000000..93b501228 --- /dev/null +++ b/test/IsolatedEntities/Tests/SetAndGet.cs @@ -0,0 +1,47 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +class SetAndGet : Test +{ + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(nameof(Counter), Guid.NewGuid().ToString()); + + // entity should not yet exist + EntityMetadata? result = await context.Client.Entities.GetEntityAsync(entityId); + Assert.Null(result); + + // entity should still not exist + result = await context.Client.Entities.GetEntityAsync(entityId, includeState:true); + Assert.Null(result); + + // send one signal + await context.Client.Entities.SignalEntityAsync(entityId, "Set", 1); + + // wait for state + int state = await context.WaitForEntityStateAsync(entityId); + Assert.Equal(1, state); + + // if we query the entity state again it should still be the same + result = await context.Client.Entities.GetEntityAsync(entityId); + + Assert.NotNull(result); + Assert.Equal(1,result!.State); + } +} diff --git a/test/IsolatedEntities/Tests/SignalAndCall.cs b/test/IsolatedEntities/Tests/SignalAndCall.cs new file mode 100644 index 000000000..29893300a --- /dev/null +++ b/test/IsolatedEntities/Tests/SignalAndCall.cs @@ -0,0 +1,68 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +class SignalAndCall : Test +{ + readonly Type entityType; + + public SignalAndCall(Type entityType) + { + this.entityType = entityType; + } + + public override string Name => $"{base.Name}.{entityType.Name}"; + + public override async Task RunAsync(TestContext context) + { + var entityId = new EntityInstanceId(this.entityType.Name, Guid.NewGuid().ToString().Substring(0, 8)); + + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(SignalAndCallOrchestration), entityId, context.CancellationToken); + OrchestrationMetadata metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs:true, context.CancellationToken); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + } + + [Function(nameof(SignalAndCallOrchestration))] + public static async Task SignalAndCallOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + // read entity id from input + var entity = context.GetInput(); + + // signal and call (both of these will be delivered close together, typically in the same batch, and always in order) + await context.Entities.SignalEntityAsync(entity, "set", "333"); + + string? result = await context.Entities.CallEntityAsync(entity, "get"); + + if (result != "333") + { + return $"fail: wrong entity state: expected 333, got {result}"; + } + + // make another call to see if the state survives replay + result = await context.Entities.CallEntityAsync(entity, "get"); + + if (result != "333") + { + return $"fail: wrong entity state: expected 333 still, but got {result}"; + } + + return "ok"; + } +} diff --git a/test/IsolatedEntities/Tests/SignalThenPoll.cs b/test/IsolatedEntities/Tests/SignalThenPoll.cs new file mode 100644 index 000000000..2e46aaf65 --- /dev/null +++ b/test/IsolatedEntities/Tests/SignalThenPoll.cs @@ -0,0 +1,103 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +class SignalThenPoll : Test +{ + private readonly bool direct; + private readonly bool delayed; + + public SignalThenPoll(bool direct, bool delayed) + { + this.direct = direct; + this.delayed = delayed; + } + + public override string Name => $"{base.Name}.{(this.direct ? "Direct" : "Indirect")}.{(this.delayed ? "Delayed" : "Immediately")}"; + + public override async Task RunAsync(TestContext context) + { + var counterEntityId = new EntityInstanceId(nameof(Counter), Guid.NewGuid().ToString().Substring(0,8)); + var relayEntityId = new EntityInstanceId("Relay", ""); + + string pollingInstance = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(PollingOrchestration), counterEntityId, context.CancellationToken); + DateTimeOffset? scheduledTime = this.delayed ? DateTime.UtcNow + TimeSpan.FromSeconds(5) : null; + + if (this.direct) + { + await context.Client.Entities.SignalEntityAsync( + counterEntityId, + "set", + 333, + new SignalEntityOptions() { SignalTime = scheduledTime }, + context.CancellationToken); + } + else + { + await context.Client.Entities.SignalEntityAsync( + relayEntityId, + operationName: "", + input: new Relay.Input(counterEntityId, "set", 333, scheduledTime), + options: null, + context.CancellationToken); + } + + var metadata = await context.Client.WaitForInstanceCompletionAsync(pollingInstance, true, context.CancellationToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("ok", metadata.ReadOutputAs()); + + if (this.delayed) + { + Assert.True(metadata.LastUpdatedAt > scheduledTime - TimeSpan.FromMilliseconds(100)); + } + + int counterState = await context.WaitForEntityStateAsync( + counterEntityId, + timeout: default); + + Assert.Equal(333, counterState); + } + + [Function(nameof(PollingOrchestration))] + public static async Task PollingOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + // read entity id from input + var entityId = context.GetInput(); + DateTime startTime = context.CurrentUtcDateTime; + + while (context.CurrentUtcDateTime < startTime + TimeSpan.FromSeconds(30)) + { + var result = await context.Entities.CallEntityAsync(entityId, "get"); + + if (result != 0) + { + if (result == 333) + { + return "ok"; + } + else + { + return $"fail: wrong entity state: expected 333, got {result}"; + } + } + + await context.CreateTimer(DateTime.UtcNow + TimeSpan.FromSeconds(1), CancellationToken.None); + } + + return "timed out while waiting for entity to have state"; + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/Tests/SingleLockedTransfer.cs b/test/IsolatedEntities/Tests/SingleLockedTransfer.cs new file mode 100644 index 000000000..1ff43cc91 --- /dev/null +++ b/test/IsolatedEntities/Tests/SingleLockedTransfer.cs @@ -0,0 +1,104 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Mime; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Xunit; + +namespace IsolatedEntities; + +class SingleLockedTransfer : Test +{ + public override async Task RunAsync(TestContext context) + { + var counter1 = new EntityInstanceId("Counter", Guid.NewGuid().ToString().Substring(0, 8)); + var counter2 = new EntityInstanceId("Counter", Guid.NewGuid().ToString().Substring(0, 8)); + + string instanceId = await context.Client.ScheduleNewOrchestrationInstanceAsync(nameof(LockedTransferOrchestration), new[] { counter1, counter2 }, context.CancellationToken); + OrchestrationMetadata metadata = await context.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs:true, context.CancellationToken); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(new[] { -1, 1 }, metadata.ReadOutputAs()); + + // validate the state of the counters + EntityMetadata? response1 = await context.Client.Entities.GetEntityAsync(counter1, true, context.CancellationToken); + EntityMetadata? response2 = await context.Client.Entities.GetEntityAsync(counter2, true, context.CancellationToken); + Assert.NotNull(response1); + Assert.NotNull(response2); + Assert.Equal(-1, response1!.State); + Assert.Equal(1, response2!.State); + } + + [Function(nameof(LockedTransferOrchestration))] + public static async Task LockedTransferOrchestration([OrchestrationTrigger] TaskOrchestrationContext context) + { + // read entity id from input + var entities = context.GetInput(); + var from = entities![0]; + var to = entities![1]; + + if (from.Equals(to)) + { + throw new ArgumentException("from and to must be distinct"); + } + + ExpectSynchState(false); + + int fromBalance; + int toBalance; + + await using (await context.Entities.LockEntitiesAsync(from, to)) + { + ExpectSynchState(true, from, to); + + // read balances in parallel + var t1 = context.Entities.CallEntityAsync(from, "get"); + ExpectSynchState(true, to); + var t2 = context.Entities.CallEntityAsync(to, "get"); + ExpectSynchState(true); + + + fromBalance = await t1; + toBalance = await t2; + ExpectSynchState(true, from, to); + + // modify + fromBalance--; + toBalance++; + + // write balances in parallel + var t3 = context.Entities.CallEntityAsync(from, "set", fromBalance); + ExpectSynchState(true, to); + var t4 = context.Entities.CallEntityAsync(to, "set", toBalance); + ExpectSynchState(true); + await t4; + await t3; + ExpectSynchState(true, to, from); + + } // lock is released here + + ExpectSynchState(false); + + return new int[] { fromBalance, toBalance }; + + void ExpectSynchState(bool inCriticalSection, params EntityInstanceId[] ids) + { + Assert.Equal(inCriticalSection, context.Entities.InCriticalSection(out var currentLocks)); + if (inCriticalSection) + { + Assert.Equal( + ids.Select(i => i.ToString()).OrderBy(s => s), + currentLocks!.Select(i => i.ToString()).OrderBy(s => s)); + } + } + } +} diff --git a/test/IsolatedEntities/host.json b/test/IsolatedEntities/host.json new file mode 100644 index 000000000..cb3864033 --- /dev/null +++ b/test/IsolatedEntities/host.json @@ -0,0 +1,16 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensions": { + "durableTask": { + "entityMessageReorderWindowInMinutes": 0 // need this just for testing the CleanEntityStorage + } + } +} \ No newline at end of file diff --git a/test/IsolatedEntities/local.settings.json b/test/IsolatedEntities/local.settings.json new file mode 100644 index 000000000..88e9efa1e --- /dev/null +++ b/test/IsolatedEntities/local.settings.json @@ -0,0 +1,7 @@ +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated" + } +} \ No newline at end of file