Skip to content

Commit 02df889

Browse files
claireguyotteo
authored andcommitted
[OCTRL-649] Second iteration of implementation of non-critical tasks.
1 parent d254ce2 commit 02df889

10 files changed

Lines changed: 89 additions & 51 deletions

File tree

core/environment/environment.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -496,9 +496,6 @@ func (env *Environment) runTasksAsHooks(hooksToTrigger task.Tasks) (errorMap map
496496
}
497497
}
498498

499-
////////////////
500-
// CHECK HERE //
501-
////////////////
502499
case e := <-env.incomingEvents:
503500
if evt, ok := e.(*event.BasicTaskTerminated); ok {
504501
tid := evt.GetOrigin().TaskId.Value

core/environment/manager.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,43 @@ func NewEnvManager(tm *task.Manager, incomingEventCh <-chan event.Event) *Manage
7979
switch typedEvent := incomingEvent.(type) {
8080
case event.DeviceEvent:
8181
instance.handleDeviceEvent(typedEvent)
82+
8283
case *event.TasksReleasedEvent:
8384
// If we got a TasksReleasedEvent, it must be matched with a pending
84-
// environment teardown.
85+
// environment teardown if the task is critical.
8586
if thisEnvCh, ok := instance.pendingTeardownsCh[typedEvent.GetEnvironmentId()]; ok {
8687
thisEnvCh <- typedEvent
8788
close(thisEnvCh)
8889
delete(instance.pendingTeardownsCh, typedEvent.GetEnvironmentId())
90+
} else {
91+
var releaseCriticalTask = false
92+
for _, v := range typedEvent.GetTaskIds() {
93+
if tm.GetTask(v).GetTraits().Critical == true || tm.GetTask(v).GetParent().GetTaskTraits().Critical == true {
94+
releaseCriticalTask = true
95+
}
96+
}
97+
if releaseCriticalTask {
98+
thisEnvCh <- typedEvent
99+
close(thisEnvCh)
100+
delete(instance.pendingTeardownsCh, typedEvent.GetEnvironmentId())
101+
}
89102
}
103+
90104
case *event.TasksStateChangedEvent:
91105
// If we got a TasksStateChangedEvent, it must be matched with a pending
92-
// environment transition.
106+
// environment transition if the task is critical.
93107
if thisEnvCh, ok := instance.pendingStateChangeCh[typedEvent.GetEnvironmentId()]; ok {
94108
thisEnvCh <- typedEvent
109+
} else {
110+
var changeCriticalTask = false
111+
for _, v := range typedEvent.GetTaskIds() {
112+
if tm.GetTask(v).GetTraits().Critical == true || tm.GetTask(v).GetParent().GetTaskTraits().Critical == true {
113+
changeCriticalTask = true
114+
}
115+
}
116+
if changeCriticalTask {
117+
thisEnvCh <- typedEvent
118+
}
95119
}
96120
default:
97121
// noop
@@ -549,9 +573,6 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
549573
return
550574
doFallthrough:
551575
fallthrough
552-
////////////////
553-
// CHECK HERE //
554-
////////////////
555576
case pb.DeviceEventType_END_OF_STREAM:
556577
taskId := evt.GetOrigin().TaskId
557578
t := envs.taskman.GetTask(taskId.Value)

core/environment/transition_deploy.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,6 @@ func (t DeployTransition) do(env *Environment) (err error) {
169169
170170
return*/
171171

172-
////////////////
173-
// CHECK HERE //
174-
////////////////
175172
notifyStatus := make(chan task.Status)
176173
subscriptionId := uuid.NewUUID().String()
177174
env.wfAdapter.SubscribeToStatusChange(subscriptionId, notifyStatus)

core/task/errors.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@ func (r GenericTasksError) Error() string {
7777

7878
type TasksDeploymentError struct {
7979
tasksErrorBase
80-
failedDescriptors Descriptors
81-
failedCriticalDescriptors Descriptors
80+
failedNonCriticalDescriptors Descriptors
81+
failedCriticalDescriptors Descriptors
8282
}
8383

8484
func (r TasksDeploymentError) Error() string {
85-
return fmt.Sprintf("deployment failed for %d critical tasks [%s], and %d non-critical tasks [%s]", len(r.failedCriticalDescriptors), r.failedCriticalDescriptors.String(), len(r.failedDescriptors), r.failedDescriptors.String())
85+
return fmt.Sprintf("deployment failed for %d critical tasks [%s], and %d non-critical tasks [%s]", len(r.failedCriticalDescriptors), r.failedCriticalDescriptors.String(), len(r.failedNonCriticalDescriptors), r.failedNonCriticalDescriptors.String())
8686
}
8787

8888
type TaskAlreadyReleasedError taskErrorBase

core/task/manager.go

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
444444
// - awaiting Task deployment in tasksToRun
445445
deploymentSuccess := true // hopefully
446446
undeployedDescriptors := make(Descriptors, 0)
447+
undeployedNonCriticalDescriptors := make(Descriptors, 0)
447448
undeployedCriticalDescriptors := make(Descriptors, 0)
448449

449450
deployedTasks := make(DeploymentMap)
@@ -496,9 +497,6 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
496497
log.WithField("partition", envId).
497498
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))
498499

499-
////////////////
500-
// CHECK HERE //
501-
////////////////
502500
for _, t := range undeployedDescriptors {
503501
if t.TaskRole.GetTaskTraits().Critical == true {
504502
deploymentSuccess = false
@@ -507,6 +505,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
507505
log.WithField("partition", envId).
508506
Errorf("critical task deployment failure: %s", printname)
509507
} else {
508+
undeployedNonCriticalDescriptors = append(undeployedNonCriticalDescriptors, t)
510509
printname := fmt.Sprintf("%s->%s", t.TaskRole.GetPath(), t.TaskClassName)
511510
log.WithField("partition", envId).
512511
Warnf("non-critical task deployment failure: %s", printname)
@@ -538,9 +537,9 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
538537
}
539538

540539
err = TasksDeploymentError{
541-
tasksErrorBase: tasksErrorBase{taskIds: deployedTaskIds},
542-
failedDescriptors: undeployedDescriptors,
543-
failedCriticalDescriptors: undeployedCriticalDescriptors,
540+
tasksErrorBase: tasksErrorBase{taskIds: deployedTaskIds},
541+
failedNonCriticalDescriptors: undeployedNonCriticalDescriptors,
542+
failedCriticalDescriptors: undeployedCriticalDescriptors,
544543
}
545544
}
546545

@@ -664,7 +663,8 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
664663
}
665664

666665
if response.IsMultiResponse() {
667-
taskErrors := make([]string, len(response.Errors()))
666+
taskCriticalErrors := make([]string, 0)
667+
taskNonCriticalErrors := make([]string, 0)
668668
i := 0
669669
for k, v := range response.Errors() {
670670
task := m.GetTask(k.TaskId.Value)
@@ -680,12 +680,20 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
680680
} else {
681681
taskDescription = fmt.Sprintf("unknown task (id %s) failed with error: %s", k.TaskId.Value, v.Error())
682682
}
683-
taskErrors[i] = taskDescription
683+
if task.GetTraits().Critical == true || task.parent.GetTaskTraits().Critical == true {
684+
taskCriticalErrors[i] = taskDescription
685+
} else {
686+
taskNonCriticalErrors[i] = taskDescription
687+
}
684688
i++
685689
}
686690

687-
if len(taskErrors) > 0 {
688-
return fmt.Errorf("CONFIGURE could not complete, errors: %s", strings.Join(taskErrors, "; "))
691+
if len(taskNonCriticalErrors) > 0 {
692+
log.WithField("partition", envId).
693+
Warnf("non-critical task configuration failure, errors: %s", strings.Join(taskNonCriticalErrors, "; "))
694+
}
695+
if len(taskCriticalErrors) > 0 {
696+
return fmt.Errorf("CONFIGURE could not complete, errors: %s", strings.Join(taskCriticalErrors, "; "))
689697
}
690698
return nil
691699
} else {
@@ -732,13 +740,49 @@ func (m *Manager) transitionTasks(envId uid.ID, tasks Tasks, src string, event s
732740
return errors.New("unknown MesosCommand error: nil response received")
733741
}
734742

735-
respError := response.Err()
736-
if respError != nil {
737-
errText := respError.Error()
738-
if len(strings.TrimSpace(errText)) != 0 {
739-
return errors.New(response.Err().Error())
743+
if response.IsMultiResponse() {
744+
taskCriticalErrors := make([]string, 0)
745+
taskNonCriticalErrors := make([]string, 0)
746+
i := 0
747+
for k, v := range response.Errors() {
748+
task := m.GetTask(k.TaskId.Value)
749+
var taskDescription string
750+
if task != nil {
751+
tci := task.GetTaskCommandInfo()
752+
tciValue := "unknown command"
753+
if tci.Value != nil {
754+
tciValue = *tci.Value
755+
}
756+
757+
taskDescription = fmt.Sprintf("task '%s' on %s (id %s) failed with error: %s", tciValue, task.GetHostname(), task.GetTaskId(), v.Error())
758+
} else {
759+
taskDescription = fmt.Sprintf("unknown task (id %s) failed with error: %s", k.TaskId.Value, v.Error())
760+
}
761+
if task.GetTraits().Critical == true || task.parent.GetTaskTraits().Critical == true {
762+
taskCriticalErrors[i] = taskDescription
763+
} else {
764+
taskNonCriticalErrors[i] = taskDescription
765+
}
766+
i++
767+
}
768+
769+
if len(taskNonCriticalErrors) > 0 {
770+
log.WithField("partition", envId).
771+
Warnf("non-critical task transition failure, errors: %s", strings.Join(taskNonCriticalErrors, "; "))
772+
}
773+
if len(taskCriticalErrors) > 0 {
774+
return fmt.Errorf("transition could not complete, errors: %s", strings.Join(taskCriticalErrors, "; "))
775+
}
776+
return nil
777+
} else {
778+
respError := response.Err()
779+
if respError != nil {
780+
errText := respError.Error()
781+
if len(strings.TrimSpace(errText)) != 0 {
782+
return errors.New(response.Err().Error())
783+
}
784+
// FIXME: improve error handling ↑
740785
}
741-
// FIXME: improve error handling ↑
742786
}
743787

744788
return nil

core/task/state.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ func StateFromString(s string) State {
6464
return UNKNOWN
6565
}
6666

67-
////////////////
68-
// CHECK HERE //
69-
////////////////
7067
func (s State) X(other State) State {
7168
if s == other {
7269
return s

core/workflow/aggregator.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ type _roleUnion struct {
6262
*includeRole
6363
}
6464

65-
////////////////
66-
// CHECK HERE //
67-
////////////////
6865
func (union *_roleUnion) UnmarshalYAML(unmarshal func(interface{}) error) (unionErr error) {
6966
_probe := _unionTypeProbe{}
7067
unionErr = unmarshal(&_probe)

core/workflow/aggregatorrole.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,6 @@ func (r *aggregatorRole) updateStatus(s task.Status) {
239239
}
240240
}
241241

242-
////////////////
243-
// CHECK HERE //
244-
////////////////
245242
func (r *aggregatorRole) updateState(s task.State) {
246243
if r == nil {
247244
return

core/workflow/safestate.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@ type SafeState struct {
3434
state task.State
3535
}
3636

37-
// //////////////
38-
// CHECK HERE //
39-
// //////////////
40-
// Aggregate the state of multiple tasks using the "task/state.go" X function
4137
func aggregateState(roles []Role) (state task.State) {
4238
if len(roles) == 0 {
4339
state = task.INVARIANT
@@ -58,9 +54,6 @@ func aggregateState(roles []Role) (state task.State) {
5854
return
5955
}
6056

61-
// //////////////
62-
// CHECK HERE //
63-
// //////////////
6457
func (t *SafeState) merge(s task.State, r Role) {
6558
t.mu.Lock()
6659
defer t.mu.Unlock()

core/workflow/taskrole.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ func (t *taskRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error)
9494
}
9595
}
9696

97-
////////////////
98-
// CHECK HERE //
99-
////////////////
100-
// If the task has a "Critical" trait set to either true or false, assign it to the Task Role
101-
// Else, the Task Role is always Critical := true
10297
if aux.Task.Critical != nil { // default for critical is always true
10398
role.Critical = *aux.Task.Critical
10499
} else {

0 commit comments

Comments
 (0)