Skip to content

Commit 4bf3160

Browse files
committed
[core] Fix crash caused by map contention in Bookkeeping plugin
1 parent 9caaa6e commit 4bf3160

1 file changed

Lines changed: 39 additions & 24 deletions

File tree

core/integration/bookkeeping/plugin.go

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"net/url"
4040
"strconv"
4141
"strings"
42+
"sync"
4243
"time"
4344

4445
bkpb "github.com/AliceO2Group/Control/core/integration/bookkeeping/protos"
@@ -66,12 +67,13 @@ type Plugin struct {
6667

6768
// Indicators linked to each ongoing environment that give some plugin-specific
6869
// data about the status of the current run (of that environment)
69-
missingUpdateRunStarts map[string] /*envId*/ bool
70-
pendingRunStops map[string] /*envId*/ int64
71-
pendingO2Starts map[string] /*envId*/ bool
72-
pendingO2Stops map[string] /*envId*/ bool
73-
pendingTrgStarts map[string] /*envId*/ bool
74-
pendingTrgStops map[string] /*envId*/ bool
70+
missingUpdateRunStarts map[string] /*envId*/ bool
71+
missingUpdateRunStartsMu sync.Mutex
72+
pendingRunStops map[string] /*envId*/ int64
73+
pendingO2Starts map[string] /*envId*/ bool
74+
pendingO2Stops map[string] /*envId*/ bool
75+
pendingTrgStarts map[string] /*envId*/ bool
76+
pendingTrgStops map[string] /*envId*/ bool
7577
}
7678

7779
/**********************************************/
@@ -120,7 +122,10 @@ func (p *Plugin) GetConnectionState() string {
120122
/*******************************************************/
121123
// Utility methods for GetData and GetEnvironmentsData //
122124
/*******************************************************/
123-
func (p *Plugin) missingUpdateRunStartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
125+
func (p *Plugin) getMissingUpdateRunStartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
126+
p.missingUpdateRunStartsMu.Lock()
127+
defer p.missingUpdateRunStartsMu.Unlock()
128+
124129
if p.missingUpdateRunStarts == nil {
125130
return nil
126131
}
@@ -135,7 +140,7 @@ func (p *Plugin) missingUpdateRunStartsForEnvs(envIds []uid.ID) map[uid.ID]bool
135140
return out
136141
}
137142

138-
func (p *Plugin) pendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string {
143+
func (p *Plugin) getPendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string {
139144
if p.pendingRunStops == nil {
140145
return nil
141146
}
@@ -150,7 +155,7 @@ func (p *Plugin) pendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string {
150155
return out
151156
}
152157

153-
func (p *Plugin) pendingO2StartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
158+
func (p *Plugin) getPendingO2StartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
154159
if p.pendingO2Starts == nil {
155160
return nil
156161
}
@@ -165,7 +170,7 @@ func (p *Plugin) pendingO2StartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
165170
return out
166171
}
167172

168-
func (p *Plugin) pendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
173+
func (p *Plugin) getPendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
169174
if p.pendingO2Stops == nil {
170175
return nil
171176
}
@@ -180,7 +185,7 @@ func (p *Plugin) pendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
180185
return out
181186
}
182187

183-
func (p *Plugin) pendingTrgStartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
188+
func (p *Plugin) getPendingTrgStartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
184189
if p.pendingTrgStarts == nil {
185190
return nil
186191
}
@@ -195,7 +200,7 @@ func (p *Plugin) pendingTrgStartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
195200
return out
196201
}
197202

