Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ public H2AsyncRequester(
final TlsStrategy tlsStrategy,
final Timeout handshakeTimeout,
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
final IOWorkerSelector workerSelector,
final int maxPendingCommandsPerConnection) {
super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
tlsStrategy, handshakeTimeout, threadPoolListener, workerSelector);
tlsStrategy, handshakeTimeout, threadPoolListener, workerSelector, maxPendingCommandsPerConnection);
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.Cancellable;
Expand Down Expand Up @@ -87,6 +88,12 @@ public class H2MultiplexingRequester extends AsyncRequester {

private final H2ConnPool connPool;

/**
* Hard cap on per-connection queued / in-flight commands.
* {@code <= 0} disables the cap.
*/
private final int maxCommandsPerConnection;

/**
* Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class.
*/
Expand All @@ -100,11 +107,13 @@ public H2MultiplexingRequester(
final Resolver<HttpHost, InetSocketAddress> addressResolver,
final TlsStrategy tlsStrategy,
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
final IOWorkerSelector workerSelector,
final int maxCommandsPerConnection) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE,
threadPoolListener, workerSelector);
this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
this.maxCommandsPerConnection = maxCommandsPerConnection;
}

public void closeIdle(final TimeValue idleTime) {
Expand Down Expand Up @@ -245,6 +254,16 @@ public void failed(final Exception cause) {
}

};
final int max = maxCommandsPerConnection;
if (max > 0) {
final int current = ioSession.getPendingCommandCount();
if (current >= 0 && current >= max) {
exchangeHandler.failed(new RejectedExecutionException(
"Maximum number of pending commands per connection reached (max=" + max + ")"));
exchangeHandler.releaseResources();
return;
}
}
final Timeout socketTimeout = ioSession.getSocketTimeout();
ioSession.enqueue(new RequestExecutionCommand(
handlerProxy,
Expand Down Expand Up @@ -349,5 +368,4 @@ public final <T> Future<T> execute(
public H2ConnPool getConnPool() {
return connPool;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.function.Supplier;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap {

private IOReactorMetricsListener threadPoolListener;

private int maxCommandsPerConnection;

private H2MultiplexingRequesterBootstrap() {
this.routeEntries = new ArrayList<>();
}
Expand Down Expand Up @@ -180,6 +183,23 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final
return this;
}

/**
* Sets a hard limit on the number of pending commands execution commands that can be queued per connection.
* When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
* A value {@code <= 0} disables the limit (default).
* Note: this limit applies to commands waiting in the connection's internal queue (backlog). HTTP/2 in-flight
* concurrency is governed separately by protocol settings (e.g. MAX_CONCURRENT_STREAMS).
*
* @param max maximum number of pending commands per connection; {@code <= 0} to disable the limit.
* @return this instance.
* @since 5.5
*/
@Experimental
public final H2MultiplexingRequesterBootstrap setMaxCommandsPerConnection(final int max) {
this.maxCommandsPerConnection = max;
return this;
}

/**
* Sets {@link H2StreamListener} instance.
*
Expand Down Expand Up @@ -274,7 +294,8 @@ public H2MultiplexingRequester create() {
DefaultAddressResolver.INSTANCE,
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
threadPoolListener,
null);
null,
maxCommandsPerConnection);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class H2RequesterBootstrap {
private ConnPoolListener<HttpHost> connPoolListener;
private IOReactorMetricsListener threadPoolListener;
private FrameFactory frameFactory;
private int maxPendingCommandsPerConnection;


private H2RequesterBootstrap() {
Expand Down Expand Up @@ -210,6 +211,11 @@ public final H2RequesterBootstrap setPoolConcurrencyPolicy(final PoolConcurrency
return this;
}

public final H2RequesterBootstrap setMaxPendingCommandsPerConnection(final int maxPendingCommandsPerConnection) {
this.maxPendingCommandsPerConnection = maxPendingCommandsPerConnection;
return this;
}

/**
* Sets {@link TlsStrategy} instance.
*
Expand Down Expand Up @@ -433,7 +439,8 @@ public H2AsyncRequester create() {
actualTlsStrategy,
handshakeTimeout,
threadPoolListener,
null);
null,
maxPendingCommandsPerConnection);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.http2.impl.nio.bootstrap;

import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class TestH2MultiplexingRequesterMaxRequestsPerConnection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturobernalg This test should be moved to client and be made a part of a larger suite of related integration tests, but if you want to keep it here, so be it, but at least be consistent and provide a similar one for HttpAsyncRequester.


@Test
@org.junit.jupiter.api.Timeout(value = 60, unit = TimeUnit.SECONDS)
void testRejectsWhenLimitReached() throws Exception {
final int maxPerConn = 2;
final int totalRequests = 30;
final Timeout timeout = Timeout.ofSeconds(30);
final long serverDelayMillis = 5000;

final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(1)
.build();

final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

final H2Config serverH2Config = H2Config.custom()
.setPushEnabled(false)
.setMaxConcurrentStreams(1) // force backlog
.build();

final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
.setIOReactorConfig(ioReactorConfig)
.setH2Config(serverH2Config)
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
.setCanonicalHostName("127.0.0.1") // avoid 421
.register("*", new AsyncServerRequestHandler<Message<HttpRequest, Void>>() {

@Override
public AsyncRequestConsumer<Message<HttpRequest, Void>> prepare(
final HttpRequest request,
final EntityDetails entityDetails,
final HttpContext context) {
return new BasicRequestConsumer<>(
entityDetails != null ? new DiscardingEntityConsumer<>() : null);
}

@Override
public void handle(
final Message<HttpRequest, Void> message,
final ResponseTrigger responseTrigger,
final HttpContext localContext) {

final HttpCoreContext context = HttpCoreContext.cast(localContext);
final String path = message.getHead().getPath();

final Runnable task = () -> {
try {
responseTrigger.submitResponse(
AsyncResponseBuilder.create(200)
.setEntity("ok\n", ContentType.TEXT_PLAIN)
.build(),
context);
} catch (final Exception ignore) {
// ignore
}
};

if ("/warmup".equals(path)) {
task.run();
} else {
scheduler.schedule(task, serverDelayMillis, TimeUnit.MILLISECONDS);
}
}

})
.create();

server.start();
final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get();
final int port = ((InetSocketAddress) ep.getAddress()).getPort();

final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap()
.setIOReactorConfig(ioReactorConfig)
.setH2Config(H2Config.custom().setPushEnabled(false).build())
.setMaxCommandsPerConnection(maxPerConn)
.create();


requester.start();

try {
final HttpHost target = new HttpHost("http", "127.0.0.1", port);

// Warmup
requester.execute(
target,
AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
timeout,
HttpCoreContext.create(),
null).get();

final AtomicInteger ok = new AtomicInteger(0);
final AtomicInteger rejected = new AtomicInteger(0);
final AtomicInteger failed = new AtomicInteger(0);

final CountDownLatch done = new CountDownLatch(totalRequests);
final CountDownLatch start = new CountDownLatch(1);
final ExecutorService exec = Executors.newFixedThreadPool(16);

for (int i = 0; i < totalRequests; i++) {
final int id = i;
exec.execute(new Runnable() {
@Override
public void run() {
try {
start.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
done.countDown();
return;
}

requester.execute(
target,
AsyncRequestBuilder.get().setHttpHost(target).setPath("/slow?i=" + id).build(),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
timeout,
HttpCoreContext.create(),
new FutureCallback<Message<HttpResponse, String>>() {

@Override
public void completed(final Message<HttpResponse, String> message) {
ok.incrementAndGet();
done.countDown();
}

@Override
public void failed(final Exception ex) {
if (ex instanceof RejectedExecutionException) {
rejected.incrementAndGet();
} else {
failed.incrementAndGet();
}
done.countDown();
}

@Override
public void cancelled() {
failed.incrementAndGet();
done.countDown();
}
});
}
});
}

start.countDown();

final boolean allDone = done.await(60, TimeUnit.SECONDS);
exec.shutdownNow();

Assertions.assertTrue(allDone, "Timed out");
Assertions.assertEquals(totalRequests, ok.get() + rejected.get() + failed.get());
Assertions.assertTrue(rejected.get() > 0, "Expected at least one RejectedExecutionException");
Assertions.assertEquals(0, failed.get(), "Unexpected non-rejection failures: " + failed.get());
} finally {
requester.close(CloseMode.GRACEFUL);
server.close(CloseMode.GRACEFUL);
scheduler.shutdownNow();
}
}
}
Loading