Skip to content

Commit eefb2da

Browse files
committed
[executor] Split out actions.go from handlers file
1 parent 969fa29 commit eefb2da

2 files changed

Lines changed: 105 additions & 70 deletions

File tree

executor/actions.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2018-2022 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package executor
26+
27+
import (
28+
"context"
29+
"encoding/json"
30+
31+
"github.com/AliceO2Group/Control/common/event"
32+
"github.com/AliceO2Group/Control/common/utils"
33+
"github.com/AliceO2Group/Control/common/utils/uid"
34+
"github.com/AliceO2Group/Control/executor/executable"
35+
mesos "github.com/mesos/mesos-go/api/v1/lib"
36+
"github.com/mesos/mesos-go/api/v1/lib/executor/calls"
37+
"github.com/sirupsen/logrus"
38+
)
39+
40+
func makeSendStatusUpdateFunc(state *internalState, task mesos.TaskInfo) executable.SendStatusFunc {
41+
return func(envId uid.ID, mesosState mesos.TaskState, message string) {
42+
status := newStatus(envId, state, task.TaskID)
43+
status.State = &mesosState
44+
status.Message = utils.ProtoString(message)
45+
state.statusCh <- status
46+
}
47+
}
48+
49+
func makeSendDeviceEventFunc(state *internalState) executable.SendDeviceEventFunc {
50+
return func(envId uid.ID, event event.DeviceEvent) {
51+
jsonEvent, err := json.Marshal(event)
52+
if err != nil {
53+
log.WithError(err).
54+
Warning("error marshaling event from task")
55+
return
56+
}
57+
state.messageCh <- jsonEvent
58+
}
59+
}
60+
61+
func makeSendMessageFunc(state *internalState) executable.SendMessageFunc {
62+
return func(message []byte) {
63+
// to send task events using state.
64+
state.messageCh <- message
65+
}
66+
}
67+
68+
func sendOutgoingMessage(state *internalState, message []byte) {
69+
_, _ = state.cli.Send(context.TODO(), calls.NonStreaming(calls.Message(message)))
70+
}
71+
72+
func performStatusUpdate(state *internalState, status mesos.TaskStatus) {
73+
if status.State == nil {
74+
log.Warn("status with nil state received")
75+
} else if *status.State == mesos.TASK_FAILED { // failed task updates are sent separately with less priority
76+
state.activeTasksMu.Lock()
77+
state.failedTasks[status.TaskID] = status
78+
delete(state.activeTasks, status.TaskID)
79+
state.activeTasksMu.Unlock()
80+
} else {
81+
switch *status.State {
82+
case mesos.TASK_DROPPED:
83+
fallthrough
84+
case mesos.TASK_FINISHED:
85+
fallthrough
86+
case mesos.TASK_GONE:
87+
fallthrough
88+
case mesos.TASK_KILLED:
89+
fallthrough
90+
case mesos.TASK_LOST:
91+
state.activeTasksMu.Lock()
92+
delete(state.activeTasks, status.TaskID)
93+
state.activeTasksMu.Unlock()
94+
}
95+
err := update(state, status)
96+
if err != nil { // in case of failed update, we just print an error message
97+
log.WithFields(logrus.Fields{
98+
"task": status.TaskID,
99+
"state": status.State.String(),
100+
}).
101+
Warn("executor failed to send task status update")
102+
}
103+
}
104+
}

executor/handlers.go

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* === This file is part of ALICE O² ===
33
*
4-
* Copyright 2018-2019 CERN and copyright holders of ALICE O².
4+
* Copyright 2018-2022 CERN and copyright holders of ALICE O².
55
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
66
*
77
* This program is free software: you can redistribute it and/or modify
@@ -31,10 +31,8 @@ import (
3131
"fmt"
3232
"time"
3333

34-
"github.com/AliceO2Group/Control/common/event"
3534
"github.com/AliceO2Group/Control/common/logger/infologger"
3635
"github.com/AliceO2Group/Control/common/utils"
37-
"github.com/AliceO2Group/Control/common/utils/uid"
3836
"github.com/AliceO2Group/Control/core/controlcommands"
3937
"github.com/AliceO2Group/Control/executor/executable"
4038
"github.com/AliceO2Group/Control/executor/executorcmd"
@@ -45,73 +43,6 @@ import (
4543
"github.com/sirupsen/logrus"
4644
)
4745

48-
func makeSendStatusUpdateFunc(state *internalState, task mesos.TaskInfo) executable.SendStatusFunc {
49-
return func(envId uid.ID, mesosState mesos.TaskState, message string) {
50-
status := newStatus(envId, state, task.TaskID)
51-
status.State = &mesosState
52-
status.Message = utils.ProtoString(message)
53-
state.statusCh <- status
54-
}
55-
}
56-
57-
func makeSendDeviceEventFunc(state *internalState) executable.SendDeviceEventFunc {
58-
return func(envId uid.ID, event event.DeviceEvent) {
59-
jsonEvent, err := json.Marshal(event)
60-
if err != nil {
61-
log.WithError(err).
62-
Warning("error marshaling event from task")
63-
return
64-
}
65-
state.messageCh <- jsonEvent
66-
}
67-
}
68-
69-
func makeSendMessageFunc(state *internalState) executable.SendMessageFunc {
70-
return func(message []byte) {
71-
// to send task events using state.
72-
state.messageCh <- message
73-
}
74-
}
75-
76-
func handleOutgoingMessage(state *internalState, message []byte) {
77-
_, _ = state.cli.Send(context.TODO(), calls.NonStreaming(calls.Message(message)))
78-
}
79-
80-
func handleStatusUpdate(state *internalState, status mesos.TaskStatus) {
81-
if status.State == nil {
82-
log.Warn("status with nil state received")
83-
} else if *status.State == mesos.TASK_FAILED { // failed task updates are sent separately with less priority
84-
state.activeTasksMu.Lock()
85-
state.failedTasks[status.TaskID] = status
86-
delete(state.activeTasks, status.TaskID)
87-
state.activeTasksMu.Unlock()
88-
} else {
89-
switch *status.State {
90-
case mesos.TASK_DROPPED:
91-
fallthrough
92-
case mesos.TASK_FINISHED:
93-
fallthrough
94-
case mesos.TASK_GONE:
95-
fallthrough
96-
case mesos.TASK_KILLED:
97-
fallthrough
98-
case mesos.TASK_LOST:
99-
state.activeTasksMu.Lock()
100-
delete(state.activeTasks, status.TaskID)
101-
state.activeTasksMu.Unlock()
102-
}
103-
err := update(state, status)
104-
if err != nil { // in case of failed update, we just print an error message
105-
log.WithFields(logrus.Fields{
106-
"task": status.TaskID,
107-
"state": status.State.String(),
108-
}).
109-
Warn("executor failed to send task status update")
110-
}
111-
}
112-
113-
}
114-
11546
// Handle incoming message event. This function is thread-safe with respect to state.
11647
func handleMessageEvent(state *internalState, data []byte) (err error) {
11748
var incoming struct {

0 commit comments

Comments
 (0)