Skip to content

Commit 219599f

Browse files
committed
[core] Parallelize ODC Shutdown calls during cleanup
1 parent 37687e0 commit 219599f

1 file changed

Lines changed: 47 additions & 38 deletions

File tree

core/integration/odc/handlers.go

Lines changed: 47 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"errors"
3030
"fmt"
3131
"strings"
32+
"sync"
3233
"time"
3334

3435
"github.com/AliceO2Group/Control/common/logger/infologger"
@@ -90,7 +91,7 @@ func handleGetState(ctx context.Context, odcClient *RpcClient, envId string) (st
9091
}
9192

9293
func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
93-
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
94+
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId))
9495
req := &odcpb.StartRequest{
9596
Request: &odcpb.StateRequest{
9697
Partitionid: envId,
@@ -141,7 +142,8 @@ func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string
141142
if replyStatus := setPropertiesResponse.Status; replyStatus != odcpb.ReplyStatus_SUCCESS {
142143
return fmt.Errorf("status %s from ODC", replyStatus.String())
143144
}
144-
log.WithFields(logrus.Fields{
145+
log.WithField("partition", envId).
146+
WithFields(logrus.Fields{
145147
"odcMsg": setPropertiesResponse.Msg,
146148
"odcStatus": setPropertiesResponse.Status.String(),
147149
"odcExectime": setPropertiesResponse.Exectime,
@@ -167,19 +169,20 @@ func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string
167169
if replyStatus := rep.Reply.Status; replyStatus != odcpb.ReplyStatus_SUCCESS {
168170
return fmt.Errorf("status %s from ODC", replyStatus.String())
169171
}
170-
log.WithFields(logrus.Fields{
171-
"odcMsg": rep.Reply.Msg,
172-
"odcStatus": rep.Reply.Status.String(),
173-
"odcExectime": rep.Reply.Exectime,
174-
"odcRunid": rep.Reply.Partitionid,
175-
"odcSessionid": rep.Reply.Sessionid,
176-
}).
172+
log.WithField("partition", envId).
173+
WithFields(logrus.Fields{
174+
"odcMsg": rep.Reply.Msg,
175+
"odcStatus": rep.Reply.Status.String(),
176+
"odcExectime": rep.Reply.Exectime,
177+
"odcRunid": rep.Reply.Partitionid,
178+
"odcSessionid": rep.Reply.Sessionid,
179+
}).
177180
Debug("call to ODC complete")
178181
return err
179182
}
180183

181184
func handleStop(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
182-
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
185+
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId))
183186
req := &odcpb.StopRequest{
184187
Request: &odcpb.StateRequest{
185188
Partitionid: envId,
@@ -211,19 +214,20 @@ func handleStop(ctx context.Context, odcClient *RpcClient, arguments map[string]
211214
if replyStatus := rep.Reply.Status; replyStatus != odcpb.ReplyStatus_SUCCESS {
212215
return fmt.Errorf("status %s from ODC", replyStatus.String())
213216
}
214-
log.WithFields(logrus.Fields{
215-
"odcMsg": rep.Reply.Msg,
216-
"odcStatus": rep.Reply.Status.String(),
217-
"odcExectime": rep.Reply.Exectime,
218-
"odcRunid": rep.Reply.Partitionid,
219-
"odcSessionid": rep.Reply.Sessionid,
220-
}).
217+
log.WithField("partition", envId).
218+
WithFields(logrus.Fields{
219+
"odcMsg": rep.Reply.Msg,
220+
"odcStatus": rep.Reply.Status.String(),
221+
"odcExectime": rep.Reply.Exectime,
222+
"odcRunid": rep.Reply.Partitionid,
223+
"odcSessionid": rep.Reply.Sessionid,
224+
}).
221225
Debug("call to ODC complete")
222226
return err
223227
}
224228

225229
func handleReset(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
226-
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
230+
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId))
227231
if envId == "" {
228232
return errors.New("cannot proceed with empty environment id")
229233
}
@@ -246,7 +250,7 @@ func handleReset(ctx context.Context, odcClient *RpcClient, arguments map[string
246250
}
247251

248252
func handleCleanupLegacy(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
249-
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
253+
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId))
250254
if envId == "" {
251255
return errors.New("cannot proceed with empty environment id")
252256
}
@@ -280,8 +284,9 @@ func handleCleanupLegacy(ctx context.Context, odcClient *RpcClient, arguments ma
280284
}
281285

282286
func handleCleanup(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
283-
log.Debug("handleCleanup starting")
284-
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
287+
log.WithField("partition", envId).
288+
Debug("handleCleanup starting")
289+
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId))
285290

286291
// First we query ODC for the full list of active partitions
287292
req := &odcpb.StatusRequest{Running: true}
@@ -290,7 +295,6 @@ func handleCleanup(ctx context.Context, odcClient *RpcClient, arguments map[stri
290295
var rep *odcpb.StatusReply
291296

292297
rep, err = odcClient.Status(ctx, req, grpc.EmptyCallOption{})
293-
log.Debug("ODC status query done")
294298
if err != nil {
295299
return printGrpcError(err)
296300
}
@@ -306,7 +310,8 @@ func handleCleanup(ctx context.Context, odcClient *RpcClient, arguments map[stri
306310
if replyStatus := rep.GetStatus(); replyStatus != odcpb.ReplyStatus_SUCCESS {
307311
return fmt.Errorf("status %s from ODC", replyStatus.String())
308312
}
309-
log.WithFields(logrus.Fields{
313+
log.WithField("partition", envId).
314+
WithFields(logrus.Fields{
310315
"odcCall": "Status",
311316
"odcMsg": rep.GetMsg(),
312317
"odcStatus": rep.GetStatus().String(),
@@ -318,7 +323,8 @@ func handleCleanup(ctx context.Context, odcClient *RpcClient, arguments map[stri
318323
for i, v := range rep.GetPartitions() {
319324
partitionIdsKnownToOdc[i] = v.Partitionid
320325
}
321-
log.Debugf("partitions known to ODC: %s", strings.Join(partitionIdsKnownToOdc, ", "))
326+
log.WithField("partition", envId).
327+
Debugf("partitions known to ODC: %s", strings.Join(partitionIdsKnownToOdc, ", "))
322328

323329
knownEnvs := environment.ManagerInstance().Ids()
324330
partitionsToClean := make(map[string]struct{})
@@ -349,19 +355,26 @@ func handleCleanup(ctx context.Context, odcClient *RpcClient, arguments map[stri
349355
partitionsToCleanStr[i] = k
350356
i++
351357
}
352-
log.Debugf("partitions about to be cleaned: %s", strings.Join(partitionIdsKnownToOdc, ", "))
358+
log.WithField("partition", envId).
359+
Debugf("partitions about to be cleaned: %s", strings.Join(partitionIdsKnownToOdc, ", "))
353360

354-
// Then the actual cleanup calls begin, one partition at a time...
355-
for odcPartitionId, _ := range partitionsToClean {
361+
wg := &sync.WaitGroup{}
362+
wg.Add(len(partitionsToClean))
356363

357-
err = doShutdown(ctx, odcClient, arguments, odcPartitionId) // FIXME make this parallel
358-
if err != nil {
359-
log.WithError(printGrpcError(err)).
360-
WithField("level", infologger.IL_Devel).
361-
WithField("partition", odcPartitionId).
362-
Warn("ODC Shutdown call failed")
363-
}
364+
// Then the actual cleanup calls begin, in parallel...
365+
for odcPartitionId, _ := range partitionsToClean {
366+
go func(odcPartitionId string) {
367+
defer wg.Done()
368+
err = doShutdown(ctx, odcClient, arguments, odcPartitionId) // FIXME make this parallel
369+
if err != nil {
370+
log.WithError(printGrpcError(err)).
371+
WithField("level", infologger.IL_Devel).
372+
WithField("partition", odcPartitionId).
373+
Warn("ODC Shutdown call failed")
374+
}
375+
}(odcPartitionId)
364376
}
377+
wg.Wait()
365378

366379
return nil // We clobber the error because nothing can be done for a failed cleanup
367380
}
@@ -481,10 +494,6 @@ func doShutdown(ctx context.Context, odcClient *RpcClient, arguments map[string]
481494
return err
482495
}
483496

484-
func handleExit(ctx context.Context, odcClient *RpcClient, arguments map[string]string ) error {
485-
return nil
486-
}
487-
488497
func handleRun(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
489498
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
490499
if envId == "" {

0 commit comments

Comments
 (0)