Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,80 @@ public Client alias(final Map<String, String> aliases) {
}
}

/**
* A {@code Client} implementation that operates in the context of a session. {@code ChildSessionClient} is tied to
* another client as a child, it borrows the connection from the parent client's pool for the transaction. Requests are
* sent to a single server, where each request is bound to the same thread and same connection with the same set of
* bindings across requests.
* Transaction are not automatically committed. It is up the client to issue commit/rollback commands.
*/
public final static class SessionedChildClient extends Client {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have been best if there was a solution that kept this class package-private, as it now differs from the other Clients that are only supposed to be exposed via Cluster. It's fine for this PR, but I'd probably recommend adding a Jira to look into this again in the future after this is merged.

private final String sessionId;
private final boolean manageTransactions;
private final boolean maintainStateAfterException;
private final Client parentClient;
private Connection borrowedConnection;
private boolean closed;

public SessionedChildClient(final Client parentClient, String sessionId) {
super(parentClient.cluster, parentClient.settings);
this.parentClient = parentClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense for any type of Client to be the parent? SessionedClient only allows for one connection so maybe there should be a check here that disallows that particular client?

this.sessionId = sessionId;
this.closed = false;
this.manageTransactions = parentClient.settings.getSession().map(s -> s.manageTransactions).orElse(false);
this.maintainStateAfterException = parentClient.settings.getSession().map(s -> s.maintainStateAfterException).orElse(false);
}

public String getSessionId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return sessionId;
}

@Override
public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
builder.processor("session");
builder.addArg(Tokens.ARGS_SESSION, sessionId);
builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
builder.addArg(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION, maintainStateAfterException);
return builder;
}

@Override
protected void initializeImplementation() {
// do nothing, parentClient is already initialized
}

@Override
protected synchronized Connection chooseConnection(RequestMessage msg) throws TimeoutException, ConnectionException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
protected synchronized Connection chooseConnection(RequestMessage msg) throws TimeoutException, ConnectionException {
protected synchronized Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's the overhead we are trying to reduce. I have updated the PR description as well.

if (borrowedConnection == null) {
//Borrow from parentClient's pool instead of creating new connection
borrowedConnection = parentClient.chooseConnection(msg);
}
//Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback
borrowedConnection.borrowed.incrementAndGet();
return borrowedConnection;
}

@Override
public synchronized CompletableFuture<Void> closeAsync() {
if (borrowedConnection != null && !borrowedConnection.isDead()) {

//Decrement borrowed one last time which was incremented by parentClient when the connection is borrowed initially
//returnToPool() does this
borrowedConnection.returnToPool();

borrowedConnection = null;
}
closed = true;
return CompletableFuture.completedFuture(null);
}

@Override
public boolean isClosing() {
return parentClient.isClosing() || closed;
}
}


/**
* A {@code Client} implementation that operates in the context of a session. Requests are sent to a single
* server, where each request is bound to the same thread with the same set of bindings across requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,13 @@ public int getPort() {
return manager.port;
}

/**
* Determines whether to reuse connections for transactions or create new ones.
*/
public boolean isReuseConnectionsForSessions() {
return manager.reuseConnectionsForSessions;
}

