Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,33 @@ default CompletableFuture<List<GetResult>> findByIndex(
new MetadataStoreException("Secondary index queries not supported by this store"));
}

/**
* Stream all direct children of {@code parentPath} together with their values.
*
* <p>This is the value-bearing counterpart to {@link #getChildren} — same semantics for
* what counts as a "child" (one hierarchical level below {@code parentPath}, no
* descendants), but each record carries the value and {@link Stat} alongside the path.
* Results are delivered to {@code consumer} as they become available so callers don't
* have to materialize a potentially-large list in memory.
*
* <p>The consumer's {@link ScanConsumer#onNext} is invoked for each child, then either
* {@link ScanConsumer#onCompleted} (success) or {@link ScanConsumer#onError} (failure)
* exactly once. The returned future completes when the scan terminates and mirrors the
* terminal callback — callers may rely on either.
*
* <p>Backends with a native range-scan primitive (Oxia, RocksDB, in-memory NavigableMap)
* issue a single store-side scan. Other backends fall back to {@link #getChildren} +
* sequential {@link #get}, at the cost of one extra round trip per child.
*
* @param parentPath path whose direct children should be streamed
* @param consumer callback that receives records, completion, or an error
* @return a future that completes when the scan terminates
*/
default CompletableFuture<Void> scanChildren(String parentPath, ScanConsumer consumer) {
return CompletableFuture.failedFuture(
new MetadataStoreException("scanChildren not supported by this store"));
}

/**
* Returns the default metadata cache config.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.api;

/**
* Streaming consumer for {@link MetadataStore#scanChildren} results.
*
* <p>The store invokes {@link #onNext} for each child record (in key order), and then either
* {@link #onCompleted} (success) or {@link #onError} (failure) exactly once. Implementations must
* be safe to invoke from a metadata-store internal thread; back-pressure is the consumer's
* responsibility (long blocking work in {@code onNext} can stall the scan).
*/
public interface ScanConsumer {

/**
* Called once per record. The result's {@link Stat#getPath()} carries the full key.
*
* @param result a child record under the requested parent path
*/
void onNext(GetResult result);

/**
* Called at most once when the scan fails. After this call no further callbacks are made.
*
* @param throwable the cause of the failure
*/
void onError(Throwable throwable);

/**
* Called at most once when the scan finishes without error. After this call no further
* callbacks are made.
*/
void onCompleted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -529,6 +531,55 @@ public CompletableFuture<List<GetResult>> findByIndex(
return storeFindByIndex(scanPathPrefix, indexName, secondaryKey, fallbackFilter);
}

@Override
public CompletableFuture<Void> scanChildren(String parentPath, ScanConsumer consumer) {
if (isClosed()) {
CompletableFuture<Void> failed = alreadyClosedFailedFuture();
failed.whenComplete((__, ex) -> {
if (ex != null) {
consumer.onError(ex);
}
});
return failed;
}
if (parentPath == null) {
MetadataStoreException ex = new MetadataStoreException("parentPath must be non-null");
consumer.onError(ex);
return FutureUtil.failedFuture(ex);
}
return storeScanChildren(parentPath, consumer);
}

/**
* Backend hook for {@link #scanChildren}. The default implementation lists the parent's
* children with {@link #getChildrenFromStore} and fetches each value sequentially with
* {@link #storeGet}. Backends with a native range-scan primitive (Oxia, RocksDB,
* in-memory NavigableMap) override this method for a single store-side scan.
*/
protected CompletableFuture<Void> storeScanChildren(String parentPath, ScanConsumer consumer) {
CompletableFuture<Void> result = new CompletableFuture<>();
getChildrenFromStore(parentPath).thenCompose(children -> {
CompletableFuture<Void> chain = CompletableFuture.completedFuture(null);
for (String child : children) {
String childPath = parentPath.equals("/") ? "/" + child : parentPath + "/" + child;
chain = chain.thenCompose(__ -> storeGet(childPath))
.thenAccept(opt -> opt.ifPresent(consumer::onNext));
}
return chain;
}).whenComplete((v, ex) -> {
if (ex != null) {
Throwable cause = ex instanceof CompletionException && ex.getCause() != null
? ex.getCause() : ex;
consumer.onError(cause);
result.completeExceptionally(cause);
} else {
consumer.onCompleted();
result.complete(null);
}
});
return result;
}

protected CompletableFuture<List<GetResult>> storeFindByIndex(
String scanPathPrefix, String indexName, String secondaryKey,
Predicate<GetResult> fallbackFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -309,6 +310,16 @@ public CompletableFuture<List<GetResult>> findByIndex(
};
}

@Override
public CompletableFuture<Void> scanChildren(String parentPath, ScanConsumer consumer) {
return switch (migrationState.getPhase()) {
case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.scanChildren(parentPath, consumer);
case COMPLETED ->
targetStore.scanChildren(parentPath, consumer);
};
}

@Override
public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> expectedVersion,
EnumSet<CreateOption> options, Map<String, String> secondaryIndexes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand All @@ -60,6 +61,7 @@ public enum OperationType {
EXISTS,
PUT,
DELETE,
SCAN_CHILDREN,
}

@Data
Expand Down Expand Up @@ -155,6 +157,17 @@ public CompletableFuture<Void> deleteRecursive(String path) {
return store.deleteRecursive(path);
}

@Override
public CompletableFuture<Void> scanChildren(String parentPath, ScanConsumer consumer) {
Optional<MetadataStoreException> ex = programmedFailure(OperationType.SCAN_CHILDREN, parentPath);
if (ex.isPresent()) {
consumer.onError(ex.get());
return FutureUtil.failedFuture(ex.get());
}

return store.scanChildren(parentPath, consumer);
}

@Override
public void registerListener(Consumer<Notification> listener) {
store.registerListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -121,6 +122,39 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {
}
}

@Override
protected CompletableFuture<Void> storeScanChildren(String parentPath, ScanConsumer consumer) {
// Snapshot the immediate children under the lock, then dispatch outside it so a slow
// consumer can't stall other store operations.
List<GetResult> snapshot = new ArrayList<>();
synchronized (map) {
String firstKey = parentPath.equals("/") ? "/" : parentPath + "/";
String lastKey = parentPath.equals("/") ? "0" : parentPath + "0";
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
// Filter to direct children only — paths with no further "/" beyond the
// parent's level. Same scoping `getChildrenFromStore` applies.
int relStart = firstKey.length();
if (key.indexOf('/', relStart) >= 0) {
return;
}
snapshot.add(new GetResult(
value.data,
new Stat(key, value.version, value.createdTimestamp, value.modifiedTimestamp,
value.isEphemeral(), true)));
});
}
try {
for (GetResult r : snapshot) {
consumer.onNext(r);
}
consumer.onCompleted();
return CompletableFuture.completedFuture(null);
} catch (Throwable t) {
consumer.onError(t);
return FutureUtil.failedFuture(t);
}
}

