Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ var (
EmptyInstanceID = InstanceID("")
)

type CreateOrchestrationAction = protos.CreateOrchestrationAction

const (
REUSE_ID_ACTION_ERROR CreateOrchestrationAction = protos.CreateOrchestrationAction_ERROR
REUSE_ID_ACTION_IGNORE CreateOrchestrationAction = protos.CreateOrchestrationAction_IGNORE
REUSE_ID_ACTION_TERMINATE CreateOrchestrationAction = protos.CreateOrchestrationAction_TERMINATE
)

type OrchestrationStatus = protos.OrchestrationStatus

const (
Expand Down Expand Up @@ -59,6 +51,7 @@ type OrchestrationMetadata struct {
SerializedOutput string
SerializedCustomStatus string
FailureDetails *protos.TaskFailureDetails
ParentInstanceID *string
Comment thread
jeanmartins marked this conversation as resolved.
}

// NewOrchestrationOptions configures options for starting a new orchestration.
Expand Down Expand Up @@ -90,8 +83,7 @@ func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) N
return func(req *protos.CreateInstanceRequest) error {
// initialize CreateInstanceOption
req.OrchestrationIdReusePolicy = &protos.OrchestrationIdReusePolicy{
Action: policy.Action,
OperationStatus: policy.OperationStatus,
ReplaceableStatus: policy.ReplaceableStatus,
}
return nil
}
Expand Down Expand Up @@ -259,6 +251,9 @@ func (m *OrchestrationMetadata) MarshalJSON() ([]byte, error) {
}
obj["failureDetails"] = root
}
if m.ParentInstanceID != nil {
obj["parentInstanceId"] = *m.ParentInstanceID
}
Comment thread
jeanmartins marked this conversation as resolved.
Comment thread
jeanmartins marked this conversation as resolved.
Comment thread
jeanmartins marked this conversation as resolved.
return json.Marshal(obj)
}

Expand All @@ -278,6 +273,9 @@ func (m *OrchestrationMetadata) UnmarshalJSON(data []byte) (err error) {
return fmt.Errorf("failed to unmarshal orchestration metadata json: %w", err)
}

// Save a reference to the original map before we potentially modify it for failureDetails
originalObj := obj

