From 3a5dd0517cfd1bc05e0c82e257644e9a4fbc6c1f Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Mon, 19 May 2025 15:58:07 +0200 Subject: [PATCH] [core] split TRG cleanup into two calls for distinguishable kafka events This commit splits trg.Cleanup() into two calls which should be used subsequently instead of trg.Cleanup. Thanks to this, the GUIs can listen to kafka events for a specific operationName ("trg.EnsureRunStop") for trigger stop timestamp. We preferred to avoid having the GUIs hunt for operationStep "perform TRG call: RunStop", which seems to be a message targeted for humans and could be more prone to change, thus breaking behaviour in GUIs. Fixes OCTRL-1017. --- core/integration/trg/plugin.go | 78 +++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 2 deletions(-) diff --git a/core/integration/trg/plugin.go b/core/integration/trg/plugin.go index 78abb68d..c070eadc 100644 --- a/core/integration/trg/plugin.go +++ b/core/integration/trg/plugin.go @@ -1285,6 +1285,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return runUnloadFunc(ctx, runNumber64) } stack["Cleanup"] = func() (out string) { + // obsolete, EnsureRunStop and EnsureRunUnload should be used instead one after another. envId, ok := varStack["environment_id"] if !ok { log.WithField("partition", envId). @@ -1303,7 +1304,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { log.WithField("run", runNumberStop). WithField("partition", envId). WithField("level", infologger.IL_Devel). - Debug("pending TRG Stop found, performing cleanup") + Info("pending TRG Stop found, performing cleanup") delete(p.pendingRunStops, envId) _ = runStopFunc(ctx, runNumberStop) @@ -1330,7 +1331,80 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { log.WithField("run", runNumberUnload). WithField("partition", envId). WithField("level", infologger.IL_Devel). - Debug("pending TRG Unload found, performing cleanup") + Info("pending TRG Unload found, performing cleanup") + + delete(p.pendingRunUnloads, envId) + _ = runUnloadFunc(ctx, runNumberUnload) + } else { + log.WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Debug("TRG cleanup: Unload not needed") + } + return + } + stack["EnsureRunStop"] = func() (out string) { + // if there is a run to stop, it is stopped, otherwise we do nothing (no errors, no events) + envId, ok := varStack["environment_id"] + if !ok { + log.WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Warn("no environment_id found for TRG EnsureRunStop") + return + } + + timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunStop", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // runStop if found pending + runNumberStop, ok := p.pendingRunStops[envId] + if ok { + log.WithField("run", runNumberStop). + WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Info("pending TRG Stop found, performing cleanup") + + delete(p.pendingRunStops, envId) + _ = runStopFunc(ctx, runNumberStop) + + trgEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10) + parentRole, ok := call.GetParentRole().(callable.ParentRole) + if ok { + parentRole.SetGlobalRuntimeVar("trg_end_time_ms", trgEndTime) + } else { + log.WithField("partition", envId). + WithField("run", runNumberStop). + WithField("trgEndTime", trgEndTime). + Debug("could not get parentRole and set TRG end time") + } + } else { + log.WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Debug("TRG cleanup: Stop not needed") + } + return + } + stack["EnsureRunUnload"] = func() (out string) { + // if there is a run to unload, it is unloaded, otherwise we do nothing (no errors, no events) + envId, ok := varStack["environment_id"] + if !ok { + log.WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Warn("no environment_id found for TRG EnsureRunUnload") + return + } + + timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunUnload", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // runUnload if found pending + runNumberUnload, ok := p.pendingRunUnloads[envId] + if ok { + log.WithField("run", runNumberUnload). + WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Info("pending TRG Unload found, performing cleanup") delete(p.pendingRunUnloads, envId) _ = runUnloadFunc(ctx, runNumberUnload)