/**
* Gets a list of all the configured hosts.
*/
Expand Down Expand Up @@ -624,6 +631,7 @@ public final static class Builder {
private long connectionSetupTimeoutMillis = Connection.CONNECTION_SETUP_TIMEOUT_MILLIS;
private boolean enableUserAgentOnConnect = true;
private boolean enableCompression = true;
private boolean reuseConnectionsForSessions = false;

private Builder() {
// empty to prevent direct instantiation
Expand Down Expand Up @@ -872,6 +880,14 @@ public Builder maxWaitForConnection(final int maxWait) {
return this;
}

/**
* If true, reuses the connections for transactions
*/
public Builder reuseConnectionsForSessions(final boolean reuseConnectionsForSessions) {
this.reuseConnectionsForSessions = reuseConnectionsForSessions;
return this;
}

/**
* The amount of time in milliseconds to wait the connection to close before timing out where the default
* value is 3000. This timeout allows for a delay to occur in waiting for remaining messages that may still
Expand Down Expand Up @@ -1118,6 +1134,7 @@ class Manager {
private final String path;
private final boolean enableUserAgentOnConnect;
private final boolean enableCompression;
private final boolean reuseConnectionsForSessions;

private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();

Expand All @@ -1132,6 +1149,7 @@ private Manager(final Builder builder) {
this.interceptor = builder.interceptor;
this.enableUserAgentOnConnect = builder.enableUserAgentOnConnect;
this.enableCompression = builder.enableCompression;
this.reuseConnectionsForSessions = builder.reuseConnectionsForSessions;

connectionPoolSettings = new Settings.ConnectionPoolSettings();
connectionPoolSettings.maxInProcessPerConnection = builder.maxInProcessPerConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab
return requestPromise;
}

private void returnToPool() {
public void returnToPool() {
try {
if (pool != null) pool.returnConnection(this);
} catch (ConnectionException ce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ final class Settings {
*/
public boolean enableCompression = true;

/**
* Determines whether to use SessionedChildClient (true) or SessionedClient (false) for transactions.
* SessionedChildClient reuses the existing connections whereas SessoinedClient creates a new one for every transaction.
* Defaults to false for backward compatibility.
*/
public boolean reuseConnectionsForSessions = false;

/**
* Read configuration from a file into a new {@link Settings} object.
*
Expand Down Expand Up @@ -162,6 +169,9 @@ public static Settings from(final Configuration conf) {
if (conf.containsKey("enableCompression"))
settings.enableCompression = conf.getBoolean("enableCompression");

if (conf.containsKey("reuseConnectionsForSessions"))
settings.reuseConnectionsForSessions = conf.getBoolean("reuseConnectionsForSessions");

if (conf.containsKey("hosts"))
settings.hosts = conf.getList("hosts").stream().map(Object::toString).collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.driver.remote;

import org.apache.commons.configuration2.Configuration;
import org.apache.tinkerpop.gremlin.driver.Channelizer;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
Expand Down Expand Up @@ -259,9 +260,24 @@ public void close() throws Exception {
*/
@Override
public Transaction tx() {
final DriverRemoteConnection session = new DriverRemoteConnection(
client.getCluster().connect(UUID.randomUUID().toString()), remoteTraversalSourceName, true);
return new DriverRemoteTransaction(session);
if (client.getCluster().getChannelizer().equalsIgnoreCase(Channelizer.HttpChannelizer.class.getName())) {
throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", Channelizer.HttpChannelizer.class.getSimpleName()));
}

final boolean reuseConnections = client.getCluster().isReuseConnectionsForSessions();
final String sessionId = UUID.randomUUID().toString();
final DriverRemoteConnection connection;

if (reuseConnections) {
client.init();
final Client.SessionedChildClient childClient = new Client.SessionedChildClient(client, sessionId);
connection = new DriverRemoteConnection(
childClient, remoteTraversalSourceName, true);
} else {
connection = new DriverRemoteConnection(
client.getCluster().connect(sessionId), remoteTraversalSourceName, true);
}
return new DriverRemoteTransaction(connection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void shouldCreateFromConfiguration() {
conf.setProperty("serializer.config.any", "thing");
conf.setProperty("enableUserAgentOnConnect", false);
conf.setProperty("enableCompression", false);
conf.setProperty("reuseConnectionsForSessions", true);
conf.setProperty("connectionPool.enableSsl", true);
conf.setProperty("connectionPool.keyStore", "server.jks");
conf.setProperty("connectionPool.keyStorePassword", "password2");
Expand Down Expand Up @@ -87,6 +88,7 @@ public void shouldCreateFromConfiguration() {
assertEquals(false, settings.enableUserAgentOnConnect);
assertEquals(false, settings.enableCompression);
assertThat(settings.connectionPool.enableSsl, is(true));
assertEquals(true, settings.reuseConnectionsForSessions);
assertEquals("server.jks", settings.connectionPool.keyStore);
assertEquals("password2", settings.connectionPool.keyStorePassword);
assertEquals("pkcs12", settings.connectionPool.keyStoreType);
Expand Down
Loading