diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core-client/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index ab862423b66..9f73dfc45bf 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -44,19 +44,24 @@ public class OmidTransactionContext implements PhoenixTransactionContext { private static final Logger LOGGER = LoggerFactory.getLogger(OmidTransactionContext.class); - private HBaseTransactionManager tm; + // The client is a thin view onto the OmidTransactionProvider singleton's + // current HBaseTransactionManager. Do not cache the manager itself here so that + // tests which swap the singleton's manager (via OmidTransactionProvider.injectTestService) + // do not leave this context holding a reference to a closed manager. In production the + // singleton's manager is set exactly once and never replaced, so resolving on each call + // is functionally identical to caching. + private OmidTransactionProvider.OmidTransactionClient client; private HBaseTransaction tx; public OmidTransactionContext() { this.tx = null; - this.tm = null; + this.client = null; } public OmidTransactionContext(PhoenixConnection connection) throws SQLException { - PhoenixTransactionClient client = - connection.getQueryServices().initTransactionClient(getProvider()); - assert (client instanceof OmidTransactionProvider.OmidTransactionClient); - this.tm = ((OmidTransactionProvider.OmidTransactionClient) client).getTransactionClient(); + PhoenixTransactionClient c = connection.getQueryServices().initTransactionClient(getProvider()); + assert (c instanceof OmidTransactionProvider.OmidTransactionClient); + this.client = (OmidTransactionProvider.OmidTransactionClient) c; this.tx = null; } @@ -64,8 +69,10 @@ public OmidTransactionContext(byte[] txnBytes) throws InvalidProtocolBufferExcep this(); if (txnBytes != null && txnBytes.length > 0) { TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(txnBytes); + HBaseTransactionManager m = tm(); tx = new HBaseTransaction(transaction.getTimestamp(), transaction.getEpoch(), - new HashSet(), new HashSet(), null, tm.isLowLatency()); + new HashSet(), new HashSet(), null, + m != null && m.isLowLatency()); } else { tx = null; } @@ -75,33 +82,40 @@ public OmidTransactionContext(PhoenixTransactionContext ctx, boolean subTask) { assert (ctx instanceof OmidTransactionContext); OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx; - this.tm = omidTransactionContext.tm; + this.client = omidTransactionContext.client; if (subTask) { if (omidTransactionContext.isTransactionRunning()) { Transaction transaction = omidTransactionContext.getTransaction(); + HBaseTransactionManager m = tm(); this.tx = new HBaseTransaction(transaction.getTransactionId(), transaction.getEpoch(), - new HashSet(), new HashSet(), this.tm, - transaction.getReadTimestamp(), transaction.getWriteTimestamp(), tm.isLowLatency()); + new HashSet(), new HashSet(), m, + transaction.getReadTimestamp(), transaction.getWriteTimestamp(), + m != null && m.isLowLatency()); } else { this.tx = null; } - this.tm = null; + this.client = null; } else { this.tx = omidTransactionContext.getTransaction(); } } + private HBaseTransactionManager tm() { + return client == null ? null : client.getTransactionClient(); + } + @Override public void begin() throws SQLException { - if (tm == null) { + HBaseTransactionManager m = tm(); + if (m == null) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build() .buildException(); } try { - tx = (HBaseTransaction) tm.begin(); + tx = (HBaseTransaction) m.begin(); } catch (TransactionException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) .setMessage(e.getMessage()).setRootCause(e).build().buildException(); @@ -110,10 +124,11 @@ public void begin() throws SQLException { @Override public void commit() throws SQLException { - if (tx == null || tm == null) return; + HBaseTransactionManager m = tm(); + if (tx == null || m == null) return; try { - tm.commit(tx); + m.commit(tx); } catch (TransactionException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) .setMessage(e.getMessage()).setRootCause(e).build().buildException(); @@ -125,12 +140,13 @@ public void commit() throws SQLException { @Override public void abort() throws SQLException { - if (tx == null || tm == null || tx.getStatus() != Status.RUNNING) { + HBaseTransactionManager m = tm(); + if (tx == null || m == null || tx.getStatus() != Status.RUNNING) { return; } try { - tm.rollback(tx); + m.rollback(tx); } catch (TransactionException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) .setMessage(e.getMessage()).setRootCause(e).build().buildException(); @@ -149,9 +165,14 @@ public void checkpoint(boolean hasUncommittedData) throws SQLException { @Override public void commitDDLFence(PTable dataTable) throws SQLException { + HBaseTransactionManager m = tm(); + if (m == null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build() + .buildException(); + } try { - tx = (HBaseTransaction) tm.fence(dataTable.getName().getBytes()); + tx = (HBaseTransaction) m.fence(dataTable.getName().getBytes()); if (LOGGER.isInfoEnabled()) { LOGGER.info("Added write fence at ~" + tx.getReadTimestamp()); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core-client/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java index d71472752a6..ba622f2f9c6 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java @@ -36,7 +36,9 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider(); - private HBaseTransactionManager transactionManager = null; + // volatile so that test-framework injection (injectTestService) is published to other + // threads without requiring all readers to synchronize on the singleton. + private volatile HBaseTransactionManager transactionManager = null; private volatile CommitTable.Client commitTableClient = null; public static final OmidTransactionProvider getInstance() { @@ -78,18 +80,22 @@ public PhoenixTransactionClient getTransactionClient(Configuration config, } } - return new OmidTransactionClient(transactionManager); + return new OmidTransactionClient(this); } + /** + * A thin view onto the {@link OmidTransactionProvider} singleton's current + * {@link HBaseTransactionManager}. + */ static class OmidTransactionClient implements PhoenixTransactionClient { - private final HBaseTransactionManager transactionManager; + private final OmidTransactionProvider provider; - public OmidTransactionClient(HBaseTransactionManager transactionManager) { - this.transactionManager = transactionManager; + public OmidTransactionClient(OmidTransactionProvider provider) { + this.provider = provider; } public HBaseTransactionManager getTransactionClient() { - return transactionManager; + return provider.transactionManager; } @Override