if id, ok := obj["id"]; ok {
m.InstanceID = InstanceID(id.(string))
} else {
Expand Down Expand Up @@ -344,6 +342,10 @@ func (m *OrchestrationMetadata) UnmarshalJSON(data []byte) (err error) {
}
}
}
if parentInstanceId, ok := originalObj["parentInstanceId"]; ok {
parentInstanceID := parentInstanceId.(string)
m.ParentInstanceID = &parentInstanceID
}
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ type OrchestrationIdReusePolicyOptions func(*protos.OrchestrationIdReusePolicy)
func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) OrchestrationIdReusePolicyOptions {
return func(po *protos.OrchestrationIdReusePolicy) error {
if policy != nil {
po.Action = policy.Action
po.OperationStatus = policy.OperationStatus
po.ReplaceableStatus = policy.ReplaceableStatus
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ func createGetInstanceResponse(req *protos.GetInstanceRequest, metadata *api.Orc
LastUpdatedTimestamp: timestamppb.New(metadata.LastUpdatedAt),
}

if metadata.ParentInstanceID != nil {
state.ParentInstanceId = wrapperspb.String(*metadata.ParentInstanceID)
Comment thread
jeanmartins marked this conversation as resolved.
}

if req.GetInputsAndOutputs {
state.Input = wrapperspb.String(metadata.SerializedInput)
state.CustomStatus = wrapperspb.String(metadata.SerializedCustomStatus)
Expand Down
63 changes: 31 additions & 32 deletions backend/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (be *postgresBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi
}
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

var visibleTime*time.Time = nil
var visibleTime *time.Time = nil
if delay := wi.GetAbandonDelay(); delay > 0 {
t := time.Now().UTC().Add(delay)
visibleTime = &t
Expand Down Expand Up @@ -363,8 +363,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi
if es := msg.HistoryEvent.GetExecutionStarted(); es != nil {
// Need to insert a new row into the DB
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, backend.WithOrchestrationIdReusePolicy(&protos.OrchestrationIdReusePolicy{
OperationStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED},
Action: api.REUSE_ID_ACTION_TERMINATE,
ReplaceableStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED},
})); err != nil {
if errors.Is(err, backend.ErrDuplicateEvent) {
be.logger.Warnf(
Expand Down Expand Up @@ -497,6 +496,12 @@ func (be *postgresBackend) createOrchestrationInstanceInternal(ctx context.Conte
}

func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx pgx.Tx, e *backend.HistoryEvent, startEvent *protos.ExecutionStartedEvent) (int64, error) {
var parentInstanceID *string
if pi := startEvent.GetParentInstance(); pi != nil {
Comment thread
jeanmartins marked this conversation as resolved.
if orchestrationInstance := pi.GetOrchestrationInstance(); orchestrationInstance != nil {
parentInstanceID = &orchestrationInstance.InstanceId
}
}
res, err := tx.Exec(
ctx,
`INSERT INTO Instances (
Expand All @@ -506,15 +511,17 @@ func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx pgx.Tx, e *back
ExecutionID,
Input,
RuntimeStatus,
CreatedTime
) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING`,
CreatedTime,
ParentInstanceID
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING`,
Comment thread
jeanmartins marked this conversation as resolved.
startEvent.Name,
startEvent.Version.GetValue(),
startEvent.OrchestrationInstance.InstanceId,
startEvent.OrchestrationInstance.ExecutionId.GetValue(),
startEvent.Input.GetValue(),
"PENDING",
e.Timestamp.AsTime(),
parentInstanceID,
Comment thread
jeanmartins marked this conversation as resolved.
)
if err != nil {
return -1, fmt.Errorf("failed to insert into Instances table: %w", err)
Expand Down Expand Up @@ -543,35 +550,25 @@ func (be *postgresBackend) handleInstanceExists(ctx context.Context, tx pgx.Tx,
}

// status not match, return instance duplicate error
if !isStatusMatch(policy.OperationStatus, helpers.FromRuntimeStatusString(*runtimeStatus)) {
if !isStatusMatch(policy.ReplaceableStatus, helpers.FromRuntimeStatusString(*runtimeStatus)) {
return api.ErrDuplicateInstance
}

// status match
switch policy.Action {
case protos.CreateOrchestrationAction_IGNORE:
// Log an warning message and ignore creating new instance
be.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", startEvent.OrchestrationInstance.InstanceId)
return api.ErrIgnoreInstance
case protos.CreateOrchestrationAction_TERMINATE:
// terminate existing instance
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId), false); err != nil {
return fmt.Errorf("failed to cleanup orchestration status: %w", err)
}
// create a new instance
var rows int64
if rows, err = insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent); err != nil {
return err
}
// status match - terminate existing instance and create a new one
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId), false); err != nil {
return fmt.Errorf("failed to cleanup orchestration status: %w", err)
}
// create a new instance
var rows int64
if rows, err = insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent); err != nil {
return err
}

// should never happen, because we clean up instance before create new one
if rows <= 0 {
return fmt.Errorf("failed to insert into Instances table because entry already exists")
}
return nil
// should never happen, because we clean up instance before create new one
if rows <= 0 {
return fmt.Errorf("failed to insert into Instances table because entry already exists")
}
// default behavior
return api.ErrDuplicateInstance
return nil
}

func isStatusMatch(statuses []protos.OrchestrationStatus, runtimeStatus protos.OrchestrationStatus) bool {
Expand Down Expand Up @@ -665,7 +662,7 @@ func (be *postgresBackend) GetOrchestrationMetadata(ctx context.Context, iid api

row := be.db.QueryRow(
ctx,
`SELECT InstanceID, Name, RuntimeStatus, CreatedTime, LastUpdatedTime, Input, Output, CustomStatus, FailureDetails
`SELECT InstanceID, Name, RuntimeStatus, CreatedTime, LastUpdatedTime, Input, Output, CustomStatus, FailureDetails, ParentInstanceID
FROM Instances WHERE InstanceID = $1`,
string(iid),
)
Expand All @@ -679,9 +676,10 @@ func (be *postgresBackend) GetOrchestrationMetadata(ctx context.Context, iid api
var output *string
var customStatus *string
var failureDetails *protos.TaskFailureDetails
var parentInstanceID *string

var failureDetailsPayload []byte
err := row.Scan(&instanceID, &name, &runtimeStatus, &createdAt, &lastUpdatedAt, &input, &output, &customStatus, &failureDetailsPayload)
err := row.Scan(&instanceID, &name, &runtimeStatus, &createdAt, &lastUpdatedAt, &input, &output, &customStatus, &failureDetailsPayload, &parentInstanceID)
if errors.Is(err, pgx.ErrNoRows) {
return nil, api.ErrInstanceNotFound
} else if err != nil {
Expand Down Expand Up @@ -718,6 +716,7 @@ func (be *postgresBackend) GetOrchestrationMetadata(ctx context.Context, iid api
*customStatus,
failureDetails,
)
metadata.ParentInstanceID = parentInstanceID
return metadata, nil
}

Expand Down Expand Up @@ -769,7 +768,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

now := time.Now().UTC()
newLockExpiration:= now.Add(be.options.OrchestrationLockTimeout)
newLockExpiration := now.Add(be.options.OrchestrationLockTimeout)

// Place a lock on an orchestration instance that has new events that are ready to be executed.
row := tx.QueryRow(
Expand Down
59 changes: 29 additions & 30 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
if es := msg.HistoryEvent.GetExecutionStarted(); es != nil {
// Need to insert a new row into the DB
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, backend.WithOrchestrationIdReusePolicy(&protos.OrchestrationIdReusePolicy{
OperationStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED},
Action: api.REUSE_ID_ACTION_TERMINATE,
ReplaceableStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED},
})); err != nil {
if errors.Is(err, backend.ErrDuplicateEvent) {
be.logger.Warnf(
Expand Down Expand Up @@ -470,6 +469,12 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context
}

func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx *sql.Tx, e *backend.HistoryEvent, startEvent *protos.ExecutionStartedEvent) (int64, error) {
var parentInstanceID *string
if pi := startEvent.GetParentInstance(); pi != nil {
if orchestrationInstance := pi.GetOrchestrationInstance(); orchestrationInstance != nil {
parentInstanceID = &orchestrationInstance.InstanceId
}
Comment thread
jeanmartins marked this conversation as resolved.
}
res, err := tx.ExecContext(
ctx,
`INSERT OR IGNORE INTO [Instances] (
Expand All @@ -479,15 +484,17 @@ func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx *sql.Tx, e *bac
[ExecutionID],
[Input],
[RuntimeStatus],
[CreatedTime]
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
[CreatedTime],
[ParentInstanceID]
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
Comment thread
jeanmartins marked this conversation as resolved.
startEvent.Name,
startEvent.Version.GetValue(),
startEvent.OrchestrationInstance.InstanceId,
startEvent.OrchestrationInstance.ExecutionId.GetValue(),
startEvent.Input.GetValue(),
"PENDING",
e.Timestamp.AsTime(),
parentInstanceID,
Comment thread
jeanmartins marked this conversation as resolved.
)
if err != nil {
return -1, fmt.Errorf("failed to insert into [Instances] table: %w", err)
Expand Down Expand Up @@ -516,35 +523,25 @@ func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, s
}

// status not match, return instance duplicate error
if !isStatusMatch(policy.OperationStatus, helpers.FromRuntimeStatusString(*runtimeStatus)) {
if !isStatusMatch(policy.ReplaceableStatus, helpers.FromRuntimeStatusString(*runtimeStatus)) {
return api.ErrDuplicateInstance
}

// status match
switch policy.Action {
case protos.CreateOrchestrationAction_IGNORE:
// Log an warning message and ignore creating new instance
be.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", startEvent.OrchestrationInstance.InstanceId)
return api.ErrIgnoreInstance
case protos.CreateOrchestrationAction_TERMINATE:
// terminate existing instance
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId), false); err != nil {
return fmt.Errorf("failed to cleanup orchestration status: %w", err)
}
// create a new instance
var rows int64
if rows, err = insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent); err != nil {
return err
}
// status match - terminate existing instance and create a new one
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId), false); err != nil {
return fmt.Errorf("failed to cleanup orchestration status: %w", err)
}
// create a new instance
var rows int64
if rows, err = insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent); err != nil {
return err
}

// should never happen, because we clean up instance before create new one
if rows <= 0 {
return fmt.Errorf("failed to insert into [Instances] table because entry already exists")
}
return nil
// should never happen, because we clean up instance before create new one
if rows <= 0 {
return fmt.Errorf("failed to insert into [Instances] table because entry already exists")
}
// default behavior
return api.ErrDuplicateInstance
return nil
}

func isStatusMatch(statuses []protos.OrchestrationStatus, runtimeStatus protos.OrchestrationStatus) bool {
Expand Down Expand Up @@ -642,7 +639,7 @@ func (be *sqliteBackend) GetOrchestrationMetadata(ctx context.Context, iid api.I

row := be.db.QueryRowContext(
ctx,
`SELECT [InstanceID], [Name], [RuntimeStatus], [CreatedTime], [LastUpdatedTime], [Input], [Output], [CustomStatus], [FailureDetails]
`SELECT [InstanceID], [Name], [RuntimeStatus], [CreatedTime], [LastUpdatedTime], [Input], [Output], [CustomStatus], [FailureDetails], [ParentInstanceID]
FROM Instances WHERE [InstanceID] = ?`,
string(iid),
)
Expand All @@ -663,9 +660,10 @@ func (be *sqliteBackend) GetOrchestrationMetadata(ctx context.Context, iid api.I
var output *string
var customStatus *string
var failureDetails *protos.TaskFailureDetails
var parentInstanceID *string

var failureDetailsPayload []byte
err = row.Scan(&instanceID, &name, &runtimeStatus, &createdAt, &lastUpdatedAt, &input, &output, &customStatus, &failureDetailsPayload)
err = row.Scan(&instanceID, &name, &runtimeStatus, &createdAt, &lastUpdatedAt, &input, &output, &customStatus, &failureDetailsPayload, &parentInstanceID)
if errors.Is(err, sql.ErrNoRows) {
return nil, api.ErrInstanceNotFound
} else if err != nil {
Expand Down Expand Up @@ -702,6 +700,7 @@ func (be *sqliteBackend) GetOrchestrationMetadata(ctx context.Context, iid api.I
*customStatus,
failureDetails,
)
metadata.ParentInstanceID = parentInstanceID
return metadata, nil
}

Expand Down
4 changes: 4 additions & 0 deletions client/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,5 +246,9 @@ func makeOrchestrationMetadata(resp *protos.GetInstanceResponse) (*api.Orchestra
if resp.OrchestrationState.LastUpdatedTimestamp != nil {
metadata.LastUpdatedAt = resp.OrchestrationState.LastUpdatedTimestamp.AsTime()
}
if resp.OrchestrationState.ParentInstanceId != nil {
parentInstanceID := resp.OrchestrationState.ParentInstanceId.GetValue()
metadata.ParentInstanceID = &parentInstanceID
Comment thread
jeanmartins marked this conversation as resolved.
}
return metadata, nil
}
Loading
Loading