Skip to content

Commit 71dd256

Browse files
miltalexteo
authored andcommitted
[core] handle workflow state error
1 parent d335a34 commit 71dd256

2 files changed

Lines changed: 57 additions & 0 deletions

File tree

core/environment/environment.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/AliceO2Group/Control/core/workflow"
4444
"github.com/gobwas/glob"
4545
"github.com/looplab/fsm"
46+
"github.com/pborman/uuid"
4647
"github.com/sirupsen/logrus"
4748
)
4849

@@ -65,6 +66,7 @@ type Environment struct {
6566
GlobalVars gera.StringMap // From Consul
6667
UserVars gera.StringMap // From user input
6768
stateChangedCh chan *event.TasksStateChangedEvent
69+
unsubscribe chan struct{}
6870
}
6971

7072
func (env *Environment) NotifyEvent(e event.DeviceEvent) {
@@ -363,4 +365,57 @@ func (env *Environment) setState(state string) {
363365
env.Mu.Lock()
364366
defer env.Mu.Unlock()
365367
env.Sm.SetState(state)
368+
}
369+
370+
func (env *Environment) subscribeToWfState(taskman *task.Manager) {
371+
go func() {
372+
wf := env.Workflow()
373+
notify := make(chan task.State)
374+
subscriptionId := uuid.NewUUID().String()
375+
env.wfAdapter.SubscribeToStateChange(subscriptionId, notify)
376+
defer env.wfAdapter.UnsubscribeFromStateChange(subscriptionId)
377+
env.unsubscribe = make(chan struct{})
378+
379+
wfState := wf.GetState()
380+
if wfState != task.ERROR {
381+
WORKFLOW_STATE_LOOP:
382+
for {
383+
select {
384+
case wfState = <-notify:
385+
if wfState == task.ERROR {
386+
env.setState(wfState.String())
387+
toStop := env.Workflow().GetTasks().Filtered(func(t *task.Task) bool {
388+
t.SetSafeToStop(true)
389+
return t.IsSafeToStop()
390+
})
391+
if len(toStop) > 0 {
392+
taskmanMessage := task.NewTransitionTaskMessage(
393+
toStop,
394+
task.RUNNING.String(),
395+
task.STOP.String(),
396+
task.CONFIGURED.String(),
397+
nil,
398+
env.Id(),
399+
)
400+
taskman.MessageChannel <- taskmanMessage
401+
<-env.stateChangedCh
402+
}
403+
break WORKFLOW_STATE_LOOP
404+
}
405+
if wfState == task.DONE {
406+
break WORKFLOW_STATE_LOOP
407+
}
408+
case <- env.unsubscribe:
409+
env.unsubscribe = nil
410+
break WORKFLOW_STATE_LOOP
411+
}
412+
}
413+
}
414+
}()
415+
}
416+
417+
func (env *Environment) unsubscribeFromWfState() {
418+
if env.unsubscribe != nil {
419+
env.unsubscribe <- struct{}{}
420+
}
366421
}

core/environment/manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
126126

127127
envs.m[env.id] = env
128128
envs.pendingStateChangeCh[env.id] = env.stateChangedCh
129+
env.subscribeToWfState(envs.taskman)
129130

130131
err = env.TryTransition(NewConfigureTransition(
131132
envs.taskman,
@@ -214,6 +215,7 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
214215
}
215216

216217
delete(envs.m, environmentId)
218+
env.unsubscribeFromWfState()
217219
return err
218220
}
219221

0 commit comments

Comments
 (0)