Skip to content

Commit

Permalink
Process events for non-running orchestrations (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum authored May 31, 2022
1 parent c938483 commit 891d3ee
Show file tree
Hide file tree
Showing 20 changed files with 551 additions and 146 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Changelog

## v1.0.0
## v1.0.0-rc2

### New

Expand All @@ -12,7 +12,9 @@
* Removed unnecessary .NET Standard 2.1 target ([#82](https://github.com/microsoft/durabletask-mssql/pull/82))
* Fixed problem terminating orchestration with running activity ([#83](https://github.com/microsoft/durabletask-mssql/pull/83))
* Fixed payload data leak for completed activities (same PR as above)
* Fixed NewEvents leak for completed or continued-as-new instances ([#97](https://github.com/microsoft/durabletask-mssql/pull/97))
* Activity payload IDs are now consistently saved to the history table ([#90](https://github.com/microsoft/durabletask-mssql/issues/90))
* Remove Microsoft.SqlServer.SqlManagementObjects dependency ([#92](https://github.com/microsoft/durabletask-mssql/pull/92)) - contributed by [@IGx89](https://github.com/IGx89)

### Breaking changes

Expand Down
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ steps:
vsVersion: 'latest'
logFileVerbosity: minimal
configuration: Release
msbuildArgs: /p:GITHUB_RUN_NUMBER=$(Build.BuildId) /p:ContinuousIntegrationBuild=true
msbuildArgs: /p:FileVersionRevision=$(Build.BuildId) /p:ContinuousIntegrationBuild=true

# Authenticode sign all the DLLs with the Microsoft certificate.
# This appears to be an in-place signing job, which is convenient.
Expand Down
16 changes: 16 additions & 0 deletions src/DurableTask.SqlServer/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ public void CreatedDatabase(string databaseName)
this.WriteLog(logEvent);
}

public void DiscardingEvent(string instanceId, string eventType, int taskEventId, string details)
{
var logEvent = new LogEvents.DiscardingEventEvent(
instanceId,
eventType,
taskEventId,
details);
this.WriteLog(logEvent);
}

public void GenericInfoEvent(string details, string? instanceId)
{
var logEvent = new LogEvents.GenericInfo(details, instanceId);
this.WriteLog(logEvent);
}

void WriteLog(ILogEvent logEvent)
{
// LogDurableEvent is an extension method defined in DurableTask.Core
Expand Down
34 changes: 33 additions & 1 deletion src/DurableTask.SqlServer/Logging/DefaultEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ internal void PurgedInstances(
}

[Event(EventIds.CommandCompleted, Level = EventLevel.Verbose)]
public void CommandCompleted(
internal void CommandCompleted(
string? InstanceId,
string CommandText,
long LatencyMs,
Expand Down Expand Up @@ -266,5 +266,37 @@ internal void CreatedDatabase(
AppName,
ExtensionVersion);
}

[Event(EventIds.DiscardingEvent, Level = EventLevel.Warning, Version = 1)]
internal void DiscardingEvent(
string InstanceId,
string EventType,
int TaskEventId,
string Details,
string AppName,
string ExtensionVersion)
{
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.DiscardingEvent,
InstanceId,
EventType,
TaskEventId,
Details,
AppName,
ExtensionVersion);
}

[Event(EventIds.GenericInfo, Level = EventLevel.Informational, Version = 1)]
internal void GenericInfo(string Details, string InstanceId, string AppName, string ExtensionVersion)
{
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.GenericInfo,
InstanceId,
Details,
AppName,
ExtensionVersion);
}
}
}
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/Logging/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ static class EventIds
public const int PurgedInstances = 310;
public const int CommandCompleted = 311;
public const int CreatedDatabase = 312;
public const int DiscardingEvent = 313;
public const int GenericInfo = 314;
}
}
73 changes: 73 additions & 0 deletions src/DurableTask.SqlServer/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -481,5 +481,78 @@ void IEventSourceEvent.WriteEventSource() =>
DTUtils.AppName,
DTUtils.ExtensionVersionString);
}

internal class DiscardingEventEvent : StructuredLogEvent, IEventSourceEvent
{
public DiscardingEventEvent(string instanceId, string eventType, int taskEventId, string details)
{
this.InstanceId = instanceId;
this.EventType = eventType;
this.TaskEventId = taskEventId;
this.Details = details;
}

[StructuredLogField]
public string InstanceId { get; }

[StructuredLogField]
public string EventType { get; }

[StructuredLogField]
public int TaskEventId { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.DiscardingEvent,
nameof(EventIds.DiscardingEvent));

public override LogLevel Level => LogLevel.Warning;

protected override string CreateLogMessage() =>
$"{this.InstanceId}: Discarding {GetEventDescription(this.EventType, this.TaskEventId)}: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
DefaultEventSource.Log.DiscardingEvent(
this.InstanceId,
this.EventType,
this.TaskEventId,
this.Details,
DTUtils.AppName,
DTUtils.ExtensionVersionString);
}

internal class GenericInfo : StructuredLogEvent, IEventSourceEvent
{
public GenericInfo(string details, string? instanceId)
{
this.Details = details;
this.InstanceId = instanceId;
}

[StructuredLogField]
public string Details { get; }

[StructuredLogField]
public string? InstanceId { get; }

public override EventId EventId => new EventId(
EventIds.GenericInfo,
nameof(EventIds.GenericInfo));

public override LogLevel Level => LogLevel.Information;

protected override string CreateLogMessage() => string.IsNullOrEmpty(this.InstanceId) ?
this.Details :
$"{this.InstanceId}: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
DefaultEventSource.Log.GenericInfo(
this.Details,
this.InstanceId ?? string.Empty,
DTUtils.AppName,
DTUtils.ExtensionVersionString);
}
}
}
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/Scripts/drop-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DROP PROCEDURE IF EXISTS dt.PurgeInstanceStateByTime
DROP PROCEDURE IF EXISTS dt._AddOrchestrationEvents
DROP PROCEDURE IF EXISTS dt._CheckpointOrchestration
DROP PROCEDURE IF EXISTS dt._CompleteTasks
DROP PROCEDURE IF EXISTS dt._DiscardEventsAndUnlockInstance
DROP PROCEDURE IF EXISTS dt._GetVersions
DROP PROCEDURE IF EXISTS dt._LockNextOrchestration
DROP PROCEDURE IF EXISTS dt._LockNextTask
Expand Down
33 changes: 31 additions & 2 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ BEGIN
DECLARE @instanceID varchar(100)
DECLARE @parentInstanceID varchar(100)
DECLARE @version varchar(100)
DECLARE @runtimeStatus varchar(30)
DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()

BEGIN TRANSACTION
Expand All @@ -532,14 +533,15 @@ BEGIN
[LockExpiration] = @LockExpiration,
@instanceID = I.[InstanceID],
@parentInstanceID = I.[ParentInstanceID],
@runtimeStatus = I.[RuntimeStatus],
@version = I.[Version]
FROM
dt.Instances I WITH (READPAST) INNER JOIN NewEvents E WITH (READPAST) ON
E.[TaskHub] = @TaskHub AND
E.[InstanceID] = I.[InstanceID]
WHERE
I.TaskHub = @TaskHub AND
I.[RuntimeStatus] IN ('Pending', 'Running') AND
I.[RuntimeStatus] NOT IN ('Suspended') AND
(I.[LockExpiration] IS NULL OR I.[LockExpiration] < @now) AND
(E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now)

Expand Down Expand Up @@ -580,7 +582,10 @@ BEGIN
RETURN
END

-- Result #2: The full event history for the locked instance
-- Result #2: Basic information about this instance, including its runtime status
SELECT @instanceID AS [InstanceID], @runtimeStatus AS [RuntimeStatus]

-- Result #3: The full event history for the locked instance
-- NOTE: This must be kept consistent with the dt.HistoryEvents custom data type
SELECT
H.[InstanceID],
Expand Down Expand Up @@ -901,6 +906,30 @@ END
GO


CREATE OR ALTER PROCEDURE dt._DiscardEventsAndUnlockInstance
@InstanceID varchar(100),
@DeletedEvents MessageIDs READONLY
AS
BEGIN
DECLARE @taskHub varchar(50) = dt.CurrentTaskHub()

-- We return the list of deleted messages so that the caller can issue a
-- warning about missing messages
DELETE E
OUTPUT DELETED.InstanceID, DELETED.SequenceNumber
FROM dt.NewEvents E WITH (FORCESEEK(PK_NewEvents(TaskHub, InstanceID, SequenceNumber)))
INNER JOIN @DeletedEvents D ON
D.InstanceID = E.InstanceID AND
D.SequenceNumber = E.SequenceNumber AND
E.TaskHub = @taskHub

-- Release the lock on this instance
UPDATE Instances SET [LastUpdatedTime] = SYSUTCDATETIME(), [LockExpiration] = NULL
WHERE [TaskHub] = @taskHub and [InstanceID] = @InstanceID
END
GO


CREATE OR ALTER PROCEDURE dt._AddOrchestrationEvents
@NewOrchestrationEvents OrchestrationEvents READONLY
AS
Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/Scripts/permissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ END
-- Functions
GRANT EXECUTE ON OBJECT::dt.GetScaleMetric TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.GetScaleRecommendation TO dt_runtime
GRANT EXECUTE ON OBJECT::dt.CurrentTaskHub TO dt_runtime

-- Public sprocs
GRANT EXECUTE ON OBJECT::dt.CreateInstance TO dt_runtime
Expand All @@ -30,6 +31,7 @@ GRANT EXECUTE ON OBJECT::dt.PurgeInstanceStateByTime TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._AddOrchestrationEvents TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._CheckpointOrchestration TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._CompleteTasks TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._DiscardEventsAndUnlockInstance TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._GetVersions TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._LockNextOrchestration TO dt_runtime
GRANT EXECUTE ON OBJECT::dt._LockNextTask TO dt_runtime
Expand Down
76 changes: 75 additions & 1 deletion src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public override Task DeleteAsync(bool deleteInstanceStore)
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
bool isWaiting = false;
Stopwatch stopwatch = Stopwatch.StartNew();
do
{
Expand Down Expand Up @@ -166,14 +167,87 @@ public override Task DeleteAsync(bool deleteInstanceStore)

if (messages.Count == 0)
{
if (!isWaiting)
{
this.traceHelper.GenericInfoEvent(
"No events were found. Waiting for new events to appear.",
instanceId: null);
isWaiting = true;
}

// TODO: Make this dynamic based on the number of readers
await this.orchestrationBackoffHelper.WaitAsync(cancellationToken);
continue;
}

this.orchestrationBackoffHelper.Reset();
isWaiting = false;

// Result #2: The runtime status of the orchestration instance
if (await reader.NextResultAsync(cancellationToken))
{
bool instanceExists = await reader.ReadAsync(cancellationToken);
string instanceId;
OrchestrationStatus? currentStatus;

bool isRunning = false;
if (instanceExists)
{
instanceId = SqlUtils.GetInstanceId(reader);
currentStatus = SqlUtils.GetRuntimeStatus(reader);
isRunning =
currentStatus == OrchestrationStatus.Running ||
currentStatus == OrchestrationStatus.Pending;
}
else
{
instanceId = messages.Select(msg => msg.OrchestrationInstance.InstanceId).First();
currentStatus = null;
}

// If the instance is in a terminal state, log and discard the new events.
// NOTE: In the future, we may want to allow processing of some events if, for example, they may
// change the state of a completed instance. For example, a rewind command.
if (!isRunning)
{
string warningMessage = instanceExists ?
$"Target is in the {currentStatus} state." :
$"Target doesn't exist (either never existed or continued-as-new).";

messages.ForEach(msg => this.traceHelper.DiscardingEvent(
msg.OrchestrationInstance.InstanceId,
msg.Event.EventType.ToString(),
DTUtils.GetTaskEventId(msg.Event),
warningMessage));

// Close the already opened reader so that we can execute a new command
reader.Close();

// Delete the events and release the orchestration instance lock
using SqlCommand discardCommand = this.GetSprocCommand(
connection,
"dt._DiscardEventsAndUnlockInstance");
discardCommand.Parameters.Add("@InstanceID", SqlDbType.VarChar, 100).Value = instanceId;
discardCommand.Parameters.AddMessageIdParameter("@DeletedEvents", messages);
try
{
await SqlUtils.ExecuteNonQueryAsync(
discardCommand,
this.traceHelper,
instanceId,
cancellationToken);
}
catch (Exception e)
{
this.traceHelper.ProcessingError(e, new OrchestrationInstance { InstanceId = instanceId });
throw;
}

continue;
}
}

// Result #2: The full event history for the locked instance
// Result #3: The full event history for the locked instance
IList<HistoryEvent> history;
if (await reader.NextResultAsync(cancellationToken))
{
Expand Down
Loading

0 comments on commit 891d3ee

Please sign in to comment.