From 9c1baadff70d64379a4221a8cb3095d092e1ced9 Mon Sep 17 00:00:00 2001 From: Chris Green Date: Thu, 19 Mar 2026 14:07:14 -0500 Subject: [PATCH] pool: exclude p2p requests from numberOfRequestsFor count Motivation: We wish to prevent internal P2P traffic from contributing to the file request count used by the hot-file replication mechanism, and thereby avoid potential replication feedback loops. Modification: Enhance `MoverRequestScheduler.numberOfRequestsFor` to filter out pool-to-pool (P2P) transfers. Result: Tests show that P2P requests are excluded from the file request count for the purposes of triggering hot file migration. Acked-by: Tigran Mkrtchyan, Dmitry Litvintsev Patch: https://rb.dcache.org/r/14628/diff/raw Commit: Target: master Request: 11.2 Require-book: no Require-notes: yes --- .../pool/classic/MoverRequestScheduler.java | 4 +- .../pool/classic/IoQueueManagerTest.java | 52 +++++++++++++++ .../classic/MoverRequestSchedulerTest.java | 66 +++++++++++++++++++ 3 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 modules/dcache/src/test/java/org/dcache/pool/classic/IoQueueManagerTest.java create mode 100644 modules/dcache/src/test/java/org/dcache/pool/classic/MoverRequestSchedulerTest.java diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java b/modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java index 9d5ae76d18d..d2afac883d2 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java @@ -59,7 +59,9 @@ public class MoverRequestScheduler { public long numberOfRequestsFor(PnfsId pnfsId) { return _jobs.values().stream() - .filter((pr) -> pr.getMover().getFileAttributes().getPnfsId().equals(pnfsId)).count(); + .filter((pr) -> pr.getMover().getFileAttributes().getPnfsId().equals(pnfsId)) + .filter((pr) -> !pr.getMover().isPoolToPoolTransfer()) + .count(); } /** diff --git a/modules/dcache/src/test/java/org/dcache/pool/classic/IoQueueManagerTest.java b/modules/dcache/src/test/java/org/dcache/pool/classic/IoQueueManagerTest.java new file mode 100644 index 00000000000..762a1c18375 --- /dev/null +++ b/modules/dcache/src/test/java/org/dcache/pool/classic/IoQueueManagerTest.java @@ -0,0 +1,52 @@ +package org.dcache.pool.classic; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import diskCacheV111.util.PnfsId; +import org.dcache.pool.movers.Mover; +import org.dcache.util.IoPriority; +import org.dcache.vehicles.FileAttributes; +import org.junit.Before; +import org.junit.Test; + +public class IoQueueManagerTest { + + private IoQueueManager ioQueueManager; + private PnfsId pnfsId = new PnfsId("000000000000000000000000000000000001"); + + @Before + public void setUp() { + ioQueueManager = new IoQueueManager(); + } + + @Test + public void shouldSumRequestsAcrossQueuesExcludingP2P() throws Exception { + // Add a regular mover to the default queue + addMover(IoQueueManager.DEFAULT_QUEUE, false); + + // Add a P2P mover to the p2p queue + addMover(IoQueueManager.P2P_QUEUE_NAME, true); + + // Add another regular mover to a custom queue + ioQueueManager.setQueues(new String[]{"custom"}); + addMover("custom", false); + + // Total should be 2 (1 from default, 1 from custom, 0 from p2p) + assertEquals(2, ioQueueManager.numberOfRequestsFor(pnfsId)); + } + + private void addMover(String queueName, boolean isP2P) throws Exception { + Mover mover = mock(Mover.class); + FileAttributes attributes = new FileAttributes(); + attributes.setPnfsId(pnfsId); + when(mover.getFileAttributes()).thenReturn(attributes); + when(mover.isPoolToPoolTransfer()).thenReturn(isP2P); + + MoverSupplier supplier = mock(MoverSupplier.class); + when(supplier.createMover()).thenReturn(mover); + + ioQueueManager.getOrCreateMover(queueName, "door-" + queueName + "-" + System.nanoTime(), supplier, IoPriority.REGULAR); + } +} diff --git a/modules/dcache/src/test/java/org/dcache/pool/classic/MoverRequestSchedulerTest.java b/modules/dcache/src/test/java/org/dcache/pool/classic/MoverRequestSchedulerTest.java new file mode 100644 index 00000000000..5b34e4efc38 --- /dev/null +++ b/modules/dcache/src/test/java/org/dcache/pool/classic/MoverRequestSchedulerTest.java @@ -0,0 +1,66 @@ +package org.dcache.pool.classic; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import diskCacheV111.util.PnfsId; +import org.dcache.pool.movers.Mover; +import org.dcache.util.IoPriority; +import org.dcache.vehicles.FileAttributes; +import org.junit.Before; +import org.junit.Test; +import java.util.concurrent.atomic.AtomicInteger; + +public class MoverRequestSchedulerTest { + + private MoverRequestScheduler scheduler; + private PnfsId pnfsId = new PnfsId("000000000000000000000000000000000001"); + + @Before + public void setUp() { + scheduler = new MoverRequestScheduler("test-queue", 0, MoverRequestScheduler.Order.FIFO); + } + + @Test + public void shouldCountNonP2PRequests() throws Exception { + addMover(pnfsId, false); + addMover(pnfsId, false); + + assertEquals(2, scheduler.numberOfRequestsFor(pnfsId)); + } + + @Test + public void shouldNotCountP2PRequests() throws Exception { + addMover(pnfsId, true); + addMover(pnfsId, false); + + assertEquals(1, scheduler.numberOfRequestsFor(pnfsId)); + } + + @Test + public void shouldReturnZeroWhenNoRequests() { + assertEquals(0, scheduler.numberOfRequestsFor(pnfsId)); + } + + @Test + public void shouldNotCountRequestsForDifferentPnfsId() throws Exception { + PnfsId otherPnfsId = new PnfsId("000000000000000000000000000000000002"); + addMover(otherPnfsId, false); + + assertEquals(0, scheduler.numberOfRequestsFor(pnfsId)); + } + + private void addMover(PnfsId pnfsId, boolean isP2P) throws Exception { + Mover mover = mock(Mover.class); + FileAttributes attributes = new FileAttributes(); + attributes.setPnfsId(pnfsId); + when(mover.getFileAttributes()).thenReturn(attributes); + when(mover.isPoolToPoolTransfer()).thenReturn(isP2P); + + MoverSupplier supplier = mock(MoverSupplier.class); + when(supplier.createMover()).thenReturn(mover); + + scheduler.getOrCreateMover(supplier, "door-" + System.nanoTime(), IoPriority.REGULAR); + } +}