Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/core/cm/launcher/itf/instancerunner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ class InstanceRunnerItf {
virtual ~InstanceRunnerItf() = default;

/**
* Updates instances on specified node.
* Runs instances on specified node.
*
* @param nodeID node ID.
* @param stopInstances instance list to stop.
* @param startInstances instance list to start.
* @param instances instance list to run.
* @return Error.
*/
virtual Error UpdateInstances(const String& nodeID, const Array<aos::InstanceInfo>& stopInstances,
const Array<aos::InstanceInfo>& startInstances)
= 0;
virtual Error RunInstances(const String& nodeID, const Array<aos::InstanceInfo>& instances) = 0;
};

/** @}*/
Expand Down
124 changes: 56 additions & 68 deletions src/core/cm/launcher/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,46 +320,25 @@ Error Node::ReserveResources(const InstanceIdent& instanceIdent, const String& r
Error Node::SendScheduledInstances(
const Array<SharedPtr<Instance>>& scheduledInstances, const Array<InstanceStatus>& runningInstances)
{
auto stopInstances = MakeUnique<StaticArray<aos::InstanceInfo, cMaxNumInstances>>(mAllocator);
auto startInstances = MakeUnique<StaticArray<aos::InstanceInfo, cMaxNumInstances>>(mAllocator);

for (const auto& status : FilterActiveNodeInstances(runningInstances, mInfo.mNodeID)) {
// Check if the instance is scheduled on this node.
auto isScheduled = scheduledInstances.ContainsIf([&status, this](const SharedPtr<Instance>& item) {
return static_cast<const InstanceIdent&>(status) == item->GetInfo().mInstanceIdent
&& status.mRuntimeID == item->GetInfo().mRuntimeID && item->GetInfo().mNodeID == mInfo.mNodeID;
});

if (!isScheduled) {
if (auto err = stopInstances->EmplaceBack(); !err.IsNone()) {
return AOS_ERROR_WRAP(err);
}
(void)runningInstances;

Convert(status, stopInstances->Back());
}
}
auto instancesToRun = MakeUnique<StaticArray<aos::InstanceInfo, cMaxNumInstances>>(mAllocator);

for (const auto& instance : FilterByNode(scheduledInstances, mInfo.mNodeID)) {
if (auto err = startInstances->PushBack(instance->GetSMInfo()); !err.IsNone()) {
if (auto err = instancesToRun->PushBack(instance->GetSMInfo()); !err.IsNone()) {
return AOS_ERROR_WRAP(err);
}
}

LOG_INF() << "Update node instances" << Log::Field("nodeID", mInfo.mNodeID)
<< Log::Field("stopInstances", stopInstances->Size())
<< Log::Field("startInstances", startInstances->Size());
LOG_INF() << "Run node instances" << Log::Field("nodeID", mInfo.mNodeID)
<< Log::Field("instances", instancesToRun->Size());

for (const auto& instance : *stopInstances) {
LOG_INF() << "Update node stop instance" << Log::Field("instance", static_cast<const InstanceIdent&>(instance))
for (const auto& instance : *instancesToRun) {
LOG_INF() << "Run node instance" << Log::Field("instance", static_cast<const InstanceIdent&>(instance))
<< Log::Field("version", instance.mVersion) << Log::Field("runtimeID", instance.mRuntimeID);
}

for (const auto& instance : *startInstances) {
LOG_INF() << "Update node start instance" << Log::Field("instance", static_cast<const InstanceIdent&>(instance))
<< Log::Field("version", instance.mVersion) << Log::Field("runtimeID", instance.mRuntimeID);
}

if (auto err = mInstanceRunner->UpdateInstances(mInfo.mNodeID, *stopInstances, *startInstances); !err.IsNone()) {
if (auto err = mInstanceRunner->RunInstances(mInfo.mNodeID, *instancesToRun); !err.IsNone()) {
return AOS_ERROR_WRAP(err);
}

Expand All @@ -369,54 +348,39 @@ Error Node::SendScheduledInstances(
RetWithError<bool> Node::ResendInstances(
const Array<SharedPtr<Instance>>& activeInstances, const Array<InstanceStatus>& runningInstances, bool forceRestart)
{
auto stopInstances = MakeUnique<StaticArray<aos::InstanceInfo, cMaxNumInstances>>(mAllocator);
auto startInstances = MakeUnique<StaticArray<aos::InstanceInfo, cMaxNumInstances>>(mAllocator);
size_t runningNodeInstances = 0;

for (const auto& status : FilterActiveNodeInstances(runningInstances, mInfo.mNodeID)) {
runningNodeInstances++;

auto isActive = activeInstances.ContainsIf([&status](const SharedPtr<Instance>& item) {
return static_cast<const InstanceIdent&>(status) == item->GetInfo().mInstanceIdent
&& status.mRuntimeID == item->GetInfo().mRuntimeID;
});

if (!isActive || forceRestart) {
if (auto err = stopInstances->EmplaceBack(); !err.IsNone()) {
return {false, AOS_ERROR_WRAP(err)};
}

Convert(status, stopInstances->Back());
}
}
auto instancesToRun = MakeUnique<StaticArray<aos::InstanceInfo, cMaxNumInstances>>(mAllocator);

for (const auto& instance : FilterByNode(activeInstances, mInfo.mNodeID)) {
if (auto err = startInstances->PushBack(instance->GetSMInfo()); !err.IsNone()) {
if (auto err = instancesToRun->PushBack(instance->GetSMInfo()); !err.IsNone()) {
return {false, AOS_ERROR_WRAP(err)};
}
}

// Instance list didn't change, skip update.
if (stopInstances->IsEmpty() && startInstances->Size() == runningNodeInstances) {
return {false, ErrorEnum::eNone};
if (!forceRestart) {
// Instance list didn't change, skip update.
auto changed = AreInstancesChanged(*instancesToRun, runningInstances);
if (!changed) {
return {false, ErrorEnum::eNone};
}
}

// Send request to node.
LOG_INF() << "Resend instance update" << Log::Field("nodeID", mInfo.mNodeID)
<< Log::Field("stopInstances", stopInstances->Size())
<< Log::Field("startInstances", startInstances->Size());
LOG_INF() << "Resend node instances" << Log::Field("nodeID", mInfo.mNodeID)
<< Log::Field("instances", instancesToRun->Size()) << Log::Field("forceRestart", forceRestart);

for (const auto& instance : *stopInstances) {
LOG_INF() << "Update node stop instance" << Log::Field("instance", static_cast<const InstanceIdent&>(instance))
for (const auto& instance : *instancesToRun) {
LOG_INF() << "Resend node instance" << Log::Field("instance", static_cast<const InstanceIdent&>(instance))
<< Log::Field("version", instance.mVersion) << Log::Field("runtimeID", instance.mRuntimeID);
}

for (const auto& instance : *startInstances) {
LOG_INF() << "Update node start instance" << Log::Field("instance", static_cast<const InstanceIdent&>(instance))
<< Log::Field("version", instance.mVersion) << Log::Field("runtimeID", instance.mRuntimeID);
if (forceRestart) {
auto emptyList = MakeUnique<StaticArray<aos::InstanceInfo, cMaxNumInstances>>(mAllocator);
if (auto err = mInstanceRunner->RunInstances(mInfo.mNodeID, *emptyList); !err.IsNone()) {
return {false, AOS_ERROR_WRAP(err)};
}
}

if (auto err = mInstanceRunner->UpdateInstances(mInfo.mNodeID, *stopInstances, *startInstances); !err.IsNone()) {
if (auto err = mInstanceRunner->RunInstances(mInfo.mNodeID, *instancesToRun); !err.IsNone()) {
return {false, AOS_ERROR_WRAP(err)};
}

Expand All @@ -427,6 +391,36 @@ RetWithError<bool> Node::ResendInstances(
* Private
**********************************************************************************************************************/

bool Node::AreInstancesChanged(
const Array<aos::InstanceInfo>& instancesToRun, const Array<InstanceStatus>& runningInstances) const
{
size_t runningNodeInstances = 0;

for (const auto& _ : FilterActiveNodeInstances(runningInstances, mInfo.mNodeID)) {
(void)_;

runningNodeInstances++;
}

if (instancesToRun.Size() != runningNodeInstances) {
return true;
}

for (const auto& desired : instancesToRun) {
const auto found = runningInstances.ContainsIf([&](const InstanceStatus& status) {
return static_cast<const InstanceIdent&>(status) == static_cast<const InstanceIdent&>(desired)
&& status.mVersion == desired.mVersion && status.mNodeID == mInfo.mNodeID
&& status.mState != aos::InstanceStateEnum::eInactive && status.mRuntimeID == desired.mRuntimeID;
});

if (!found) {
return true;
}
}

return false;
}

size_t Node::GetSystemCPUUsage(const monitoring::NodeMonitoringData& monitoringData) const
{
size_t instanceUsage = 0;
Expand Down Expand Up @@ -514,10 +508,4 @@ size_t* Node::GetPtrToMaxNumInstances(const String& runtimeID)
return &mMaxInstances.Find(runtimeID)->mSecond;
}

void Node::Convert(const InstanceStatus& status, aos::InstanceInfo& info)
{
static_cast<InstanceIdent&>(info) = static_cast<const InstanceIdent&>(status);
info.mRuntimeID = status.mRuntimeID;
}

} // namespace aos::cm::launcher
5 changes: 3 additions & 2 deletions src/core/cm/launcher/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ class Node : public NodeItf {
// Returns CPU usage without Aos service instances.
size_t GetSystemRAMUsage(const monitoring::NodeMonitoringData& monitoringData) const;

bool AreInstancesChanged(
const Array<aos::InstanceInfo>& instancesToRun, const Array<InstanceStatus>& runningInstances) const;

size_t* GetPtrToAvailableCPU(const String& runtimeID);
size_t* GetPtrToAvailableRAM(const String& runtimeID);
size_t* GetPtrToMaxNumInstances(const String& runtimeID);

void Convert(const InstanceStatus& status, aos::InstanceInfo& info);

unitconfig::NodeConfigProviderItf* mNodeConfigProvider {};
InstanceRunnerItf* mInstanceRunner {};

Expand Down
Loading
Loading