diff --git a/core/integration/trg/plugin.go b/core/integration/trg/plugin.go index 78abb68d..c070eadc 100644 --- a/core/integration/trg/plugin.go +++ b/core/integration/trg/plugin.go @@ -1285,6 +1285,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return runUnloadFunc(ctx, runNumber64) } stack["Cleanup"] = func() (out string) { + // obsolete, EnsureRunStop and EnsureRunUnload should be used instead one after another. envId, ok := varStack["environment_id"] if !ok { log.WithField("partition", envId). @@ -1303,7 +1304,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { log.WithField("run", runNumberStop). WithField("partition", envId). WithField("level", infologger.IL_Devel). - Debug("pending TRG Stop found, performing cleanup") + Info("pending TRG Stop found, performing cleanup") delete(p.pendingRunStops, envId) _ = runStopFunc(ctx, runNumberStop) @@ -1330,7 +1331,80 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { log.WithField("run", runNumberUnload). WithField("partition", envId). WithField("level", infologger.IL_Devel). - Debug("pending TRG Unload found, performing cleanup") + Info("pending TRG Unload found, performing cleanup") + + delete(p.pendingRunUnloads, envId) + _ = runUnloadFunc(ctx, runNumberUnload) + } else { + log.WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Debug("TRG cleanup: Unload not needed") + } + return + } + stack["EnsureRunStop"] = func() (out string) { + // if there is a run to stop, it is stopped, otherwise we do nothing (no errors, no events) + envId, ok := varStack["environment_id"] + if !ok { + log.WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Warn("no environment_id found for TRG EnsureRunStop") + return + } + + timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunStop", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // runStop if found pending + runNumberStop, ok := p.pendingRunStops[envId] + if ok { + log.WithField("run", runNumberStop). + WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Info("pending TRG Stop found, performing cleanup") + + delete(p.pendingRunStops, envId) + _ = runStopFunc(ctx, runNumberStop) + + trgEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10) + parentRole, ok := call.GetParentRole().(callable.ParentRole) + if ok { + parentRole.SetGlobalRuntimeVar("trg_end_time_ms", trgEndTime) + } else { + log.WithField("partition", envId). + WithField("run", runNumberStop). + WithField("trgEndTime", trgEndTime). + Debug("could not get parentRole and set TRG end time") + } + } else { + log.WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Debug("TRG cleanup: Stop not needed") + } + return + } + stack["EnsureRunUnload"] = func() (out string) { + // if there is a run to unload, it is unloaded, otherwise we do nothing (no errors, no events) + envId, ok := varStack["environment_id"] + if !ok { + log.WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Warn("no environment_id found for TRG EnsureRunUnload") + return + } + + timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "EnsureRunUnload", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // runUnload if found pending + runNumberUnload, ok := p.pendingRunUnloads[envId] + if ok { + log.WithField("run", runNumberUnload). + WithField("partition", envId). + WithField("level", infologger.IL_Devel). + Info("pending TRG Unload found, performing cleanup") delete(p.pendingRunUnloads, envId) _ = runUnloadFunc(ctx, runNumberUnload)