diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index fc706b85369..258c63b2d76 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -773,6 +773,80 @@ public Client alias(final Map aliases) { } } + /** + * A {@code Client} implementation that operates in the context of a session. {@code ChildSessionClient} is tied to + * another client as a child, it borrows the connection from the parent client's pool for the transaction. Requests are + * sent to a single server, where each request is bound to the same thread and same connection with the same set of + * bindings across requests. + * Transaction are not automatically committed. It is up the client to issue commit/rollback commands. + */ + public final static class SessionedChildClient extends Client { + private final String sessionId; + private final boolean manageTransactions; + private final boolean maintainStateAfterException; + private final Client parentClient; + private Connection borrowedConnection; + private boolean closed; + + public SessionedChildClient(final Client parentClient, String sessionId) { + super(parentClient.cluster, parentClient.settings); + this.parentClient = parentClient; + this.sessionId = sessionId; + this.closed = false; + this.manageTransactions = parentClient.settings.getSession().map(s -> s.manageTransactions).orElse(false); + this.maintainStateAfterException = parentClient.settings.getSession().map(s -> s.maintainStateAfterException).orElse(false); + } + + public String getSessionId() { + return sessionId; + } + + @Override + public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) { + builder.processor("session"); + builder.addArg(Tokens.ARGS_SESSION, sessionId); + builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions); + builder.addArg(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION, maintainStateAfterException); + return builder; + } + + @Override + protected void initializeImplementation() { + // do nothing, parentClient is already initialized + } + + @Override + protected synchronized Connection chooseConnection(RequestMessage msg) throws TimeoutException, ConnectionException { + if (borrowedConnection == null) { + //Borrow from parentClient's pool instead of creating new connection + borrowedConnection = parentClient.chooseConnection(msg); + } + //Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback + borrowedConnection.borrowed.incrementAndGet(); + return borrowedConnection; + } + + @Override + public synchronized CompletableFuture closeAsync() { + if (borrowedConnection != null && !borrowedConnection.isDead()) { + + //Decrement borrowed one last time which was incremented by parentClient when the connection is borrowed initially + //returnToPool() does this + borrowedConnection.returnToPool(); + + borrowedConnection = null; + } + closed = true; + return CompletableFuture.completedFuture(null); + } + + @Override + public boolean isClosing() { + return parentClient.isClosing() || closed; + } + } + + /** * A {@code Client} implementation that operates in the context of a session. Requests are sent to a single * server, where each request is bound to the same thread with the same set of bindings across requests. diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index f8180c6347b..2b531808bfa 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -459,6 +459,13 @@ public int getPort() { return manager.port; } + /** + * Determines whether to reuse connections for transactions or create new ones. + */ + public boolean isReuseConnectionsForSessions() { + return manager.reuseConnectionsForSessions; + } + /** * Gets a list of all the configured hosts. */ @@ -624,6 +631,7 @@ public final static class Builder { private long connectionSetupTimeoutMillis = Connection.CONNECTION_SETUP_TIMEOUT_MILLIS; private boolean enableUserAgentOnConnect = true; private boolean enableCompression = true; + private boolean reuseConnectionsForSessions = false; private Builder() { // empty to prevent direct instantiation @@ -872,6 +880,14 @@ public Builder maxWaitForConnection(final int maxWait) { return this; } + /** + * If true, reuses the connections for transactions + */ + public Builder reuseConnectionsForSessions(final boolean reuseConnectionsForSessions) { + this.reuseConnectionsForSessions = reuseConnectionsForSessions; + return this; + } + /** * The amount of time in milliseconds to wait the connection to close before timing out where the default * value is 3000. This timeout allows for a delay to occur in waiting for remaining messages that may still @@ -1118,6 +1134,7 @@ class Manager { private final String path; private final boolean enableUserAgentOnConnect; private final boolean enableCompression; + private final boolean reuseConnectionsForSessions; private final AtomicReference> closeFuture = new AtomicReference<>(); @@ -1132,6 +1149,7 @@ private Manager(final Builder builder) { this.interceptor = builder.interceptor; this.enableUserAgentOnConnect = builder.enableUserAgentOnConnect; this.enableCompression = builder.enableCompression; + this.reuseConnectionsForSessions = builder.reuseConnectionsForSessions; connectionPoolSettings = new Settings.ConnectionPoolSettings(); connectionPoolSettings.maxInProcessPerConnection = builder.maxInProcessPerConnection; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 749789a86c4..3633d397c68 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -267,7 +267,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab return requestPromise; } - private void returnToPool() { + public void returnToPool() { try { if (pool != null) pool.returnConnection(this); } catch (ConnectionException ce) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java index 12be922b66e..f07bc095ce1 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java @@ -110,6 +110,13 @@ final class Settings { */ public boolean enableCompression = true; + /** + * Determines whether to use SessionedChildClient (true) or SessionedClient (false) for transactions. + * SessionedChildClient reuses the existing connections whereas SessoinedClient creates a new one for every transaction. + * Defaults to false for backward compatibility. + */ + public boolean reuseConnectionsForSessions = false; + /** * Read configuration from a file into a new {@link Settings} object. * @@ -162,6 +169,9 @@ public static Settings from(final Configuration conf) { if (conf.containsKey("enableCompression")) settings.enableCompression = conf.getBoolean("enableCompression"); + if (conf.containsKey("reuseConnectionsForSessions")) + settings.reuseConnectionsForSessions = conf.getBoolean("reuseConnectionsForSessions"); + if (conf.containsKey("hosts")) settings.hosts = conf.getList("hosts").stream().map(Object::toString).collect(Collectors.toList()); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java index ee2364ec445..9eb6ad5382d 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java @@ -19,6 +19,7 @@ package org.apache.tinkerpop.gremlin.driver.remote; import org.apache.commons.configuration2.Configuration; +import org.apache.tinkerpop.gremlin.driver.Channelizer; import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection; @@ -259,9 +260,24 @@ public void close() throws Exception { */ @Override public Transaction tx() { - final DriverRemoteConnection session = new DriverRemoteConnection( - client.getCluster().connect(UUID.randomUUID().toString()), remoteTraversalSourceName, true); - return new DriverRemoteTransaction(session); + if (client.getCluster().getChannelizer().equalsIgnoreCase(Channelizer.HttpChannelizer.class.getName())) { + throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", Channelizer.HttpChannelizer.class.getSimpleName())); + } + + final boolean reuseConnections = client.getCluster().isReuseConnectionsForSessions(); + final String sessionId = UUID.randomUUID().toString(); + final DriverRemoteConnection connection; + + if (reuseConnections) { + client.init(); + final Client.SessionedChildClient childClient = new Client.SessionedChildClient(client, sessionId); + connection = new DriverRemoteConnection( + childClient, remoteTraversalSourceName, true); + } else { + connection = new DriverRemoteConnection( + client.getCluster().connect(sessionId), remoteTraversalSourceName, true); + } + return new DriverRemoteTransaction(connection); } @Override diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/SettingsTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/SettingsTest.java index 26eb8a9f7f0..4a85e94ff8f 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/SettingsTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/SettingsTest.java @@ -48,6 +48,7 @@ public void shouldCreateFromConfiguration() { conf.setProperty("serializer.config.any", "thing"); conf.setProperty("enableUserAgentOnConnect", false); conf.setProperty("enableCompression", false); + conf.setProperty("reuseConnectionsForSessions", true); conf.setProperty("connectionPool.enableSsl", true); conf.setProperty("connectionPool.keyStore", "server.jks"); conf.setProperty("connectionPool.keyStorePassword", "password2"); @@ -87,6 +88,7 @@ public void shouldCreateFromConfiguration() { assertEquals(false, settings.enableUserAgentOnConnect); assertEquals(false, settings.enableCompression); assertThat(settings.connectionPool.enableSsl, is(true)); + assertEquals(true, settings.reuseConnectionsForSessions); assertEquals("server.jks", settings.connectionPool.keyStore); assertEquals("password2", settings.connectionPool.keyStorePassword); assertEquals("pkcs12", settings.connectionPool.keyStoreType);