Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ internal SqlOrchestrationServiceSettings GetOrchestrationServiceSettings(
settings.MaxActiveOrchestrations = extensionOptions.MaxConcurrentOrchestratorFunctions.Value;
}

settings.ExtendedSessionsEnabled = extensionOptions.ExtendedSessionsEnabled;
if (extensionOptions.ExtendedSessionIdleTimeoutInSeconds > 0)
{
settings.ExtendedSessionIdleTimeout =
TimeSpan.FromSeconds(extensionOptions.ExtendedSessionIdleTimeoutInSeconds);
}

return settings;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/Scripts/drop-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._LockNextTask
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._QueryManyOrchestrations
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._RenewOrchestrationLocks
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._RenewTaskLocks
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._FetchOrchestrationMessages
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._ReleaseOrchestrationLock
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._UpdateVersion
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._RewindInstance
DROP PROCEDURE IF EXISTS __SchemaNamePlaceholder__._RewindInstanceRecursive
Expand Down
114 changes: 109 additions & 5 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,10 @@ CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._CheckpointOrchestration
@DeletedEvents MessageIDs READONLY,
@NewHistoryEvents HistoryEvents READONLY,
@NewOrchestrationEvents OrchestrationEvents READONLY,
@NewTaskEvents TaskEvents READONLY
@NewTaskEvents TaskEvents READONLY,
@KeepLocked bit = 0,
@LockedBy varchar(100) = NULL,
@NewLockExpiration datetime2 = NULL
AS
BEGIN
BEGIN TRANSACTION
Expand Down Expand Up @@ -869,17 +872,31 @@ BEGIN
[RuntimeStatus] = @RuntimeStatus,
[LastUpdatedTime] = SYSUTCDATETIME(),
[CompletedTime] = (CASE WHEN @IsCompleted = 1 THEN SYSUTCDATETIME() ELSE NULL END),
[LockExpiration] = NULL, -- release the lock
-- Release the lock unless the caller asked to keep it for an extended session and the instance is not in a terminal state.
[LockExpiration] = (CASE WHEN @KeepLocked = 1 AND @IsCompleted = 0 THEN @NewLockExpiration ELSE NULL END),
[LockedBy] = (CASE WHEN @KeepLocked = 1 AND @IsCompleted = 0 THEN @LockedBy ELSE NULL END),
[CustomStatusPayloadID] = @CustomStatusPayloadID,
[InputPayloadID] = @InputPayloadID,
[OutputPayloadID] = @OutputPayloadID
FROM Instances
WHERE [TaskHub] = @TaskHub and [InstanceID] = @InstanceID
WHERE
[TaskHub] = @TaskHub
AND [InstanceID] = @InstanceID
-- Do not overwrite a row that was taken over by a different worker after our lock expired.
AND (@KeepLocked = 0 OR [LockedBy] = @LockedBy)

IF @@ROWCOUNT = 0
BEGIN
ROLLBACK TRANSACTION;
THROW 50000, 'The instance does not exist.', 1;
IF @KeepLocked = 1
BEGIN
ROLLBACK TRANSACTION;
THROW 50003, 'Lock lost.', 1;
END
ELSE
BEGIN
ROLLBACK TRANSACTION;
THROW 50000, 'The instance does not exist.', 1;
END
END
-- External event messages can create new instances
-- NOTE: There is a chance this could result in deadlocks if two
Expand Down Expand Up @@ -1337,6 +1354,93 @@ END
GO


-- Used by extended sessions to fetch any new events for an already-locked
-- instance without going through the normal lock-acquisition flow.
CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._FetchOrchestrationMessages
@InstanceID varchar(100),
@LockedBy varchar(100),
@LockExpiration datetime2,
@BatchSize int
AS
BEGIN
DECLARE @now datetime2 = SYSUTCDATETIME()
DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()
DECLARE @parentInstanceID varchar(100)
DECLARE @version varchar(100)
DECLARE @runtimeStatus varchar(30)
DECLARE @tags varchar(8000)

UPDATE Instances
SET
[LockExpiration] = @LockExpiration,
@parentInstanceID = [ParentInstanceID],
@version = [Version],
@runtimeStatus = [RuntimeStatus],
@tags = [Tags]
WHERE
[TaskHub] = @TaskHub
AND [InstanceID] = @InstanceID
AND [LockedBy] = @LockedBy
AND [LockExpiration] IS NOT NULL
AND [LockExpiration] > @now

IF @@ROWCOUNT = 0
THROW 50003, 'Lock lost.', 1;

IF @runtimeStatus IN ('Completed', 'Failed', 'Terminated')
RETURN

-- Same column shape as the first result-set of _LockNextOrchestration
SELECT TOP (@BatchSize)
N.[SequenceNumber],
N.[Timestamp],
N.[VisibleTime],
N.[DequeueCount],
N.[InstanceID],
N.[ExecutionID],
N.[EventType],
N.[Name],
N.[RuntimeStatus],
N.[TaskID],
P.[Reason],
P.[Text] AS [PayloadText],
P.[PayloadID],
DATEDIFF(SECOND, [Timestamp], @now) AS [WaitTime],
@parentInstanceID as [ParentInstanceID],
@version as [Version],
N.[TraceContext],
@tags as [Tags]
FROM NewEvents N
LEFT OUTER JOIN __SchemaNamePlaceholder__.[Payloads] P ON
P.[TaskHub] = @TaskHub AND
P.[InstanceID] = N.[InstanceID] AND
P.[PayloadID] = N.[PayloadID]
WHERE
N.[TaskHub] = @TaskHub AND
N.[InstanceID] = @InstanceID AND
(N.[VisibleTime] IS NULL OR N.[VisibleTime] < @now)
END
GO


