Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions opendj-server-legacy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,18 @@
<artifactId>handler-jdbc</artifactId>
</dependency>

<!-- slf4j libraries -->
<!-- <dependency> -->
<!-- <groupId>org.slf4j</groupId> -->
<!-- <artifactId>slf4j-jdk14</artifactId> -->
<!-- </dependency> -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>

<!-- Source: https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.2.3</version>
<scope>compile</scope>
</dependency>

<!-- mail -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,19 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;

public class Storage implements org.opends.server.backends.pluggable.spi.Storage, ConfigurationChangeListener<CASBackendCfg>{

private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();

//private final ServerContext serverContext;
private CASBackendCfg config;

public Storage(CASBackendCfg cfg, ServerContext serverContext) {
//this.serverContext = serverContext;
this.config = cfg;
cfg.addCASChangeListener(this);
cfg.addCASChangeListener(this);
}

//config
Expand All @@ -88,37 +86,32 @@ public boolean isConfigurationChangeAcceptable(CASBackendCfg configuration,List<
@Override
public ConfigChangeResult applyConfigurationChange(CASBackendCfg cfg) {
final ConfigChangeResult ccr = new ConfigChangeResult();
try
{
this.config = cfg;
}
catch (Exception e)
{
addErrorMessage(ccr, LocalizableMessage.raw(stackTraceToSingleLineString(e)));
}
return ccr;
try
{
this.config = cfg;
}
catch (Exception e)
{
addErrorMessage(ccr, LocalizableMessage.raw(stackTraceToSingleLineString(e)));
}
return ccr;
}

CqlSession session=null;

final LoadingCache<String,PreparedStatement> prepared=CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10))
.maximumSize(4096)
.build(new CacheLoader<String,PreparedStatement>(){
@Override
public PreparedStatement load(String query) throws Exception {
return session.prepare(query);
}
});


final LoadingCache<String,PreparedStatement> prepared = Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10))
.maximumSize(4096)
.build(query -> session.prepare(query));