@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
if (!isValidPath(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.rocksdb.ColumnFamilyDescriptor;
Expand Down Expand Up @@ -406,6 +407,85 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {
}
}

@Override
protected CompletableFuture<Void> storeScanChildren(String parentPath, ScanConsumer consumer) {
// Native iterator-based scan over the parent's key range, with the same direct-child
// filter getChildrenFromStore applies. Snapshot under the read lock then dispatch
// outside it.
List<GetResult> snapshot = new ArrayList<>();
try {
dbStateLock.readLock().lock();
if (isClosed()) {
CompletableFuture<Void> failed = alreadyClosedFailedFuture();
failed.whenComplete((__, ex) -> {
if (ex != null) {
consumer.onError(ex);
}
});
return failed;
}
String firstKey = parentPath.equals("/") ? "/" : parentPath + "/";
String lastKey = parentPath.equals("/") ? "0" : parentPath + "0";
byte[] endBytes = toBytes(lastKey);
try (RocksIterator iterator = db.newIterator(optionDontCache)) {
for (iterator.seek(toBytes(firstKey)); iterator.isValid(); iterator.next()) {
byte[] keyBytes = iterator.key();
if (compareUnsigned(keyBytes, endBytes) >= 0) {
break;
}
String currentPath = toString(keyBytes);
// Direct children only.
if (currentPath.indexOf('/', firstKey.length()) >= 0) {
continue;
}
byte[] value = iterator.value();
if (value == null) {
continue;
}
MetaValue metaValue = MetaValue.parse(value);
if (metaValue.ephemeral && metaValue.owner != instanceId) {
// Ephemeral record left behind by a different session; skip.
continue;
}
snapshot.add(new GetResult(metaValue.getData(),
new Stat(currentPath,
metaValue.getVersion(),
metaValue.getCreatedTimestamp(),
metaValue.getModifiedTimestamp(),
metaValue.ephemeral,
metaValue.getOwner() == instanceId)));
}
}
} catch (Throwable e) {
MetadataStoreException ex = MetadataStoreException.wrap(e);
consumer.onError(ex);
return FutureUtil.failedFuture(ex);
} finally {
dbStateLock.readLock().unlock();
}
try {
for (GetResult r : snapshot) {
consumer.onNext(r);
}
consumer.onCompleted();
return CompletableFuture.completedFuture(null);
} catch (Throwable t) {
consumer.onError(t);
return FutureUtil.failedFuture(t);
}
}

private static int compareUnsigned(byte[] a, byte[] b) {
int len = Math.min(a.length, b.length);
for (int i = 0; i < len; i++) {
int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
if (diff != 0) {
return diff;
}
}
return a.length - b.length;
}

@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
log.debug().attr("path", path).attr("instanceId", instanceId).log("getChildrenFromStore");
Expand Down
Loading
Loading