Skip to content

Commit d7cb32a

Browse files
miltalexteo
authored andcommitted
[core] CTP run start read config from consul
1 parent 2fe5d85 commit d7cb32a

1 file changed

Lines changed: 37 additions & 30 deletions

File tree

core/integration/ctp/plugin.go

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/AliceO2Group/Control/common/utils/uid"
3939
"github.com/AliceO2Group/Control/core/integration"
4040
ctpecspb "github.com/AliceO2Group/Control/core/integration/ctp/protos"
41+
"github.com/AliceO2Group/Control/core/the"
4142
"github.com/AliceO2Group/Control/core/workflow/callable"
4243
"github.com/spf13/viper"
4344
"google.golang.org/grpc"
@@ -47,11 +48,10 @@ import (
4748
const CTP_DIAL_TIMEOUT = 2 * time.Second
4849

4950
type Plugin struct {
50-
ctpHost string
51-
ctpPort int
52-
53-
ctpClient *RpcClient
51+
ctpHost string
52+
ctpPort int
5453

54+
ctpClient *RpcClient
5555
}
5656

5757
func NewPlugin(endpoint string) integration.Plugin {
@@ -121,12 +121,12 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
121121
varStack := call.VarStack
122122
stack = make(map[string]interface{})
123123
// global runs only
124-
stack["RunLoad"] = func() (out string) { // must formally return string even when we return nothing
124+
stack["RunLoad"] = func() (out string) { // must formally return string even when we return nothing
125125
log.Debug("performing CTP Run load Request")
126126

127127
parameters, ok := varStack["ctp_load_parameters"]
128128
if !ok {
129-
log.Debug("no CTP config set, using default configuration")
129+
log.Debug("no CTP Global config set")
130130
parameters = ""
131131
}
132132
// TODO (malexis): pass consul key to CTP if avail
@@ -151,16 +151,16 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
151151
}
152152

153153
// standalone run
154-
if len(strings.Split(detectors," ")) < 2 && varStack["ctp_global_run_enabled"] == "false" {
154+
if len(strings.Split(detectors, " ")) < 2 && varStack["ctp_global_run_enabled"] == "false" {
155155
// we do not load any run cause it is standalone
156-
log.Debug("not a CTP Global Run")
157-
return
156+
log.Debug("not a CTP Global Run, continuing with CTP Run Start")
157+
return
158158
}
159159

160160
in := ctpecspb.RunLoadRequest{
161-
Runn: uint32(runNumber64),
161+
Runn: uint32(runNumber64),
162162
Detectors: detectors,
163-
Config: parameters,
163+
Config: parameters,
164164
}
165165
if p.ctpClient == nil {
166166
log.WithError(fmt.Errorf("CTP plugin not initialized")).
@@ -191,15 +191,21 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
191191
}
192192
return
193193
}
194-
stack["RunStart"] = func() (out string) { // must formally return string even when we return nothing
194+
stack["RunStart"] = func() (out string) { // must formally return string even when we return nothing
195195
log.Debug("performing CTP Run Start")
196196

197-
parameters, ok := varStack["ctp_run_parameters"]
197+
parameters, ok := varStack["ctp_runtime_config"]
198198
if !ok {
199199
log.Debug("no CTP config set, using default configuration")
200200
parameters = ""
201201
}
202-
// TODO (malexis): pass consul key to CTP if avail
202+
ctpConfig, ctpErr := the.ConfSvc().GetRuntimeEntry("ctp", parameters)
203+
if ctpErr != nil {
204+
log.WithError(ctpErr).
205+
WithField("endpoint", viper.GetString("ctpServiceEndpoint")).
206+
Error("failed to load config")
207+
return
208+
}
203209

204210
rn := varStack["run_number"]
205211
var runNumber64 int64
@@ -220,16 +226,16 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
220226
return
221227
}
222228

223-
// if global run then start with empty
224-
if len(strings.Split(detectors," ")) >= 2 || varStack["ctp_global_run_enabled"] == "true" {
229+
// if global run then start with empty string in detectors
230+
if len(strings.Split(detectors, " ")) >= 2 || varStack["ctp_global_run_enabled"] == "true" {
225231
// global run detectors ""
226232
detectors = ""
227233
}
228234

229235
in := ctpecspb.RunStartRequest{
230-
Runn: uint32(runNumber64),
236+
Runn: uint32(runNumber64),
231237
Detector: detectors,
232-
Config: parameters,
238+
Config: ctpConfig,
233239
}
234240

235241
if p.ctpClient == nil {
@@ -287,13 +293,13 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
287293
}
288294

289295
// if global run then start with empty
290-
if len(strings.Split(detectors," ")) >= 2 || varStack["ctp_global_run_enabled"] == "true" {
296+
if len(strings.Split(detectors, " ")) >= 2 || varStack["ctp_global_run_enabled"] == "true" {
291297
// global run detectors ""
292298
detectors = ""
293299
}
294300

295301
in := ctpecspb.RunStopRequest{
296-
Runn: uint32(runNumber64),
302+
Runn: uint32(runNumber64),
297303
Detector: detectors,
298304
}
299305

@@ -318,7 +324,7 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
318324
Error("failed to perform Run Stop request")
319325
}
320326
if response != nil {
321-
if response.Rc != 0 {
327+
if response.Rc != 0 {
322328
log.WithField("response rc", response.Rc).
323329
WithField("Message", response.Msg).
324330
Error("Run Stop failed")
@@ -347,15 +353,16 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
347353
return
348354
}
349355

350-
// if global run then unloa
351-
if len(strings.Split(detectors," ")) < 2 && varStack["ctp_global_run_enabled"] == "false" {
356+
// if global run then unload
357+
if len(strings.Split(detectors, " ")) < 2 && varStack["ctp_global_run_enabled"] == "false" {
358+
log.Debug("not a CTP Global Run, skipping CTP Run Unload")
352359
return
353360
}
354361

355362
in := ctpecspb.RunStopRequest{
356-
Runn: uint32(runNumber64),
363+
Runn: uint32(runNumber64),
357364
// "" when unloading global run
358-
Detector: "",
365+
Detector: "",
359366
}
360367

361368
if p.ctpClient == nil {
@@ -379,10 +386,10 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
379386
Error("failed to perform Run Unload request")
380387
}
381388
if response != nil {
382-
if response.Rc != 0 {
383-
log.WithField("response rc", response.Rc).
384-
WithField("Message", response.Msg).
385-
Error("Run Unload failed")
389+
if response.Rc != 0 {
390+
log.WithField("response rc", response.Rc).
391+
WithField("Message", response.Msg).
392+
Error("Run Unload failed")
386393
}
387394
}
388395
return
@@ -401,7 +408,7 @@ func (p *Plugin) parseDetectors(ctsDetectorsParam string) (detectors string, err
401408
}
402409

403410
detectors = strings.ToLower(strings.Join(detectorsSlice, " "))
404-
return
411+
return
405412
}
406413

407414
func (p *Plugin) Destroy() error {

0 commit comments

Comments
 (0)