diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index b73364ae6d..8e01e5bc43 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -49,7 +50,6 @@ import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.option.RpcOptions; import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.rpc.RpcServer; @@ -86,8 +86,12 @@ public synchronized boolean init(PDConfig.Raft config) { } this.config = config; + // Wire configured rpc timeout into RaftRpcClient so the Bolt transport + // timeout and the future.get() caller timeout in getLeaderGrpcAddress() are consistent. raftRpcClient = new RaftRpcClient(); - raftRpcClient.init(new RpcOptions()); + RpcOptions rpcOptions = new RpcOptions(); + rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout()); + raftRpcClient.init(rpcOptions); String raftPath = config.getDataPath() + "/" + groupId; new File(raftPath).mkdirs(); @@ -119,10 +123,7 @@ public synchronized boolean init(PDConfig.Raft config) { nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout()); nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout()); nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout()); - // Set the raft configuration - RaftOptions raftOptions = nodeOptions.getRaftOptions(); - - nodeOptions.setEnableMetrics(true); + // TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine for reference) final PeerId serverId = JRaftUtils.getPeerId(config.getAddress()); @@ -228,7 +229,7 @@ public PeerId getLeader() { } /** - * Send a message to the leader to get the grpc address; + * Send a message to the leader to get the grpc address. */ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedException { if (isLeader()) { @@ -236,11 +237,41 @@ public String getLeaderGrpcAddress() throws ExecutionException, InterruptedExcep } if (raftNode.getLeaderId() == null) { - waitingForLeader(10000); + waitingForLeader(config.getRpcTimeout()); } - return raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get() - .getGrpcAddress(); + // Cache leader to avoid repeated getLeaderId() calls and guard against + // waitingForLeader() returning without a leader being elected. + PeerId leader = raftNode.getLeaderId(); + if (leader == null) { + throw new ExecutionException(new IllegalStateException("Leader is not ready")); + } + + try { + RaftRpcProcessor.GetMemberResponse response = raftRpcClient + .getGrpcAddress(leader.getEndpoint().toString()) + .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS); + if (response != null && response.getGrpcAddress() != null) { + return response.getGrpcAddress(); + } + } catch (TimeoutException e) { + // TODO: a more complete fix would need a source of truth for the leader's + // actual grpcAddress rather than deriving it from the local node's port config. + throw new ExecutionException( + String.format("Timed out while resolving leader gRPC address for %s", leader), + e); + } catch (ExecutionException e) { + // TODO: a more complete fix would need a source of truth for the leader's + // actual grpcAddress rather than deriving it from the local node's port config. + Throwable cause = e.getCause() != null ? e.getCause() : e; + throw new ExecutionException( + String.format("Failed to resolve leader gRPC address for %s", leader), cause); + } + + log.warn("Leader gRPC address field is null in RPC response for {}", leader); + throw new ExecutionException( + new IllegalStateException( + String.format("Leader gRPC address unavailable for %s", leader))); } /** @@ -322,14 +353,7 @@ public Status changePeerList(String peerList) { newPeers.parse(peerList); CountDownLatch latch = new CountDownLatch(1); this.raftNode.changePeers(newPeers, status -> { - // Use compareAndSet so a late callback does not overwrite a timeout status result.compareAndSet(null, status); - // Refresh inside callback so it fires even if caller already timed out - // Note: changePeerList() uses Configuration.parse() which only supports - // plain comma-separated peer addresses with no learner syntax. - // getLearners() will always be empty here. Learner support is handled - // in PDService.updatePdRaft() which uses PeerUtil.parseConfig() - // and supports the /learner suffix. if (status != null && status.isOk()) { IpAuthHandler handler = IpAuthHandler.getInstance(); if (handler != null) { @@ -347,16 +371,12 @@ public Status changePeerList(String peerList) { } latch.countDown(); }); - // Use 3x configured RPC timeout — bare await() would block forever if - // the callback is never invoked (e.g. node not started / RPC failure) - boolean completed = latch.await(3L * config.getRpcTimeout(), - TimeUnit.MILLISECONDS); + boolean completed = latch.await(3L * config.getRpcTimeout(), TimeUnit.MILLISECONDS); if (!completed && result.get() == null) { Status timeoutStatus = new Status(RaftError.EINTERNAL, "changePeerList timed out after %d ms", 3L * config.getRpcTimeout()); if (!result.compareAndSet(null, timeoutStatus)) { - // Callback arrived just before us — keep its result timeoutStatus = null; } if (timeoutStatus != null) { @@ -395,7 +415,6 @@ public PeerId waitingForLeader(long timeOut) { } return leader; } - } public Node getRaftNode() {