Skip to content

Commit 6e4b8e8

Browse files
committed
[improve][ml] Migrate managed-ledger from protobuf to LightProto
Migrate the managed-ledger module from Google protobuf to LightProto for ManagedLedgerInfo and ManagedCursorInfo proto messages. Key changes: - Replace protobuf Builder pattern with direct LightProto mutable objects - Replace ByteString with byte[] for bytes fields - Replace static parseFrom() with instance parseFrom(ByteBuf, size) - Reuse LightProto objects in ManagedCursorImpl and MetaStoreImpl to reduce allocations on hot paths - Use indexed loops (getXxxCount() + getXxxAt(i)) for repeated fields instead of getXxxList() to avoid list copy overhead - Update all test files to use LightProto API
1 parent 2bd9dd0 commit 6e4b8e8

45 files changed

Lines changed: 673 additions & 667 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

managed-ledger/pom.xml

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@
4343
</dependency>
4444

4545

46-
<dependency>
47-
<groupId>com.google.protobuf</groupId>
48-
<artifactId>protobuf-java</artifactId>
49-
</dependency>
5046

5147
<dependency>
5248
<groupId>${project.groupId}</groupId>
@@ -168,18 +164,13 @@
168164
</executions>
169165
</plugin>
170166
<plugin>
171-
<groupId>org.xolstice.maven.plugins</groupId>
172-
<artifactId>protobuf-maven-plugin</artifactId>
173-
<version>${protobuf-maven-plugin.version}</version>
174-
<configuration>
175-
<protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
176-
<checkStaleness>true</checkStaleness>
177-
</configuration>
167+
<groupId>com.github.splunk.lightproto</groupId>
168+
<artifactId>lightproto-maven-plugin</artifactId>
169+
<version>${lightproto-maven-plugin.version}</version>
178170
<executions>
179171
<execution>
180-
<phase>generate-sources</phase>
181172
<goals>
182-
<goal>compile</goal>
173+
<goal>generate</goal>
183174
</goals>
184175
</execution>
185176
</executions>

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.bookkeeper.client.api.ReadHandle;
2727
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
2828
import org.apache.bookkeeper.common.annotation.InterfaceStability;
29-
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
29+
import org.apache.bookkeeper.mledger.proto.OffloadContext;
3030
import org.apache.pulsar.common.policies.data.OffloadPolicies;
3131

3232
/**
@@ -198,7 +198,7 @@ CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
198198
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
199199
Map<String, String> offloadDriverMetadata);
200200

201-
default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
201+
default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, OffloadContext ledgerContext,
202202
Map<String, String> offloadDriverMetadata) {
203203
throw new UnsupportedOperationException();
204204
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
3636
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
3737
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
38-
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
38+
import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo;
3939
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
4040
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
4141
import org.jspecify.annotations.Nullable;

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import lombok.Data;
2222
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
2323
import org.apache.bookkeeper.common.annotation.InterfaceStability;
24-
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
24+
import org.apache.bookkeeper.mledger.proto.CompressionType;
2525

2626
/**
2727
* Configuration for a {@link ManagedLedgerFactory}.
@@ -134,7 +134,7 @@ public class ManagedLedgerFactoryConfig {
134134
/**
135135
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
136136
*/
137-
private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
137+
private String managedLedgerInfoCompressionType = CompressionType.NONE.name();
138138

139139
/**
140140
* ManagedLedgerInfo compression threshold. If the origin metadata size below configuration.
@@ -145,7 +145,7 @@ public class ManagedLedgerFactoryConfig {
145145
/**
146146
* ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data.
147147
*/
148-
private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
148+
private String managedCursorInfoCompressionType = CompressionType.NONE.name();
149149

