Skip to content

Commit d6cc634

Browse files
committed
[core] Ensure all DCS calls are bound by a context with timeout
1 parent 55e0ea0 commit d6cc634

1 file changed

Lines changed: 81 additions & 25 deletions

File tree

core/integration/dcs/plugin.go

Lines changed: 81 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ import (
4848
"google.golang.org/grpc/connectivity"
4949
)
5050

51-
const DCS_DIAL_TIMEOUT = 2 * time.Hour
51+
const (
52+
DCS_DIAL_TIMEOUT = 2 * time.Hour
53+
DCS_GENERAL_OP_TIMEOUT = 45 * time.Second
54+
)
5255

5356
type Plugin struct {
5457
dcsHost string
@@ -156,29 +159,41 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
156159
return
157160
}
158161
varStack := call.VarStack
162+
envId, ok := varStack["environment_id"]
163+
if !ok {
164+
log.Error("cannot acquire environment ID")
165+
return
166+
}
167+
159168
stack = make(map[string]interface{})
160169
stack["StartOfRun"] = func() (out string) { // must formally return string even when we return nothing
161-
log.Debug("performing DCS SOR")
170+
log.WithField("partition", envId).
171+
Debug("performing DCS SOR")
162172

163173
parameters, ok := varStack["dcs_sor_parameters"]
164174
if !ok {
165-
log.Debug("no DCS SOR parameters set")
175+
log.WithField("partition", envId).
176+
Debug("no DCS SOR parameters set")
166177
parameters = "{}"
167178
}
168179

169180
argMap := make(map[string]string)
170181
bytes := []byte(parameters)
171182
err := json.Unmarshal(bytes, &argMap)
172183
if err != nil {
173-
log.WithError(err).Error("error processing DCS SOR parameters")
184+
log.WithField("partition", envId).
185+
WithError(err).
186+
Error("error processing DCS SOR parameters")
174187
return
175188
}
176189

177190
rn := varStack["run_number"]
178191
var runNumber64 int64
179192
runNumber64, err = strconv.ParseInt(rn, 10, 32)
180193
if err != nil {
181-
log.WithError(err).Error("cannot acquire run number for DCS SOR")
194+
log.WithField("partition", envId).
195+
WithError(err).
196+
Error("cannot acquire run number for DCS SOR")
182197
}
183198

184199
rt := dcspb.RunType_TECHNICAL
@@ -196,7 +211,8 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
196211

197212
dcsDetectorsParam, ok := varStack["dcs_detectors"]
198213
if !ok {
199-
log.Debug("empty DCS detectors list provided")
214+
log.WithField("partition", envId).
215+
Debug("empty DCS detectors list provided")
200216
dcsDetectorsParam = "[\"NULL_DETECTOR\"]"
201217
}
202218

@@ -214,7 +230,8 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
214230
for i, det := range detectors {
215231
perDetectorParameters, ok := varStack[strings.ToLower(det.String()) + "_dcs_sor_parameters"]
216232
if !ok {
217-
log.Debug("empty DCS detectors list provided")
233+
log.WithField("partition", envId).
234+
Debug("empty DCS detectors list provided")
218235
perDetectorParameters = "{}"
219236
}
220237
detectorArgMap := make(map[string]string)
@@ -223,6 +240,7 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
223240
if err != nil {
224241
log.WithError(err).
225242
WithField("detector", det.String()).
243+
WithField("partition", envId).
226244
Errorf("error processing DCS SOR parameters for detector %s", det.String())
227245
return
228246
}
@@ -232,6 +250,7 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
232250
if err != nil {
233251
log.WithError(err).
234252
WithField("detector", det.String()).
253+
WithField("partition", envId).
235254
Errorf("error building parameter map for detector %s", det.String())
236255
return
237256
}
@@ -245,47 +264,58 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
245264
if p.dcsClient == nil {
246265
log.WithError(fmt.Errorf("DCS plugin not initialized")).
247266
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
267+
WithField("partition", envId).
248268
Error("failed to perform DCS SOR")
249269
return
250270
}
251271
if p.dcsClient.GetConnState() != connectivity.Ready {
252272
log.WithError(fmt.Errorf("DCS client connection not available")).
253273
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
274+
WithField("partition", envId).
254275
Error("failed to perform DCS SOR")
255276
return
256277
}
257278

258279
var stream dcspb.Configurator_StartOfRunClient
259-
stream, err = p.dcsClient.StartOfRun(context.Background(), &in, grpc.EmptyCallOption{})
280+
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "SOR", envId)
281+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
282+
defer cancel()
283+
stream, err = p.dcsClient.StartOfRun(ctx, &in, grpc.EmptyCallOption{})
260284
if err != nil {
261285
log.WithError(err).
262286
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
287+
WithField("partition", envId).
263288
Error("failed to perform DCS SOR")
264289
}
265290
var dcsEvent *dcspb.RunEvent
266291
for {
267292
dcsEvent, err = stream.Recv()
268293
if err == io.EOF {
269-
log.Debug("DCS SOR event stream EOF, closed")
294+
log.WithField("partition", envId).
295+
Debug("DCS SOR event stream EOF, closed")
270296
break // no more data
271297
}
272298
if err != nil || dcsEvent == nil {
273299
if dcsEvent == nil {
274-
log.Warn("nil DCS event received")
300+
log.WithField("partition", envId).
301+
Warn("nil DCS event received")
275302
err = errors.New("nil DCS event")
276303
}
277-
log.WithError(err).Warn("bad DCS event received")
304+
log.WithError(err).WithField("partition", envId).
305+
Warn("bad DCS event received")
278306
break
279307
}
280308

281309
if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE {
282310
log.WithField("event", dcsEvent).
283311
WithField("detector", dcsEvent.GetDetector().String()).
312+
WithField("partition", envId).
284313
Warn("DCS SOR failure")
285314
return
286315
}
287316
if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK && dcsEvent.GetDetector() == dcspb.Detector_DCS {
288317
log.WithField("event", dcsEvent).
318+
WithField("partition", envId).
289319
Debug("DCS SOR success")
290320
envId, ok := varStack["environment_id"]
291321
if !ok {
@@ -294,30 +324,35 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
294324
p.pendingEORs[envId] = runNumber64
295325
break
296326
}
297-
log.WithField("event", dcsEvent).Debug("incoming DCS SOR event")
327+
log.WithField("event", dcsEvent).
328+
WithField("partition", envId).
329+
Debug("incoming DCS SOR event")
298330
}
299331
return
300332
}
301333
eorFunc := func(runNumber int64) (out string) { // must formally return string even when we return nothing
302-
log.Debug("performing DCS EOR")
334+
log.WithField("partition", envId).Debug("performing DCS EOR")
303335

304336
parameters, ok := varStack["dcs_eor_parameters"]
305337
if !ok {
306-
log.Debug("no DCS EOR parameters set")
338+
log.WithField("partition", envId).Debug("no DCS EOR parameters set")
307339
parameters = "{}"
308340
}
309341

310342
argMap := make(map[string]string)
311343
bytes := []byte(parameters)
312344
err := json.Unmarshal(bytes, &argMap)
313345
if err != nil {
314-
log.WithError(err).Error("error processing DCS EOR parameters")
346+
log.WithError(err).
347+
WithField("partition", envId).
348+
Error("error processing DCS EOR parameters")
315349
return
316350
}
317351

318352
dcsDetectorsParam, ok := varStack["dcs_detectors"]
319353
if !ok {
320-
log.Debug("empty DCS detectors list provided")
354+
log.WithField("partition", envId).
355+
Debug("empty DCS detectors list provided")
321356
dcsDetectorsParam = "[\"NULL_DETECTOR\"]"
322357
}
323358

@@ -334,7 +369,8 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
334369
for i, det := range detectors {
335370
perDetectorParameters, ok := varStack[strings.ToLower(det.String()) + "_dcs_eor_parameters"]
336371
if !ok {
337-
log.Debug("empty DCS detectors list provided")
372+
log.WithField("partition", envId).
373+
Debug("empty DCS detectors list provided")
338374
perDetectorParameters = "{}"
339375
}
340376
detectorArgMap := make(map[string]string)
@@ -343,6 +379,7 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
343379
if err != nil {
344380
log.WithError(err).
345381
WithField("detector", det.String()).
382+
WithField("partition", envId).
346383
Errorf("error processing DCS EOR parameters for detector %s", det.String())
347384
return
348385
}
@@ -352,6 +389,7 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
352389
if err != nil {
353390
log.WithError(err).
354391
WithField("detector", det.String()).
392+
WithField("partition", envId).
355393
Errorf("error building parameter map for detector %s", det.String())
356394
return
357395
}
@@ -365,33 +403,41 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
365403
if p.dcsClient == nil {
366404
log.WithError(fmt.Errorf("DCS plugin not initialized")).
367405
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
406+
WithField("partition", envId).
368407
Error("failed to perform DCS EOR")
369408
return
370409
}
371410
if p.dcsClient.GetConnState() != connectivity.Ready {
372411
log.WithError(fmt.Errorf("DCS client connection not available")).
373412
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
413+
WithField("partition", envId).
374414
Error("failed to perform DCS EOR")
375415
return
376416
}
377417

378418
var stream dcspb.Configurator_EndOfRunClient
379-
stream, err = p.dcsClient.EndOfRun(context.Background(), &in, grpc.EmptyCallOption{})
419+
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "EOR", envId)
420+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
421+
defer cancel()
422+
stream, err = p.dcsClient.EndOfRun(ctx, &in, grpc.EmptyCallOption{})
380423
if err != nil {
381424
log.WithError(err).
382425
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
426+
WithField("partition", envId).
383427
Error("failed to perform DCS EOR")
384428
}
385429
var dcsEvent *dcspb.RunEvent
386430
for {
387431
dcsEvent, err = stream.Recv()
388432
if err == io.EOF {
389-
log.Debug("DCS EOR event stream EOF, closed")
433+
log.WithField("partition", envId).
434+
Debug("DCS EOR event stream EOF, closed")
390435
break // no more data
391436
}
392437
if err != nil || dcsEvent == nil {
393438
if dcsEvent == nil {
394-
log.Warn("nil DCS event received")
439+
log.WithField("partition", envId).
440+
Warn("nil DCS event received")
395441
err = errors.New("nil DCS event")
396442
}
397443
log.WithError(err).Warn("bad DCS event received")
@@ -401,11 +447,13 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
401447
if dcsEvent.GetState() == dcspb.DetectorState_EOR_FAILURE {
402448
log.WithField("event", dcsEvent).
403449
WithField("detector", dcsEvent.GetDetector().String()).
450+
WithField("partition", envId).
404451
Warn("DCS EOR failure")
405452
return
406453
}
407454
if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK && dcsEvent.GetDetector() == dcspb.Detector_DCS {
408455
log.WithField("event", dcsEvent).
456+
WithField("partition", envId).
409457
Debug("DCS EOR success")
410458
envId, ok := varStack["environment_id"]
411459
if !ok {
@@ -415,7 +463,9 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
415463
break
416464
}
417465

418-
log.WithField("event", dcsEvent).Debug("incoming DCS EOR event")
466+
log.WithField("event", dcsEvent).
467+
WithField("partition", envId).
468+
Debug("incoming DCS EOR event")
419469
}
420470
return
421471
}
@@ -425,24 +475,30 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
425475
var err error
426476
runNumber64, err = strconv.ParseInt(rn, 10, 32)
427477
if err != nil {
428-
log.WithError(err).Error("cannot acquire run number for DCS EOR")
478+
log.WithError(err).
479+
WithField("partition", envId).
480+
Error("cannot acquire run number for DCS EOR")
429481
}
430482
return eorFunc(runNumber64)
431483
}
432484
stack["Cleanup"] = func() (out string) {
433485
envId, ok := varStack["environment_id"]
434486
if !ok {
435-
log.Warn("no environment_id found for DCS cleanup")
487+
log.WithField("partition", envId).
488+
Warn("no environment_id found for DCS cleanup")
436489
return
437490
}
438491

439492
runNumber, ok := p.pendingEORs[envId]
440493
if !ok {
441-
log.Debug("DCS cleanup: nothing to do")
494+
log.WithField("partition", envId).
495+
Debug("DCS cleanup: nothing to do")
442496
return
443497
}
444498

445-
log.WithField("runNumber", runNumber).Debug("pending DCS EOR found, performing cleanup")
499+
log.WithField("runNumber", runNumber).
500+
WithField("partition", envId).
501+
Debug("pending DCS EOR found, performing cleanup")
446502
return eorFunc(runNumber)
447503
}
448504

0 commit comments

Comments
 (0)