ResultSet execute(Statement<?> statement) {
if (logger.isTraceEnabled()) {
final ResultSet res=session.execute(statement.setTracing(true));
logger.trace(LocalizableMessage.raw(
"cassandra: %s"
,res.getExecutionInfo().getQueryTrace().getParameters()
)
);
)
);
return res;
}
return session.execute(statement);
Expand All @@ -129,11 +122,11 @@ ResultSet execute(Statement<?> statement) {
public void open(AccessMode accessMode) throws Exception {
this.accessMode=accessMode;
session=CqlSession.builder()
.withApplicationName("OpenDJ "+getKeyspaceName()+"."+config.getBackendId())
.withConfigLoader(DriverConfigLoader.fromDefaults(Storage.class.getClassLoader()))
.build();
.withApplicationName("OpenDJ "+getKeyspaceName()+"."+config.getBackendId())
.withConfigLoader(DriverConfigLoader.fromDefaults(Storage.class.getClassLoader()))
.build();
if (AccessMode.READ_WRITE.equals(accessMode)) {
execute(prepared.getUnchecked("CREATE KEYSPACE IF NOT EXISTS "+getKeyspaceName()+" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};").bind().setExecutionProfileName(profile));
execute(prepared.get("CREATE KEYSPACE IF NOT EXISTS "+getKeyspaceName()+" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};").bind().setExecutionProfileName(profile));
}
storageStatus = StorageStatus.working();
}
Expand Down Expand Up @@ -163,7 +156,7 @@ String getTableName() {

@Override
public void removeStorageFiles() throws StorageRuntimeException {
final Boolean isOpen=getStorageStatus().isWorking();
final boolean isOpen=getStorageStatus().isWorking();
if (!isOpen) {
try {
open(AccessMode.READ_WRITE);
Expand All @@ -172,7 +165,7 @@ public void removeStorageFiles() throws StorageRuntimeException {
}
}
try {
execute(prepared.getUnchecked("TRUNCATE TABLE "+getTableName()+";").bind().setExecutionProfileName(profile));
execute(prepared.get("TRUNCATE TABLE "+getTableName()+";").bind().setExecutionProfileName(profile));
}catch (Throwable e) {}
if (!isOpen) {
close();
Expand Down Expand Up @@ -210,7 +203,7 @@ public TransactionImpl(AccessMode accessMode) {
@Override
public void openTree(TreeName name, boolean createOnDemand) {
if (createOnDemand) {
execute(prepared.getUnchecked("CREATE TABLE IF NOT EXISTS "+getTableName()+" (baseDN text,indexId text,key blob,value blob,PRIMARY KEY ((baseDN,indexId),key));").bind().setExecutionProfileName(profile));
execute(prepared.get("CREATE TABLE IF NOT EXISTS "+getTableName()+" (baseDN text,indexId text,key blob,value blob,PRIMARY KEY ((baseDN,indexId),key));").bind().setExecutionProfileName(profile));
}
}

Expand All @@ -222,10 +215,10 @@ public void clearTree(TreeName treeName) {
@Override
public ByteString read(TreeName treeName, ByteSequence key) {
final Row row=execute(
prepared.getUnchecked("SELECT value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
.setByteBuffer("key", ByteBuffer.wrap(key.toByteArray()))
).one();
prepared.get("SELECT value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
.setByteBuffer("key", ByteBuffer.wrap(key.toByteArray()))
).one();
return row==null?null:ByteString.wrap(row.getByteBuffer("value").array());
}

Expand All @@ -237,30 +230,30 @@ public Cursor<ByteString, ByteString> openCursor(TreeName treeName) {
@Override
public long getRecordCount(TreeName treeName) {
return execute(
prepared.getUnchecked("SELECT count(*) FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
).one().getLong(0);
prepared.get("SELECT count(*) FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
).one().getLong(0);
}

@Override
public void deleteTree(TreeName treeName) {
checkReadOnly();
openTree(treeName,true);
execute(
prepared.getUnchecked("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
);
prepared.get("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
);
}

@Override
public void put(TreeName treeName, ByteSequence key, ByteSequence value) {
checkReadOnly();
execute(
prepared.getUnchecked("INSERT INTO "+getTableName()+" (baseDN,indexId,key,value) VALUES (:baseDN,:indexId,:key,:value)").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
prepared.get("INSERT INTO "+getTableName()+" (baseDN,indexId,key,value) VALUES (:baseDN,:indexId,:key,:value)").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
.setByteBuffer("key", ByteBuffer.wrap(key.toByteArray()))
.setByteBuffer("value",ByteBuffer.wrap(value.toByteArray()))
);
);
}

@Override
Expand All @@ -269,26 +262,26 @@ public boolean update(TreeName treeName, ByteSequence key, UpdateFunction f) {
final ByteString oldValue=read(treeName,key);
final ByteSequence newValue=f.computeNewValue(oldValue);
if (Objects.equals(newValue, oldValue))
{
{
return false;
}
if (newValue == null)
{
delete(treeName, key);
return true;
}
put(treeName,key,newValue);
}
if (newValue == null)
{
delete(treeName, key);
return true;
}
put(treeName,key,newValue);
return true;
}

@Override
public boolean delete(TreeName treeName, ByteSequence key) {
checkReadOnly();
execute(
prepared.getUnchecked("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
.setByteBuffer("key", ByteBuffer.wrap(key.toByteArray()))
);
prepared.get("DELETE FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId and key=:key").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
.setByteBuffer("key", ByteBuffer.wrap(key.toByteArray()))
);
return true;
}

Expand Down Expand Up @@ -316,9 +309,9 @@ public CursorImpl(TransactionImpl tx,TreeName treeName) {

ResultSet full(){
return execute(
prepared.getUnchecked("SELECT key,value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId ORDER BY key").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
);
prepared.get("SELECT key,value FROM "+getTableName()+" WHERE baseDN=:baseDN and indexId=:indexId ORDER BY key").bind()
.setString("baseDN", treeName.getBaseDN()).setString("indexId", treeName.getIndexId())
);
}

@Override
Expand Down Expand Up @@ -424,7 +417,7 @@ public boolean positionToIndex(int index) {
return true;
}
ct++;
}
}
current=null;
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,66 @@
*/
package org.opends.server.backends.jdbc;

import com.google.common.cache.*;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;

import java.sql.*;
import java.util.LinkedList;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.*;

public class CachedConnection implements Connection {
final Connection parent;

static LoadingCache<String, BlockingQueue<CachedConnection>> cached= CacheBuilder.newBuilder()
.expireAfterAccess(Long.parseLong(System.getProperty("org.openidentityplatform.opendj.jdbc.ttl","15000")), TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener<String, BlockingQueue<CachedConnection>>() {
@Override
public void onRemoval(RemovalNotification<String, BlockingQueue<CachedConnection>> notification) {
assert notification.getValue() != null;
for (CachedConnection con: notification.getValue()) {
try {
if (!con.isClosed()) {
con.parent.close();
}
} catch (SQLException e) {
}
}
}
})
.build(new CacheLoader<String, BlockingQueue<CachedConnection>>() {
@Override
public BlockingQueue<CachedConnection> load(String connectionString) throws Exception {
return new LinkedBlockingQueue<>();
static LoadingCache<String, BlockingQueue<CachedConnection>> cached = Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMillis(Long.parseLong(System.getProperty("org.openidentityplatform.opendj.jdbc.ttl","15000"))))
.removalListener((String key, BlockingQueue<CachedConnection> value, RemovalCause cause) -> {
for (CachedConnection con : value) {
try {
if (!con.isClosed()) {
con.parent.close();
}
} catch (SQLException e) {
// ignore
}
});
}
})
.build(conStr -> new LinkedBlockingQueue<>());

final String connectionString;
public CachedConnection(String connectionString,Connection parent) {
this.connectionString=connectionString;
public CachedConnection(String connectionString, Connection parent) {
this.connectionString = connectionString;
this.parent = parent;
}

static Connection getConnection(String connectionString) throws Exception {
return getConnection(connectionString,0);
return getConnection(connectionString, 0);
}

static Connection getConnection(String connectionString, final int waitTime) throws Exception {
CachedConnection con=cached.get(connectionString).poll(waitTime,TimeUnit.MILLISECONDS);
while(con!=null) {
CachedConnection con = cached.get(connectionString).poll(waitTime, TimeUnit.MILLISECONDS);

while (con != null) {
if (!con.isValid(0)) {
try {
con.parent.close();
} catch (SQLException e) {
con=null;
con = null;
}
con=cached.get(connectionString).poll();
}else{
con = cached.get(connectionString).poll();
} else {
return con;
}
}
try {
final Connection conNew= DriverManager.getConnection(connectionString);
final Connection conNew = DriverManager.getConnection(connectionString);
conNew.setAutoCommit(false);
conNew.setTransactionIsolation(TRANSACTION_READ_COMMITTED);
return new CachedConnection(connectionString, conNew);
}catch (SQLException e) { //max_connection server error: try recursion for reuse connection
return getConnection(connectionString,(waitTime==0)?1:waitTime*2);
} catch (SQLException e) { // max_connection server error: try recursion for reuse connection
return getConnection(connectionString, (waitTime == 0) ? 1 : waitTime * 2);
}
}

Expand Down Expand Up @@ -127,11 +121,7 @@ public void rollback() throws SQLException {
@Override
public void close() throws SQLException {
rollback();
try {
cached.get(connectionString).add(this);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
cached.get(connectionString).add(this);
}

@Override
Expand Down Expand Up @@ -196,7 +186,7 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res

@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return parent.prepareCall(sql, resultSetType, resultSetConcurrency) ;
return parent.prepareCall(sql, resultSetType, resultSetConcurrency);
}

@Override
Expand Down
Loading