From 04cd2df4b75d181e3e5764dfbf2e7f8f4b2f18f4 Mon Sep 17 00:00:00 2001 From: Chris Green Date: Fri, 20 Mar 2026 15:30:38 -0500 Subject: [PATCH] pool: enhance migration pool choice logic Motivation: The pool destination choice/prioritization should take into account accessibility, existing load, hostname, and more when replicating to mitigate hot file issues. Modification: Create `PoolListByPoolMgrQuery` and utilize in `MigrationModule.reportFileRequest` to ensure that destination pools are accessible (at least) to the file request that triggered the migration. Result: Hot file migration destination pools are now prioritized, and match network and protocol requirements of the file request that triggered the migration. Acked-by: Dmitry Litvintsev Patch: https://rb.dcache.org/r/14629/diff/raw Commit: Target: master Request: 11.2 Require-book: no Require-notes: yes --- docs/dev/hotfile-replication.md | 173 ++++++++ .../pool/classic/FileRequestMonitor.java | 6 +- .../java/org/dcache/pool/classic/PoolV4.java | 3 +- .../pool/migration/MigrationModule.java | 60 ++- .../migration/PoolListByPoolMgrQuery.java | 177 ++++++++ .../pool/migration/MigrationModuleTest.java | 131 +++++- .../migration/PoolListByPoolMgrQueryTest.java | 413 ++++++++++++++++++ 7 files changed, 950 insertions(+), 13 deletions(-) create mode 100644 docs/dev/hotfile-replication.md create mode 100644 modules/dcache/src/main/java/org/dcache/pool/migration/PoolListByPoolMgrQuery.java create mode 100644 modules/dcache/src/test/java/org/dcache/pool/migration/PoolListByPoolMgrQueryTest.java diff --git a/docs/dev/hotfile-replication.md b/docs/dev/hotfile-replication.md new file mode 100644 index 00000000000..5422b8c3d5a --- /dev/null +++ b/docs/dev/hotfile-replication.md @@ -0,0 +1,173 @@ +# Hot File Replication + +Hot file replication automatically creates additional replicas of files that are being read +frequently (i.e. "hot" files), distributing load across the pool fleet. + +## How It Works + +### Protocol-Agnostic Design + +All dCache read protocols (DCap, NFS/pNFS, WebDAV, xrootd, FTP, HTTP, pool-to-pool) send a +`PoolIoFileMessage` to the pool when a client requests a file. The pool dispatches that message +to `PoolV4.ioFile()`, which is therefore a single, protocol-independent entry point for every +read request. Hot file monitoring is implemented at that point: + +``` +Any Door (DCap / NFS / WebDAV / xrootd / FTP / HTTP / …) + → sends PoolIoFileMessage + → Pool.messageArrived(PoolIoFileMessage) + → PoolV4.ioFile() ← monitoring happens here + → queues mover (protocol-specific) + → FileRequestMonitor.reportFileRequest(pnfsId, currentCount, protocolInfo) +``` + +Because the counting and triggering happen in `PoolV4.ioFile()`, no protocol-specific code +changes are required to benefit from this feature. + +### Request Counting + +`PoolV4.ioFile()` reads the concurrent mover count for the file from `IoQueueManager`: + +```java +long requestCount = _ioQueue.numberOfRequestsFor(message.getPnfsId()); +_fileRequestMonitor.reportFileRequest(message.getPnfsId(), requestCount, + message.getProtocolInfo()); +``` + +When `requestCount` reaches or exceeds the configured threshold, +`MigrationModule.reportFileRequest()` creates a migration job named +`hotfile-` that replicates the file to additional pools. + +### Pool Selection + +The migration job selects target pools by querying PoolManager via `PoolMgrQueryPoolsMsg`, +deriving `protocolUnit` from the triggering request's `ProtocolInfo` (e.g., `"DCap/3"`) and +`netUnitName` from the client's IP address when available (e.g., `"192.168.1.10"`). When the +client IP is not available (non-IP protocol or unknown), an empty string is used for `netUnitName`, +which causes PoolManager to match any network unit. When `ProtocolInfo` is null (e.g., for +internal pool-to-pool transfers), `protocolUnit` falls back to `"*/*"` and `netUnitName` to `""` +so that selection is based solely on the file's storage group and pool-group read preferences. + +`PoolMgrQueryPoolsMsg.getPools()` returns a `List[]` where index 0 is the highest +read-preference level. `PoolListByPoolMgrQuery` selects **only** the first non-empty +preference level, so the file is always replicated to the best available pools: + +```java +// Only take the first non-empty preference level (highest read preference) +for (int i = 0; i < poolLists.length; i++) { + List poolList = poolLists[i]; + if (poolList != null && !poolList.isEmpty()) { + selectedPools = poolList; + break; + } +} +``` + +Prior to this, all preference levels were flattened into a union, causing files to be +replicated to pools from lower-preference groups (e.g. flush pools) instead of the intended +read-only pools. + +### Job Housekeeping + +To prevent unbounded memory growth, `MigrationModule` keeps at most 50 hotfile jobs. When a +new job would exceed that limit, the oldest jobs that have reached a terminal state +(`FINISHED`, `CANCELLED`, `FAILED`) are pruned first. + +## Configuration + +| Property | Default | Description | +|---|---|---| +| `pool.hotfile.replication.enable` | `false` | Enable/disable hot file monitoring. **Must be `true` to activate.** | +| `pool.migration.hotfile.threshold` | `50` | Number of concurrent read movers required to trigger replication | +| `pool.migration.hotfile.replicas` | `1` | Number of additional replicas to create | +| `pool.migration.concurrency.default` | `1` | Number of files the migration job migrates concurrently | + +Example (`dcache.conf` or pool layout file): + +```ini +pool.hotfile.replication.enable = true +pool.migration.hotfile.threshold = 3 +pool.migration.hotfile.replicas = 3 +pool.migration.concurrency.default = 1 +``` + +> **Note:** The feature is disabled by default. A pool restart is required after any +> configuration change. + +## Key Source Files + +| File | Role | +|---|---| +| `modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java` | Entry point; checks enable flag, calls `FileRequestMonitor` | +| `modules/dcache/src/main/java/org/dcache/pool/migration/MigrationModule.java` | Implements `FileRequestMonitor`; counts requests, creates and manages migration jobs | +| `modules/dcache/src/main/java/org/dcache/pool/migration/PoolListByPoolMgrQuery.java` | Queries PoolManager for eligible target pools; selects highest-preference level only | +| `modules/dcache/src/test/java/org/dcache/pool/classic/HotfileMonitoringTest.java` | Spring-context integration test for enable/disable behaviour | +| `modules/dcache/src/test/java/org/dcache/pool/migration/MigrationModuleTest.java` | Unit tests for `reportFileRequest`, threshold, housekeeping | +| `modules/dcache/src/test/java/org/dcache/pool/migration/PoolListByPoolMgrQueryTest.java` | Unit tests for pool selection, preference-level handling, unknown net unit, and wildcard protocol | +| `skel/share/defaults/pool.properties` | Canonical defaults for all `pool.hotfile.*` and `pool.migration.hotfile.*` properties | + +## Diagnostics + +### Log Messages + +With the default log level the following INFO messages are emitted by `MigrationModule`: + +``` +Hot file monitoring: pnfsId=, requests=, threshold= +Hot file detected! Triggering replication for pnfsId= +Created migration job with id hotfile- for pnfsId with replicas and concurrency +Starting migration job hotfile- for pnfsId +Successfully started migration job hotfile- for pnfsId +Job hotfile- already exists with state +``` + +`PoolV4` emits at INFO: + +``` +PoolV4.ioFile: Received IO request for pnfsId=, hotFileEnabled=, monitorSet= +PoolV4.ioFile: Calling reportFileRequest for pnfsId=, count= +``` + +And at ERROR if the monitor is not wired: + +``` +PoolV4.ioFile: Hot file replication enabled but FileRequestMonitor is NULL! +``` + +`PoolListByPoolMgrQuery` emits at DEBUG when a preference level is selected: + +``` +Selected preference level with pools for : [pool1, pool2, …] +``` + +### Runtime Log Level Adjustment + +``` +# In the pool's admin shell +log set org.dcache.pool.classic.PoolV4 DEBUG +log set org.dcache.pool.migration.MigrationModule DEBUG +``` + +### Checking Job and Replica Status + +```bash +# List active migration jobs on all pools +ssh -p admin@ '\s migration ls' + +# Inspect a specific job +ssh -p admin@ '\s migration info hotfile-' + +# List replicas of a file +ssh -p admin@ '\sl rep ls ' +``` + +### Interpreting Absence of Log Messages + +| Symptom | Likely Cause | +|---|---| +| No `PoolV4.ioFile` messages | IO requests are not reaching the pool, or the feature is disabled (`hotFileEnabled=false`) | +| `monitorSet=false` in PoolV4 log | `FileRequestMonitor` not wired — check Spring context startup errors | +| `requests` stays at 1 | IoQueue not counting movers correctly | +| "Hot file detected" but no job created | Exception during job creation — check ERROR lines for a stack trace | +| Job created but not started | `MigrationModule` not started — run `migration start` in the admin interface | +| "Job already exists" repeating | Previous job is stuck in a non-terminal state — inspect job state | diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/FileRequestMonitor.java b/modules/dcache/src/main/java/org/dcache/pool/classic/FileRequestMonitor.java index 379394bf6c9..7afb16a8403 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/FileRequestMonitor.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/FileRequestMonitor.java @@ -1,6 +1,8 @@ package org.dcache.pool.classic; import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.ProtocolInfo; +import javax.annotation.Nullable; /** * Abstract interface for monitoring file requests in the pool. @@ -12,7 +14,9 @@ public interface FileRequestMonitor { * * @param pnfsId the file identifier * @param numberOfRequests the number of requests for this file + * @param protocolInfo the protocol info of the request, may be {@code null} if unknown */ - void reportFileRequest(PnfsId pnfsId, long numberOfRequests); + void reportFileRequest(PnfsId pnfsId, long numberOfRequests, + @Nullable ProtocolInfo protocolInfo); } diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java b/modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java index e380f14acbc..ef6f2cde897 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java @@ -756,7 +756,8 @@ private void ioFile(CellMessage envelope, PoolIoFileMessage message) { message.getPnfsId()); if (_hotFileReplicationEnabled) { _fileRequestMonitor.reportFileRequest(message.getPnfsId(), - _ioQueue.numberOfRequestsFor(message.getPnfsId())); + _ioQueue.numberOfRequestsFor(message.getPnfsId()), + message.getProtocolInfo()); } message.setSucceeded(); } catch (OutOfDateCacheException e) { diff --git a/modules/dcache/src/main/java/org/dcache/pool/migration/MigrationModule.java b/modules/dcache/src/main/java/org/dcache/pool/migration/MigrationModule.java index a49935cd9ea..d0b2d1c66a9 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/migration/MigrationModule.java +++ b/modules/dcache/src/main/java/org/dcache/pool/migration/MigrationModule.java @@ -12,7 +12,9 @@ import diskCacheV111.util.PnfsId; import diskCacheV111.util.RetentionPolicy; import diskCacheV111.vehicles.PoolManagerGetPoolMonitor; +import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.PoolManagerPoolInformation; +import diskCacheV111.vehicles.ProtocolInfo; import dmg.cells.nucleus.CellCommandListener; import dmg.cells.nucleus.CellInfoProvider; import dmg.cells.nucleus.CellLifeCycleAware; @@ -25,6 +27,7 @@ import dmg.util.command.Option; import java.io.PrintWriter; import java.io.StringWriter; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,6 +43,7 @@ import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.dcache.cells.CellStub; import org.dcache.pool.PoolDataBeanProvider; @@ -57,6 +61,7 @@ import org.dcache.util.expression.TypeMismatchException; import org.dcache.util.expression.UnknownIdentifierException; import org.dcache.util.pool.CostModuleTagProvider; +import org.dcache.vehicles.FileAttributes; import org.parboiled.Parboiled; import org.parboiled.parserunners.ReportingParseRunner; import org.parboiled.support.ParsingResult; @@ -1244,7 +1249,8 @@ public Object messageArrived(CellMessage envelope, PoolMigrationJobCancelMessage * new job. */ @Override - public synchronized void reportFileRequest(PnfsId pnfsId, long numberOfRequests) { + public synchronized void reportFileRequest(PnfsId pnfsId, long numberOfRequests, + ProtocolInfo protocolInfo) { if (numberOfRequests < hotFileThreshold) { return; } @@ -1268,11 +1274,28 @@ public synchronized void reportFileRequest(PnfsId pnfsId, long numberOfRequests) _context.getPoolManagerStub(), Collections.singletonList(_context.getPoolName())); sourceList.refresh(); + + // Get file attributes from repository for pool selection + CacheEntry cacheEntry; + try { + cacheEntry = _context.getRepository().getEntry(pnfsId); + } catch (Exception e) { + LOGGER.warn("Failed to get cache entry for {}: {}", pnfsId, e.getMessage(), e); + return; + } + FileAttributes fileAttributes = cacheEntry.getFileAttributes(); + + String protocolUnit = deriveProtocolUnit(protocolInfo); + String netUnitName = deriveNetUnitName(protocolInfo); + Collection excluded = new HashSet<>(); excluded.add(Pattern.compile(Pattern.quote(_context.getPoolName()))); RefreshablePoolList basePoolList = new PoolListFilter( - new PoolListByPoolGroupOfPool(_context.getPoolManagerStub(), - _context.getPoolName()), + new PoolListByPoolMgrQuery(_context.getPoolManagerStub(), + pnfsId, + fileAttributes, + protocolUnit, + netUnitName), excluded, FALSE_EXPRESSION, Collections.emptySet(), @@ -1454,6 +1477,37 @@ public boolean isActive(PnfsId id) { return _context.isActive(id); } + /** + * Returns the PSU protocol-unit string for the given request, e.g. {@code "DCap/3"} or + * {@code "xrootd/2.1"}. When {@code protocolInfo} is {@code null} (e.g. an internal + * pool-to-pool transfer), returns the PSU wildcard that matches any protocol unit. + */ + private static String deriveProtocolUnit(@Nullable ProtocolInfo protocolInfo) { + if (protocolInfo == null) { + return "*/*"; + } + String unit = protocolInfo.getProtocol() + "/" + protocolInfo.getMajorVersion(); + return protocolInfo.getMinorVersion() != 0 + ? unit + "." + protocolInfo.getMinorVersion() + : unit; + } + + /** + * Returns the client IP address string for use as the PSU net-unit name, e.g. + * {@code "10.0.0.5"}. Returns an empty string whenever the address is unavailable + * (non-IP protocol, null socket address, or null {@code protocolInfo}), which causes + * the PSU to match any network unit. + */ + private static String deriveNetUnitName(@Nullable ProtocolInfo protocolInfo) { + if (!(protocolInfo instanceof IpProtocolInfo)) { + return ""; + } + InetSocketAddress addr = ((IpProtocolInfo) protocolInfo).getSocketAddress(); + return (addr != null && addr.getAddress() != null) + ? addr.getAddress().getHostAddress() + : ""; + } + // Hot file replication parameters public int getNumReplicas() { return hotFileReplicaCount; diff --git a/modules/dcache/src/main/java/org/dcache/pool/migration/PoolListByPoolMgrQuery.java b/modules/dcache/src/main/java/org/dcache/pool/migration/PoolListByPoolMgrQuery.java new file mode 100644 index 00000000000..f1e19ee864b --- /dev/null +++ b/modules/dcache/src/main/java/org/dcache/pool/migration/PoolListByPoolMgrQuery.java @@ -0,0 +1,177 @@ +package org.dcache.pool.migration; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import diskCacheV111.poolManager.PoolSelectionUnit.DirectionType; +import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.PoolMgrQueryPoolsMsg; +import diskCacheV111.vehicles.PoolManagerGetPoolsByNameMessage; +import diskCacheV111.vehicles.PoolManagerPoolInformation; +import java.util.List; +import org.dcache.cells.AbstractMessageCallback; +import org.dcache.cells.CellStub; +import org.dcache.namespace.FileAttribute; +import org.dcache.vehicles.FileAttributes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * RefreshablePoolList implementation that queries PoolManager for eligible pools using + * PoolMgrQueryPoolsMsg, which performs full selection matching including read preferences and file + * metadata. + */ +class PoolListByPoolMgrQuery + extends AbstractMessageCallback + implements RefreshablePoolList { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PoolListByPoolMgrQuery.class); + + private final CellStub _poolManager; + private final PnfsId _pnfsId; + private final String _protocolUnit; + private final String _netUnitName; + private final FileAttributes _fileAttributes; + + private ImmutableList _pools = ImmutableList.of(); + private ImmutableList _offlinePools = ImmutableList.of(); + private boolean _isValid; + + /** + * Creates a new pool list that queries PoolManager for eligible pools. + * + * @param poolManager the PoolManager cell stub + * @param pnfsId the PNFS ID of the file + * @param fileAttributes the file attributes for selection + * @param protocolUnit the protocol unit (e.g., "DCap/3") + * @param netUnitName the network unit name (IP address of the client, or empty string if unknown) + */ + public PoolListByPoolMgrQuery(CellStub poolManager, + PnfsId pnfsId, + FileAttributes fileAttributes, + String protocolUnit, + String netUnitName) { + _poolManager = requireNonNull(poolManager); + _pnfsId = requireNonNull(pnfsId); + _fileAttributes = requireNonNull(fileAttributes); + _protocolUnit = requireNonNull(protocolUnit); + _netUnitName = requireNonNull(netUnitName); + } + + @Override + public synchronized boolean isValid() { + return _isValid; + } + + @Override + public synchronized ImmutableList getOfflinePools() { + return _offlinePools; + } + + @Override + public synchronized ImmutableList getPools() { + return _pools; + } + + @Override + public void refresh() { + // Ensure we have STORAGEINFO attribute + if (!_fileAttributes.isDefined(FileAttribute.STORAGEINFO)) { + LOGGER.warn("FileAttributes for {} missing STORAGEINFO, cannot query PoolManager", + _pnfsId); + synchronized (this) { + _isValid = false; + } + return; + } + + PoolMgrQueryPoolsMsg msg = new PoolMgrQueryPoolsMsg( + DirectionType.READ, + _protocolUnit, + _netUnitName, + _fileAttributes); + + CellStub.addCallback( + _poolManager.send(msg), + this, + MoreExecutors.directExecutor()); + } + + @Override + public void success(PoolMgrQueryPoolsMsg message) { + List[] poolLists = message.getPools(); + if (poolLists == null || poolLists.length == 0) { + synchronized (this) { + _pools = ImmutableList.of(); + _offlinePools = ImmutableList.of(); + _isValid = true; + } + return; + } + + // Use only the first non-empty preference level (highest preference) + // Each preference level corresponds to pools with different read preferences + // We only want pools from the most preferred level, not a union of all levels + List selectedPools = null; + for (int i = 0; i < poolLists.length; i++) { + List poolList = poolLists[i]; + if (poolList != null && !poolList.isEmpty()) { + selectedPools = poolList; + LOGGER.debug("Selected preference level {} with {} pools for {}: {}", + i, poolList.size(), _pnfsId, poolList); + break; + } + } + + // Query PoolManager for full pool information (including cost) + if (selectedPools == null || selectedPools.isEmpty()) { + synchronized (this) { + _pools = ImmutableList.of(); + _offlinePools = ImmutableList.of(); + _isValid = true; + } + return; + } + + PoolManagerGetPoolsByNameMessage poolInfoMsg = + new PoolManagerGetPoolsByNameMessage(selectedPools); + + CellStub.addCallback( + _poolManager.send(poolInfoMsg), + new AbstractMessageCallback() { + @Override + public void success(PoolManagerGetPoolsByNameMessage msg) { + synchronized (PoolListByPoolMgrQuery.this) { + _pools = ImmutableList.copyOf(msg.getPools()); + _offlinePools = ImmutableList.copyOf(msg.getOfflinePools()); + _isValid = true; + } + } + + @Override + public void failure(int rc, Object error) { + LOGGER.error("Failed to get pool information from PoolManager ({})", error); + synchronized (PoolListByPoolMgrQuery.this) { + _isValid = false; + } + } + }, + MoreExecutors.directExecutor()); + } + + @Override + public void failure(int rc, Object error) { + LOGGER.error("Failed to query pool manager for eligible pools ({})", error); + synchronized (this) { + _isValid = false; + } + } + + @Override + public String toString() { + return String.format("PoolMgrQuery(%s, %d pools)", _protocolUnit, _pools.size()); + } +} + diff --git a/modules/dcache/src/test/java/org/dcache/pool/migration/MigrationModuleTest.java b/modules/dcache/src/test/java/org/dcache/pool/migration/MigrationModuleTest.java index 1b325640b67..5e9b7abb408 100644 --- a/modules/dcache/src/test/java/org/dcache/pool/migration/MigrationModuleTest.java +++ b/modules/dcache/src/test/java/org/dcache/pool/migration/MigrationModuleTest.java @@ -1,10 +1,13 @@ package org.dcache.pool.migration; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import org.dcache.cells.CellStub; +import org.dcache.namespace.FileAttribute; import org.dcache.pool.migration.json.MigrationData; import org.dcache.pool.repository.CacheEntry; import org.dcache.pool.repository.ReplicaState; @@ -14,6 +17,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import org.junit.Before; @@ -28,11 +32,16 @@ import static org.mockito.Mockito.when; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import diskCacheV111.poolManager.CostModule; import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.PoolIoFileMessage; import diskCacheV111.vehicles.PoolManagerGetPoolMonitor; +import diskCacheV111.vehicles.PoolManagerGetPoolsByNameMessage; +import diskCacheV111.vehicles.PoolMgrQueryPoolsMsg; +import diskCacheV111.vehicles.ProtocolInfo; import dmg.cells.nucleus.CellAddressCore; import dmg.cells.nucleus.CellPath; @@ -101,9 +110,12 @@ public void setUp() { @Test public void testReportFileRequestBelowThreshold() throws Exception { PnfsId pnfsId = new PnfsId("0000A1B2C3D4E5F6"); + ProtocolInfo protocolInfo = mock(ProtocolInfo.class); + when(protocolInfo.getProtocol()).thenReturn("DCap"); + when(protocolInfo.getMajorVersion()).thenReturn(3); when(message.getPnfsId()).thenReturn(pnfsId); module.setThreshold(5L); - module.reportFileRequest(pnfsId, 1L); // below threshold + module.reportFileRequest(pnfsId, 1L, protocolInfo); // below threshold // Should not create a migration job assertFalse(module.hasJob("hotfile-" + pnfsId)); } @@ -111,13 +123,16 @@ public void testReportFileRequestBelowThreshold() throws Exception { @Test public void testReportFileRequestAboveThreshold() throws Exception { PnfsId pnfsId = new PnfsId("0000A1B2C3D4E5F6"); + ProtocolInfo protocolInfo = mock(ProtocolInfo.class); + when(protocolInfo.getProtocol()).thenReturn("DCap"); + when(protocolInfo.getMajorVersion()).thenReturn(3); when(entry.getFileAttributes()).thenReturn(fileAttributes); when(entry.getLastAccessTime()).thenReturn(0L); when(entry.getPnfsId()).thenReturn(pnfsId); when(message.getPnfsId()).thenReturn(pnfsId); when(repository.getEntry(pnfsId)).thenReturn(entry); module.setThreshold(5L); - module.reportFileRequest(pnfsId, 10L); // above threshold + module.reportFileRequest(pnfsId, 10L, protocolInfo); // above threshold // Should create a migration job assertTrue(module.hasJob("hotfile-" + pnfsId)); } @@ -148,13 +163,21 @@ public void testHotfileJobHousekeeping() throws Exception { module.setThreshold(0L); // Always trigger + ProtocolInfo protocolInfo = mock(ProtocolInfo.class); + when(protocolInfo.getProtocol()).thenReturn("DCap"); + when(protocolInfo.getMajorVersion()).thenReturn(3); + + // Setup FileAttributes for hot file migration + when(entry.getFileAttributes()).thenReturn(fileAttributes); + when(entry.getLastAccessTime()).thenReturn(0L); + // Create 50 jobs for (int i = 0; i < 50; i++) { PnfsId pnfsId = new PnfsId(String.format("0000000000000000000000%02d", i)); when(message.getPnfsId()).thenReturn(pnfsId); when(repository.getEntry(pnfsId)).thenReturn(entry); when(entry.getPnfsId()).thenReturn(pnfsId); - module.reportFileRequest(pnfsId, 1L); + module.reportFileRequest(pnfsId, 1L, protocolInfo); Thread.sleep(1); // Ensure unique timestamps } @@ -171,7 +194,7 @@ public void testHotfileJobHousekeeping() throws Exception { when(repository.getEntry(pnfsId)).thenReturn(entry); when(entry.getPnfsId()).thenReturn(pnfsId); when(repository.iterator()).thenReturn(Collections.singletonList(pnfsId).iterator()); - module.reportFileRequest(pnfsId, 1L); + module.reportFileRequest(pnfsId, 1L, protocolInfo); Thread.sleep(1); } @@ -210,13 +233,21 @@ public void testHotfileJobHousekeepingExclusions() throws Exception { module.setThreshold(0L); + ProtocolInfo protocolInfo = mock(ProtocolInfo.class); + when(protocolInfo.getProtocol()).thenReturn("DCap"); + when(protocolInfo.getMajorVersion()).thenReturn(3); + + // Setup FileAttributes for hot file migration + when(entry.getFileAttributes()).thenReturn(fileAttributes); + when(entry.getLastAccessTime()).thenReturn(0L); + // 1. Create 50 jobs and cancel them (Terminal). for (int i = 0; i < 50; i++) { PnfsId pnfsId = new PnfsId(String.format("0000000000000000000001%02d", i)); when(message.getPnfsId()).thenReturn(pnfsId); when(repository.getEntry(pnfsId)).thenReturn(entry); when(entry.getPnfsId()).thenReturn(pnfsId); - module.reportFileRequest(pnfsId, 1L); + module.reportFileRequest(pnfsId, 1L, protocolInfo); Thread.sleep(1); } module.cancelAll(); @@ -229,7 +260,7 @@ public void testHotfileJobHousekeepingExclusions() throws Exception { when(repository.getEntry(pnfsId)).thenReturn(entry); when(entry.getPnfsId()).thenReturn(pnfsId); when(repository.iterator()).thenReturn(Collections.singletonList(pnfsId).iterator()); - module.reportFileRequest(pnfsId, 1L); + module.reportFileRequest(pnfsId, 1L, protocolInfo); Thread.sleep(1); } // Total: 55 (50 Finished, 5 Running). @@ -242,7 +273,7 @@ public void testHotfileJobHousekeepingExclusions() throws Exception { when(message.getPnfsId()).thenReturn(pnfsId); when(repository.getEntry(pnfsId)).thenReturn(entry); when(entry.getPnfsId()).thenReturn(pnfsId); - module.reportFileRequest(pnfsId, 1L); + module.reportFileRequest(pnfsId, 1L, protocolInfo); // Cancel this specific job MigrationModule.MigrationCancelCommand cmd = module.new MigrationCancelCommand(); cmd.id = "hotfile-" + pnfsId; @@ -255,7 +286,7 @@ public void testHotfileJobHousekeepingExclusions() throws Exception { when(repository.getEntry(pnfsId2)).thenReturn(entry); when(entry.getPnfsId()).thenReturn(pnfsId2); when(repository.iterator()).thenReturn(Collections.singletonList(pnfsId2).iterator()); - module.reportFileRequest(pnfsId2, 1L); + module.reportFileRequest(pnfsId2, 1L, protocolInfo); // Analysis: // Start: 50 Terminal, 5 Running. @@ -267,4 +298,88 @@ public void testHotfileJobHousekeepingExclusions() throws Exception { // Total = 56. assertEquals(56, module.getDataObject().getJobInfo().length); } + + /** + * Helper: configures mocks so that the PoolMgrQueryPoolsMsg sent during + * {@code reportFileRequest} is captured and returned for assertion. + * + *

