Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@
</dependency>


<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
Expand Down Expand Up @@ -168,18 +164,13 @@
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
<checkStaleness>true</checkStaleness>
</configuration>
<groupId>io.streamnative.lightproto</groupId>
<artifactId>lightproto-maven-plugin</artifactId>
<version>${lightproto-maven-plugin.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
<goal>generate</goal>
</goals>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.OffloadContext;
import org.apache.pulsar.common.policies.data.OffloadPolicies;

/**
Expand Down Expand Up @@ -198,7 +198,7 @@ CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata);

default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, OffloadContext ledgerContext,
Map<String, String> offloadDriverMetadata) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.jspecify.annotations.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.Data;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.CompressionType;

/**
* Configuration for a {@link ManagedLedgerFactory}.
Expand Down Expand Up @@ -134,7 +134,7 @@ public class ManagedLedgerFactoryConfig {
/**
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
*/
private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
private String managedLedgerInfoCompressionType = CompressionType.NONE.name();

/**
* ManagedLedgerInfo compression threshold. If the origin metadata size below configuration.
Expand All @@ -145,7 +145,7 @@ public class ManagedLedgerFactoryConfig {
/**
* ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data.
*/
private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
private String managedCursorInfoCompressionType = CompressionType.NONE.name();

/**
* ManagedCursorInfo compression threshold. If the origin metadata size below configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.CompressionType;
import org.apache.commons.lang3.StringUtils;

@Data
@AllArgsConstructor
@ToString
public class MetadataCompressionConfig {
MLDataFormats.CompressionType compressionType;
CompressionType compressionType;
long compressSizeThresholdInBytes;

public MetadataCompressionConfig(String compressionType) throws IllegalArgumentException {
Expand All @@ -41,15 +41,15 @@ public MetadataCompressionConfig(String compressionType, long compressThreshold)
}

public static MetadataCompressionConfig noCompression =
new MetadataCompressionConfig(MLDataFormats.CompressionType.NONE, 0);
new MetadataCompressionConfig(CompressionType.NONE, 0);

private MLDataFormats.CompressionType parseCompressionType(String value) throws IllegalArgumentException {
private CompressionType parseCompressionType(String value) throws IllegalArgumentException {
if (StringUtils.isEmpty(value)) {
return MLDataFormats.CompressionType.NONE;
return CompressionType.NONE;
}

MLDataFormats.CompressionType compressionType;
compressionType = MLDataFormats.CompressionType.valueOf(value);
CompressionType compressionType;
compressionType = CompressionType.valueOf(value);

return compressionType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;

class EntryCountEstimator {
// Prevent instantiation, this is a utility class with only static methods
Expand Down Expand Up @@ -61,15 +61,15 @@ static int estimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Posi
* @param maxEntries stop further estimation if the number of estimated entries exceeds this value
* @param maxSizeBytes the maximum size in bytes for the entries to be estimated
* @param readPosition the position in the ledger from where to start reading
* @param ledgersInfo a map of ledger ID to {@link MLDataFormats.ManagedLedgerInfo.LedgerInfo} containing
* @param ledgersInfo a map of ledger ID to {@link ManagedLedgerInfo.LedgerInfo} containing
* metadata for ledgers
* @param lastLedgerId the ID of the last active ledger in the managed ledger
* @param lastLedgerTotalEntries the total number of entries in the last active ledger
* @param lastLedgerTotalSize the total size in bytes of the last active ledger
* @return the estimated number of entries that can be read
*/
static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
NavigableMap<Long, ManagedLedgerInfo.LedgerInfo>
ledgersInfo,
Long lastLedgerId, long lastLedgerTotalEntries,
long lastLedgerTotalSize) {
Expand Down Expand Up @@ -98,11 +98,11 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
long remainingBytesSize = maxSizeBytes;
long currentAvgSize = 0;
// Get a collection of ledger info starting from the read position
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
Collection<ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();

// calculate the estimated entry count based on the remaining bytes and ledger metadata
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
for (ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
if (remainingBytesSize <= 0 || estimatedEntryCount >= maxEntries) {
// Stop processing if there are no more bytes remaining to allocate for entries
// or if the estimated entry count exceeds the maximum allowed entries
Expand Down Expand Up @@ -159,9 +159,9 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
// need to find the previous non-empty ledger to find the average size
if (currentAvgSize == 0) {
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersBeforeReadPosition =
Collection<ManagedLedgerInfo.LedgerInfo> ledgersBeforeReadPosition =
ledgersInfo.headMap(readPosition.getLedgerId(), false).descendingMap().values();
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
for (ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
long ledgerTotalSize = ledgerInfo.getSize();
long ledgerTotalEntries = ledgerInfo.getEntries();
// Skip processing ledgers that have no entries or size
Expand All @@ -184,7 +184,7 @@ static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeByt
}

private static Position adjustReadPosition(Position readPosition,
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
NavigableMap<Long, ManagedLedgerInfo.LedgerInfo>
ledgersInfo,
Long lastLedgerId, long lastLedgerTotalEntries) {
// Adjust the read position to ensure it falls within the valid range of available ledgers.
Expand All @@ -195,7 +195,7 @@ private static Position adjustReadPosition(Position readPosition,
}
long lastKey = ledgersInfo.lastKey();
if (lastLedgerId == null && readPosition.getLedgerId() > lastKey) {
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
Map.Entry<Long, ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
if (lastEntry != null && lastEntry.getKey() == lastKey) {
return PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
}
Expand Down
Loading
Loading