Skip to content

Commit de51471

Browse files
committed
[core] Ensure all DDsched calls are bound by a context with timeout
1 parent d6cc634 commit de51471

1 file changed

Lines changed: 72 additions & 66 deletions

File tree

core/integration/ddsched/plugin.go

Lines changed: 72 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ import (
4545
"google.golang.org/grpc/connectivity"
4646
)
4747

48-
const DDSCHED_DIAL_TIMEOUT = 2 * time.Second
48+
const (
49+
DDSCHED_DIAL_TIMEOUT = 2 * time.Second
50+
DDSCHED_INITIALIZE_TIMEOUT = 30 * time.Second
51+
DDSCHED_TERMINATE_TIMEOUT = 30 * time.Second
52+
DDSCHED_DEFAULT_POLLING_TIMEOUT = 30 * time.Second
53+
)
4954

5055

5156
type Plugin struct {
@@ -148,9 +153,16 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
148153
return
149154
}
150155
varStack := call.VarStack
156+
envId, ok := varStack["environment_id"]
157+
if !ok {
158+
log.Error("cannot acquire environment ID")
159+
return
160+
}
161+
151162
stack = make(map[string]interface{})
152163
stack["PartitionInitialize"] = func() (out string) { // must formally return string even when we return nothing
153-
log.Debug("performing DD scheduler PartitionInitialize")
164+
log.WithField("partition", envId).
165+
Debug("performing DD scheduler PartitionInitialize")
154166

155167
parentRoleI := call.GetParentRole()
156168
parentRole, ok := parentRoleI.(workflow.Role)
@@ -162,7 +174,9 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
162174
workflow.LeafWalk(root, func(role workflow.Role) {
163175
roleVS, err := role.ConsolidatedVarStack()
164176
if err != nil {
165-
log.WithError(err).Error("error processing DD host_id_map")
177+
log.WithError(err).
178+
WithField("partition", envId).
179+
Error("error processing DD host_id_map")
166180
return
167181
}
168182
var(
@@ -185,12 +199,6 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
185199
}
186200
})
187201

188-
envId, ok := varStack["environment_id"]
189-
if !ok {
190-
log.Error("cannot acquire environment ID for DD scheduler PartitionInitialize")
191-
return
192-
}
193-
194202
in := ddpb.PartitionInitRequest{
195203
PartitionInfo: &ddpb.PartitionInfo{
196204
EnvironmentId: envId,
@@ -201,14 +209,14 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
201209
}
202210
if p.ddSchedClient == nil {
203211
log.WithError(fmt.Errorf("DD scheduler plugin not initialized")).
204-
WithField("environment_id", envId).
212+
WithField("partition", envId).
205213
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
206214
Error("failed to perform DD scheduler PartitionInitialize")
207215
return
208216
}
209217
if p.ddSchedClient.GetConnState() != connectivity.Ready {
210218
log.WithError(fmt.Errorf("DD scheduler client connection not available")).
211-
WithField("environment_id", envId).
219+
WithField("partition", envId).
212220
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
213221
Error("failed to perform DD scheduler PartitionInitialize")
214222
return
@@ -218,48 +226,51 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
218226
response *ddpb.PartitionResponse
219227
err error
220228
)
221-
response, err = p.ddSchedClient.PartitionInitialize(context.Background(), &in, grpc.EmptyCallOption{})
229+
timeout := callable.AcquireTimeout(DDSCHED_INITIALIZE_TIMEOUT, varStack, "Initialize", envId)
230+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
231+
defer cancel()
232+
response, err = p.ddSchedClient.PartitionInitialize(ctx, &in, grpc.EmptyCallOption{})
222233
if err != nil {
223234
log.WithError(err).
224-
WithField("environment_id", envId).
235+
WithField("partition", envId).
225236
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
226237
Error("failed to perform DD scheduler PartitionInitialize")
227238
return
228239
}
229240
if response.PartitionState != ddpb.PartitionState_PARTITION_CONFIGURING &&
230241
response.PartitionState != ddpb.PartitionState_PARTITION_CONFIGURED {
231242
log.WithError(fmt.Errorf("PartitionInitialize returned unexpected state %s (expected: PARTITION_CONFIGURING)", response.PartitionState.String())).
232-
WithField("environment_id", envId).
243+
WithField("partition", envId).
233244
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
234245
Error("failed to perform DD scheduler PartitionInitialize")
235246
return
236247
}
237248

238249
pollingSeconds, ok := varStack["dd_polling_timeout"]
239-
if !ok {
240-
pollingSeconds = "30"
250+
pollingTimeout := DDSCHED_DEFAULT_POLLING_TIMEOUT
251+
if ok {
252+
pollingSecondsInt, _ := strconv.Atoi(pollingSeconds)
253+
pollingTimeout = time.Duration(pollingSecondsInt) * time.Second
241254
}
242-
pollingSecondsInt, _ := strconv.Atoi(pollingSeconds)
243-
pollingTimeout := time.Duration(pollingSecondsInt) * time.Second
244255

245256
PARTITION_STATE_POLLING:
246257
for startPolling := time.Now(); ; {
247-
response, err = p.ddSchedClient.PartitionStatus(context.Background(), in.PartitionInfo, grpc.EmptyCallOption{})
258+
response, err = p.ddSchedClient.PartitionStatus(ctx, in.PartitionInfo, grpc.EmptyCallOption{})
248259
switch response.PartitionState {
249260
case ddpb.PartitionState_PARTITION_CONFIGURING:
250261
time.Sleep(100 * time.Millisecond)
251262
case ddpb.PartitionState_PARTITION_CONFIGURED:
252263
break PARTITION_STATE_POLLING
253264
default:
254265
log.WithError(fmt.Errorf("PartitionInitialize landed on unexpected state %s (expected: PARTITION_CONFIGURED)", response.PartitionState.String())).
255-
WithField("environment_id", envId).
266+
WithField("partition", envId).
256267
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
257268
Error("failed to perform DD scheduler PartitionInitialize")
258269
break PARTITION_STATE_POLLING
259270
}
260271
if time.Since(startPolling) > pollingTimeout {
261272
log.WithError(fmt.Errorf("PartitionInitialize timeout exceeded. Latest state %s (expected: PARTITION_CONFIGURED)", response.PartitionState.String())).
262-
WithField("environment_id", envId).
273+
WithField("partition", envId).
263274
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
264275
WithField("timeout", pollingTimeout).
265276
Error("failed to perform DD scheduler PartitionInitialize")
@@ -269,13 +280,8 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
269280
return
270281
}
271282
stack["PartitionTerminate"] = func() (out string) { // must formally return string even when we return nothing
272-
log.Debug("performing DD scheduler PartitionTerminate")
273-
274-
envId, ok := varStack["environment_id"]
275-
if !ok {
276-
log.Error("cannot acquire environment ID for DD scheduler PartitionTerminate")
277-
return
278-
}
283+
log.WithField("partition", envId).
284+
Debug("performing DD scheduler PartitionTerminate")
279285

280286
in := ddpb.PartitionTermRequest{
281287
PartitionInfo: &ddpb.PartitionInfo{
@@ -285,14 +291,14 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
285291
}
286292
if p.ddSchedClient == nil {
287293
log.WithError(fmt.Errorf("DD scheduler plugin not initialized")).
288-
WithField("environment_id", envId).
294+
WithField("partition", envId).
289295
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
290296
Error("failed to perform DD scheduler PartitionTerminate")
291297
return
292298
}
293299
if p.ddSchedClient.GetConnState() != connectivity.Ready {
294300
log.WithError(fmt.Errorf("DD scheduler client connection not available")).
295-
WithField("environment_id", envId).
301+
WithField("partition", envId).
296302
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
297303
Error("failed to perform DD scheduler PartitionTerminate")
298304
return
@@ -302,47 +308,49 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
302308
response *ddpb.PartitionResponse
303309
err error
304310
)
305-
response, err = p.ddSchedClient.PartitionTerminate(context.Background(), &in, grpc.EmptyCallOption{})
311+
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
312+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
313+
defer cancel()
314+
response, err = p.ddSchedClient.PartitionTerminate(ctx, &in, grpc.EmptyCallOption{})
306315
if err != nil {
307316
log.WithError(err).
308-
WithField("environment_id", envId).
317+
WithField("partition", envId).
309318
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
310319
Error("failed to perform DD scheduler PartitionTerminate")
311320
}
312321
if response.PartitionState != ddpb.PartitionState_PARTITION_TERMINATING &&
313322
response.PartitionState != ddpb.PartitionState_PARTITION_TERMINATED {
314323
log.WithError(fmt.Errorf("PartitionTerminate returned unexpected state %s (expected: PARTITION_TERMINATING)", response.PartitionState.String())).
315-
WithField("environment_id", envId).
324+
WithField("partition", envId).
316325
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
317326
Error("failed to perform DD scheduler PartitionTerminate")
318327
}
319328

320329
pollingSeconds, ok := varStack["dd_polling_timeout"]
321-
if !ok {
322-
// Default to 30s when dd_polling_timeout is not set
323-
pollingSeconds = "30"
330+
pollingTimeout := DDSCHED_DEFAULT_POLLING_TIMEOUT
331+
if ok {
332+
pollingSecondsInt, _ := strconv.Atoi(pollingSeconds)
333+
pollingTimeout = time.Duration(pollingSecondsInt) * time.Second
324334
}
325-
pollingSecondsInt, _ := strconv.Atoi(pollingSeconds)
326-
pollingTimeout := time.Duration(pollingSecondsInt) * time.Second
327335

328336
PARTITION_STATE_POLLING:
329337
for startPolling := time.Now(); ; {
330-
response, err = p.ddSchedClient.PartitionStatus(context.Background(), in.PartitionInfo, grpc.EmptyCallOption{})
338+
response, err = p.ddSchedClient.PartitionStatus(ctx, in.PartitionInfo, grpc.EmptyCallOption{})
331339
switch response.PartitionState {
332340
case ddpb.PartitionState_PARTITION_TERMINATING:
333341
time.Sleep(100 * time.Millisecond)
334342
case ddpb.PartitionState_PARTITION_TERMINATED:
335343
break PARTITION_STATE_POLLING
336344
default:
337345
log.WithError(fmt.Errorf("PartitionTerminate landed on unexpected state %s (expected: PARTITION_TERMINATED)", response.PartitionState.String())).
338-
WithField("environment_id", envId).
346+
WithField("partition", envId).
339347
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
340348
Error("failed to perform DD scheduler PartitionTerminate")
341349
break PARTITION_STATE_POLLING
342350
}
343351
if time.Since(startPolling) > pollingTimeout {
344352
log.WithError(fmt.Errorf("PartitionTerminate timeout exceeded. Latest state %s (expected: PARTITION_TERMINATED)", response.PartitionState.String())).
345-
WithField("environment_id", envId).
353+
WithField("partition", envId).
346354
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
347355
WithField("timeout", pollingTimeout).
348356
Error("failed to perform DD scheduler PartitionTerminate")
@@ -352,24 +360,19 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
352360
return
353361
}
354362
stack["EnsureTermination"] = func() (out string) {
355-
log.Debug("performing DD scheduler session cleanup")
356-
357-
envId, ok := varStack["environment_id"]
358-
if !ok {
359-
log.Error("cannot acquire environment ID for DD scheduler session cleanup")
360-
return
361-
}
363+
log.WithField("partition", envId).
364+
Debug("performing DD scheduler session cleanup")
362365

363366
if p.ddSchedClient == nil {
364367
log.WithError(fmt.Errorf("DD scheduler plugin not initialized")).
365-
WithField("environment_id", envId).
368+
WithField("partition", envId).
366369
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
367370
Error("failed to perform DD scheduler session cleanup")
368371
return
369372
}
370373
if p.ddSchedClient.GetConnState() != connectivity.Ready {
371374
log.WithError(fmt.Errorf("DD scheduler client connection not available")).
372-
WithField("environment_id", envId).
375+
WithField("partition", envId).
373376
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
374377
Error("failed to perform DD scheduler session cleanup")
375378
return
@@ -384,18 +387,21 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
384387
EnvironmentId: envId,
385388
PartitionId: envId,
386389
}
387-
response, err = p.ddSchedClient.PartitionStatus(context.Background(), &infoReq, grpc.EmptyCallOption{})
390+
timeout := callable.AcquireTimeout(DDSCHED_TERMINATE_TIMEOUT, varStack, "Terminate", envId)
391+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
392+
defer cancel()
393+
response, err = p.ddSchedClient.PartitionStatus(ctx, &infoReq, grpc.EmptyCallOption{})
388394
if err != nil {
389395
log.WithError(err).
390-
WithField("environment_id", envId).
396+
WithField("partition", envId).
391397
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
392398
Error("failed to perform DD scheduler session cleanup")
393399
return
394400
}
395401

396402
if response == nil {
397403
log.WithError(errors.New("nil response")).
398-
WithField("environment_id", envId).
404+
WithField("partition", envId).
399405
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
400406
Error("failed to perform DD scheduler session cleanup")
401407
return
@@ -410,7 +416,7 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
410416
response.PartitionState == ddpb.PartitionState_PARTITION_TERMINATED {
411417
// DDsched is in an acceptable state, so we return
412418
log.WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
413-
WithField("environment_id", envId).
419+
WithField("partition", envId).
414420
WithField("partition_state", response.PartitionState).
415421
Trace("DD scheduler session cleanup not needed")
416422
return
@@ -427,50 +433,50 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
427433
}
428434

429435
log.WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
430-
WithField("environment_id", envId).
436+
WithField("partition", envId).
431437
WithField("partition_state", response.PartitionState).
432438
Warn("DD scheduler partition still active, performing PartitionTerminate")
433439

434-
response, err = p.ddSchedClient.PartitionTerminate(context.Background(), &in, grpc.EmptyCallOption{})
440+
response, err = p.ddSchedClient.PartitionTerminate(ctx, &in, grpc.EmptyCallOption{})
435441
if err != nil {
436442
log.WithError(err).
437-
WithField("environment_id", envId).
443+
WithField("partition", envId).
438444
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
439445
Error("failed to perform DD scheduler PartitionTerminate")
440446
}
441447
if response.PartitionState != ddpb.PartitionState_PARTITION_TERMINATING &&
442448
response.PartitionState != ddpb.PartitionState_PARTITION_TERMINATED {
443449
log.WithError(fmt.Errorf("PartitionTerminate returned unexpected state %s (expected: PARTITION_TERMINATING)", response.PartitionState.String())).
444-
WithField("environment_id", envId).
450+
WithField("partition", envId).
445451
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
446452
Error("failed to perform DD scheduler PartitionTerminate")
447453
}
448454

449455
pollingSeconds, ok := varStack["dd_polling_timeout"]
450-
if !ok {
451-
// Default to 30s when dd_polling_timeout is not set
452-
pollingSeconds = "30"
456+
pollingTimeout := DDSCHED_DEFAULT_POLLING_TIMEOUT
457+
if ok {
458+
pollingSecondsInt, _ := strconv.Atoi(pollingSeconds)
459+
pollingTimeout = time.Duration(pollingSecondsInt) * time.Second
453460
}
454-
pollingSecondsInt, _ := strconv.Atoi(pollingSeconds)
455-
pollingTimeout := time.Duration(pollingSecondsInt) * time.Second
461+
456462
PARTITION_STATE_POLLING:
457463
for startPolling := time.Now(); ; {
458-
response, err = p.ddSchedClient.PartitionStatus(context.Background(), in.PartitionInfo, grpc.EmptyCallOption{})
464+
response, err = p.ddSchedClient.PartitionStatus(ctx, in.PartitionInfo, grpc.EmptyCallOption{})
459465
switch response.PartitionState {
460466
case ddpb.PartitionState_PARTITION_TERMINATING:
461467
time.Sleep(100 * time.Millisecond)
462468
case ddpb.PartitionState_PARTITION_TERMINATED:
463469
break PARTITION_STATE_POLLING
464470
default:
465471
log.WithError(fmt.Errorf("PartitionTerminate landed on unexpected state %s (expected: PARTITION_TERMINATED)", response.PartitionState.String())).
466-
WithField("environment_id", envId).
472+
WithField("partition", envId).
467473
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
468474
Error("failed to perform DD scheduler PartitionTerminate")
469475
break PARTITION_STATE_POLLING
470476
}
471477
if time.Since(startPolling) > pollingTimeout {
472478
log.WithError(fmt.Errorf("PartitionTerminate timeout exceeded. Latest state %s (expected: PARTITION_TERMINATED)", response.PartitionState.String())).
473-
WithField("environment_id", envId).
479+
WithField("partition", envId).
474480
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
475481
WithField("timeout", pollingTimeout).
476482
Error("failed to perform DD scheduler PartitionTerminate")

0 commit comments

Comments
 (0)