From c549eb4c30a8db0deeb9a25bc91ab8b0566d7acb Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Thu, 1 Jan 2026 08:21:10 +0100 Subject: [PATCH] HTTP/2 requester: add request submission policy for per-connection request cap Provide optional queuing mode and keep default fail-fast rejection semantics Derive request scheme from target host to ensure HTTP/2 :scheme is always set --- .../bootstrap/H2MultiplexingRequester.java | 428 +++++++++++++++++- .../H2MultiplexingRequesterBootstrap.java | 17 +- .../bootstrap/RequestSubmissionPolicy.java | 71 +++ ...2MaxRequestsPerConnectionLocalExample.java | 236 ++++++++++ ...questsPerConnectionRejectLocalExample.java | 245 ++++++++++ ...xingRequesterMaxRequestsPerConnection.java | 192 ++++++++ 6 files changed, 1186 insertions(+), 3 deletions(-) create mode 100644 httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/RequestSubmissionPolicy.java create mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java create mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionRejectLocalExample.java create mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index f03bd20577..4ba4295fb9 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -32,7 +32,13 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.Cancellable; @@ -48,6 +54,7 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.impl.DefaultAddressResolver; import org.apache.hc.core5.http.impl.bootstrap.AsyncRequester; @@ -85,8 +92,221 @@ */ public class H2MultiplexingRequester extends AsyncRequester { + private static final class ConnRequestState { + + private final AtomicInteger active; + private final ConcurrentLinkedQueue queue; + + ConnRequestState() { + this.active = new AtomicInteger(0); + this.queue = new ConcurrentLinkedQueue<>(); + } + + boolean tryAcquire(final int max) { + for (;;) { + final int current = active.get(); + if (current >= max) { + return false; + } + if (active.compareAndSet(current, current + 1)) { + return true; + } + } + } + + void release() { + active.decrementAndGet(); + } + + void enqueue(final QueuedRequest request) { + queue.add(request); + } + + boolean remove(final QueuedRequest request) { + return queue.remove(request); + } + + QueuedRequest poll() { + return queue.poll(); + } + + void abortPending(final Exception cause) { + for (;;) { + final QueuedRequest request = queue.poll(); + if (request == null) { + return; + } + request.failed(cause); + } + } + + } + + private static final class QueuedRequest implements Cancellable { + + private final ConnRequestState connRequestState; + private final IOSession ioSession; + private final HttpRequest request; + private final EntityDetails entityDetails; + private final AsyncClientExchangeHandler exchangeHandler; + private final HandlerFactory pushHandlerFactory; + private final CancellableDependency cancellableDependency; + private final HttpContext context; + + private final AtomicBoolean cancelled; + private final AtomicBoolean submitted; + private volatile AsyncClientExchangeHandler handlerProxy; + + QueuedRequest( + final ConnRequestState connRequestState, + final IOSession ioSession, + final HttpRequest request, + final EntityDetails entityDetails, + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final CancellableDependency cancellableDependency, + final HttpContext context) { + this.connRequestState = connRequestState; + this.ioSession = ioSession; + this.request = request; + this.entityDetails = entityDetails; + this.exchangeHandler = exchangeHandler; + this.pushHandlerFactory = pushHandlerFactory; + this.cancellableDependency = cancellableDependency; + this.context = context; + this.cancelled = new AtomicBoolean(false); + this.submitted = new AtomicBoolean(false); + } + + @Override + public boolean cancel() { + if (!cancelled.compareAndSet(false, true)) { + return false; + } + final AsyncClientExchangeHandler proxy = handlerProxy; + if (proxy != null) { + proxy.cancel(); + } else { + connRequestState.remove(this); + exchangeHandler.cancel(); + exchangeHandler.releaseResources(); + } + return true; + } + + void failed(final Exception cause) { + if (!cancelled.compareAndSet(false, true)) { + return; + } + final AsyncClientExchangeHandler proxy = handlerProxy; + if (proxy != null) { + proxy.failed(cause); + } else { + exchangeHandler.failed(cause); + exchangeHandler.releaseResources(); + } + } + + boolean submit(final H2MultiplexingRequester requester) { + if (cancelled.get()) { + return false; + } + if (!submitted.compareAndSet(false, true)) { + return true; + } + requester.submitRequest(ioSession, connRequestState, request, entityDetails, exchangeHandler, pushHandlerFactory, + cancellableDependency, context, this); + return true; + } + + } + + private static final class Init { + + private final int maxRequestsPerConnection; + private final RequestSubmissionPolicy requestSubmissionPolicy; + private final ConcurrentMap connRequestStates; + private final IOSessionListener sessionListener; + + Init(final IOSessionListener sessionListener, final int maxRequestsPerConnection, final RequestSubmissionPolicy requestSubmissionPolicy) { + this.maxRequestsPerConnection = maxRequestsPerConnection; + this.requestSubmissionPolicy = requestSubmissionPolicy != null ? requestSubmissionPolicy : RequestSubmissionPolicy.REJECT; + this.connRequestStates = maxRequestsPerConnection > 0 ? new ConcurrentHashMap() : null; + this.sessionListener = new IOSessionListener() { + + @Override + public void connected(final IOSession session) { + if (sessionListener != null) { + sessionListener.connected(session); + } + } + + @Override + public void startTls(final IOSession session) { + if (sessionListener != null) { + sessionListener.startTls(session); + } + } + + @Override + public void inputReady(final IOSession session) { + if (sessionListener != null) { + sessionListener.inputReady(session); + } + } + + @Override + public void outputReady(final IOSession session) { + if (sessionListener != null) { + sessionListener.outputReady(session); + } + } + + @Override + public void timeout(final IOSession session) { + if (sessionListener != null) { + sessionListener.timeout(session); + } + } + + @Override + public void exception(final IOSession session, final Exception ex) { + if (sessionListener != null) { + sessionListener.exception(session, ex); + } + } + + @Override + public void disconnected(final IOSession session) { + try { + if (sessionListener != null) { + sessionListener.disconnected(session); + } + } finally { + if (connRequestStates != null) { + final ConnRequestState state = connRequestStates.remove(session.getId()); + if (state != null) { + state.abortPending(new ConnectionClosedException()); + } + } + } + } + + }; + } + + } + + private static Init init(final IOSessionListener sessionListener, final int maxRequestsPerConnection, final RequestSubmissionPolicy requestSubmissionPolicy) { + return new Init(sessionListener, maxRequestsPerConnection, requestSubmissionPolicy); + } + private final H2ConnPool connPool; + private final int maxRequestsPerConnection; + private final RequestSubmissionPolicy requestSubmissionPolicy; + private final ConcurrentMap connRequestStates; + /** * Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class. */ @@ -101,10 +321,192 @@ public H2MultiplexingRequester( final TlsStrategy tlsStrategy, final IOReactorMetricsListener threadPoolListener, final IOWorkerSelector workerSelector) { - super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, + this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, + addressResolver, tlsStrategy, threadPoolListener, workerSelector, init(sessionListener, 0, RequestSubmissionPolicy.REJECT)); + } + + @Internal + public H2MultiplexingRequester( + final IOReactorConfig ioReactorConfig, + final IOEventHandlerFactory eventHandlerFactory, + final Decorator ioSessionDecorator, + final Callback exceptionCallback, + final IOSessionListener sessionListener, + final Resolver addressResolver, + final TlsStrategy tlsStrategy, + final IOReactorMetricsListener threadPoolListener, + final IOWorkerSelector workerSelector, + final int maxRequestsPerConnection, + final RequestSubmissionPolicy requestSubmissionPolicy) { + this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, + addressResolver, tlsStrategy, threadPoolListener, workerSelector, init(sessionListener, maxRequestsPerConnection, requestSubmissionPolicy)); + } + + private H2MultiplexingRequester( + final IOReactorConfig ioReactorConfig, + final IOEventHandlerFactory eventHandlerFactory, + final Decorator ioSessionDecorator, + final Callback exceptionCallback, + final IOSessionListener sessionListener, + final Resolver addressResolver, + final TlsStrategy tlsStrategy, + final IOReactorMetricsListener threadPoolListener, + final IOWorkerSelector workerSelector, + final Init init) { + super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, init.sessionListener, ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener, workerSelector); this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); + this.maxRequestsPerConnection = init.maxRequestsPerConnection; + this.requestSubmissionPolicy = init.requestSubmissionPolicy; + this.connRequestStates = init.connRequestStates; + } + + private ConnRequestState getConnRequestState(final IOSession ioSession) { + final String id = ioSession.getId(); + ConnRequestState state = connRequestStates.get(id); + if (state == null) { + final ConnRequestState newState = new ConnRequestState(); + final ConnRequestState existing = connRequestStates.putIfAbsent(id, newState); + state = existing != null ? existing : newState; + } + return state; + } + + private void drainQueue(final IOSession ioSession, final ConnRequestState state) { + if (!ioSession.isOpen()) { + state.abortPending(new ConnectionClosedException()); + return; + } + for (;;) { + if (!ioSession.isOpen()) { + state.abortPending(new ConnectionClosedException()); + return; + } + if (!state.tryAcquire(maxRequestsPerConnection)) { + return; + } + final QueuedRequest queuedRequest = state.poll(); + if (queuedRequest == null) { + state.release(); + return; + } + if (!queuedRequest.submit(this)) { + state.release(); + } + } + } + + private void submitRequest( + final IOSession ioSession, + final ConnRequestState connRequestState, + final HttpRequest request, + final EntityDetails entityDetails, + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final CancellableDependency cancellableDependency, + final HttpContext context, + final QueuedRequest queuedRequest) { + final AtomicBoolean released = new AtomicBoolean(false); + final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() { + + private void releaseSlotIfNeeded() { + if (released.compareAndSet(false, true)) { + connRequestState.release(); + if (requestSubmissionPolicy == RequestSubmissionPolicy.QUEUE) { + drainQueue(ioSession, connRequestState); + } + } + } + + @Override + public void releaseResources() { + try { + exchangeHandler.releaseResources(); + } finally { + releaseSlotIfNeeded(); + } + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { + channel.sendRequest(request, entityDetails, httpContext); + } + + @Override + public int available() { + return exchangeHandler.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + exchangeHandler.produce(channel); + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { + exchangeHandler.consumeInformation(response, httpContext); + } + + @Override + public void consumeResponse( + final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { + exchangeHandler.consumeResponse(response, entityDetails, httpContext); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + exchangeHandler.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + exchangeHandler.consume(src); + } + + @Override + public void streamEnd(final List trailers) throws HttpException, IOException { + exchangeHandler.streamEnd(trailers); + } + + @Override + public void cancel() { + try { + exchangeHandler.cancel(); + } finally { + releaseSlotIfNeeded(); + } + } + + @Override + public void failed(final Exception cause) { + try { + exchangeHandler.failed(cause); + } finally { + releaseSlotIfNeeded(); + } + } + + }; + if (queuedRequest != null) { + queuedRequest.handlerProxy = handlerProxy; + } + final Timeout socketTimeout = ioSession.getSocketTimeout(); + ioSession.enqueue(new RequestExecutionCommand( + handlerProxy, + pushHandlerFactory, + context, + streamControl -> { + cancellableDependency.setDependency(streamControl); + if (socketTimeout != null) { + streamControl.setTimeout(socketTimeout); + } + }), + Command.Priority.NORMAL); + if (!ioSession.isOpen()) { + handlerProxy.failed(new ConnectionClosedException()); + handlerProxy.releaseResources(); + } } public void closeIdle(final TimeValue idleTime) { @@ -179,6 +581,9 @@ private void execute( try { exchangeHandler.produceRequest((request, entityDetails, httpContext) -> { final HttpHost host = target != null ? target : defaultTarget(request); + if (request.getScheme() == null) { + request.setScheme(host.getSchemeName()); + } if (request.getAuthority() == null) { request.setAuthority(new URIAuthority(host)); } @@ -186,6 +591,25 @@ private void execute( @Override public void completed(final IOSession ioSession) { + if (maxRequestsPerConnection > 0) { + final ConnRequestState connRequestState = getConnRequestState(ioSession); + if (connRequestState.tryAcquire(maxRequestsPerConnection)) { + submitRequest(ioSession, connRequestState, request, entityDetails, exchangeHandler, pushHandlerFactory, + cancellableDependency, context, null); + return; + } + if (requestSubmissionPolicy == RequestSubmissionPolicy.QUEUE) { + final QueuedRequest queuedRequest = new QueuedRequest(connRequestState, ioSession, request, entityDetails, + exchangeHandler, pushHandlerFactory, cancellableDependency, context); + cancellableDependency.setDependency(queuedRequest); + connRequestState.enqueue(queuedRequest); + return; + } + exchangeHandler.failed(new RejectedExecutionException( + "Maximum number of pending requests per connection reached (max=" + maxRequestsPerConnection + ")")); + exchangeHandler.releaseResources(); + return; + } final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() { @Override @@ -350,4 +774,4 @@ public H2ConnPool getConnPool() { return connPool; } -} +} \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index a19e7913fc..ae9c1cefcc 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -76,6 +76,9 @@ public class H2MultiplexingRequesterBootstrap { private IOReactorMetricsListener threadPoolListener; + private int maxRequestsPerConnection; + private RequestSubmissionPolicy requestSubmissionPolicy; + private H2MultiplexingRequesterBootstrap() { this.routeEntries = new ArrayList<>(); } @@ -211,6 +214,16 @@ public final H2MultiplexingRequesterBootstrap setUriPatternType(final UriPattern return this; } + public final H2MultiplexingRequesterBootstrap setMaxRequestsPerConnection(final int maxRequestsPerConnection) { + this.maxRequestsPerConnection = maxRequestsPerConnection; + return this; + } + + public final H2MultiplexingRequesterBootstrap setRequestSubmissionPolicy(final RequestSubmissionPolicy requestSubmissionPolicy) { + this.requestSubmissionPolicy = requestSubmissionPolicy; + return this; + } + /** * Registers the given {@link AsyncPushConsumer} {@link Supplier} as a default handler for URIs * matching the given pattern. @@ -274,7 +287,9 @@ public H2MultiplexingRequester create() { DefaultAddressResolver.INSTANCE, tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), threadPoolListener, - null); + null, + maxRequestsPerConnection, + requestSubmissionPolicy != null ? requestSubmissionPolicy : RequestSubmissionPolicy.REJECT); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/RequestSubmissionPolicy.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/RequestSubmissionPolicy.java new file mode 100644 index 0000000000..fedcefaa4d --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/RequestSubmissionPolicy.java @@ -0,0 +1,71 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http2.impl.nio.bootstrap; + +/** + * Controls how an {@link H2MultiplexingRequester} behaves when it reaches a configured + * per-connection limit for active / pending request executions (for example when a connection + * has reached a cap such as {@code maxRequestsPerConnection}). + *

+ * In {@link #REJECT} mode, submissions that exceed the configured cap are failed immediately + * (typically with {@link java.util.concurrent.RejectedExecutionException}). + *

+ * In {@link #QUEUE} mode, submissions that exceed the configured cap are queued and will be + * executed later when the number of active requests drops below the cap. This mode improves + * throughput in bursty workloads at the cost of potentially increased latency for queued + * requests. + *

+ * Important: QUEUE mode may retain queued requests in memory until capacity becomes + * available. Applications should ensure reasonable upper bounds on concurrency and use timeouts. + * A bounded queue option may be introduced in the future. + * + * @since 5.5 + */ +public enum RequestSubmissionPolicy { + + /** + * Reject submissions that would exceed the configured per-connection cap. + *

+ * This mode provides fast failure and avoids unbounded memory usage from queued requests. + * + * @since 5.5 + */ + REJECT, + + /** + * Queue submissions that would exceed the configured per-connection cap and execute them + * when capacity becomes available. + *

+ * This mode avoids {@code RejectedExecutionException} under bursts, but queued requests may + * experience additional latency. + * + * @since 5.5 + */ + QUEUE + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java new file mode 100644 index 0000000000..affa375944 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java @@ -0,0 +1,236 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http2.examples; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.RequestSubmissionPolicy; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.Timeout; + +public final class H2MaxRequestsPerConnectionLocalExample { + + public static void main(final String[] args) throws Exception { + final int maxPerConn = 2; + final int totalRequests = 50; + final Timeout timeout = Timeout.ofSeconds(30); + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(maxPerConn) + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(serverH2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setCanonicalHostName("127.0.0.1") + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer(entityDetails != null ? new DiscardingEntityConsumer<>() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext localContext) { + final HttpCoreContext context = HttpCoreContext.cast(localContext); + scheduler.schedule(() -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ex) { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(500) + .setEntity(ex.toString(), ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + } + } + }, 200, TimeUnit.MILLISECONDS); + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + System.out.println("server on 127.0.0.1:" + port); + + final H2Config clientH2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(clientH2Config) + .setMaxRequestsPerConnection(maxPerConn) + .setRequestSubmissionPolicy(RequestSubmissionPolicy.QUEUE) + .create(); + + requester.start(); + + final HttpHost target = new HttpHost("http", "127.0.0.1", port); + + requester.execute( + target, + AsyncRequestBuilder.get() + .setPath("/warmup") + .build(), + new BasicResponseConsumer(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + @Override + public void completed(final Message result) { + System.out.println("warmup -> " + result.getHead().getCode()); + } + + @Override + public void failed(final Exception ex) { + System.out.println("warmup failed -> " + ex.getClass().getName() + ": " + ex.getMessage()); + } + + @Override + public void cancelled() { + System.out.println("warmup cancelled"); + } + }).get(); + + final AtomicInteger ok = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(totalRequests); + + final ExecutorService exec = Executors.newFixedThreadPool(Math.min(16, totalRequests)); + + for (int i = 0; i < totalRequests; i++) { + final int id = i; + exec.execute(() -> { + final String path = "/slow?i=" + id; + + requester.execute( + target, + AsyncRequestBuilder.get() + .setPath(path) + .build(), + new BasicResponseConsumer(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + if (message.getHead().getCode() == 200) { + ok.incrementAndGet(); + } else { + failed.incrementAndGet(); + System.out.println("FAILED " + path + " -> HTTP " + message.getHead().getCode()); + } + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + failed.incrementAndGet(); + System.out.println("FAILED " + path + " -> " + ex.getClass().getName() + ": " + ex.getMessage()); + latch.countDown(); + } + + @Override + public void cancelled() { + failed.incrementAndGet(); + latch.countDown(); + } + }); + }); + } + + final boolean done = latch.await(60, TimeUnit.SECONDS); + exec.shutdownNow(); + + System.out.println("done=" + done + " ok=" + ok.get() + ", failed=" + failed.get()); + + requester.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } + + private H2MaxRequestsPerConnectionLocalExample() { + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionRejectLocalExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionRejectLocalExample.java new file mode 100644 index 0000000000..c12f1bb847 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionRejectLocalExample.java @@ -0,0 +1,245 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http2.examples; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.RequestSubmissionPolicy; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.Timeout; + +public final class H2MaxRequestsPerConnectionRejectLocalExample { + + public static void main(final String[] args) throws Exception { + final int maxPerConn = 2; + final int totalRequests = 50; + final Timeout timeout = Timeout.ofSeconds(30); + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + // Keep it low to make rejections deterministic / easy to reproduce. + .setMaxConcurrentStreams(maxPerConn) + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(serverH2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setCanonicalHostName("127.0.0.1") + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer<>(entityDetails != null ? new DiscardingEntityConsumer<>() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext localContext) { + final HttpCoreContext context = HttpCoreContext.cast(localContext); + // Add a delay so the connection stays "busy" and the client hits the cap. + scheduler.schedule(() -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ex) { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(500) + .setEntity(ex.toString(), ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + } + } + }, 200, TimeUnit.MILLISECONDS); + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + System.out.println("server on 127.0.0.1:" + port); + + final H2Config clientH2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(clientH2Config) + .setMaxRequestsPerConnection(maxPerConn) + .setRequestSubmissionPolicy(RequestSubmissionPolicy.REJECT) + .create(); + + requester.start(); + + final HttpHost target = new HttpHost("http", "127.0.0.1", port); + + requester.execute( + target, + AsyncRequestBuilder.get() + .setPath("/warmup") + .build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + @Override + public void completed(final Message result) { + System.out.println("warmup -> " + result.getHead().getCode()); + } + + @Override + public void failed(final Exception ex) { + System.out.println("warmup failed -> " + ex.getClass().getName() + ": " + ex.getMessage()); + } + + @Override + public void cancelled() { + System.out.println("warmup cancelled"); + } + }).get(); + + final AtomicInteger ok = new AtomicInteger(0); + final AtomicInteger rejected = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(totalRequests); + + final ExecutorService exec = Executors.newFixedThreadPool(Math.min(16, totalRequests)); + + for (int i = 0; i < totalRequests; i++) { + final int id = i; + exec.execute(() -> { + final String path = "/slow?i=" + id; + + requester.execute( + target, + AsyncRequestBuilder.get() + .setPath(path) + .build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + if (message.getHead().getCode() == 200) { + ok.incrementAndGet(); + } else { + failed.incrementAndGet(); + System.out.println("FAILED " + path + " -> HTTP " + message.getHead().getCode()); + } + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + // In REJECT mode, you should see this hit often under burst load. + if (ex instanceof java.util.concurrent.RejectedExecutionException) { + rejected.incrementAndGet(); + System.out.println("REJECTED " + path + " -> " + ex.getMessage()); + } else { + failed.incrementAndGet(); + System.out.println("FAILED " + path + " -> " + ex.getClass().getName() + ": " + ex.getMessage()); + } + latch.countDown(); + } + + @Override + public void cancelled() { + failed.incrementAndGet(); + latch.countDown(); + } + }); + }); + } + + final boolean done = latch.await(60, TimeUnit.SECONDS); + exec.shutdownNow(); + + System.out.println("done=" + done + " ok=" + ok.get() + ", rejected=" + rejected.get() + ", failed=" + failed.get()); + + requester.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } + + private H2MaxRequestsPerConnectionRejectLocalExample() { + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java new file mode 100644 index 0000000000..b50a2052c2 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java @@ -0,0 +1,192 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http2.impl.nio.bootstrap; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Test; + +class TestH2MultiplexingRequesterMaxRequestsPerConnection { + + @Test + @org.junit.jupiter.api.Timeout(30) + void rejectsWhenPerConnectionLimitReached() throws Exception { + final int maxPerConn = 2; + final int totalRequests = 20; + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(1) + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(serverH2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer<>(entityDetails != null ? new DiscardingEntityConsumer() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext localContext) { + final HttpCoreContext context = HttpCoreContext.cast(localContext); + scheduler.schedule(() -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + } + }, 2, TimeUnit.SECONDS); + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + + final H2Config clientH2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(clientH2Config) + .setMaxRequestsPerConnection(maxPerConn) + .create(); + + requester.start(); + + final HttpHost target = new HttpHost("http", "127.0.0.1", port); + final Timeout timeout = Timeout.ofSeconds(30); + + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + @Override public void completed(final Message result) { } + @Override public void failed(final Exception ex) { } + @Override public void cancelled() { } + }).get(); + + final AtomicInteger rejected = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(totalRequests); + + for (int i = 0; i < totalRequests; i++) { + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath("/slow?i=" + i).build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + @Override + public void completed(final Message message) { + latch.countDown(); + } + @Override + public void failed(final Exception ex) { + if (ex instanceof RejectedExecutionException) { + rejected.incrementAndGet(); + } + latch.countDown(); + } + @Override + public void cancelled() { + latch.countDown(); + } + }); + } + + latch.await(20, TimeUnit.SECONDS); + + try { + assertTrue(rejected.get() > 0); + } finally { + requester.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } + } + +}