Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.zstack.compute.host;

import org.zstack.header.core.workflow.Flow;
import org.zstack.header.host.ConnectHostInfo;
import org.zstack.header.host.HostInventory;

/**
* Runs after the hypervisor connect hook and before pre-connect extensions.
*/
public interface HostAfterConnectHookExtensionPoint {
/**
* Returns a flow to run after host connection.
*
* @param reconnect whether this connect was triggered by host reconnect
* @return null to skip, or a flow to join the connect chain
*/
Flow createAfterConnectHookFlow(HostInventory host, ConnectHostInfo info, boolean reconnect);
}
107 changes: 107 additions & 0 deletions compute/src/main/java/org/zstack/compute/host/HostBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
public abstract class HostBase extends AbstractHost {
protected static final CLogger logger = Utils.getLogger(HostBase.class);
private static final String AFTER_CONNECT_HOOK_RECONNECT = "afterConnectHookReconnect";
protected HostVO self;

@Autowired
Expand Down Expand Up @@ -1063,6 +1064,7 @@ public void run(final SyncTaskChain chain) {
ConnectHostMsg connectMsg = new ConnectHostMsg(self.getUuid());
connectMsg.setNewAdd(false);
connectMsg.setCalledByAPI(msg.isCalledByAPI());
connectMsg.putHeaderEntry(AFTER_CONNECT_HOOK_RECONNECT, Boolean.TRUE.toString());
bus.makeTargetServiceIdByResourceUuid(connectMsg, HostConstant.SERVICE_ID, self.getUuid());
bus.send(connectMsg, new CloudBusCallBack(msg, chain, completion) {
@Override
Expand Down Expand Up @@ -1325,6 +1327,111 @@ public void fail(ErrorCode errorCode) {
}
});

flow(new Flow() {
String __name__ = "call-after-connect-hook-extensions";
private final List<Flow> afterConnectHookFlows = new ArrayList<>();
private boolean done;

@Override
public void run(FlowTrigger trigger, Map data) {
FlowChain afterConnectHookChain = FlowChainBuilder.newSimpleFlowChain();
afterConnectHookChain.allowEmptyFlow();

self = dbf.reload(self);
HostInventory inv = getSelfInventory();
ConnectHostInfo info = ConnectHostInfo.fromConnectHostMsg(msg);
boolean reconnect = Boolean.parseBoolean(msg.getHeaderEntry(AFTER_CONNECT_HOOK_RECONNECT));

for (HostAfterConnectHookExtensionPoint p : pluginRgty.getExtensionList(HostAfterConnectHookExtensionPoint.class)) {
Flow flow = p.createAfterConnectHookFlow(inv, info, reconnect);
if (flow != null) {
Flow wrapper = new Flow() {
private boolean ran;
private Map runData;

@Override
public void run(FlowTrigger trigger, Map data) {
ran = true;
runData = data;
flow.run(trigger, data);
}

@Override
public void rollback(FlowRollback trigger, Map data) {
if (!ran) {
trigger.rollback();
return;
}

flow.rollback(trigger, runData);
}

@Override
public boolean skip(Map data) {
return flow.skip(data);
}
};
afterConnectHookFlows.add(wrapper);
afterConnectHookChain.then(wrapper);
}
}

afterConnectHookChain.done(new FlowDoneHandler(trigger) {
@Override
public void handle(Map data) {
done = true;
trigger.next();
}
}).error(new FlowErrorHandler(trigger) {
@Override
public void handle(ErrorCode errCode, Map data) {
trigger.fail(errCode);
}
}).start();
}

@Override
public void rollback(FlowRollback trigger, Map data) {
if (!done) {
trigger.rollback();
return;
}

ListIterator<Flow> iterator = afterConnectHookFlows.listIterator(afterConnectHookFlows.size());
rollbackAfterConnectHookFlows(iterator, trigger, data);
}

private void rollbackAfterConnectHookFlows(ListIterator<Flow> iterator, FlowRollback trigger, Map data) {
if (!iterator.hasPrevious()) {
trigger.rollback();
return;
}

try {
iterator.previous().rollback(new FlowRollback() {
@Override
public void rollback() {
rollbackAfterConnectHookFlows(iterator, trigger, data);
}

@Override
public void skipRestRollbacks() {
trigger.skipRestRollbacks();
}

@Override
public ErrorCode getErrorCode() {
return trigger.getErrorCode();
}

}, data);
} catch (Throwable t) {
logger.warn("unhandled exception when rolling back after-connect-hook flow", t);
rollbackAfterConnectHookFlows(iterator, trigger, data);
}
}
});

flow(new NoRollbackFlow() {
String __name__ = "call-pre-connect-extensions";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ private void reportHostCapacity() {
new While<>(hostUuids).step((hostUuid, completion) -> {
CheckHostCapacityMsg msg = new CheckHostCapacityMsg();
msg.setHostUuid(hostUuid);
msg.setTimeout(HostConstant.CHECK_HOST_CAPACITY_TIMEOUT);
bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, hostUuid);
bus.send(msg, new CloudBusCallBack(completion) {
@Override
Expand Down
3 changes: 3 additions & 0 deletions header/src/main/java/org/zstack/header/host/HostConstant.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.zstack.header.host;

import java.util.concurrent.TimeUnit;

public interface HostConstant {
public static final String SERVICE_ID = "host";

public static final String ACTION_CATEGORY = "host";

long CHECK_HOST_CAPACITY_TIMEOUT = TimeUnit.MINUTES.toMillis(30);

String HOST_SYNC_SIGNATURE_PREFIX = "Host-";

String HOST_ARCHITECTURE_X86_64 = "x86_64";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4833,7 +4833,7 @@ public String call(CephPrimaryStorageMonVO arg) {
}

final SetupSelfFencerOnKvmHostReply reply = new SetupSelfFencerOnKvmHostReply();
new KvmCommandSender(param.getHostUuid()).send(cmd, KVM_HA_SETUP_SELF_FENCER, new KvmCommandFailureChecker() {
new KvmCommandSender(param.getHostUuid(), param.isNoStatusCheck()).send(cmd, KVM_HA_SETUP_SELF_FENCER, new KvmCommandFailureChecker() {
@Override
public ErrorCode getError(KvmResponseWrapper wrapper) {
AgentResponse rsp = wrapper.getResponse(AgentResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.errorcode.ErrorFacade;
import org.zstack.core.timeout.ApiTimeoutManager;
import org.zstack.header.core.Completion;
import org.zstack.header.core.NopeCompletion;
import org.zstack.header.core.workflow.Flow;
Expand All @@ -24,13 +23,12 @@ public class KVMHostCapacityExtension implements KVMHostConnectExtensionPoint, H
@Autowired
private ErrorFacade errf;
@Autowired
private ApiTimeoutManager timeoutMgr;
@Autowired
private ResourceConfigFacade rcf;

public void reportCapacity(HostInventory host, Completion completion) {
CheckHostCapacityMsg msg = new CheckHostCapacityMsg();
msg.setHostUuid(host.getUuid());
msg.setTimeout(HostConstant.CHECK_HOST_CAPACITY_TIMEOUT);
bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, host.getUuid());
bus.send(msg, new CloudBusCallBack(completion) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class KvmSetupSelfFencerParam {
private PrimaryStorageInventory primaryStorage;
private String strategy;
private List<String> fencers;
private boolean noStatusCheck;
// all previous self-fencer configurations on the ps will be removed after applying the new one
private boolean flushPrevious = true;

Expand Down Expand Up @@ -97,6 +98,14 @@ public void setFencers(List<String> fencers) {
this.fencers = fencers;
}

public boolean isNoStatusCheck() {
return noStatusCheck;
}

public void setNoStatusCheck(boolean noStatusCheck) {
this.noStatusCheck = noStatusCheck;
}

public boolean isFlushPrevious() {
return flushPrevious;
}
Expand Down