198-
func (p *Plugin) pendingTrgStopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
203+
func (p *Plugin) getPendingTrgStopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
199204
if p.pendingTrgStops == nil {
200205
return nil
201206
}
@@ -223,12 +228,12 @@ func (p *Plugin) GetData(_ []any) string {
223228
envIds := environment.ManagerInstance().Ids()
224229

225230
outMap := make(map[string]interface{})
226-
outMap["missingUpdateRunStarts"] = p.missingUpdateRunStartsForEnvs(envIds)
227-
outMap["pendingRunStops"] = p.pendingRunStopsForEnvs(envIds)
228-
outMap["pendingO2Starts"] = p.pendingO2StartsForEnvs(envIds)
229-
outMap["pendingO2Stops"] = p.pendingO2StopsForEnvs(envIds)
230-
outMap["pendingTrgStarts"] = p.pendingTrgStartsForEnvs(envIds)
231-
outMap["pendingTrgStops"] = p.pendingTrgStopsForEnvs(envIds)
231+
outMap["missingUpdateRunStarts"] = p.getMissingUpdateRunStartsForEnvs(envIds)
232+
outMap["pendingRunStops"] = p.getPendingRunStopsForEnvs(envIds)
233+
outMap["pendingO2Starts"] = p.getPendingO2StartsForEnvs(envIds)
234+
outMap["pendingO2Stops"] = p.getPendingO2StopsForEnvs(envIds)
235+
outMap["pendingTrgStarts"] = p.getPendingTrgStartsForEnvs(envIds)
236+
outMap["pendingTrgStops"] = p.getPendingTrgStopsForEnvs(envIds)
232237

233238
out, err := json.Marshal(outMap)
234239
if err != nil {
@@ -243,12 +248,12 @@ func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string {
243248
return nil
244249
}
245250

246-
inMissingStart := p.missingUpdateRunStartsForEnvs(envIds)
247-
inRunStopMap := p.pendingRunStopsForEnvs(envIds)
248-
inO2StartMap := p.pendingO2StartsForEnvs(envIds)
249-
inO2StopMap := p.pendingO2StopsForEnvs(envIds)
250-
inTrgStartMap := p.pendingTrgStartsForEnvs(envIds)
251-
inTrgStopMap := p.pendingTrgStopsForEnvs(envIds)
251+
inMissingStart := p.getMissingUpdateRunStartsForEnvs(envIds)
252+
inRunStopMap := p.getPendingRunStopsForEnvs(envIds)
253+
inO2StartMap := p.getPendingO2StartsForEnvs(envIds)
254+
inO2StopMap := p.getPendingO2StopsForEnvs(envIds)
255+
inTrgStartMap := p.getPendingTrgStartsForEnvs(envIds)
256+
inTrgStopMap := p.getPendingTrgStopsForEnvs(envIds)
252257

253258
envMap := make(map[string]interface{})
254259
out := make(map[uid.ID]string)
@@ -527,7 +532,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
527532
return
528533
} else {
529534
// If the run creation request succeeds, we set the run status indicators
535+
p.missingUpdateRunStartsMu.Lock()
530536
p.missingUpdateRunStarts[envId] = true
537+
p.missingUpdateRunStartsMu.Unlock()
531538
p.pendingRunStops[envId] = runNumber64
532539
p.pendingO2Starts[envId] = true
533540
p.pendingO2Stops[envId] = true
@@ -719,6 +726,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
719726
}
720727
// If UpdateRunStart was not called and the Trg start time is missing,
721728
// it is set to the O2 start time.
729+
p.missingUpdateRunStartsMu.Lock()
722730
if p.missingUpdateRunStarts[envId] == true && trgEnabled && timeTrgStartOutput == nil {
723731
if p.pendingO2Starts[envId] == false {
724732
timeTrgStartOutput = timeO2StartOutput
@@ -732,6 +740,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
732740
Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing O2 start time after missing UpdateRunStart call")
733741
}
734742
}
743+
p.missingUpdateRunStartsMu.Unlock()
735744
timeTrgEndTemp, err = strconv.ParseInt(timeTrgEndInput, 10, 64)
736745
if err != nil {
737746
log.WithField("run", runNumber64).
@@ -769,6 +778,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
769778
// Else if O2 start time or Trg start time is available, we can update it safely
770779
// because it means that even if they weren't missing during the UpdateRunStart call,
771780
// they will be the same and can be overwritten.
781+
p.missingUpdateRunStartsMu.Lock()
772782
if p.missingUpdateRunStarts[envId] == true && timeO2StartOutput == nil {
773783
log.WithField("run", runNumber64).
774784
WithField("partition", envId).
@@ -806,6 +816,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
806816
ReadoutCfgUri: &readoutUri,
807817
}
808818
}
819+
p.missingUpdateRunStartsMu.Unlock()
809820
}
810821

811822
// Send the run update request
@@ -828,10 +839,12 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
828839
if function, ok := varStack["__call_func"]; ok && strings.Contains(function, "UpdateRunStop") {
829840
// If the update is successful and it is an UpdateRunStop call, we check if we are missing
830841
// EOR timestamps, and if that is the case, we set them to Now() and send a new update request
842+
p.missingUpdateRunStartsMu.Lock()
831843
if p.missingUpdateRunStarts[envId] == true {
832844
defer delete(p.pendingO2Starts, envId)
833845
defer delete(p.pendingTrgStarts, envId)
834846
}
847+
p.missingUpdateRunStartsMu.Unlock()
835848
defer delete(p.pendingRunStops, envId)
836849
defer delete(p.pendingO2Stops, envId)
837850
defer delete(p.pendingTrgStops, envId)
@@ -979,7 +992,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
979992
p.pendingTrgStarts[envId] = false
980993
}
981994

995+
p.missingUpdateRunStartsMu.Lock()
982996
p.missingUpdateRunStarts[envId] = false
997+
p.missingUpdateRunStartsMu.Unlock()
983998

984999
return updateRunFunc(runNumber64, "test", O2StartTime, "", TrgStartTime, "")
9851000
}

0 commit comments

Comments
 (0)