Requires a per-test {@code pnfsId} that has not been used before, so that + * {@code reportFileRequest} does not short-circuit on an existing job. + */ + private PoolMgrQueryPoolsMsg reportFileRequestAndCaptureQuery( + PnfsId pnfsId, ProtocolInfo protocolInfo) throws Exception { + when(fileAttributes.isDefined(FileAttribute.STORAGEINFO)).thenReturn(true); + when(entry.getFileAttributes()).thenReturn(fileAttributes); + when(entry.getPnfsId()).thenReturn(pnfsId); + when(repository.getEntry(pnfsId)).thenReturn(entry); + + // Return non-null futures so CellStub.addCallback does not NPE. + when(poolManagerStub.send(any(PoolManagerGetPoolsByNameMessage.class))) + .thenReturn(SettableFuture.create()); + PoolMgrQueryPoolsMsg[] captured = new PoolMgrQueryPoolsMsg[1]; + when(poolManagerStub.send(any(PoolMgrQueryPoolsMsg.class))).thenAnswer(inv -> { + captured[0] = inv.getArgument(0); + return SettableFuture.create(); + }); + + module.setThreshold(0L); + module.reportFileRequest(pnfsId, 1L, protocolInfo); + + assertNotNull("PoolMgrQueryPoolsMsg should have been sent to PoolManager", captured[0]); + return captured[0]; + } + + @Test + public void testReportFileRequestUsesWildcardProtocolWhenProtocolInfoNull() throws Exception { + PoolMgrQueryPoolsMsg msg = reportFileRequestAndCaptureQuery( + new PnfsId("0000000000000000000000A1"), null); + + assertEquals("*/*", msg.getProtocolUnitName()); + assertEquals("", msg.getNetUnitName()); + } + + @Test + public void testReportFileRequestDerivesProtocolFromNonIpProtocolInfo() throws Exception { + ProtocolInfo protocolInfo = mock(ProtocolInfo.class); + when(protocolInfo.getProtocol()).thenReturn("xrootd"); + when(protocolInfo.getMajorVersion()).thenReturn(2); + // getMinorVersion() returns 0 by default → no ".minor" suffix + + PoolMgrQueryPoolsMsg msg = reportFileRequestAndCaptureQuery( + new PnfsId("0000000000000000000000A2"), protocolInfo); + + assertEquals("xrootd/2", msg.getProtocolUnitName()); + assertEquals("", msg.getNetUnitName()); + } + + @Test + public void testReportFileRequestDerivesNetUnitFromIpProtocolInfo() throws Exception { + IpProtocolInfo protocolInfo = mock(IpProtocolInfo.class); + when(protocolInfo.getProtocol()).thenReturn("DCap"); + when(protocolInfo.getMajorVersion()).thenReturn(3); + // minor = 0 → no ".minor" suffix + InetAddress addr = InetAddress.getByAddress(new byte[]{10, 0, 0, 5}); + when(protocolInfo.getSocketAddress()).thenReturn(new InetSocketAddress(addr, 22125)); + + PoolMgrQueryPoolsMsg msg = reportFileRequestAndCaptureQuery( + new PnfsId("0000000000000000000000A3"), protocolInfo); + + assertEquals("DCap/3", msg.getProtocolUnitName()); + assertEquals("10.0.0.5", msg.getNetUnitName()); + } + + @Test + public void testReportFileRequestUsesEmptyNetUnitWhenSocketAddressNull() throws Exception { + IpProtocolInfo protocolInfo = mock(IpProtocolInfo.class); + when(protocolInfo.getProtocol()).thenReturn("NFS"); + when(protocolInfo.getMajorVersion()).thenReturn(4); + when(protocolInfo.getMinorVersion()).thenReturn(1); // → "NFS/4.1" + when(protocolInfo.getSocketAddress()).thenReturn(null); + + PoolMgrQueryPoolsMsg msg = reportFileRequestAndCaptureQuery( + new PnfsId("0000000000000000000000A4"), protocolInfo); + + assertEquals("NFS/4.1", msg.getProtocolUnitName()); + assertEquals("", msg.getNetUnitName()); + } } diff --git a/modules/dcache/src/test/java/org/dcache/pool/migration/PoolListByPoolMgrQueryTest.java b/modules/dcache/src/test/java/org/dcache/pool/migration/PoolListByPoolMgrQueryTest.java new file mode 100644 index 00000000000..042ee6b828f --- /dev/null +++ b/modules/dcache/src/test/java/org/dcache/pool/migration/PoolListByPoolMgrQueryTest.java @@ -0,0 +1,413 @@ +package org.dcache.pool.migration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.SettableFuture; +import org.mockito.InOrder; +import diskCacheV111.poolManager.PoolSelectionUnit.DirectionType; +import diskCacheV111.pools.PoolCostInfo; +import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.PoolMgrQueryPoolsMsg; +import diskCacheV111.vehicles.PoolManagerGetPoolsByNameMessage; +import diskCacheV111.vehicles.PoolManagerPoolInformation; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import org.dcache.cells.CellStub; +import org.dcache.namespace.FileAttribute; +import org.dcache.pool.classic.IoQueueManager; +import org.dcache.vehicles.FileAttributes; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class PoolListByPoolMgrQueryTest { + + private CellStub poolManager; + private FileAttributes fileAttributes; + private PnfsId pnfsId; + + @Before + public void setUp() { + poolManager = mock(CellStub.class); + fileAttributes = mock(FileAttributes.class); + pnfsId = new PnfsId("0000A1B2C3D4E5F6"); + + // Mock FileAttributes to have STORAGEINFO defined + when(fileAttributes.isDefined(FileAttribute.STORAGEINFO)).thenReturn(true); + } + + @Test + public void testRefreshWithValidResponse() throws Exception { + // Setup + PoolListByPoolMgrQuery poolList = new PoolListByPoolMgrQuery( + poolManager, pnfsId, fileAttributes, "DCap/3", "127.0.0.1"); + + InOrder inOrder = inOrder(poolManager); + + // Mock the send method to return SettableFuture + SettableFuture future = SettableFuture.create(); + when(poolManager.send(any(PoolMgrQueryPoolsMsg.class))).thenReturn(future); + + // Setup the first message response (PoolMgrQueryPoolsMsg) + @SuppressWarnings("unchecked") + List[] poolLists = new List[2]; + poolLists[0] = Arrays.asList("pool1", "pool2"); + poolLists[1] = Arrays.asList("pool3"); + + // Mock the second send for PoolManagerGetPoolsByNameMessage + SettableFuture poolInfoFuture = SettableFuture.create(); + + // Setup mock to return different futures based on message type + when(poolManager.send(any(PoolManagerGetPoolsByNameMessage.class))) + .thenReturn(poolInfoFuture); + + // Call refresh + poolList.refresh(); + + // Verify that send was called and capture the message + ArgumentCaptor queryMsgCaptor = ArgumentCaptor.forClass( + PoolMgrQueryPoolsMsg.class); + inOrder.verify(poolManager).send(queryMsgCaptor.capture()); + + // Verify the query message parameters + PoolMgrQueryPoolsMsg queryMsg = queryMsgCaptor.getValue(); + assertEquals(DirectionType.READ, queryMsg.getAccessType()); + assertEquals("DCap/3", queryMsg.getProtocolUnitName()); + assertEquals("127.0.0.1", queryMsg.getNetUnitName()); + assertEquals(fileAttributes, queryMsg.getFileAttributes()); + + // Simulate the callback being invoked with a success response + PoolMgrQueryPoolsMsg response = new PoolMgrQueryPoolsMsg( + DirectionType.READ, "DCap/3", "127.0.0.1", fileAttributes); + response.setPoolList(poolLists); + response.setSucceeded(); + + // Complete the future to trigger the callback + future.set(response); + + // Verify that second send was called + ArgumentCaptor poolInfoCaptor = + ArgumentCaptor.forClass(PoolManagerGetPoolsByNameMessage.class); + inOrder.verify(poolManager, timeout(1000)).send(poolInfoCaptor.capture()); + + // Now simulate the pool info response + List pools = new ArrayList<>(); + pools.add(new PoolManagerPoolInformation("pool1", + new PoolCostInfo("pool1", IoQueueManager.DEFAULT_QUEUE), 0.5)); + pools.add(new PoolManagerPoolInformation("pool2", + new PoolCostInfo("pool2", IoQueueManager.DEFAULT_QUEUE), 0.3)); + + PoolManagerGetPoolsByNameMessage poolInfoResponse = new PoolManagerGetPoolsByNameMessage( + Arrays.asList("pool1", "pool2")); + poolInfoResponse.setPools(pools); + List offlinePools = new ArrayList<>(); + offlinePools.add("pool4"); + poolInfoResponse.setOfflinePools(offlinePools); + poolInfoResponse.setSucceeded(); + + poolInfoFuture.set(poolInfoResponse); + + // Verify the pool list is valid and has correct pools + // Should only have pools from preference level 0 (pool1, pool2), not from level 1 (pool3) + // We use a small wait or check in a loop if necessary, but directExecutor should make it immediate. + assertTrue(poolList.isValid()); + assertEquals(2, poolList.getPools().size()); + assertEquals("pool1", poolList.getPools().get(0).getName()); + assertEquals(1, poolList.getOfflinePools().size()); + assertEquals("pool4", poolList.getOfflinePools().get(0)); + } + + @Test + public void testRefreshWithEmptyResponse() throws Exception { + PoolListByPoolMgrQuery poolList = new PoolListByPoolMgrQuery( + poolManager, pnfsId, fileAttributes, "DCap/3", "127.0.0.1"); + + SettableFuture future = SettableFuture.create(); + when(poolManager.send(any(PoolMgrQueryPoolsMsg.class))).thenReturn(future); + + poolList.refresh(); + + // Simulate empty response + PoolMgrQueryPoolsMsg response = new PoolMgrQueryPoolsMsg( + DirectionType.READ, "DCap/3", "127.0.0.1", fileAttributes); + @SuppressWarnings("unchecked") + List[] emptyList = new List[0]; + response.setPoolList(emptyList); + response.setSucceeded(); + + future.set(response); + + assertTrue(poolList.isValid()); + assertEquals(0, poolList.getPools().size()); + } + + @Test + public void testRefreshWithMissingStorageInfo() { + when(fileAttributes.isDefined(FileAttribute.STORAGEINFO)).thenReturn(false); + + PoolListByPoolMgrQuery poolList = new PoolListByPoolMgrQuery( + poolManager, pnfsId, fileAttributes, "DCap/3", "127.0.0.1"); + + poolList.refresh(); + + assertFalse(poolList.isValid()); + assertEquals(0, poolList.getPools().size()); + } + + @Test + public void testRefreshWithUnknownNetUnitName() throws Exception { + PoolListByPoolMgrQuery poolList = new PoolListByPoolMgrQuery( + poolManager, pnfsId, fileAttributes, "DCap/3", ""); + + SettableFuture future = SettableFuture.create(); + when(poolManager.send(any(PoolMgrQueryPoolsMsg.class))).thenReturn(future); + + poolList.refresh(); + + ArgumentCaptor queryMsgCaptor = ArgumentCaptor.forClass( + PoolMgrQueryPoolsMsg.class); + verify(poolManager).send(queryMsgCaptor.capture()); + + PoolMgrQueryPoolsMsg queryMsg = queryMsgCaptor.getValue(); + assertEquals("", queryMsg.getNetUnitName()); + } + + /** + * Test that only the highest preference level is selected when multiple pool groups have + * different read preferences. This reproduces the test stand scenario where: + * - flushPools (pool6-10) have readpref=5 (lower preference) + * - readOnlyPools (pool2-5) have readpref=10 (higher preference) + * - pool1 is in both groups + * Expected: Only pools from the highest preference group (readOnlyPools) should be selected. + */ + @Test + public void testSelectsOnlyHighestPreferenceLevel() throws Exception { + // Setup: Two pool groups with different read preferences + // Pool1 is in both groups, pool2-5 only in high-pref group, pool6-10 only in low-pref group + PoolListByPoolMgrQuery poolList = new PoolListByPoolMgrQuery( + poolManager, pnfsId, fileAttributes, "DCap/3", ""); + + InOrder inOrder = inOrder(poolManager); + + SettableFuture queryFuture = SettableFuture.create(); + when(poolManager.send(any(PoolMgrQueryPoolsMsg.class))).thenReturn(queryFuture); + + // Mock the second send for PoolManagerGetPoolsByNameMessage + SettableFuture poolInfoFuture = SettableFuture.create(); + when(poolManager.send(any(PoolManagerGetPoolsByNameMessage.class))) + .thenReturn(poolInfoFuture); + + // Call refresh + poolList.refresh(); + + inOrder.verify(poolManager).send(any(PoolMgrQueryPoolsMsg.class)); + + // Simulate PoolManager response with two preference levels + // Level 0 (highest): readOnlyPools with readpref=10 (pool1, pool2, pool3, pool4, pool5) + // Level 1 (lower): flushPools with readpref=5 (pool1, pool6, pool7, pool8, pool9, pool10) + @SuppressWarnings("unchecked") + List[] poolLists = new List[2]; + poolLists[0] = Arrays.asList("pool1", "pool2", "pool3", "pool4", "pool5"); // High pref + poolLists[1] = Arrays.asList("pool1", "pool6", "pool7", "pool8", "pool9", "pool10"); // Low pref + + PoolMgrQueryPoolsMsg response = new PoolMgrQueryPoolsMsg( + DirectionType.READ, "DCap/3", "", fileAttributes); + response.setPoolList(poolLists); + response.setSucceeded(); + + // Complete the future to trigger the callback + queryFuture.set(response); + + // Verify that only the pools from preference level 0 were requested + ArgumentCaptor poolInfoCaptor = + ArgumentCaptor.forClass(PoolManagerGetPoolsByNameMessage.class); + inOrder.verify(poolManager, timeout(1000)).send(poolInfoCaptor.capture()); + + Collection requestedPools = poolInfoCaptor.getValue().getPoolNames(); + assertEquals("Should only request pools from highest preference level", + 5, requestedPools.size()); + assertTrue("Should include pool1 from high-pref group", requestedPools.contains("pool1")); + assertTrue("Should include pool2 from high-pref group", requestedPools.contains("pool2")); + assertTrue("Should include pool3 from high-pref group", requestedPools.contains("pool3")); + assertTrue("Should include pool4 from high-pref group", requestedPools.contains("pool4")); + assertTrue("Should include pool5 from high-pref group", requestedPools.contains("pool5")); + assertFalse("Should NOT include pool6 from low-pref group", requestedPools.contains("pool6")); + assertFalse("Should NOT include pool7 from low-pref group", requestedPools.contains("pool7")); + assertFalse("Should NOT include pool8 from low-pref group", requestedPools.contains("pool8")); + assertFalse("Should NOT include pool9 from low-pref group", requestedPools.contains("pool9")); + assertFalse("Should NOT include pool10 from low-pref group", requestedPools.contains("pool10")); + + // Complete the pool info request + List pools = new ArrayList<>(); + for (String poolName : requestedPools) { + pools.add(new PoolManagerPoolInformation(poolName, + new PoolCostInfo(poolName, IoQueueManager.DEFAULT_QUEUE), 0.5)); + } + + PoolManagerGetPoolsByNameMessage poolInfoResponse = new PoolManagerGetPoolsByNameMessage( + requestedPools); + poolInfoResponse.setPools(pools); + poolInfoResponse.setOfflinePools(new ArrayList<>()); + poolInfoResponse.setSucceeded(); + + poolInfoFuture.set(poolInfoResponse); + + // Verify the final pool list contains only high-preference pools + assertTrue(poolList.isValid()); + assertEquals("Should have 5 pools from high-preference level", + 5, poolList.getPools().size()); + } + + /** + * Test that the first non-empty preference level is selected when the highest level is empty. + */ + @Test + public void testSelectsFirstNonEmptyPreferenceLevel() throws Exception { + PoolListByPoolMgrQuery poolList = new PoolListByPoolMgrQuery( + poolManager, pnfsId, fileAttributes, "DCap/3", ""); + + InOrder inOrder = inOrder(poolManager); + + SettableFuture queryFuture = SettableFuture.create(); + when(poolManager.send(any(PoolMgrQueryPoolsMsg.class))).thenReturn(queryFuture); + + SettableFuture poolInfoFuture = SettableFuture.create(); + when(poolManager.send(any(PoolManagerGetPoolsByNameMessage.class))) + .thenReturn(poolInfoFuture); + + poolList.refresh(); + + inOrder.verify(poolManager).send(any(PoolMgrQueryPoolsMsg.class)); + + // Simulate response with empty first level, pools in second level + @SuppressWarnings("unchecked") + List[] poolLists = new List[3]; + poolLists[0] = new ArrayList<>(); // Empty highest preference + poolLists[1] = Arrays.asList("pool3"); // First non-empty level + poolLists[2] = Arrays.asList("pool1", "pool2"); // Lower preference (should be ignored) + + PoolMgrQueryPoolsMsg response = new PoolMgrQueryPoolsMsg( + DirectionType.READ, "DCap/3", "", fileAttributes); + response.setPoolList(poolLists); + response.setSucceeded(); + + queryFuture.set(response); + + // Verify that only pool3 was requested (from first non-empty level) + ArgumentCaptor poolInfoCaptor = + ArgumentCaptor.forClass(PoolManagerGetPoolsByNameMessage.class); + inOrder.verify(poolManager, timeout(1000)).send(poolInfoCaptor.capture()); + + Collection requestedPools = poolInfoCaptor.getValue().getPoolNames(); + assertEquals(1, requestedPools.size()); + assertTrue("pool3 should be requested", requestedPools.contains("pool3")); + + // Complete the pool info request + List pools = new ArrayList<>(); + pools.add(new PoolManagerPoolInformation("pool3", + new PoolCostInfo("pool3", IoQueueManager.DEFAULT_QUEUE), 0.5)); + + PoolManagerGetPoolsByNameMessage poolInfoResponse = new PoolManagerGetPoolsByNameMessage( + requestedPools); + poolInfoResponse.setPools(pools); + poolInfoResponse.setOfflinePools(new ArrayList<>()); + poolInfoResponse.setSucceeded(); + + poolInfoFuture.set(poolInfoResponse); + + assertTrue(poolList.isValid()); + assertEquals(1, poolList.getPools().size()); + assertEquals("pool3", poolList.getPools().get(0).getName()); + } + + @Test + public void testSelectsPoolsWithWildcardProtocol() throws Exception { + // Verifies end-to-end pool selection when both protocol and net unit use their + // default values ("*/*" and ""), which is the case when ProtocolInfo is null. + PoolListByPoolMgrQuery poolList = new PoolListByPoolMgrQuery( + poolManager, pnfsId, fileAttributes, "*/*", ""); + + InOrder inOrder = inOrder(poolManager); + + SettableFuture queryFuture = SettableFuture.create(); + when(poolManager.send(any(PoolMgrQueryPoolsMsg.class))).thenReturn(queryFuture); + + SettableFuture poolInfoFuture = SettableFuture.create(); + when(poolManager.send(any(PoolManagerGetPoolsByNameMessage.class))) + .thenReturn(poolInfoFuture); + + poolList.refresh(); + + // Verify the wildcard values are forwarded to PoolManager unchanged. + ArgumentCaptor queryMsgCaptor = ArgumentCaptor.forClass( + PoolMgrQueryPoolsMsg.class); + inOrder.verify(poolManager).send(queryMsgCaptor.capture()); + PoolMgrQueryPoolsMsg queryMsg = queryMsgCaptor.getValue(); + assertEquals(DirectionType.READ, queryMsg.getAccessType()); + assertEquals("*/*", queryMsg.getProtocolUnitName()); + assertEquals("", queryMsg.getNetUnitName()); + + // Simulate PoolManager response with two preference levels. + @SuppressWarnings("unchecked") + List[] poolLists = new List[2]; + poolLists[0] = Arrays.asList("pool1", "pool2"); + poolLists[1] = Arrays.asList("pool3"); + + PoolMgrQueryPoolsMsg response = new PoolMgrQueryPoolsMsg( + DirectionType.READ, "*/*", "", fileAttributes); + response.setPoolList(poolLists); + response.setSucceeded(); + queryFuture.set(response); + + // Only the highest-preference pools should be requested. + ArgumentCaptor poolInfoCaptor = + ArgumentCaptor.forClass(PoolManagerGetPoolsByNameMessage.class); + inOrder.verify(poolManager, timeout(1000)).send(poolInfoCaptor.capture()); + + Collection requestedPools = poolInfoCaptor.getValue().getPoolNames(); + assertEquals(2, requestedPools.size()); + assertTrue(requestedPools.contains("pool1")); + assertTrue(requestedPools.contains("pool2")); + assertFalse("pool3 from lower-preference level should be excluded", + requestedPools.contains("pool3")); + + // Complete the pool info request. + List pools = new ArrayList<>(); + pools.add(new PoolManagerPoolInformation("pool1", + new PoolCostInfo("pool1", IoQueueManager.DEFAULT_QUEUE), 0.5)); + pools.add(new PoolManagerPoolInformation("pool2", + new PoolCostInfo("pool2", IoQueueManager.DEFAULT_QUEUE), 0.3)); + PoolManagerGetPoolsByNameMessage poolInfoResponse = new PoolManagerGetPoolsByNameMessage( + Arrays.asList("pool1", "pool2")); + poolInfoResponse.setPools(pools); + poolInfoResponse.setOfflinePools(new ArrayList<>()); + poolInfoResponse.setSucceeded(); + poolInfoFuture.set(poolInfoResponse); + + assertTrue(poolList.isValid()); + assertEquals(2, poolList.getPools().size()); + assertTrue(poolList.getPools().stream().anyMatch(p -> p.getName().equals("pool1"))); + assertTrue(poolList.getPools().stream().anyMatch(p -> p.getName().equals("pool2"))); + } + + @Test + public void testToString() { + PoolListByPoolMgrQuery poolList = new PoolListByPoolMgrQuery( + poolManager, pnfsId, fileAttributes, "DCap/3", "127.0.0.1"); + + String result = poolList.toString(); + assertTrue(result.contains("DCap/3")); + assertTrue(result.contains("0 pools")); + } +} +