diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index 19e50ed699842..b2d9080094327 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -287,6 +287,33 @@ default CompletableFuture> findByIndex( new MetadataStoreException("Secondary index queries not supported by this store")); } + /** + * Stream all direct children of {@code parentPath} together with their values. + * + *

This is the value-bearing counterpart to {@link #getChildren} — same semantics for + * what counts as a "child" (one hierarchical level below {@code parentPath}, no + * descendants), but each record carries the value and {@link Stat} alongside the path. + * Results are delivered to {@code consumer} as they become available so callers don't + * have to materialize a potentially-large list in memory. + * + *

The consumer's {@link ScanConsumer#onNext} is invoked for each child, then either + * {@link ScanConsumer#onCompleted} (success) or {@link ScanConsumer#onError} (failure) + * exactly once. The returned future completes when the scan terminates and mirrors the + * terminal callback — callers may rely on either. + * + *

Backends with a native range-scan primitive (Oxia, RocksDB, in-memory NavigableMap) + * issue a single store-side scan. Other backends fall back to {@link #getChildren} + + * sequential {@link #get}, at the cost of one extra round trip per child. + * + * @param parentPath path whose direct children should be streamed + * @param consumer callback that receives records, completion, or an error + * @return a future that completes when the scan terminates + */ + default CompletableFuture scanChildren(String parentPath, ScanConsumer consumer) { + return CompletableFuture.failedFuture( + new MetadataStoreException("scanChildren not supported by this store")); + } + /** * Returns the default metadata cache config. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java new file mode 100644 index 0000000000000..24ce6c15ac5cf --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.api; + +/** + * Streaming consumer for {@link MetadataStore#scanChildren} results. + * + *

The store invokes {@link #onNext} for each child record (in key order), and then either + * {@link #onCompleted} (success) or {@link #onError} (failure) exactly once. Implementations must + * be safe to invoke from a metadata-store internal thread; back-pressure is the consumer's + * responsibility (long blocking work in {@code onNext} can stall the scan). + */ +public interface ScanConsumer { + + /** + * Called once per record. The result's {@link Stat#getPath()} carries the full key. + * + * @param result a child record under the requested parent path + */ + void onNext(GetResult result); + + /** + * Called at most once when the scan fails. After this call no further callbacks are made. + * + * @param throwable the cause of the failure + */ + void onError(Throwable throwable); + + /** + * Called at most once when the scan finishes without error. After this call no further + * callbacks are made. + */ + void onCompleted(); +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 6442ece216e8f..02086898e9f96 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -41,6 +41,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -69,6 +70,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.ScanConsumer; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -529,6 +531,55 @@ public CompletableFuture> findByIndex( return storeFindByIndex(scanPathPrefix, indexName, secondaryKey, fallbackFilter); } + @Override + public CompletableFuture scanChildren(String parentPath, ScanConsumer consumer) { + if (isClosed()) { + CompletableFuture failed = alreadyClosedFailedFuture(); + failed.whenComplete((__, ex) -> { + if (ex != null) { + consumer.onError(ex); + } + }); + return failed; + } + if (parentPath == null) { + MetadataStoreException ex = new MetadataStoreException("parentPath must be non-null"); + consumer.onError(ex); + return FutureUtil.failedFuture(ex); + } + return storeScanChildren(parentPath, consumer); + } + + /** + * Backend hook for {@link #scanChildren}. The default implementation lists the parent's + * children with {@link #getChildrenFromStore} and fetches each value sequentially with + * {@link #storeGet}. Backends with a native range-scan primitive (Oxia, RocksDB, + * in-memory NavigableMap) override this method for a single store-side scan. + */ + protected CompletableFuture storeScanChildren(String parentPath, ScanConsumer consumer) { + CompletableFuture result = new CompletableFuture<>(); + getChildrenFromStore(parentPath).thenCompose(children -> { + CompletableFuture chain = CompletableFuture.completedFuture(null); + for (String child : children) { + String childPath = parentPath.equals("/") ? "/" + child : parentPath + "/" + child; + chain = chain.thenCompose(__ -> storeGet(childPath)) + .thenAccept(opt -> opt.ifPresent(consumer::onNext)); + } + return chain; + }).whenComplete((v, ex) -> { + if (ex != null) { + Throwable cause = ex instanceof CompletionException && ex.getCause() != null + ? ex.getCause() : ex; + consumer.onError(cause); + result.completeExceptionally(cause); + } else { + consumer.onCompleted(); + result.complete(null); + } + }); + return result; + } + protected CompletableFuture> storeFindByIndex( String scanPathPrefix, String indexName, String secondaryKey, Predicate fallbackFilter) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java index 7096588701274..bcc6b7226da41 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java @@ -51,6 +51,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreLifecycle; import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.ScanConsumer; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -309,6 +310,16 @@ public CompletableFuture> findByIndex( }; } + @Override + public CompletableFuture scanChildren(String parentPath, ScanConsumer consumer) { + return switch (migrationState.getPhase()) { + case NOT_STARTED, PREPARATION, COPYING, FAILED -> + sourceStore.scanChildren(parentPath, consumer); + case COMPLETED -> + targetStore.scanChildren(parentPath, consumer); + }; + } + @Override public CompletableFuture put(String path, byte[] value, Optional expectedVersion, EnumSet options, Map secondaryIndexes) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java index f409115ed9c42..de988b6b1c898 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java @@ -38,6 +38,7 @@ import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.ScanConsumer; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -60,6 +61,7 @@ public enum OperationType { EXISTS, PUT, DELETE, + SCAN_CHILDREN, } @Data @@ -155,6 +157,17 @@ public CompletableFuture deleteRecursive(String path) { return store.deleteRecursive(path); } + @Override + public CompletableFuture scanChildren(String parentPath, ScanConsumer consumer) { + Optional ex = programmedFailure(OperationType.SCAN_CHILDREN, parentPath); + if (ex.isPresent()) { + consumer.onError(ex.get()); + return FutureUtil.failedFuture(ex.get()); + } + + return store.scanChildren(parentPath, consumer); + } + @Override public void registerListener(Consumer listener) { store.registerListener(listener); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 4679d410edd98..7c7bd31e29b41 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -45,6 +45,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreProvider; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.ScanConsumer; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -121,6 +122,39 @@ public CompletableFuture> storeGet(String path) { } } + @Override + protected CompletableFuture storeScanChildren(String parentPath, ScanConsumer consumer) { + // Snapshot the immediate children under the lock, then dispatch outside it so a slow + // consumer can't stall other store operations. + List snapshot = new ArrayList<>(); + synchronized (map) { + String firstKey = parentPath.equals("/") ? "/" : parentPath + "/"; + String lastKey = parentPath.equals("/") ? "0" : parentPath + "0"; + map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> { + // Filter to direct children only — paths with no further "/" beyond the + // parent's level. Same scoping `getChildrenFromStore` applies. + int relStart = firstKey.length(); + if (key.indexOf('/', relStart) >= 0) { + return; + } + snapshot.add(new GetResult( + value.data, + new Stat(key, value.version, value.createdTimestamp, value.modifiedTimestamp, + value.isEphemeral(), true))); + }); + } + try { + for (GetResult r : snapshot) { + consumer.onNext(r); + } + consumer.onCompleted(); + return CompletableFuture.completedFuture(null); + } catch (Throwable t) { + consumer.onError(t); + return FutureUtil.failedFuture(t); + } + } + @Override public CompletableFuture> getChildrenFromStore(String path) { if (!isValidPath(path)) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 4ad7e3de2fd4c..3a7d7abe5d25a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -53,6 +53,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreProvider; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.ScanConsumer; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.rocksdb.ColumnFamilyDescriptor; @@ -406,6 +407,85 @@ public CompletableFuture> storeGet(String path) { } } + @Override + protected CompletableFuture storeScanChildren(String parentPath, ScanConsumer consumer) { + // Native iterator-based scan over the parent's key range, with the same direct-child + // filter getChildrenFromStore applies. Snapshot under the read lock then dispatch + // outside it. + List snapshot = new ArrayList<>(); + try { + dbStateLock.readLock().lock(); + if (isClosed()) { + CompletableFuture failed = alreadyClosedFailedFuture(); + failed.whenComplete((__, ex) -> { + if (ex != null) { + consumer.onError(ex); + } + }); + return failed; + } + String firstKey = parentPath.equals("/") ? "/" : parentPath + "/"; + String lastKey = parentPath.equals("/") ? "0" : parentPath + "0"; + byte[] endBytes = toBytes(lastKey); + try (RocksIterator iterator = db.newIterator(optionDontCache)) { + for (iterator.seek(toBytes(firstKey)); iterator.isValid(); iterator.next()) { + byte[] keyBytes = iterator.key(); + if (compareUnsigned(keyBytes, endBytes) >= 0) { + break; + } + String currentPath = toString(keyBytes); + // Direct children only. + if (currentPath.indexOf('/', firstKey.length()) >= 0) { + continue; + } + byte[] value = iterator.value(); + if (value == null) { + continue; + } + MetaValue metaValue = MetaValue.parse(value); + if (metaValue.ephemeral && metaValue.owner != instanceId) { + // Ephemeral record left behind by a different session; skip. + continue; + } + snapshot.add(new GetResult(metaValue.getData(), + new Stat(currentPath, + metaValue.getVersion(), + metaValue.getCreatedTimestamp(), + metaValue.getModifiedTimestamp(), + metaValue.ephemeral, + metaValue.getOwner() == instanceId))); + } + } + } catch (Throwable e) { + MetadataStoreException ex = MetadataStoreException.wrap(e); + consumer.onError(ex); + return FutureUtil.failedFuture(ex); + } finally { + dbStateLock.readLock().unlock(); + } + try { + for (GetResult r : snapshot) { + consumer.onNext(r); + } + consumer.onCompleted(); + return CompletableFuture.completedFuture(null); + } catch (Throwable t) { + consumer.onError(t); + return FutureUtil.failedFuture(t); + } + } + + private static int compareUnsigned(byte[] a, byte[] b) { + int len = Math.min(a.length, b.length); + for (int i = 0; i < len; i++) { + int diff = (a[i] & 0xFF) - (b[i] & 0xFF); + if (diff != 0) { + return diff; + } + } + return a.length - b.length; + } + @Override public CompletableFuture> getChildrenFromStore(String path) { log.debug().attr("path", path).attr("instanceId", instanceId).log("getChildrenFromStore"); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index 13cefa126236e..21bbba615d4cf 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -51,6 +51,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.ScanConsumer; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; @@ -216,6 +217,41 @@ protected CompletableFuture storePut( return doStorePut(path, data, optExpectedVersion, options, secondaryIndexes); } + @Override + protected CompletableFuture storeScanChildren(String parentPath, ScanConsumer consumer) { + // Oxia's hierarchical sort makes [parentPath + "/", parentPath + "//") the canonical + // range covering exactly the immediate children — same convention getChildrenFromStore + // uses with `client.list(...)`. + String firstKey = parentPath.endsWith("/") ? parentPath : parentPath + "/"; + String lastKey = firstKey + "/"; + CompletableFuture done = new CompletableFuture<>(); + try { + client.rangeScan(firstKey, lastKey, new io.oxia.client.api.RangeScanConsumer() { + @Override + public void onNext(io.oxia.client.api.GetResult result) { + consumer.onNext(new GetResult(result.value(), + convertStat(result.key(), result.version()))); + } + + @Override + public void onError(Throwable throwable) { + consumer.onError(throwable); + done.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + consumer.onCompleted(); + done.complete(null); + } + }); + } catch (Throwable t) { + consumer.onError(t); + done.completeExceptionally(t); + } + return done; + } + @Override protected CompletableFuture> storeFindByIndex( String scanPathPrefix, String indexName, String secondaryKey, diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java index d55ad072dff42..fee0955744c7e 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java @@ -146,6 +146,7 @@ public Object[][] zkImplementations() { return filterImplementations("ZooKeeper", "MockZooKeeper"); } + protected Object[][] filterImplementations(String... providers) { Set providersSet = Set.of(providers); return Arrays.stream(allImplementations()) diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java new file mode 100644 index 0000000000000..37eea03713cc2 --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import lombok.Cleanup; +import org.apache.pulsar.metadata.api.GetResult; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.ScanConsumer; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.testng.annotations.Test; + +public class MetadataStoreScanChildrenTest extends BaseMetadataStoreTest { + + /** Records all callbacks for assertion in tests. */ + private static final class CollectingConsumer implements ScanConsumer { + final List records = new ArrayList<>(); + final AtomicReference error = new AtomicReference<>(); + final CountDownLatch done = new CountDownLatch(1); + + @Override + public void onNext(GetResult result) { + records.add(result); + } + + @Override + public void onError(Throwable throwable) { + error.set(throwable); + done.countDown(); + } + + @Override + public void onCompleted() { + done.countDown(); + } + + void awaitDone() throws InterruptedException { + assertTrue(done.await(30, TimeUnit.SECONDS), "scan did not terminate within 30s"); + } + } + + @Test(dataProvider = "impl") + public void streamsAllChildrenWithValues(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create( + urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String parent = newKey(); + Set expectedNames = Set.of("a", "b", "c"); + for (String name : expectedNames) { + store.put(parent + "/" + name, name.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).join(); + } + + CollectingConsumer consumer = new CollectingConsumer(); + store.scanChildren(parent, consumer).join(); + consumer.awaitDone(); + + assertEquals(consumer.records.size(), 3); + Set seenPaths = new HashSet<>(); + Set seenValues = new HashSet<>(); + for (GetResult r : consumer.records) { + seenPaths.add(r.getStat().getPath()); + seenValues.add(new String(r.getValue(), StandardCharsets.UTF_8)); + } + assertEquals(seenPaths, Set.of(parent + "/a", parent + "/b", parent + "/c")); + assertEquals(seenValues, expectedNames); + } + + @Test(dataProvider = "impl") + public void parentWithNoChildrenCompletesEmpty(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create( + urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String parent = newKey(); + + CollectingConsumer consumer = new CollectingConsumer(); + store.scanChildren(parent, consumer).join(); + consumer.awaitDone(); + + assertEquals(consumer.records.size(), 0); + } + + @Test(dataProvider = "impl") + public void doesNotEmitDescendantsBeyondImmediateChildren(String provider, Supplier urlSupplier) + throws Exception { + // scanChildren is hierarchy-aware: deeper paths (children of children) are NOT emitted. + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create( + urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String parent = newKey(); + String child = parent + "/child"; + String grandchild = child + "/inner"; + + store.put(child, "C".getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).join(); + store.put(grandchild, "G".getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).join(); + + CollectingConsumer consumer = new CollectingConsumer(); + store.scanChildren(parent, consumer).join(); + consumer.awaitDone(); + + assertEquals(consumer.records.size(), 1); + assertEquals(consumer.records.get(0).getStat().getPath(), child); + } + + @Test(dataProvider = "impl") + public void closedStoreRejectsScan(String provider, Supplier urlSupplier) throws Exception { + MetadataStoreExtended store = MetadataStoreExtended.create( + urlSupplier.get(), MetadataStoreConfig.builder().build()); + store.close(); + + CollectingConsumer consumer = new CollectingConsumer(); + CompletionException ex = expectThrows(CompletionException.class, + () -> store.scanChildren("/anything", consumer).join()); + assertTrue(ex.getCause() instanceof MetadataStoreException.AlreadyClosedException, + "expected AlreadyClosedException, got: " + ex.getCause()); + } +}