Skip to content

Commit 2449e86

Browse files
knopers8teo
authored andcommitted
[OCTRL-791] Allow to fetch LHC fill info from BK, propagate to varStack
1 parent 82d6cc3 commit 2449e86

18 files changed

Lines changed: 1138 additions & 234 deletions

core/integration/bookkeeping/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type RpcClient struct {
4545
bkpb.FlpServiceClient
4646
bkpb.LogServiceClient
4747
bkpb.RunServiceClient
48+
bkpb.LhcFillServiceClient
4849
conn *grpc.ClientConn
4950
cancel context.CancelFunc
5051
}
@@ -119,6 +120,7 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
119120
FlpServiceClient: bkpb.NewFlpServiceClient(conn),
120121
LogServiceClient: bkpb.NewLogServiceClient(conn),
121122
RunServiceClient: bkpb.NewRunServiceClient(conn),
123+
LhcFillServiceClient: bkpb.NewLhcFillServiceClient(conn),
122124
conn: conn,
123125
cancel: cancel,
124126
}

core/integration/bookkeeping/plugin.go

Lines changed: 161 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:. protos/flp.proto
2929
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:. protos/log.proto
3030
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:. protos/run.proto
31+
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_out=require_unimplemented_servers=false:. protos/lhcFill.proto
3132

3233
package bookkeeping
3334

@@ -54,8 +55,9 @@ import (
5455
)
5556

5657
const (
57-
BKP_RUN_TIMEOUT = 30 * time.Second
58-
BKP_ENV_TIMEOUT = 30 * time.Second
58+
BKP_RUN_TIMEOUT = 30 * time.Second
59+
BKP_ENV_TIMEOUT = 30 * time.Second
60+
BKP_FILL_TIMEOUT = 30 * time.Second
5961
)
6062

6163
type Plugin struct {
@@ -1267,6 +1269,163 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
12671269
return
12681270
}
12691271

