Skip to content

Commit f17fbbe

Browse files
committed
[core] Add machine counter to scheduler.go offers handler
1 parent 53c9a84 commit f17fbbe

1 file changed

Lines changed: 29 additions & 7 deletions

File tree

core/task/scheduler.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/AliceO2Group/Control/common/utils/uid"
4444
"github.com/AliceO2Group/Control/core/task/channel"
4545
"github.com/AliceO2Group/Control/core/task/schedutil"
46+
"github.com/AliceO2Group/Control/core/the"
4647
"github.com/spf13/viper"
4748

4849
"github.com/AliceO2Group/Control/common/event"
@@ -453,6 +454,8 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
453454

454455
timeGotDescriptors := time.Now()
455456

457+
machinesUsed := make(map[string]struct{})
458+
456459
// by default we get ready to decline all offers
457460
offerIDsToDecline := make(map[mesos.OfferID]struct{}, len(offers))
458461
for i := range offers {
@@ -611,6 +614,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
611614
Hostname: offer.Hostname,
612615
}
613616
state.taskman.AgentCache.Update(agentForCache) //thread safe
617+
machinesUsed[offer.Hostname] = struct{}{}
614618

615619
taskPtr := state.taskman.newTaskForMesosOffer(&offer, descriptor, bindMap, targetExecutorId)
616620
if taskPtr == nil {
@@ -900,17 +904,33 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
900904
if viper.GetBool("veryVerbose") {
901905
log.WithPrefix("scheduler").
902906
WithField("partition", envId.String()).
907+
WithField("level", infologger.IL_Devel).
903908
Trace("offers cycle complete, no tasks launched")
904909
}
905910
} else {
911+
machinesUsedSlice := func(machines map[string]struct{}) []string { // StringSet to StringSlice
912+
out := make([]string, len(machines))
913+
i := 0
914+
for k, _ := range machines {
915+
out[i] = k
916+
i++
917+
}
918+
return out
919+
}(machinesUsed)
920+
921+
detectorsForHosts, _ := the.ConfSvc().GetDetectorsForHosts(machinesUsedSlice)
922+
906923
log.WithPrefix("scheduler").
907924
WithField("partition", envId.String()).
908-
WithField("tasks", tasksLaunchedThisCycle).
909-
Debug("offers cycle complete, tasks launched")
925+
WithField("hosts", strings.Join(machinesUsedSlice, " ")).
926+
WithField("hostCount", len(machinesUsedSlice)).
927+
WithField("detectors", strings.Join(detectorsForHosts, " ")).
928+
Debugf("offers cycle complete, %d tasks launched for %d detectors", tasksLaunchedThisCycle, len(detectorsForHosts))
910929
}
911930
utils.TimeTrack(timeBeforeDecline, "resourceOffers: decline-notify-return", log.
912931
WithField("partition", envId.String()).
913932
WithField("tasksLaunched", tasksLaunchedThisCycle).
933+
WithField("level", infologger.IL_Devel).
914934
WithField("offersDeclined", offersDeclined))
915935

916936
return nil
@@ -923,11 +943,13 @@ func (state *schedulerState) statusUpdate() events.HandlerFunc {
923943
return func(ctx context.Context, e *scheduler.Event) error {
924944
s := e.GetUpdate().GetStatus()
925945
if viper.GetBool("verbose") {
926-
log.WithPrefix("scheduler").WithFields(logrus.Fields{
927-
"task": s.TaskID.Value,
928-
"state": s.GetState().String(),
929-
"message": s.GetMessage(),
930-
}).Trace("task status update received")
946+
log.WithPrefix("scheduler").
947+
WithFields(logrus.Fields{
948+
"task": s.TaskID.Value,
949+
"state": s.GetState().String(),
950+
"message": s.GetMessage(),
951+
}).
952+
Trace("task status update received")
931953
}
932954

933955
// What's the new task state?

0 commit comments

Comments
 (0)