Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,8 @@ private List<ProducerBatch> drainBatchesForOneNode(MetadataSnapshot metadataSnap
OptionalInt leaderEpoch = metadataSnapshot.leaderEpochFor(tp);

final ProducerBatch batch;
final boolean isTransactional;
final boolean needsTxnStateUpdate;
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst();
Expand All @@ -896,33 +898,35 @@ private List<ProducerBatch> drainBatchesForOneNode(MetadataSnapshot metadataSnap
}

batch = deque.pollFirst();
isTransactional = transactionManager != null && transactionManager.isTransactional();
needsTxnStateUpdate = transactionManager != null
&& transactionManager.producerIdAndEpoch() != null
&& !batch.hasSequence();
}

boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
if (producerIdAndEpoch != null && !batch.hasSequence()) {
// If the producer id/epoch of the partition do not match the latest one
// of the producer, we update it and reset the sequence. This should be
// only done when all its in-flight batches have completed. This is guarantee
// in `shouldStopDrainBatchesForPartition`.
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);

// If the batch already has an assigned sequence, then we should not change the producer id and
// sequence number, since this may introduce duplicates. In particular, the previous attempt
// may actually have been accepted, and if we change the producer id and sequence here, this
// attempt will also be accepted, causing a duplicate.
//
// Additionally, we update the next sequence number bound for the partition, and also have
// the transaction manager track the batch so as to ensure that sequence ordering is maintained
// even if we receive out of order responses.
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);

transactionManager.addInFlightBatch(batch);
}
if (transactionManager != null && needsTxnStateUpdate) {
// If the producer id/epoch of the partition do not match the latest one
// of the producer, we update it and reset the sequence. This should be
// only done when all its in-flight batches have completed. This is guarantee
// in `shouldStopDrainBatchesForPartition`.
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
ProducerIdAndEpoch producerIdAndEpoch = transactionManager.producerIdAndEpoch();

// If the batch already has an assigned sequence, then we should not change the producer id and
// sequence number, since this may introduce duplicates. In particular, the previous attempt
// may actually have been accepted, and if we change the producer id and sequence here, this
// attempt will also be accepted, causing a duplicate.
//
// Additionally, we update the next sequence number bound for the partition, and also have
// the transaction manager track the batch so as to ensure that sequence ordering is maintained
// even if we receive out of order responses.
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);

transactionManager.addInFlightBatch(batch);
}

// the rest of the work by processing outside the lock
Expand Down