diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index ef6a03d107040..87ebff5655b02 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -43,10 +43,6 @@
-
- com.google.protobuf
- protobuf-java
-
${project.groupId}
@@ -168,18 +164,13 @@
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- ${protobuf-maven-plugin.version}
-
- com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}
- true
-
+ io.streamnative.lightproto
+ lightproto-maven-plugin
+ ${lightproto-maven-plugin.version}
- generate-sources
- compile
+ generate
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 11148ef1a59f5..c1740ba9ff3a3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -26,7 +26,7 @@
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.OffloadContext;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
@@ -198,7 +198,7 @@ CompletableFuture readOffloaded(long ledgerId, UUID uid,
CompletableFuture deleteOffloaded(long ledgerId, UUID uid,
Map offloadDriverMetadata);
- default CompletableFuture readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
+ default CompletableFuture readOffloaded(long ledgerId, OffloadContext ledgerContext,
Map offloadDriverMetadata) {
throw new UnsupportedOperationException();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 8fb083bcd026c..0455f0efa8bb6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -35,7 +35,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.jspecify.annotations.Nullable;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 592fd2d385e5c..a915d651bd6b2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -21,7 +21,7 @@
import lombok.Data;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.CompressionType;
/**
* Configuration for a {@link ManagedLedgerFactory}.
@@ -134,7 +134,7 @@ public class ManagedLedgerFactoryConfig {
/**
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
*/
- private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
+ private String managedLedgerInfoCompressionType = CompressionType.NONE.name();
/**
* ManagedLedgerInfo compression threshold. If the origin metadata size below configuration.
@@ -145,7 +145,7 @@ public class ManagedLedgerFactoryConfig {
/**
* ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data.
*/
- private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
+ private String managedCursorInfoCompressionType = CompressionType.NONE.name();
/**
* ManagedCursorInfo compression threshold. If the origin metadata size below configuration.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/MetadataCompressionConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/MetadataCompressionConfig.java
index 7ce23192663e1..e246f73acb90c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/MetadataCompressionConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/MetadataCompressionConfig.java
@@ -21,14 +21,14 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.CompressionType;
import org.apache.commons.lang3.StringUtils;
@Data
@AllArgsConstructor
@ToString
public class MetadataCompressionConfig {
- MLDataFormats.CompressionType compressionType;
+ CompressionType compressionType;
long compressSizeThresholdInBytes;
public MetadataCompressionConfig(String compressionType) throws IllegalArgumentException {
@@ -41,15 +41,15 @@ public MetadataCompressionConfig(String compressionType, long compressThreshold)
}
public static MetadataCompressionConfig noCompression =
- new MetadataCompressionConfig(MLDataFormats.CompressionType.NONE, 0);
+ new MetadataCompressionConfig(CompressionType.NONE, 0);
- private MLDataFormats.CompressionType parseCompressionType(String value) throws IllegalArgumentException {
+ private CompressionType parseCompressionType(String value) throws IllegalArgumentException {
if (StringUtils.isEmpty(value)) {
- return MLDataFormats.CompressionType.NONE;
+ return CompressionType.NONE;
}
- MLDataFormats.CompressionType compressionType;
- compressionType = MLDataFormats.CompressionType.valueOf(value);
+ CompressionType compressionType;
+ compressionType = CompressionType.valueOf(value);
return compressionType;
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
index a5d41bad6a8d8..7ad45249e3c3c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java
@@ -26,7 +26,7 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
class EntryCountEstimator {
// Prevent instantiation, this is a utility class with only static methods
@@ -61,7 +61,7 @@ static int estimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Posi
* @param maxEntries stop further estimation if the number of estimated entries exceeds this value
* @param maxSizeBytes the maximum size in bytes for the entries to be estimated
* @param readPosition the position in the ledger from where to start reading
- * @param ledgersInfo a map of ledger ID to {@link MLDataFormats.ManagedLedgerInfo.LedgerInfo} containing
+ * @param ledgersInfo a map of ledger ID to {@link ManagedLedgerInfo.LedgerInfo} containing
* metadata for ledgers
* @param lastLedgerId the ID of the last active ledger in the managed ledger
* @param lastLedgerTotalEntries the total number of entries in the last active ledger
@@ -69,7 +69,7 @@ static int estimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Posi
* @return the estimated number of entries that can be read
*/
static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
- NavigableMap
+ NavigableMap
ledgersInfo,
Long lastLedgerId, long lastLedgerTotalEntries,
long lastLedgerTotalSize) {
@@ -98,11 +98,11 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
long remainingBytesSize = maxSizeBytes;
long currentAvgSize = 0;
// Get a collection of ledger info starting from the read position
- Collection ledgersAfterReadPosition =
+ Collection ledgersAfterReadPosition =
ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();
// calculate the estimated entry count based on the remaining bytes and ledger metadata
- for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
+ for (ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
if (remainingBytesSize <= 0 || estimatedEntryCount >= maxEntries) {
// Stop processing if there are no more bytes remaining to allocate for entries
// or if the estimated entry count exceeds the maximum allowed entries
@@ -159,9 +159,9 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
// need to find the previous non-empty ledger to find the average size
if (currentAvgSize == 0) {
- Collection ledgersBeforeReadPosition =
+ Collection ledgersBeforeReadPosition =
ledgersInfo.headMap(readPosition.getLedgerId(), false).descendingMap().values();
- for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
+ for (ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
long ledgerTotalSize = ledgerInfo.getSize();
long ledgerTotalEntries = ledgerInfo.getEntries();
// Skip processing ledgers that have no entries or size
@@ -184,7 +184,7 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
}
private static Position adjustReadPosition(Position readPosition,
- NavigableMap
+ NavigableMap
ledgersInfo,
Long lastLedgerId, long lastLedgerTotalEntries) {
// Adjust the read position to ensure it falls within the valid range of available ledgers.
@@ -195,7 +195,7 @@ private static Position adjustReadPosition(Position readPosition,
}
long lastKey = ledgersInfo.lastKey();
if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) {
- Map.Entry lastEntry = ledgersInfo.lastEntry();
+ Map.Entry lastEntry = ledgersInfo.lastEntry();
if (lastEntry != null && lastEntry.getKey() == lastKey) {
return PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7009ac750fe15..4267016ac8383 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -32,7 +32,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;
-import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -60,8 +59,8 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
+import java.util.function.IntFunction;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
import java.util.stream.LongStream;
import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -94,15 +93,15 @@
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
+import org.apache.bookkeeper.mledger.proto.BatchedEntryDeletionIndexInfo;
+import org.apache.bookkeeper.mledger.proto.LongListMap;
+import org.apache.bookkeeper.mledger.proto.LongProperty;
+import org.apache.bookkeeper.mledger.proto.ManagedCursorInfo;
+import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.proto.MessageRange;
+import org.apache.bookkeeper.mledger.proto.NestedPositionInfo;
+import org.apache.bookkeeper.mledger.proto.PositionInfo;
+import org.apache.bookkeeper.mledger.proto.StringProperty;
import org.apache.bookkeeper.mledger.util.ManagedLedgerUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -212,6 +211,9 @@ public class ManagedCursorImpl implements ManagedCursor {
@Nullable protected final ConcurrentSkipListMap batchDeletedIndexes;
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+ // Reusable LightProto object for cursor position serialization (used only from persistPositionToLedger)
+ private final PositionInfo reusablePositionInfo = new PositionInfo();
+
private RateLimiter markDeleteLimiter;
// The cursor is considered "dirty" when there are mark-delete updates that are only applied in memory,
// because of the rate limiting.
@@ -428,11 +430,10 @@ private CompletableFuture computeCursorProperties(
return updateCursorPropertiesResult;
}
- ManagedCursorInfo copy = ManagedCursorInfo
- .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
- .clearCursorProperties()
- .addAllCursorProperties(buildStringPropertiesMap(newProperties))
- .build();
+ ManagedCursorInfo copy = new ManagedCursorInfo();
+ copy.copyFrom(ManagedCursorImpl.this.managedCursorInfo);
+ copy.clearCursorProperties();
+ copy.addAllCursorProperties(buildStringPropertiesMap(newProperties));
final Stat lastCursorLedgerStat = ManagedCursorImpl.this.cursorLedgerStat;
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
@@ -561,7 +562,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
// Recover properties map
recoveredCursorProperties = new HashMap<>();
for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
- StringProperty property = info.getCursorProperties(i);
+ StringProperty property = info.getCursorPropertyAt(i);
recoveredCursorProperties.put(property.getName(), property.getValue());
}
}
@@ -573,7 +574,8 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
Position recoveredPosition = PositionFactory.create(info.getMarkDeleteLedgerId(),
info.getMarkDeleteEntryId());
if (info.getIndividualDeletedMessagesCount() > 0) {
- recoverIndividualDeletedMessages(info.getIndividualDeletedMessagesList());
+ recoverIndividualDeletedMessages(info.getIndividualDeletedMessagesCount(),
+ info::getIndividualDeletedMessageAt);
}
Map recoveredProperties = Collections.emptyMap();
@@ -581,7 +583,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
// Recover properties map
recoveredProperties = new HashMap<>();
for (int i = 0; i < info.getPropertiesCount(); i++) {
- LongProperty property = info.getProperties(i);
+ LongProperty property = info.getPropertyAt(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}
@@ -656,10 +658,10 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
LedgerEntry entry = seq.nextElement();
mbean.addReadCursorLedgerSize(entry.getLength());
- PositionInfo positionInfo;
+ PositionInfo positionInfo = new PositionInfo();
try {
- positionInfo = PositionInfo.parseFrom(entry.getEntry());
- } catch (InvalidProtocolBufferException e) {
+ positionInfo.parseFrom(entry.getEntry());
+ } catch (Exception e) {
callback.operationFailed(new ManagedLedgerException(e));
return;
}
@@ -669,7 +671,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
// Recover properties map
recoveredProperties = new HashMap<>();
for (int i = 0; i < positionInfo.getPropertiesCount(); i++) {
- LongProperty property = positionInfo.getProperties(i);
+ LongProperty property = positionInfo.getPropertyAt(i);
recoveredProperties.put(property.getName(), property.getValue());
}
}
@@ -677,8 +679,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
Position position = PositionFactory.create(positionInfo.getLedgerId(), positionInfo.getEntryId());
recoverIndividualDeletedMessages(positionInfo);
if (getConfig().isDeletionAtBatchIndexLevelEnabled()
- && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
- recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
+ && positionInfo.getBatchedEntryDeletionIndexInfosCount() > 0) {
+ recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfosCount(),
+ positionInfo::getBatchedEntryDeletionIndexInfoAt);
}
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
@@ -696,13 +699,20 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
public void recoverIndividualDeletedMessages(PositionInfo positionInfo) {
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
- recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
+ recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesCount(),
+ positionInfo::getIndividualDeletedMessageAt);
} else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) {
- List rangeList = positionInfo.getIndividualDeletedMessageRangesList();
lock.writeLock().lock();
try {
- Map rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
- list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));
+ Map rangeMap = new HashMap<>(positionInfo.getIndividualDeletedMessageRangesCount());
+ for (int i = 0; i < positionInfo.getIndividualDeletedMessageRangesCount(); i++) {
+ LongListMap list = positionInfo.getIndividualDeletedMessageRangeAt(i);
+ long[] values = new long[list.getValuesCount()];
+ for (int idx = 0; idx < values.length; idx++) {
+ values[idx] = list.getValueAt(idx);
+ }
+ rangeMap.put(list.getKey(), values);
+ }
// Guarantee compatability for the config "unackedRangesOpenCacheSetEnabled".
if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
individualDeletedMessages.build(rangeMap);
@@ -738,12 +748,10 @@ private List buildLongPropertiesMap(Map properties) {
if (ranges == null || ranges.length <= 0) {
return;
}
- org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap.Builder lmBuilder = LongListMap.newBuilder()
- .setKey(id);
+ LongListMap lm = new LongListMap().setKey(id);
for (long range : ranges) {
- lmBuilder.addValues(range);
+ lm.addValue(range);
}
- LongListMap lm = lmBuilder.build();
longListMap.add(lm);
serializedSize.add(lm.getSerializedSize());
});
@@ -751,13 +759,14 @@ private List buildLongPropertiesMap(Map properties) {
return longListMap;
}
- private void recoverIndividualDeletedMessages(List individualDeletedMessagesList) {
+ private void recoverIndividualDeletedMessages(int count, IntFunction accessor) {
lock.writeLock().lock();
try {
individualDeletedMessages.clear();
- individualDeletedMessagesList.forEach(messageRange -> {
- MLDataFormats.NestedPositionInfo lowerEndpoint = messageRange.getLowerEndpoint();
- MLDataFormats.NestedPositionInfo upperEndpoint = messageRange.getUpperEndpoint();
+ for (int i = 0; i < count; i++) {
+ MessageRange messageRange = accessor.apply(i);
+ NestedPositionInfo lowerEndpoint = messageRange.getLowerEndpoint();
+ NestedPositionInfo upperEndpoint = messageRange.getUpperEndpoint();
if (lowerEndpoint.getLedgerId() == upperEndpoint.getLedgerId()) {
individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(),
@@ -782,29 +791,29 @@ private void recoverIndividualDeletedMessages(List i
individualDeletedMessages.addOpenClosed(upperEndpoint.getLedgerId(), -1,
upperEndpoint.getLedgerId(), upperEndpoint.getEntryId());
}
- });
+ }
} finally {
lock.writeLock().unlock();
}
}
- private void recoverBatchDeletedIndexes (
- List batchDeletedIndexInfoList) {
+ private void recoverBatchDeletedIndexes(int count, IntFunction accessor) {
Objects.requireNonNull(batchDeletedIndexes);
lock.writeLock().lock();
try {
this.batchDeletedIndexes.clear();
- batchDeletedIndexInfoList.forEach(batchDeletedIndexInfo -> {
- if (batchDeletedIndexInfo.getDeleteSetCount() > 0) {
- long[] array = new long[batchDeletedIndexInfo.getDeleteSetCount()];
- for (int i = 0; i < batchDeletedIndexInfo.getDeleteSetList().size(); i++) {
- array[i] = batchDeletedIndexInfo.getDeleteSetList().get(i);
+ for (int i = 0; i < count; i++) {
+ BatchedEntryDeletionIndexInfo batchDeletedIndexInfo = accessor.apply(i);
+ if (batchDeletedIndexInfo.getDeleteSetsCount() > 0) {
+ long[] array = new long[batchDeletedIndexInfo.getDeleteSetsCount()];
+ for (int j = 0; j < array.length; j++) {
+ array[j] = batchDeletedIndexInfo.getDeleteSetAt(j);
}
this.batchDeletedIndexes.put(
PositionFactory.create(batchDeletedIndexInfo.getPosition().getLedgerId(),
batchDeletedIndexInfo.getPosition().getEntryId()), BitSet.valueOf(array));
}
- });
+ }
} finally {
lock.writeLock().unlock();
}
@@ -2967,18 +2976,18 @@ private void persistPositionMetaStore(long cursorsLedgerId, Position position, M
// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
- ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
- .setCursorsLedgerId(cursorsLedgerId) //
- .setMarkDeleteLedgerId(position.getLedgerId()) //
- .setMarkDeleteEntryId(position.getEntryId()) //
- .setLastActive(lastActive); //
+ ManagedCursorInfo info = new ManagedCursorInfo()
+ .setCursorsLedgerId(cursorsLedgerId)
+ .setMarkDeleteLedgerId(position.getLedgerId())
+ .setMarkDeleteEntryId(position.getEntryId())
+ .setLastActive(lastActive);
info.addAllProperties(buildPropertiesMap(properties));
info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
- info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList());
+ info.addAllBatchedEntryDeletionIndexInfos(buildBatchEntryDeletionIndexInfoList());
}
}
@@ -2986,7 +2995,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, Position position, M
log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, position);
}
- ManagedCursorInfo cursorInfo = info.build();
+ ManagedCursorInfo cursorInfo = info;
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, cursorInfo, lastCursorLedgerStat,
new MetaStoreCallback() {
@Override
@@ -3341,7 +3350,7 @@ private static List buildPropertiesMap(Map propertie
List longProperties = new ArrayList<>();
properties.forEach((name, value) -> {
- LongProperty lp = LongProperty.newBuilder().setName(name).setValue(value).build();
+ LongProperty lp = new LongProperty().setName(name).setValue(value);
longProperties.add(lp);
});
@@ -3355,14 +3364,14 @@ private static List buildStringPropertiesMap(Map
List stringProperties = new ArrayList<>();
properties.forEach((name, value) -> {
- StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
+ StringProperty sp = new StringProperty().setName(name).setValue(value);
stringProperties.add(sp);
});
return stringProperties;
}
- private List buildIndividualDeletedMessageRanges() {
+ private List buildIndividualDeletedMessageRanges() {
lock.writeLock().lock();
try {
if (individualDeletedMessages.isEmpty()) {
@@ -3370,30 +3379,17 @@ private List buildIndividualDeletedMessageRanges() {
return Collections.emptyList();
}
- MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
- .newBuilder();
-
- MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange
- .newBuilder();
-
AtomicInteger acksSerializedSize = new AtomicInteger(0);
List rangeList = new ArrayList<>();
individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
- MLDataFormats.NestedPositionInfo lowerPosition = nestedPositionBuilder
+ MessageRange messageRange = new MessageRange();
+ messageRange.setLowerEndpoint()
.setLedgerId(lowerKey)
- .setEntryId(lowerValue)
- .build();
-
- MLDataFormats.NestedPositionInfo upperPosition = nestedPositionBuilder
+ .setEntryId(lowerValue);
+ messageRange.setUpperEndpoint()
.setLedgerId(upperKey)
- .setEntryId(upperValue)
- .build();
-
- MessageRange messageRange = messageRangeBuilder
- .setLowerEndpoint(lowerPosition)
- .setUpperEndpoint(upperPosition)
- .build();
+ .setEntryId(upperValue);
acksSerializedSize.addAndGet(messageRange.getSerializedSize());
rangeList.add(messageRange);
@@ -3409,31 +3405,25 @@ private List buildIndividualDeletedMessageRanges() {
}
}
- private List buildBatchEntryDeletionIndexInfoList() {
+ private List buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
if (batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
- MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
- .newBuilder();
- MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats
- .BatchedEntryDeletionIndexInfo.newBuilder();
- List result = new ArrayList<>();
+ List result = new ArrayList<>();
final var iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < getConfig().getMaxBatchDeletedIndexToPersist()) {
final var entry = iterator.next();
- nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
- nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
- batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
+ BatchedEntryDeletionIndexInfo batchDeletedIndexInfo = new BatchedEntryDeletionIndexInfo();
+ batchDeletedIndexInfo.setPosition()
+ .setLedgerId(entry.getKey().getLedgerId())
+ .setEntryId(entry.getKey().getEntryId());
long[] array = entry.getValue().toLongArray();
- List deleteSet = new ArrayList<>(array.length);
for (long l : array) {
- deleteSet.add(l);
+ batchDeletedIndexInfo.addDeleteSet(l);
}
- batchDeletedIndexInfoBuilder.clearDeleteSet();
- batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet);
- result.add(batchDeletedIndexInfoBuilder.build());
+ result.add(batchDeletedIndexInfo);
}
return result;
} finally {
@@ -3444,9 +3434,11 @@ private List buildBatchEntryDeletio
void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback,
boolean ignoreClosedStateAfterFailure) {
Position position = mdEntry.newPosition;
- Builder piBuilder = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
+ PositionInfo pi = reusablePositionInfo;
+ pi.clear();
+ pi.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId())
- .addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList())
+ .addAllBatchedEntryDeletionIndexInfos(buildBatchEntryDeletionIndexInfoList())
.addAllProperties(buildPropertiesMap(mdEntry.properties));
Map internalRanges = null;
@@ -3470,11 +3462,10 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}
}
if (internalRanges != null && !internalRanges.isEmpty()) {
- piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));
+ pi.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));
} else {
- piBuilder.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
+ pi.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
}
- PositionInfo pi = piBuilder.build();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(),
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b8b5b78a62c0d..4e530a185cce1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -26,7 +26,6 @@
import com.google.common.collect.BoundType;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
-import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
@@ -72,11 +71,8 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
-import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
-import org.apache.bookkeeper.mledger.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
-import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
import org.apache.bookkeeper.mledger.MetadataCompressionConfig;
import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats;
import org.apache.bookkeeper.mledger.Position;
@@ -90,10 +86,14 @@
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
+import org.apache.bookkeeper.mledger.proto.KeyValue;
+import org.apache.bookkeeper.mledger.proto.LongProperty;
+import org.apache.bookkeeper.mledger.proto.ManagedCursorInfo;
+import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.proto.MessageRange;
+import org.apache.bookkeeper.mledger.proto.OffloadContext;
+import org.apache.bookkeeper.mledger.proto.PositionInfo;
import org.apache.bookkeeper.mledger.util.Errors;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -741,16 +741,17 @@ public CompletableFuture asyncExists(String ledgerName) {
}
@Override
- public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException {
+ public org.apache.bookkeeper.mledger.ManagedLedgerInfo getManagedLedgerInfo(String name)
+ throws InterruptedException, ManagedLedgerException {
class Result {
- ManagedLedgerInfo info = null;
+ org.apache.bookkeeper.mledger.ManagedLedgerInfo info = null;
ManagedLedgerException e = null;
}
final Result r = new Result();
final CountDownLatch latch = new CountDownLatch(1);
asyncGetManagedLedgerInfo(name, new ManagedLedgerInfoCallback() {
@Override
- public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
+ public void getInfoComplete(org.apache.bookkeeper.mledger.ManagedLedgerInfo info, Object ctx) {
r.info = info;
latch.countDown();
}
@@ -773,17 +774,18 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
@Override
public void asyncGetManagedLedgerInfo(String name, ManagedLedgerInfoCallback callback, Object ctx) {
store.getManagedLedgerInfo(name, false /* createIfMissing */,
- new MetaStoreCallback() {
+ new MetaStoreCallback() {
@Override
- public void operationComplete(MLDataFormats.ManagedLedgerInfo pbInfo, Stat stat) {
- ManagedLedgerInfo info = new ManagedLedgerInfo();
+ public void operationComplete(ManagedLedgerInfo pbInfo, Stat stat) {
+ org.apache.bookkeeper.mledger.ManagedLedgerInfo info =
+ new org.apache.bookkeeper.mledger.ManagedLedgerInfo();
info.version = stat.getVersion();
info.creationDate = DateFormatter.format(stat.getCreationTimestamp());
info.modificationDate = DateFormatter.format(stat.getModificationTimestamp());
- info.ledgers = new ArrayList<>(pbInfo.getLedgerInfoCount());
+ info.ledgers = new ArrayList<>(pbInfo.getLedgerInfosCount());
if (pbInfo.hasTerminatedPosition()) {
- info.terminatedPosition = new PositionInfo();
+ info.terminatedPosition = new org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo();
info.terminatedPosition.ledgerId = pbInfo.getTerminatedPosition().getLedgerId();
info.terminatedPosition.entryId = pbInfo.getTerminatedPosition().getEntryId();
}
@@ -791,21 +793,22 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo pbInfo, Stat stat)
if (pbInfo.getPropertiesCount() > 0) {
info.properties = new TreeMap();
for (int i = 0; i < pbInfo.getPropertiesCount(); i++) {
- MLDataFormats.KeyValue property = pbInfo.getProperties(i);
+ KeyValue property = pbInfo.getPropertyAt(i);
info.properties.put(property.getKey(), property.getValue());
}
}
- for (int i = 0; i < pbInfo.getLedgerInfoCount(); i++) {
- MLDataFormats.ManagedLedgerInfo.LedgerInfo pbLedgerInfo = pbInfo.getLedgerInfo(i);
- LedgerInfo ledgerInfo = new LedgerInfo();
+ for (int i = 0; i < pbInfo.getLedgerInfosCount(); i++) {
+ LedgerInfo pbLedgerInfo = pbInfo.getLedgerInfoAt(i);
+ org.apache.bookkeeper.mledger.ManagedLedgerInfo.LedgerInfo ledgerInfo =
+ new org.apache.bookkeeper.mledger.ManagedLedgerInfo.LedgerInfo();
ledgerInfo.ledgerId = pbLedgerInfo.getLedgerId();
ledgerInfo.entries = pbLedgerInfo.hasEntries() ? pbLedgerInfo.getEntries() : null;
ledgerInfo.size = pbLedgerInfo.hasSize() ? pbLedgerInfo.getSize() : null;
ledgerInfo.timestamp = pbLedgerInfo.hasTimestamp() ? pbLedgerInfo.getTimestamp() : null;
ledgerInfo.isOffloaded = pbLedgerInfo.hasOffloadContext();
if (pbLedgerInfo.hasOffloadContext()) {
- MLDataFormats.OffloadContext offloadContext = pbLedgerInfo.getOffloadContext();
+ OffloadContext offloadContext = pbLedgerInfo.getOffloadContext();
UUID uuid = new UUID(offloadContext.getUidMsb(), offloadContext.getUidLsb());
ledgerInfo.offloadedContextUuid = uuid.toString();
}
@@ -823,7 +826,7 @@ public void operationComplete(List cursorsList, Stat stat) {
CompletableFuture cursorFuture = new CompletableFuture<>();
cursorsFutures.add(cursorFuture);
store.asyncGetCursorInfo(name, cursorName,
- new MetaStoreCallback() {
+ new MetaStoreCallback() {
@Override
public void operationComplete(ManagedCursorInfo pbCursorInfo, Stat stat) {
CursorInfo cursorInfo = new CursorInfo();
@@ -835,7 +838,9 @@ public void operationComplete(ManagedCursorInfo pbCursorInfo, Stat stat) {
cursorInfo.cursorsLedgerId = pbCursorInfo.getCursorsLedgerId();
if (pbCursorInfo.hasMarkDeleteLedgerId()) {
- cursorInfo.markDelete = new PositionInfo();
+ cursorInfo.markDelete =
+ new org.apache.bookkeeper.mledger.ManagedLedgerInfo
+ .PositionInfo();
cursorInfo.markDelete.ledgerId = pbCursorInfo.getMarkDeleteLedgerId();
cursorInfo.markDelete.entryId = pbCursorInfo.getMarkDeleteEntryId();
}
@@ -843,7 +848,7 @@ public void operationComplete(ManagedCursorInfo pbCursorInfo, Stat stat) {
if (pbCursorInfo.getPropertiesCount() > 0) {
cursorInfo.properties = new TreeMap();
for (int i = 0; i < pbCursorInfo.getPropertiesCount(); i++) {
- LongProperty property = pbCursorInfo.getProperties(i);
+ LongProperty property = pbCursorInfo.getPropertyAt(i);
cursorInfo.properties.put(property.getName(), property.getValue());
}
}
@@ -852,7 +857,8 @@ public void operationComplete(ManagedCursorInfo pbCursorInfo, Stat stat) {
cursorInfo.individualDeletedMessages = new ArrayList<>();
for (int i = 0; i < pbCursorInfo
.getIndividualDeletedMessagesCount(); i++) {
- MessageRange range = pbCursorInfo.getIndividualDeletedMessages(i);
+ MessageRange range =
+ pbCursorInfo.getIndividualDeletedMessageAt(i);
MessageRangeInfo rangeInfo = new MessageRangeInfo();
rangeInfo.from.ledgerId = range.getLowerEndpoint().getLedgerId();
rangeInfo.from.entryId = range.getLowerEndpoint().getEntryId();
@@ -963,7 +969,7 @@ void deleteManagedLedger(String managedLedgerName, CompletableFuture {
// First delete all cursors resources
List> futures = info.cursors.entrySet().stream()
@@ -985,17 +991,19 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}, ctx);
}
- private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, ManagedLedgerInfo info,
+ private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName,
+ org.apache.bookkeeper.mledger.ManagedLedgerInfo info,
CompletableFuture mlConfigFuture,
DeleteLedgerCallback callback, Object ctx) {
- final CompletableFuture