1272+
/*************************************/
1273+
/******** Fill info functions ********/
1274+
/*************************************/
1275+
1276+
fetchLHCInfoForRun := func(runNumber int32) (out *bkpb.LHCFill, err error) {
1277+
runFetchRequest := bkpb.RunFetchRequest{RunNumber: runNumber}
1278+
1279+
timeout := callable.AcquireTimeout(BKP_FILL_TIMEOUT, varStack, "RetrieveFillInfo", envId)
1280+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1281+
defer cancel()
1282+
log.WithField("partition", envId).
1283+
WithField("level", infologger.IL_Devel).
1284+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1285+
WithField("call", "RetrieveFillInfo").
1286+
Debugf("requesting BK the run info for run %d", runNumber)
1287+
runWithRelations, err := p.bookkeepingClient.RunServiceClient.Get(ctx, &runFetchRequest, grpc.EmptyCallOption{})
1288+
if err != nil {
1289+
return nil, err
1290+
}
1291+
lhcFill := runWithRelations.GetLhcFill()
1292+
if lhcFill == nil {
1293+
return nil, fmt.Errorf("lhcFill for run %d is nil", runNumber)
1294+
}
1295+
return lhcFill, nil
1296+
}
1297+
1298+
fetchLatestLHCInfo := func() (out *bkpb.LHCFill, err error) {
1299+
lhcFillFetchRequest := bkpb.LastLhcFillFetchRequest{}
1300+
1301+
timeout := callable.AcquireTimeout(BKP_FILL_TIMEOUT, varStack, "RetrieveFillInfo", envId)
1302+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
1303+
defer cancel()
1304+
log.WithField("partition", envId).
1305+
WithField("level", infologger.IL_Devel).
1306+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1307+
WithField("call", "RetrieveFillInfo").
1308+
Debug("requesting BK the latest LHC fill info")
1309+
LhcFillWithRelations, err := p.bookkeepingClient.LhcFillServiceClient.GetLast(ctx, &lhcFillFetchRequest, grpc.EmptyCallOption{})
1310+
if err != nil {
1311+
return nil, err
1312+
}
1313+
lhcFill := LhcFillWithRelations.GetLhcFill()
1314+
if lhcFill == nil {
1315+
return nil, fmt.Errorf("lhcFill is nil")
1316+
}
1317+
return lhcFill, nil
1318+
}
1319+
1320+
propagateLHCInfoToVarStack := func(lhcInfo *bkpb.LHCFill, varStack map[string]string) {
1321+
call.VarStack["fill_info_fill_number"] = string(lhcInfo.FillNumber)
1322+
call.VarStack["fill_info_filling_scheme"] = lhcInfo.FillingSchemeName
1323+
call.VarStack["fill_info_beam_type"] = lhcInfo.BeamType
1324+
if lhcInfo.StableBeamStart != nil {
1325+
call.VarStack["fill_info_stable_beam_start_ms"] = strconv.FormatInt(*lhcInfo.StableBeamStart, 10)
1326+
}
1327+
if lhcInfo.StableBeamEnd != nil {
1328+
call.VarStack["fill_info_stable_beam_end_ms"] = strconv.FormatInt(*lhcInfo.StableBeamEnd, 10)
1329+
}
1330+
log.WithField("partition", envId).
1331+
WithField("level", infologger.IL_Devel).
1332+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1333+
WithField("call", "RetrieveFillInfo").
1334+
Infof("successfully updated the LHC Fill info (number %d, scheme %s, beam type %s)",
1335+
lhcInfo.FillNumber, lhcInfo.FillingSchemeName, lhcInfo.BeamType)
1336+
}
1337+
deleteLHCInfoInVarStack := func(varStack map[string]string) {
1338+
delete(call.VarStack, "fill_info_fill_number")
1339+
delete(call.VarStack, "fill_info_filling_scheme")
1340+
delete(call.VarStack, "fill_info_beam_type")
1341+
delete(call.VarStack, "fill_info_stable_beam_start_ms")
1342+
delete(call.VarStack, "fill_info_stable_beam_end_ms")
1343+
}
1344+
1345+
stack["RetrieveFillInfo"] = func() (out string) {
1346+
callFailedStr := "Bookkeeping RetrieveFillInfo call failed"
1347+
if p.bookkeepingClient == nil {
1348+
log.WithError(err).
1349+
WithField("level", infologger.IL_Support).
1350+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1351+
WithField("partition", envId).
1352+
WithField("call", "RetrieveFillInfo").
1353+
Error("bookkeeping plugin RetrieveFillInfo error")
1354+
call.VarStack["__call_error_reason"] = "bookkeeping plugin not initialized, RetrieveFillInfo impossible"
1355+
call.VarStack["__call_error"] = callFailedStr
1356+
return
1357+
}
1358+
1359+
// First, we try to get fill info associated to a run.
1360+
// At the time of writing, it is not correctly associated for SYNTHETIC runs,
1361+
// but it might be in the future.
1362+
// Only if there is no fill info associated with a run (e.g. because we ask BK too early during SOR),
1363+
// we ask for the latest LHC fill and will use if the end time of stable beams is not set.
1364+
rn := varStack["run_number"]
1365+
runNumber64, err := strconv.ParseInt(rn, 10, 32)
1366+
if err != nil {
1367+
log.WithField("partition", envId).
1368+
WithField("level", infologger.IL_Devel).
1369+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1370+
WithField("call", "RetrieveFillInfo").
1371+
WithError(err).
1372+
Info("cannot acquire run number for Bookkeeping fill info fetch, perhaps we are not in RUNNING")
1373+
1374+
call.VarStack["__call_error_reason"] = err.Error()
1375+
call.VarStack["__call_error"] = callFailedStr
1376+
} else {
1377+
lhcFill, err := fetchLHCInfoForRun(int32(runNumber64))
1378+
if err != nil {
1379+
log.WithField("partition", envId).
1380+
WithField("level", infologger.IL_Devel).
1381+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1382+
WithField("call", "RetrieveFillInfo").
1383+
Infof("could not get LHC fill info associated to run %d, will try to get the latest fill. Details: %s", runNumber64, err.Error())
1384+
} else {
1385+
log.WithField("partition", envId).
1386+
WithField("level", infologger.IL_Devel).
1387+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1388+
WithField("call", "RetrieveFillInfo").
1389+
Infof("received a reply about fill info associated to run %d, filling", runNumber64)
1390+
propagateLHCInfoToVarStack(lhcFill, varStack)
1391+
log.WithField("partition", envId).
1392+
WithField("level", infologger.IL_Devel).
1393+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1394+
WithField("call", "RetrieveFillInfo").
1395+
Infof("successfully updated the LHC Fill info for run %d (number %d, scheme %s, beam type %s)",
1396+
runNumber64, lhcFill.FillNumber, lhcFill.FillingSchemeName, lhcFill.BeamType)
1397+
return
1398+
}
1399+
}
1400+
1401+
lhcFill, err := fetchLatestLHCInfo()
1402+
if err != nil {
1403+
log.WithError(err).
1404+
WithField("level", infologger.IL_Support).
1405+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1406+
WithField("partition", envId).
1407+
WithField("call", "RetrieveFillInfo").
1408+
Error("bookkeeping plugin RetrieveFillInfo error")
1409+
call.VarStack["__call_error_reason"] = err.Error()
1410+
call.VarStack["__call_error"] = callFailedStr
1411+
} else if lhcFill.StableBeamEnd == nil || *lhcFill.StableBeamEnd != 0 {
1412+
log.WithField("partition", envId).
1413+
WithField("level", infologger.IL_Devel).
1414+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1415+
WithField("call", "RetrieveFillInfo").
1416+
Debug("received a reply about fill info, filling the var stack")
1417+
propagateLHCInfoToVarStack(lhcFill, varStack)
1418+
} else {
1419+
log.WithField("partition", envId).
1420+
WithField("level", infologger.IL_Devel).
1421+
WithField("endpoint", viper.GetString("bookkeepingBaseUri")).
1422+
WithField("call", "RetrieveFillInfo").
1423+
Debug("received a reply about fill info, but the stable beam end is in the past, will not read the fill info and will delete any existing")
1424+
deleteLHCInfoInVarStack(varStack)
1425+
}
1426+
return
1427+
}
1428+
12701429
return
12711430
}
12721431

0 commit comments

Comments
 (0)