150150
/**
151151
* ManagedCursorInfo compression threshold. If the origin metadata size below configuration.

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/MetadataCompressionConfig.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import lombok.AllArgsConstructor;
2222
import lombok.Data;
2323
import lombok.ToString;
24-
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
24+
import org.apache.bookkeeper.mledger.proto.CompressionType;
2525
import org.apache.commons.lang3.StringUtils;
2626

2727
@Data
2828
@AllArgsConstructor
2929
@ToString
3030
public class MetadataCompressionConfig {
31-
MLDataFormats.CompressionType compressionType;
31+
CompressionType compressionType;
3232
long compressSizeThresholdInBytes;
3333

3434
public MetadataCompressionConfig(String compressionType) throws IllegalArgumentException {
@@ -41,15 +41,15 @@ public MetadataCompressionConfig(String compressionType, long compressThreshold)
4141
}
4242

4343
public static MetadataCompressionConfig noCompression =
44-
new MetadataCompressionConfig(MLDataFormats.CompressionType.NONE, 0);
44+
new MetadataCompressionConfig(CompressionType.NONE, 0);
4545

46-
private MLDataFormats.CompressionType parseCompressionType(String value) throws IllegalArgumentException {
46+
private CompressionType parseCompressionType(String value) throws IllegalArgumentException {
4747
if (StringUtils.isEmpty(value)) {
48-
return MLDataFormats.CompressionType.NONE;
48+
return CompressionType.NONE;
4949
}
5050

51-
MLDataFormats.CompressionType compressionType;
52-
compressionType = MLDataFormats.CompressionType.valueOf(value);
51+
CompressionType compressionType;
52+
compressionType = CompressionType.valueOf(value);
5353

5454
return compressionType;
5555
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.bookkeeper.client.LedgerHandle;
2727
import org.apache.bookkeeper.mledger.Position;
2828
import org.apache.bookkeeper.mledger.PositionFactory;
29-
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
29+
import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
3030

3131
class EntryCountEstimator {
3232
// Prevent instantiation, this is a utility class with only static methods
@@ -61,15 +61,15 @@ static int estimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Posi
6161
* @param maxEntries stop further estimation if the number of estimated entries exceeds this value
6262
* @param maxSizeBytes the maximum size in bytes for the entries to be estimated
6363
* @param readPosition the position in the ledger from where to start reading
64-
* @param ledgersInfo a map of ledger ID to {@link MLDataFormats.ManagedLedgerInfo.LedgerInfo} containing
64+
* @param ledgersInfo a map of ledger ID to {@link ManagedLedgerInfo.LedgerInfo} containing
6565
* metadata for ledgers
6666
* @param lastLedgerId the ID of the last active ledger in the managed ledger
6767
* @param lastLedgerTotalEntries the total number of entries in the last active ledger
6868
* @param lastLedgerTotalSize the total size in bytes of the last active ledger
6969
* @return the estimated number of entries that can be read
7070
*/
7171
static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
72-
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
72+
NavigableMap<Long, ManagedLedgerInfo.LedgerInfo>
7373
ledgersInfo,
7474
Long lastLedgerId, long lastLedgerTotalEntries,
7575
long lastLedgerTotalSize) {
@@ -98,11 +98,11 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
9898
long remainingBytesSize = maxSizeBytes;
9999
long currentAvgSize = 0;
100100
// Get a collection of ledger info starting from the read position
101-
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
101+
Collection<ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
102102
ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();
103103

104104
// calculate the estimated entry count based on the remaining bytes and ledger metadata
105-
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
105+
for (ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
106106
if (remainingBytesSize <= 0 || estimatedEntryCount >= maxEntries) {
107107
// Stop processing if there are no more bytes remaining to allocate for entries
108108
// or if the estimated entry count exceeds the maximum allowed entries
@@ -159,9 +159,9 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
159159
if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
160160
// need to find the previous non-empty ledger to find the average size
161161
if (currentAvgSize == 0) {
162-
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersBeforeReadPosition =
162+
Collection<ManagedLedgerInfo.LedgerInfo> ledgersBeforeReadPosition =
163163
ledgersInfo.headMap(readPosition.getLedgerId(), false).descendingMap().values();
164-
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
164+
for (ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
165165
long ledgerTotalSize = ledgerInfo.getSize();
166166
long ledgerTotalEntries = ledgerInfo.getEntries();
167167
// Skip processing ledgers that have no entries or size
@@ -184,7 +184,7 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
184184
}
185185

186186
private static Position adjustReadPosition(Position readPosition,
187-
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
187+
NavigableMap<Long, ManagedLedgerInfo.LedgerInfo>
188188
ledgersInfo,
189189
Long lastLedgerId, long lastLedgerTotalEntries) {
190190
// Adjust the read position to ensure it falls within the valid range of available ledgers.
@@ -195,7 +195,7 @@ private static Position adjustReadPosition(Position readPosition,
195195
}
196196
long lastKey = ledgersInfo.lastKey();
197197
if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) {
198-
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
198+
Map.Entry<Long, ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
199199
if (lastEntry != null && lastEntry.getKey() == lastKey) {
200200
return PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
201201
}

0 commit comments

Comments
 (0)