Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,35 @@ public class OmidTransactionContext implements PhoenixTransactionContext {

private static final Logger LOGGER = LoggerFactory.getLogger(OmidTransactionContext.class);

private HBaseTransactionManager tm;
// The client is a thin view onto the OmidTransactionProvider singleton's
// current HBaseTransactionManager. Do not cache the manager itself here so that
// tests which swap the singleton's manager (via OmidTransactionProvider.injectTestService)
// do not leave this context holding a reference to a closed manager. In production the
// singleton's manager is set exactly once and never replaced, so resolving on each call
// is functionally identical to caching.
private OmidTransactionProvider.OmidTransactionClient client;
private HBaseTransaction tx;

public OmidTransactionContext() {
this.tx = null;
this.tm = null;
this.client = null;
}

public OmidTransactionContext(PhoenixConnection connection) throws SQLException {
PhoenixTransactionClient client =
connection.getQueryServices().initTransactionClient(getProvider());
assert (client instanceof OmidTransactionProvider.OmidTransactionClient);
this.tm = ((OmidTransactionProvider.OmidTransactionClient) client).getTransactionClient();
PhoenixTransactionClient c = connection.getQueryServices().initTransactionClient(getProvider());
assert (c instanceof OmidTransactionProvider.OmidTransactionClient);
this.client = (OmidTransactionProvider.OmidTransactionClient) c;
this.tx = null;
}

public OmidTransactionContext(byte[] txnBytes) throws InvalidProtocolBufferException {
this();
if (txnBytes != null && txnBytes.length > 0) {
TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(txnBytes);
HBaseTransactionManager m = tm();
tx = new HBaseTransaction(transaction.getTimestamp(), transaction.getEpoch(),
new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), null, tm.isLowLatency());
new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), null,
m != null && m.isLowLatency());
} else {
tx = null;
}
Expand All @@ -75,33 +82,40 @@ public OmidTransactionContext(PhoenixTransactionContext ctx, boolean subTask) {
assert (ctx instanceof OmidTransactionContext);
OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx;

this.tm = omidTransactionContext.tm;
this.client = omidTransactionContext.client;

if (subTask) {
if (omidTransactionContext.isTransactionRunning()) {
Transaction transaction = omidTransactionContext.getTransaction();
HBaseTransactionManager m = tm();
this.tx = new HBaseTransaction(transaction.getTransactionId(), transaction.getEpoch(),
new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), this.tm,
transaction.getReadTimestamp(), transaction.getWriteTimestamp(), tm.isLowLatency());
new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), m,
transaction.getReadTimestamp(), transaction.getWriteTimestamp(),
m != null && m.isLowLatency());
} else {
this.tx = null;
}

this.tm = null;
this.client = null;
} else {
this.tx = omidTransactionContext.getTransaction();
}
}

private HBaseTransactionManager tm() {
return client == null ? null : client.getTransactionClient();
}

@Override
public void begin() throws SQLException {
if (tm == null) {
HBaseTransactionManager m = tm();
if (m == null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
.buildException();
}

try {
tx = (HBaseTransaction) tm.begin();
tx = (HBaseTransaction) m.begin();
} catch (TransactionException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build().buildException();
Expand All @@ -110,10 +124,11 @@ public void begin() throws SQLException {

@Override
public void commit() throws SQLException {
if (tx == null || tm == null) return;
HBaseTransactionManager m = tm();
if (tx == null || m == null) return;

try {
tm.commit(tx);
m.commit(tx);
} catch (TransactionException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build().buildException();
Expand All @@ -125,12 +140,13 @@ public void commit() throws SQLException {

@Override
public void abort() throws SQLException {
if (tx == null || tm == null || tx.getStatus() != Status.RUNNING) {
HBaseTransactionManager m = tm();
if (tx == null || m == null || tx.getStatus() != Status.RUNNING) {
return;
}

try {
tm.rollback(tx);
m.rollback(tx);
} catch (TransactionException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED)
.setMessage(e.getMessage()).setRootCause(e).build().buildException();
Expand All @@ -149,9 +165,14 @@ public void checkpoint(boolean hasUncommittedData) throws SQLException {

@Override
public void commitDDLFence(PTable dataTable) throws SQLException {
HBaseTransactionManager m = tm();
if (m == null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
.buildException();
}

try {
tx = (HBaseTransaction) tm.fence(dataTable.getName().getBytes());
tx = (HBaseTransaction) m.fence(dataTable.getName().getBytes());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Added write fence at ~" + tx.getReadTimestamp());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
public class OmidTransactionProvider implements PhoenixTransactionProvider {
private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();

private HBaseTransactionManager transactionManager = null;
// volatile so that test-framework injection (injectTestService) is published to other
// threads without requiring all readers to synchronize on the singleton.
private volatile HBaseTransactionManager transactionManager = null;
private volatile CommitTable.Client commitTableClient = null;

public static final OmidTransactionProvider getInstance() {
Expand Down Expand Up @@ -78,18 +80,22 @@ public PhoenixTransactionClient getTransactionClient(Configuration config,
}
}

return new OmidTransactionClient(transactionManager);
return new OmidTransactionClient(this);
}

/**
* A thin view onto the {@link OmidTransactionProvider} singleton's current
* {@link HBaseTransactionManager}.
*/
static class OmidTransactionClient implements PhoenixTransactionClient {
private final HBaseTransactionManager transactionManager;
private final OmidTransactionProvider provider;

public OmidTransactionClient(HBaseTransactionManager transactionManager) {
this.transactionManager = transactionManager;
public OmidTransactionClient(OmidTransactionProvider provider) {
this.provider = provider;
}

public HBaseTransactionManager getTransactionClient() {
return transactionManager;
return provider.transactionManager;
}

@Override
Expand Down