From d1091bcd4f5fb4b6f32bf9951a1bb5b2be77ba28 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 12 Jan 2026 17:37:26 +0800 Subject: [PATCH] fix --- .../common/thread/ThreadPoolMonitor.java | 7 ++++--- .../rocketmq/proxy/config/ProxyConfig.java | 19 ++++++++++++++++++ .../proxy/grpc/GrpcServerBuilder.java | 20 ++++++++++--------- .../remoting/RemotingProtocolServer.java | 13 ++++++------ .../src/main/resources/rmq.proxy.logback.xml | 2 +- 5 files changed, 41 insertions(+), 20 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java index a79674568bf..02acd78ba16 100644 --- a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java +++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java @@ -142,9 +142,10 @@ public static void logThreadPoolStatus() { List monitors = threadPoolWrapper.getStatusPrinters(); for (ThreadPoolStatusMonitor monitor : monitors) { double value = monitor.value(threadPoolWrapper.getThreadPoolExecutor()); - String nameFormatted = String.format("%-40s", threadPoolWrapper.getName()); - String descFormatted = String.format("%-12s", monitor.describe()); - waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted, value); + waterMarkLogger.info("\t{}\t{}\t{}", threadPoolWrapper.getName(), + monitor.describe(), + value); + if (enablePrintJstack) { if (monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) && System.currentTimeMillis() - jstackTime > jstackPeriodTime) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index d44b82aff55..5a1a5859305 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -95,6 +95,17 @@ public class ProxyConfig implements ConfigFile { private boolean enableGrpcEpoll = false; private int grpcThreadPoolNums = 16 + PROCESSOR_NUMBER * 2; private int grpcThreadPoolQueueCapacity = 100000; + + /** + * Maximum number of concurrent gRPC calls allowed per client connection. + *

+ * A single client issuing excessively high concurrent requests may skew the validation load balancing + * and overload a single proxy instance (hotspot), potentially bringing it down. Limiting + * {@code grpcMaxConcurrentCallsPerConnection} helps mitigate this per-connection hotspot risk. + *

+ * Note: Setting this limit too low may cause send/consume failures (e.g., backpressure or rejected calls). + */ + private int grpcMaxConcurrentCallsPerConnection = Integer.MAX_VALUE; private String brokerConfigPath = ConfigurationManager.getProxyHome() + "/conf/broker.conf"; /** * gRPC max message size @@ -1581,4 +1592,12 @@ public int getReturnHandleGroupThreadPoolNums() { public void setReturnHandleGroupThreadPoolNums(int returnHandleGroupThreadPoolNums) { this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums; } + + public int getGrpcMaxConcurrentCallsPerConnection() { + return grpcMaxConcurrentCallsPerConnection; + } + + public void setGrpcMaxConcurrentCallsPerConnection(int grpcMaxConcurrentCallsPerConnection) { + this.grpcMaxConcurrentCallsPerConnection = grpcMaxConcurrentCallsPerConnection; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java index 163e799f413..1f012e6f40d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java @@ -24,16 +24,16 @@ import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel; import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor; import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor; - -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager; public class GrpcServerBuilder { @@ -52,18 +52,20 @@ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port } protected GrpcServerBuilder(ThreadPoolExecutor executor, int port, TlsCertificateManager tlsCertificateManager) { + ProxyConfig config = ConfigurationManager.getProxyConfig(); this.tlsCertificateManager = tlsCertificateManager; - serverBuilder = NettyServerBuilder.forPort(port); + serverBuilder = NettyServerBuilder.forPort(port) + .maxConcurrentCallsPerConnection(config.getGrpcMaxConcurrentCallsPerConnection()); serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator()); // build server - int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum(); - int workerLoopNum = ConfigurationManager.getProxyConfig().getGrpcWorkerLoopNum(); - int maxInboundMessageSize = ConfigurationManager.getProxyConfig().getGrpcMaxInboundMessageSize(); - long idleTimeMills = ConfigurationManager.getProxyConfig().getGrpcClientIdleTimeMills(); + int bossLoopNum = config.getGrpcBossLoopNum(); + int workerLoopNum = config.getGrpcWorkerLoopNum(); + int maxInboundMessageSize = config.getGrpcMaxInboundMessageSize(); + long idleTimeMills = config.getGrpcClientIdleTimeMills(); - if (ConfigurationManager.getProxyConfig().isEnableGrpcEpoll()) { + if (config.isEnableGrpcEpoll()) { serverBuilder.bossEventLoopGroup(new EpollEventLoopGroup(bossLoopNum)) .workerEventLoopGroup(new EpollEventLoopGroup(workerLoopNum)) .channelType(EpollServerSocketChannel.class) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index a01c23fce6b..c26f6bc2ef4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -19,6 +19,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.channel.Channel; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.auth.config.AuthConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; @@ -59,12 +64,6 @@ import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -194,7 +193,7 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertific this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build() ); - this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS); + this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 100, 100, TimeUnit.MILLISECONDS); this.registerRemotingServer(this.defaultRemotingServer); } diff --git a/proxy/src/main/resources/rmq.proxy.logback.xml b/proxy/src/main/resources/rmq.proxy.logback.xml index aee4cbc71b6..3eccf5f0238 100644 --- a/proxy/src/main/resources/rmq.proxy.logback.xml +++ b/proxy/src/main/resources/rmq.proxy.logback.xml @@ -52,7 +52,7 @@ 128MB - %d{yyy-MM-dd HH:mm:ss,GMT+8} %m%n + %d{yyy-MM-dd HH:mm:ss,GMT+8}%m%n UTF-8