-- Releases an instance lock that was kept across an extended session.
CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._ReleaseOrchestrationLock
@InstanceID varchar(100),
@LockedBy varchar(100)
AS
BEGIN
DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()

UPDATE Instances
SET [LockExpiration] = NULL, [LockedBy] = NULL
WHERE
[TaskHub] = @TaskHub
AND [InstanceID] = @InstanceID
AND [LockedBy] = @LockedBy
END
GO


CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._RenewTaskLocks
@RenewingTasks MessageIDs READONLY,
@LockExpiration datetime2
Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.SqlServer/Scripts/permissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._LockNextTask TO __SchemaName
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._QueryManyOrchestrations TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._RenewOrchestrationLocks TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._RenewTaskLocks TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._FetchOrchestrationMessages TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._ReleaseOrchestrationLock TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._UpdateVersion TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._RewindInstance TO __SchemaNamePlaceholder___runtime
GRANT EXECUTE ON OBJECT::__SchemaNamePlaceholder__._RewindInstanceRecursive TO __SchemaNamePlaceholder___runtime
Expand Down
48 changes: 47 additions & 1 deletion src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,24 @@ await SqlUtils.ExecuteNonQueryAsync(
instance = new OrchestrationInstance();
}

string orchestrationInstanceId = messages[0].OrchestrationInstance.InstanceId;

return new ExtendedOrchestrationWorkItem(orchestrationName, instance, eventPayloadMappings)
{
InstanceId = messages[0].OrchestrationInstance.InstanceId,
InstanceId = orchestrationInstanceId,
LockedUntilUtc = lockExpiration,
NewMessages = messages,
OrchestrationRuntimeState = runtimeState,
Session = this.settings.ExtendedSessionsEnabled
? new SqlOrchestrationSession(
this.settings,
this.orchestrationBackoffHelper,
this.traceHelper,
eventPayloadMappings,
orchestrationInstanceId,
this.lockedByValue,
this.ShutdownToken)
: null,
};
}
} while (stopwatch.Elapsed < receiveTimeout);
Expand Down Expand Up @@ -361,6 +373,15 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
command.Parameters.Add("@RuntimeStatus", SqlDbType.VarChar, size: 30).Value = orchestrationState.OrchestrationStatus.ToString();
command.Parameters.Add("@CustomStatusPayload", SqlDbType.VarChar).Value = orchestrationState.Status ?? SqlString.Null;

bool keepLocked = workItem.Session != null && !IsTerminalStatus(orchestrationState.OrchestrationStatus);
DateTime newLockExpiration = DateTime.UtcNow.Add(this.settings.WorkItemLockTimeout);
if (keepLocked)
{
command.Parameters.Add("@KeepLocked", SqlDbType.Bit).Value = true;
command.Parameters.Add("@LockedBy", SqlDbType.VarChar, size: 100).Value = this.lockedByValue;
command.Parameters.Add("@NewLockExpiration", SqlDbType.DateTime2).Value = newLockExpiration;
}

currentWorkItem.EventPayloadMappings.Add(outboundMessages);
currentWorkItem.EventPayloadMappings.Add(orchestratorMessages);

Expand Down Expand Up @@ -400,6 +421,16 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
this.traceHelper.DuplicateExecutionDetected(instance, orchestrationState.Name);
return;
}
catch (SqlException e) when (keepLocked && SqlUtils.HasErrorNumber(e, SqlOrchestrationSession.LockLostErrorNumber))
{
throw new SessionAbortedException(
$"Lost the lock for instance '{instance.InstanceId}' during checkpoint.", e);
}

if (keepLocked)
{
workItem.LockedUntilUtc = newLockExpiration;
}

// notify pollers that new messages may be available
if (outboundMessages.Count > 0)
Expand All @@ -415,10 +446,25 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
this.traceHelper.CheckpointCompleted(orchestrationState, sw);
}

static bool IsTerminalStatus(OrchestrationStatus status) =>
status == OrchestrationStatus.Completed ||
status == OrchestrationStatus.Failed ||
status == OrchestrationStatus.Terminated;

// We abandon work items by just letting their locks expire. The benefit of this "lazy" approach is that it
// removes the need for a DB access and also ensures that a work-item can't spam the error logs in a tight loop.
public override Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem) => Task.CompletedTask;

public override Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
if (workItem.Session is SqlOrchestrationSession session)
{
return session.ReleaseLockAsync();
}

return Task.CompletedTask;
}

public override async Task<TaskActivityWorkItem?> LockNextTaskActivityWorkItem(
TimeSpan receiveTimeout,
CancellationToken shutdownCancellationToken)
Expand Down
12 changes: 12 additions & 0 deletions src/DurableTask.SqlServer/SqlOrchestrationServiceSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ public SqlOrchestrationServiceSettings(string connectionString, string? taskHubN
[JsonProperty("maxActiveOrchestrations")]
public int MaxActiveOrchestrations { get; set; } = Environment.ProcessorCount;

/// <summary>
/// Gets or sets a flag indicating whether to enable extended sessions.
/// </summary>
[JsonProperty("extendedSessionsEnabled")]
public bool ExtendedSessionsEnabled { get; set; } = false;

/// <summary>
/// Gets or sets the number of seconds before an idle session times out.
/// </summary>
[JsonProperty("extendedSessionIdleTimeout")]
public TimeSpan ExtendedSessionIdleTimeout { get; set; } = TimeSpan.FromSeconds(30);

/// <summary>
/// Gets or sets the minimum interval to poll for orchestrations.
/// Polling interval increases when no orchestrations or activities are found.
Expand Down
Loading