From 47b95a8ed01c1fa063969f221c0a7a85d94471f5 Mon Sep 17 00:00:00 2001 From: Shawn McKee Date: Tue, 17 Mar 2026 13:09:11 -0400 Subject: [PATCH 1/5] Fix SciTag WAN TPC marker generation and validation (cherry picked from commit 1b1914c6dcf36b3edd44879ab7732e4e8642bba3) Signed-off-by: Shawn McKee (cherry picked from commit cfa848240d5300fa03efd3cbc374038f079d83d4) Signed-off-by: Tigran Mkrtchyan --- .../dcache/webdav/DcacheResourceFactory.java | 78 ++++++---- .../webdav/DcacheResourceFactoryTest.java | 59 ++++++++ .../dcache/xrootd/door/XrootdTransfer.java | 35 +++-- .../AbstractMoverProtocolTransferService.java | 109 ++++++++++++++ .../classic/DefaultPostTransferService.java | 110 +++++++++++++- .../dcache/pool/movers/TransferLifeCycle.java | 135 ++++++++++++++++-- .../org/dcache/pool/classic/pool.xml | 1 + .../pool/movers/TransferLifeCycleTest.java | 48 ++++++- 8 files changed, 525 insertions(+), 50 deletions(-) create mode 100644 modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java diff --git a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java index 30aa97a5514..ea8e4dc148d 100644 --- a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java +++ b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java @@ -169,11 +169,32 @@ * This ResourceFactory exposes the dCache name space through the Milton WebDAV framework. */ public class DcacheResourceFactory - extends AbstractCellComponent - implements ResourceFactory, CellMessageReceiver, CellCommandListener, CellInfoProvider { + extends AbstractCellComponent + implements ResourceFactory, CellMessageReceiver, CellCommandListener, CellInfoProvider { private static final Logger LOGGER = - LoggerFactory.getLogger(DcacheResourceFactory.class); + LoggerFactory.getLogger(DcacheResourceFactory.class); + private static final Logger SCITAGS_LOGGER = + LoggerFactory.getLogger("org.dcache.scitags"); + + static Optional> findHeaderIgnoreCase(HttpServletRequest request, + String expectedHeaderName) { + Enumeration headerNames = request.getHeaderNames(); + if (headerNames != null) { + while (headerNames.hasMoreElements()) { + String actualHeaderName = headerNames.nextElement(); + if (actualHeaderName.equalsIgnoreCase(expectedHeaderName)) { + return Optional.of(Map.entry(actualHeaderName, + request.getHeader(actualHeaderName))); + } + } + } + + String headerValue = request.getHeader(expectedHeaderName); + return headerValue == null + ? Optional.empty() + : Optional.of(Map.entry(expectedHeaderName, headerValue)); + } private static final XMLOutputFactory XML_OUTPUT_FACTORY = XMLOutputFactory.newFactory(); @@ -1697,11 +1718,11 @@ private class HttpTransfer extends RedirectedTransfer { /** * The original request path that will be passed to pool for fall-back redirect. */ - private final String _requestPath; - private String _transferTag = ""; + private final String _requestPath; + private String _transferTag = ""; - private static final String HEADER_SCITAG = "SciTag"; - private static final String HEADER_TRANSFER_HEADER_SCITAG = "TransferHeaderSciTag"; + private static final String HEADER_SCITAG = "SciTag"; + private static final String HEADER_TRANSFER_HEADER_SCITAG = "TransferHeaderSciTag"; private static final String[] SCITAG_HEADERS = { HEADER_SCITAG, HEADER_TRANSFER_HEADER_SCITAG @@ -1722,34 +1743,43 @@ public HttpTransfer(PnfsHandler pnfs, Subject subject, private String readTransferTag(HttpServletRequest request) { // SciTag takes precedence because it is checked first. for (String header : SCITAG_HEADERS) { - String transferTag = request.getHeader(header); - if (transferTag != null && !transferTag.isBlank()) { - String trimmed = transferTag.trim(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} header found: {} (from client={})", - header, trimmed, request.getRemoteAddr()); + var matchedHeader = findHeaderIgnoreCase(request, header); + if (matchedHeader.isPresent()) { + String transferTag = matchedHeader.get().getValue(); + if (transferTag != null && !transferTag.isBlank()) { + return logSciTagsRequest(request, + matchedHeader.get().getKey() + "-header", + transferTag.trim()); } - return trimmed; } } String flowFromQuery = request.getParameter("scitag.flow"); if (flowFromQuery != null && !flowFromQuery.isBlank()) { - String trimmed = flowFromQuery.trim(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("scitag.flow query parameter found: {} (from client={})", - trimmed, request.getRemoteAddr()); - } - return trimmed; + return logSciTagsRequest(request, "scitag.flow-query", flowFromQuery.trim()); } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("No SciTag header/parameter found in request (client={})", - request.getRemoteAddr()); - } + logSciTagsRequest(request, "none", ""); return ""; } + private String logSciTagsRequest(HttpServletRequest request, String tagSource, + String transferTag) { + if (SCITAGS_LOGGER.isDebugEnabled()) { + SCITAGS_LOGGER.debug( + "scitags event=request protocol={} door={} remote={} method={} alias={} local={} tagSource={} transferTag={}", + request.isSecure() ? PROTOCOL_INFO_SSL_NAME : PROTOCOL_INFO_NAME, + getCellName() + '@' + getCellDomainName(), + request.getRemoteAddr(), + request.getMethod(), + request.getServerName(), + request.getLocalAddr(), + tagSource, + transferTag.isEmpty() ? "-" : transferTag); + } + return transferTag; + } + protected ProtocolInfo createProtocolInfo(InetSocketAddress address) { List wantedChecksums = _wantedChecksum == null ? Collections.emptyList() diff --git a/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java b/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java new file mode 100644 index 00000000000..9219219c2e4 --- /dev/null +++ b/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java @@ -0,0 +1,59 @@ +package org.dcache.webdav; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import org.junit.Test; + +public class DcacheResourceFactoryTest { + + @Test + public void shouldFindSciTagHeaderIgnoringCase() { + HttpServletRequest request = mock(HttpServletRequest.class); + given(request.getHeaderNames()).willReturn(Collections.enumeration(List.of("scitag"))); + given(request.getHeader("scitag")).willReturn("313"); + + Optional> header = + DcacheResourceFactory.findHeaderIgnoreCase(request, "SciTag"); + + assertThat(header.isPresent(), is(true)); + assertThat(header.get().getKey(), is("scitag")); + assertThat(header.get().getValue(), is("313")); + } + + @Test + public void shouldFindTransferHeaderSciTagIgnoringCase() { + HttpServletRequest request = mock(HttpServletRequest.class); + given(request.getHeaderNames()).willReturn( + Collections.enumeration(List.of("transferheaderscitag"))); + given(request.getHeader("transferheaderscitag")).willReturn("777"); + + Optional> header = + DcacheResourceFactory.findHeaderIgnoreCase(request, "TransferHeaderSciTag"); + + assertThat(header.isPresent(), is(true)); + assertThat(header.get().getKey(), is("transferheaderscitag")); + assertThat(header.get().getValue(), is("777")); + } + + @Test + public void shouldFallbackToServletHeaderLookup() { + HttpServletRequest request = mock(HttpServletRequest.class); + given(request.getHeaderNames()).willReturn(null); + given(request.getHeader("SciTag")).willReturn("313"); + + Optional> header = + DcacheResourceFactory.findHeaderIgnoreCase(request, "SciTag"); + + assertThat(header.isPresent(), is(true)); + assertThat(header.get().getKey(), is("SciTag")); + assertThat(header.get().getValue(), is("313")); + } +} \ No newline at end of file diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java index abbcc49e626..241ebdad43d 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java @@ -26,6 +26,7 @@ public class XrootdTransfer extends RedirectedTransfer { private static final Logger LOGGER = LoggerFactory.getLogger(XrootdTransfer.class); + private static final Logger SCITAGS_LOGGER = LoggerFactory.getLogger("org.dcache.scitags"); private UUID _uuid; private InetSocketAddress _doorAddress; @@ -43,15 +44,7 @@ public XrootdTransfer(PnfsHandler pnfs, Subject subject, this.restriction = requireNonNull(restriction); tpcInfo = new XrootdTpcInfo(opaque); _transferTag = opaque.getOrDefault("scitag.flow", ""); - if (LOGGER.isDebugEnabled()) { - if (!_transferTag.isEmpty()) { - LOGGER.debug("scitag.flow parameter found: {}", _transferTag); - } else if (opaque.containsKey("scitag.flow")) { - LOGGER.debug("scitag.flow parameter found but empty"); - } else { - LOGGER.debug("No scitag.flow parameter in this request"); - } - } + logSciTagsRequest(opaque); try { tpcInfo.setUid(Subjects.getUid(subject)); } catch (NoSuchElementException e) { @@ -96,6 +89,30 @@ protected synchronized ProtocolInfo createProtocolInfo() { return createXrootdProtocolInfo(); } + private void logSciTagsRequest(Map opaque) { + if (SCITAGS_LOGGER.isDebugEnabled()) { + String tagSource = !_transferTag.isEmpty() + ? "scitag.flow" + : opaque.containsKey("scitag.flow") ? "scitag.flow-empty" : "none"; + SCITAGS_LOGGER.debug( + "scitags event=request protocol=xrootd door={} remote={} tagSource={} transferTag={}", + getCellName() + '@' + getDomainName(), + formatAddress(getClientAddress()), + tagSource, + _transferTag.isEmpty() ? "-" : _transferTag); + } + } + + private String formatAddress(InetSocketAddress address) { + if (address == null) { + return "-"; + } + + return address.getAddress() == null + ? address.getHostString() + : address.getAddress().getHostAddress(); + } + @Override protected ProtocolInfo getProtocolInfoForPoolManager() { ProtocolInfo info = createProtocolInfo(); diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java index 73bf90beba4..8798a6aa97b 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java @@ -20,6 +20,7 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import diskCacheV111.util.CacheException; +import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.PoolIoFileMessage; import diskCacheV111.vehicles.ProtocolInfo; import dmg.cells.nucleus.AbstractCellComponent; @@ -27,17 +28,20 @@ import dmg.cells.nucleus.CellPath; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.io.SyncFailedException; import java.lang.reflect.InvocationTargetException; import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; import java.nio.file.StandardOpenOption; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.dcache.pool.movers.ChecksumMover; import org.dcache.pool.movers.Mover; import org.dcache.pool.movers.MoverProtocol; import org.dcache.pool.movers.MoverProtocolMover; +import org.dcache.pool.movers.TransferLifeCycle; import org.dcache.pool.repository.ReplicaDescriptor; import org.dcache.pool.repository.RepositoryChannel; import org.dcache.util.CDCExecutorServiceDecorator; @@ -52,12 +56,15 @@ public abstract class AbstractMoverProtocolTransferService private static final Logger LOGGER = LoggerFactory.getLogger(MoverMapTransferService.class); + private static final Logger SCITAGS_LOGGER = + LoggerFactory.getLogger("org.dcache.scitags"); private final ExecutorService _executor = new CDCExecutorServiceDecorator<>( Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat( getClass().getSimpleName() + "-transfer-service-%d").build())); private PostTransferService _postTransferService; + private TransferLifeCycle _transferLifeCycle; @Required @@ -65,6 +72,10 @@ public void setPostTransferService(PostTransferService postTransferService) { _postTransferService = postTransferService; } + public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) { + _transferLifeCycle = transferLifeCycle; + } + @Override public Mover createMover(ReplicaDescriptor handle, PoolIoFileMessage message, CellPath pathToDoor) @@ -155,10 +166,22 @@ public void run() { _completionHandler.completed(null, null); } catch (InterruptedException e) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=start abort reason=interrupted protocol={} pnfsid={} transferTag={} message={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + transferTag(), + formatError(e)); InterruptedException why = _explanation == null ? e : (InterruptedException) (new InterruptedException(_explanation).initCause(e)); _completionHandler.failed(why, null); } catch (Throwable t) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=start abort reason=execution-failed protocol={} pnfsid={} transferTag={} message={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + transferTag(), + formatError(t)); _completionHandler.failed(t, null); } } @@ -176,6 +199,7 @@ private void runMoverForRead(RepositoryChannel fileIoChannel) throws Exception { _mover.getMover() .runIO(_mover.getFileAttributes(), fileIoChannel, _mover.getProtocolInfo(), _mover.getIoMode()); + reportTransferStart(); } private void tryToSync(RepositoryChannel channel) throws IOException { @@ -195,11 +219,96 @@ private void runMoverForWrite(RepositoryChannel fileIoChannel) throws Exception _mover.getMover() .runIO(_mover.getFileAttributes(), fileIoChannel, _mover.getProtocolInfo(), _mover.getIoMode()); + reportTransferStart(); } finally { tryToSync(fileIoChannel); } } + private void reportTransferStart() { + if (_transferLifeCycle == null) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=start skip reason=no-transfer-lifecycle protocol={} pnfsid={} transferTag={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + transferTag()); + return; + } + + if (!(_mover.getProtocolInfo() instanceof IpProtocolInfo ipProtocolInfo)) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=start skip reason=non-ip-protocol protocol={} pnfsid={} transferTag={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + transferTag()); + return; + } + + InetSocketAddress remoteEndpoint = ipProtocolInfo.getSocketAddress(); + if (remoteEndpoint == null) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=start skip reason=no-remote-endpoint protocol={} pnfsid={} transferTag={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + transferTag()); + return; + } + + Optional localEndpoint = _mover.getLocalEndpoint(); + if (localEndpoint.isEmpty()) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=start skip reason=no-local-endpoint protocol={} pnfsid={} remote={} transferTag={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + formatEndpoint(remoteEndpoint), + transferTag()); + return; + } + + InetSocketAddress local = localEndpoint.get(); + SCITAGS_LOGGER.debug( + "scitags lifecycle=start invoke protocol={} pnfsid={} remote={} local={} transferTag={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + formatEndpoint(remoteEndpoint), + formatEndpoint(local), + transferTag()); + + _transferLifeCycle.onStart( + remoteEndpoint, + local, + _mover.getProtocolInfo(), + _mover.getSubject()); + } + + private String protocolName() { + return _mover.getProtocolInfo().getProtocol().toLowerCase(); + } + + private String transferTag() { + String transferTag = _mover.getProtocolInfo().getTransferTag(); + return transferTag == null || transferTag.isEmpty() ? "-" : transferTag; + } + + private String formatEndpoint(InetSocketAddress endpoint) { + if (endpoint == null) { + return "-"; + } + + if (endpoint.getAddress() != null) { + return endpoint.getAddress().getHostAddress() + ":" + endpoint.getPort(); + } + + return endpoint.getHostString() + ":" + endpoint.getPort(); + } + + private String formatError(Throwable t) { + String message = t.getMessage(); + return message == null || message.isEmpty() + ? t.getClass().getSimpleName() + : t.getClass().getSimpleName() + ':' + message; + } + private synchronized void setThread() throws InterruptedException { if (_needInterruption) { throw new InterruptedException("Thread interrupted before execution"); diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java index bc7c73cbb7b..1cd688948fd 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java @@ -30,8 +30,12 @@ import dmg.cells.nucleus.CellInfoProvider; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketException; import java.nio.channels.CompletionHandler; import java.nio.file.StandardOpenOption; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -47,6 +51,7 @@ import org.dcache.pool.statistics.SnapshotStatistics; import org.dcache.util.CDCExecutorServiceDecorator; import org.dcache.util.FireAndForgetTask; +import org.dcache.util.NetworkUtils; import org.dcache.vehicles.FileAttributes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +65,7 @@ public class DefaultPostTransferService extends AbstractCellComponent implements PostTransferService, CellInfoProvider { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPostTransferService.class); + private static final Logger SCITAGS_LOGGER = LoggerFactory.getLogger("org.dcache.scitags"); private final ExecutorService _executor = new CDCExecutorServiceDecorator<>( @@ -239,14 +245,110 @@ public void sendFinished(Mover mover, MoverInfoMessage moverInfoMessage) { finished.setReply(mover.getErrorCode(), mover.getErrorMessage()); } - mover.getLocalEndpoint().ifPresent(e -> - transferLifeCycle.onEnd(((IpProtocolInfo) mover.getProtocolInfo()).getSocketAddress(), - e, - moverInfoMessage)); + if (transferLifeCycle == null) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=no-transfer-lifecycle protocol={} pnfsid={} transferTag={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + transferTag(mover)); + } else if (!(mover.getProtocolInfo() instanceof IpProtocolInfo ipProtocolInfo)) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=non-ip-protocol protocol={} pnfsid={} transferTag={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + transferTag(mover)); + } else { + InetSocketAddress remoteEndpoint = ipProtocolInfo.getSocketAddress(); + if (remoteEndpoint == null) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=no-remote-endpoint protocol={} pnfsid={} transferTag={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + transferTag(mover)); + } else { + Optional localEndpoint = mover.getLocalEndpoint(); + String localEndpointSource = "mover"; + + if (localEndpoint.isEmpty()) { + localEndpoint = deriveLocalEndpoint(remoteEndpoint); + localEndpointSource = "derived"; + } + + if (localEndpoint.isEmpty()) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=no-local-endpoint protocol={} pnfsid={} remote={} transferTag={} connectionTime={} bytesTransferred={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + formatEndpoint(remoteEndpoint), + transferTag(mover), + moverInfoMessage.getConnectionTime(), + moverInfoMessage.getDataTransferred()); + } else { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end invoke protocol={} pnfsid={} remote={} local={} localSource={} transferTag={} connectionTime={} bytesTransferred={} errorCode={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + formatEndpoint(remoteEndpoint), + formatEndpoint(localEndpoint.get()), + localEndpointSource, + transferTag(mover), + moverInfoMessage.getConnectionTime(), + moverInfoMessage.getDataTransferred(), + mover.getErrorCode()); + transferLifeCycle.onEnd( + remoteEndpoint, + localEndpoint.get(), + moverInfoMessage); + } + } + } _door.notify(mover.getPathToDoor(), finished); } + private Optional deriveLocalEndpoint(InetSocketAddress remoteEndpoint) { + if (remoteEndpoint == null) { + return Optional.empty(); + } + + InetAddress remoteAddress = remoteEndpoint.getAddress(); + if (remoteAddress == null) { + return Optional.empty(); + } + + try { + InetAddress localAddress = NetworkUtils.getLocalAddress(remoteAddress); + return Optional.ofNullable(localAddress).map(address -> new InetSocketAddress(address, 0)); + } catch (SocketException e) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=local-endpoint-derivation-failed remote={} message={}", + formatEndpoint(remoteEndpoint), + e.getMessage()); + return Optional.empty(); + } + } + + private String protocolName(Mover mover) { + return mover.getProtocolInfo().getProtocol().toLowerCase(); + } + + private String transferTag(Mover mover) { + String transferTag = mover.getProtocolInfo().getTransferTag(); + return transferTag == null || transferTag.isEmpty() ? "-" : transferTag; + } + + private String formatEndpoint(InetSocketAddress endpoint) { + if (endpoint == null) { + return "-"; + } + + if (endpoint.getAddress() != null) { + return endpoint.getAddress().getHostAddress() + ":" + endpoint.getPort(); + } + + return endpoint.getHostString() + ":" + endpoint.getPort(); + } + public void shutdown() { MoreExecutors.shutdownAndAwaitTermination(_executor, 10, TimeUnit.SECONDS); } diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java index 1962f62d86b..a6a3798de7a 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java @@ -50,6 +50,7 @@ public class TransferLifeCycle { private final static Logger LOGGER = LoggerFactory.getLogger(TransferLifeCycle.class); + private final static Logger SCITAGS_LOGGER = LoggerFactory.getLogger("org.dcache.scitags"); private static final int MIN_VALID_TRANSFER_TAG = 64; private static final int MAX_VALID_TRANSFER_TAG = 65535; private static final int EXPERIMENT_ID_BIT_SHIFT = 6; @@ -91,26 +92,32 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p Subject subject) { if (!enabled) { + logSkippedMarker("disabled", src, dst, protocolInfo); return; } if (isExcludedTransfer(src, dst)) { + logSkippedMarker("excluded", src, dst, protocolInfo); return; } if (!needMarker(protocolInfo)) { + logSkippedMarker("unsupported-protocol", src, dst, protocolInfo); return; } - var optionalExpId = getExperimentId(protocolInfo, subject); - if (optionalExpId.isEmpty()) { + var optionalExpId = getExperimentId(protocolInfo, subject); + if (optionalExpId.isEmpty()) { + logSkippedMarker("no-experiment-id", src, dst, protocolInfo); return; - } + } - var data = new FlowMarkerBuilder() + int activityId = getActivity(protocolInfo); + + var data = new FlowMarkerBuilder() .withStartedAt(Instant.now()) .withExperimentId(optionalExpId.getAsInt()) - .withActivityId(getActivity(protocolInfo)) + .withActivityId(activityId) .wittApplication(getApplication(protocolInfo)) .withProtocol("tcp") .withAFI(toAFI(dst)) @@ -118,7 +125,10 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p .withSource(src) .build("start"); - sendToMultipleDestinations(toFireflyDestination.apply(src), data); + InetSocketAddress fireflyDestination = toFireflyDestination.apply(src); + sendToMultipleDestinations(fireflyDestination, data); + logMarkerEvent("start", src, dst, protocolInfo, optionalExpId.getAsInt(), activityId, + null, null, fireflyDestination); } /** @@ -134,27 +144,36 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage if (!enabled) { + logSkippedMarker("disabled", src, dst, protocolInfo); return; } if (isExcludedTransfer(src, dst)) { + logSkippedMarker("excluded", src, dst, protocolInfo); return; } if (!needMarker(protocolInfo)) { + logSkippedMarker("unsupported-protocol", src, dst, protocolInfo); return; } var optionalExpId = getExperimentId(protocolInfo, subject); if (optionalExpId.isEmpty()) { + logSkippedMarker("no-experiment-id", src, dst, protocolInfo); return; } + int activityId = getActivity(protocolInfo); + long transferDurationMillis = Math.max(1L, mover.getConnectionTime()); + Instant finishedAt = Instant.now(); + Instant startedAt = finishedAt.minusMillis(transferDurationMillis); + var data = new FlowMarkerBuilder() - .withStartedAt(Instant.now()) - .withFinishedAt(Instant.now()) + .withStartedAt(startedAt) + .withFinishedAt(finishedAt) .withExperimentId(optionalExpId.getAsInt()) - .withActivityId(getActivity(protocolInfo)) + .withActivityId(activityId) .wittApplication(getApplication(protocolInfo)) .withUsage(mover.getBytesRead(), mover.getBytesWritten()) .withProtocol("tcp") @@ -166,7 +185,10 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage } var firefly = data.build("end"); - sendToMultipleDestinations(toFireflyDestination.apply(src), firefly); + InetSocketAddress fireflyDestination = toFireflyDestination.apply(src); + sendToMultipleDestinations(fireflyDestination, firefly); + logMarkerEvent("end", src, dst, protocolInfo, optionalExpId.getAsInt(), activityId, + mover.getBytesRead(), mover.getBytesWritten(), fireflyDestination); } /** @@ -293,7 +315,7 @@ private String getApplication(ProtocolInfo protocolInfo) { * @return the experiment ID, or -1 if it cannot be determined */ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject) { - if (protocolInfo.getTransferTag() != null && !protocolInfo.getTransferTag().isEmpty()) { + if (hasTransferTag(protocolInfo)) { try { int transferTag = Integer.parseInt(protocolInfo.getTransferTag()); if (transferTag < MIN_VALID_TRANSFER_TAG || transferTag > MAX_VALID_TRANSFER_TAG) { @@ -335,7 +357,7 @@ private boolean isExcludedTransfer(InetSocketAddress src, InetSocketAddress dst) } private int getActivity(ProtocolInfo protocolInfo) { - if (!protocolInfo.getTransferTag().isEmpty()) { + if (hasTransferTag(protocolInfo)) { // scitag = exp_id << 6 | act_id return Integer.parseInt(protocolInfo.getTransferTag()) & ACTIVITY_ID_MASK; } else { @@ -343,6 +365,95 @@ private int getActivity(ProtocolInfo protocolInfo) { } } + private boolean hasTransferTag(ProtocolInfo protocolInfo) { + String transferTag = protocolInfo.getTransferTag(); + return transferTag != null && !transferTag.isEmpty(); + } + + private void logSkippedMarker(String reason, InetSocketAddress src, InetSocketAddress dst, + ProtocolInfo protocolInfo) { + if (SCITAGS_LOGGER.isDebugEnabled()) { + SCITAGS_LOGGER.debug( + "scitags event=marker-skip reason={} protocol={} source={} sourcePort={} destination={} destinationPort={} transferTag={}", + reason, + protocolInfo.getProtocol().toLowerCase(), + formatAddress(src), + formatPort(src), + formatAddress(dst), + formatPort(dst), + formatTransferTag(protocolInfo)); + } + } + + private void logMarkerEvent(String event, InetSocketAddress src, InetSocketAddress dst, + ProtocolInfo protocolInfo, int experimentId, int activityId, Double bytesRead, + Double bytesWritten, InetSocketAddress fireflyDestination) { + if (SCITAGS_LOGGER.isDebugEnabled()) { + String classification = hasTransferTag(protocolInfo) ? "transfer-tag" : "fqan"; + if (bytesRead == null || bytesWritten == null) { + SCITAGS_LOGGER.debug( + "scitags event=marker state={} protocol={} source={} sourcePort={} destination={} destinationPort={} transferTag={} experimentId={} activityId={} classifier={} fireflyDestination={} collector={}", + event, + protocolInfo.getProtocol().toLowerCase(), + formatAddress(src), + formatPort(src), + formatAddress(dst), + formatPort(dst), + formatTransferTag(protocolInfo), + experimentId, + activityId, + classification, + formatSocketAddress(fireflyDestination), + formatCollectorDestination()); + } else { + SCITAGS_LOGGER.debug( + "scitags event=marker state={} protocol={} source={} sourcePort={} destination={} destinationPort={} transferTag={} experimentId={} activityId={} classifier={} bytesRead={} bytesWritten={} fireflyDestination={} collector={}", + event, + protocolInfo.getProtocol().toLowerCase(), + formatAddress(src), + formatPort(src), + formatAddress(dst), + formatPort(dst), + formatTransferTag(protocolInfo), + experimentId, + activityId, + classification, + bytesRead, + bytesWritten, + formatSocketAddress(fireflyDestination), + formatCollectorDestination()); + } + } + } + + private String formatSocketAddress(InetSocketAddress address) { + return formatAddress(address) + ":" + formatPort(address); + } + + private String formatCollectorDestination() { + return fireflyCollector == null + ? "-" + : formatAddress(fireflyCollector) + ":" + fireflyCollector.getPort(); + } + + private String formatTransferTag(ProtocolInfo protocolInfo) { + String transferTag = protocolInfo.getTransferTag(); + return transferTag == null || transferTag.isEmpty() ? "-" : transferTag; + } + + private String formatAddress(InetSocketAddress address) { + if (address == null) { + return "-"; + } + + InetAddress inetAddress = address.getAddress(); + return inetAddress == null ? address.getHostString() : inetAddress.getHostAddress(); + } + + private String formatPort(InetSocketAddress address) { + return address == null ? "-" : Integer.toString(address.getPort()); + } + private String toAFI(InetSocketAddress dst) { var addr = dst.getAddress(); diff --git a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml index ce20e7258ae..e0093d71fe2 100644 --- a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml +++ b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml @@ -496,6 +496,7 @@ + diff --git a/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java b/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java index af1b35adcae..c2fdee14d6e 100644 --- a/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java +++ b/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java @@ -4,16 +4,23 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.MoverInfoMessage; import diskCacheV111.vehicles.ProtocolInfo; +import dmg.cells.nucleus.CellAddressCore; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.lang.reflect.Method; import java.util.OptionalInt; import javax.security.auth.Subject; import org.dcache.auth.FQANPrincipal; +import org.json.JSONObject; import org.junit.Before; import org.junit.Test; @@ -73,6 +80,45 @@ public void shouldNotSuppressMarkerWhenOnlyDestinationIsExcluded() throws Except assertTrue(sendsStartMarker("203.0.113.20", "10.20.20.20", "10.0.0.0/8")); } + @Test + public void shouldSetEndMarkerStartTimeFromTransferDuration() throws Exception { + try (DatagramSocket socket = new DatagramSocket(0, InetAddress.getByName("127.0.0.1"))) { + socket.setSoTimeout(700); + + TransferLifeCycle lifecycle = new TransferLifeCycle(); + lifecycle.setEnabled(true); + lifecycle.setVoMapping("atlas:2"); + lifecycle.setFireflyDestination("127.0.0.1:" + socket.getLocalPort()); + + long transferDuration = 2500; + MoverInfoMessage mover = new MoverInfoMessage(new CellAddressCore("pool", "test"), + new PnfsId("000000000001")); + mover.setTransferAttributes(1024, transferDuration, + new TestProtocolInfo("xrootd", "129")); + mover.setSubject(new Subject()); + mover.setBytesRead(1024); + mover.setBytesWritten(0); + + lifecycle.onEnd( + new InetSocketAddress("203.0.113.20", 42000), + new InetSocketAddress("198.51.100.55", 2880), + mover); + + var packet = new DatagramPacket(new byte[4096], 4096); + socket.receive(packet); + + String payload = new String(packet.getData(), 0, packet.getLength(), + StandardCharsets.UTF_8); + JSONObject lifecyclePayload = new JSONObject(payload.substring(payload.indexOf('{'))) + .getJSONObject("flow-lifecycle"); + + Instant startTime = Instant.parse(lifecyclePayload.getString("start-time")); + Instant endTime = Instant.parse(lifecyclePayload.getString("end-time")); + + assertTrue(endTime.isAfter(startTime)); + assertEquals(transferDuration, Duration.between(startTime, endTime).toMillis()); + } + } private OptionalInt resolveExperimentId(String transferTag, Subject subject) throws Exception { return (OptionalInt) getExperimentId.invoke(transferLifeCycle, new TestProtocolInfo("xrootd", transferTag), subject); From 3927f9c53f4d2c11b2f97ef7e709cfcd1dc002f2 Mon Sep 17 00:00:00 2001 From: Shawn McKee Date: Tue, 17 Mar 2026 14:10:23 -0400 Subject: [PATCH 2/5] xrootd: fix logSciTagsRequest called before cell address initialization XrootdTransfer.logSciTagsRequest() was called from the constructor, but Transfer._cellAddress is set later via setCellAddress() in XrootdDoor.createTransfer(). Calling getCellName() before _cellAddress is initialized throws IllegalStateException, crashing every xrootd OPEN when org.dcache.scitags debug logging is enabled. Move the logSciTagsRequest call to XrootdDoor.createTransfer(), after both setCellAddress() and setClientAddress() have been called, and widen logSciTagsRequest visibility to package-private. Signed-off-by: Stuart McKee (cherry picked from commit 5cef4f70803731ba1fc7eea817f5774d363f5da5) Signed-off-by: Tigran Mkrtchyan --- .../src/main/java/org/dcache/xrootd/door/XrootdDoor.java | 1 + .../src/main/java/org/dcache/xrootd/door/XrootdTransfer.java | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java index 864ce724506..427d959308f 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java @@ -511,6 +511,7 @@ public synchronized void finished(CacheException error) { transfer.setKafkaSender(_kafkaSender); transfer.setTriedHosts(tried); transfer.setProxiedTransfer(proxied); + transfer.logSciTagsRequest(opaque); return transfer; } diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java index 241ebdad43d..48058f847c0 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java @@ -44,7 +44,6 @@ public XrootdTransfer(PnfsHandler pnfs, Subject subject, this.restriction = requireNonNull(restriction); tpcInfo = new XrootdTpcInfo(opaque); _transferTag = opaque.getOrDefault("scitag.flow", ""); - logSciTagsRequest(opaque); try { tpcInfo.setUid(Subjects.getUid(subject)); } catch (NoSuchElementException e) { @@ -89,7 +88,7 @@ protected synchronized ProtocolInfo createProtocolInfo() { return createXrootdProtocolInfo(); } - private void logSciTagsRequest(Map opaque) { + void logSciTagsRequest(Map opaque) { if (SCITAGS_LOGGER.isDebugEnabled()) { String tagSource = !_transferTag.isEmpty() ? "scitag.flow" From beed60bcbe9f8a437d347c5807d06049fda0fea2 Mon Sep 17 00:00:00 2001 From: Shawn McKee Date: Wed, 18 Mar 2026 11:16:58 -0400 Subject: [PATCH 3/5] firefly: address PR 8044 review fixes Signed-off-by: Shawn McKee (cherry picked from commit db228398b8fe14414dacec3e990faddc1fb9708e) Signed-off-by: Tigran Mkrtchyan --- .../dcache/webdav/DcacheResourceFactory.java | 44 ++++++----- .../webdav/DcacheResourceFactoryTest.java | 18 ++--- .../AbstractMoverProtocolTransferService.java | 73 ------------------- .../dcache/pool/movers/TransferLifeCycle.java | 10 +-- .../pool/movers/TransferLifeCycleTest.java | 6 +- 5 files changed, 38 insertions(+), 113 deletions(-) diff --git a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java index ea8e4dc148d..9f2d905383d 100644 --- a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java +++ b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java @@ -177,23 +177,19 @@ public class DcacheResourceFactory private static final Logger SCITAGS_LOGGER = LoggerFactory.getLogger("org.dcache.scitags"); - static Optional> findHeaderIgnoreCase(HttpServletRequest request, + static Optional findHeaderIgnoreCase(HttpServletRequest request, String expectedHeaderName) { Enumeration headerNames = request.getHeaderNames(); if (headerNames != null) { while (headerNames.hasMoreElements()) { String actualHeaderName = headerNames.nextElement(); if (actualHeaderName.equalsIgnoreCase(expectedHeaderName)) { - return Optional.of(Map.entry(actualHeaderName, - request.getHeader(actualHeaderName))); + return Optional.ofNullable(request.getHeader(actualHeaderName)); } } } - String headerValue = request.getHeader(expectedHeaderName); - return headerValue == null - ? Optional.empty() - : Optional.of(Map.entry(expectedHeaderName, headerValue)); + return Optional.ofNullable(request.getHeader(expectedHeaderName)); } private static final XMLOutputFactory XML_OUTPUT_FACTORY = XMLOutputFactory.newFactory(); @@ -1741,35 +1737,38 @@ public HttpTransfer(PnfsHandler pnfs, Subject subject, } private String readTransferTag(HttpServletRequest request) { + String door = getCellName() + '@' + getCellDomainName(); + // SciTag takes precedence because it is checked first. for (String header : SCITAG_HEADERS) { - var matchedHeader = findHeaderIgnoreCase(request, header); - if (matchedHeader.isPresent()) { - String transferTag = matchedHeader.get().getValue(); - if (transferTag != null && !transferTag.isBlank()) { - return logSciTagsRequest(request, - matchedHeader.get().getKey() + "-header", - transferTag.trim()); - } + var transferTag = findHeaderIgnoreCase(request, header) + .map(String::trim) + .filter(tag -> !tag.isEmpty()); + if (transferTag.isPresent()) { + logSciTagsRequest(request, door, header + "-header", transferTag.get()); + return transferTag.get(); } } - String flowFromQuery = request.getParameter("scitag.flow"); - if (flowFromQuery != null && !flowFromQuery.isBlank()) { - return logSciTagsRequest(request, "scitag.flow-query", flowFromQuery.trim()); + var flowFromQuery = Optional.ofNullable(request.getParameter("scitag.flow")) + .map(String::trim) + .filter(tag -> !tag.isEmpty()); + if (flowFromQuery.isPresent()) { + logSciTagsRequest(request, door, "scitag.flow-query", flowFromQuery.get()); + return flowFromQuery.get(); } - logSciTagsRequest(request, "none", ""); + logSciTagsRequest(request, door, "none", ""); return ""; } - private String logSciTagsRequest(HttpServletRequest request, String tagSource, - String transferTag) { + private static void logSciTagsRequest(HttpServletRequest request, String door, + String tagSource, String transferTag) { if (SCITAGS_LOGGER.isDebugEnabled()) { SCITAGS_LOGGER.debug( "scitags event=request protocol={} door={} remote={} method={} alias={} local={} tagSource={} transferTag={}", request.isSecure() ? PROTOCOL_INFO_SSL_NAME : PROTOCOL_INFO_NAME, - getCellName() + '@' + getCellDomainName(), + door, request.getRemoteAddr(), request.getMethod(), request.getServerName(), @@ -1777,7 +1776,6 @@ private String logSciTagsRequest(HttpServletRequest request, String tagSource, tagSource, transferTag.isEmpty() ? "-" : transferTag); } - return transferTag; } protected ProtocolInfo createProtocolInfo(InetSocketAddress address) { diff --git a/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java b/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java index 9219219c2e4..8fbfc9afac6 100644 --- a/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java +++ b/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java @@ -7,7 +7,6 @@ import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import javax.servlet.http.HttpServletRequest; import org.junit.Test; @@ -20,12 +19,11 @@ public void shouldFindSciTagHeaderIgnoringCase() { given(request.getHeaderNames()).willReturn(Collections.enumeration(List.of("scitag"))); given(request.getHeader("scitag")).willReturn("313"); - Optional> header = + Optional header = DcacheResourceFactory.findHeaderIgnoreCase(request, "SciTag"); assertThat(header.isPresent(), is(true)); - assertThat(header.get().getKey(), is("scitag")); - assertThat(header.get().getValue(), is("313")); + assertThat(header.get(), is("313")); } @Test @@ -35,12 +33,11 @@ public void shouldFindTransferHeaderSciTagIgnoringCase() { Collections.enumeration(List.of("transferheaderscitag"))); given(request.getHeader("transferheaderscitag")).willReturn("777"); - Optional> header = + Optional header = DcacheResourceFactory.findHeaderIgnoreCase(request, "TransferHeaderSciTag"); assertThat(header.isPresent(), is(true)); - assertThat(header.get().getKey(), is("transferheaderscitag")); - assertThat(header.get().getValue(), is("777")); + assertThat(header.get(), is("777")); } @Test @@ -49,11 +46,10 @@ public void shouldFallbackToServletHeaderLookup() { given(request.getHeaderNames()).willReturn(null); given(request.getHeader("SciTag")).willReturn("313"); - Optional> header = + Optional header = DcacheResourceFactory.findHeaderIgnoreCase(request, "SciTag"); assertThat(header.isPresent(), is(true)); - assertThat(header.get().getKey(), is("SciTag")); - assertThat(header.get().getValue(), is("313")); + assertThat(header.get(), is("313")); } -} \ No newline at end of file +} diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java index 8798a6aa97b..019a9ada2b2 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java @@ -20,7 +20,6 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import diskCacheV111.util.CacheException; -import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.PoolIoFileMessage; import diskCacheV111.vehicles.ProtocolInfo; import dmg.cells.nucleus.AbstractCellComponent; @@ -28,13 +27,11 @@ import dmg.cells.nucleus.CellPath; import java.io.IOException; import java.io.InterruptedIOException; -import java.net.InetSocketAddress; import java.io.SyncFailedException; import java.lang.reflect.InvocationTargetException; import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; import java.nio.file.StandardOpenOption; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.dcache.pool.movers.ChecksumMover; @@ -199,7 +196,6 @@ private void runMoverForRead(RepositoryChannel fileIoChannel) throws Exception { _mover.getMover() .runIO(_mover.getFileAttributes(), fileIoChannel, _mover.getProtocolInfo(), _mover.getIoMode()); - reportTransferStart(); } private void tryToSync(RepositoryChannel channel) throws IOException { @@ -219,68 +215,11 @@ private void runMoverForWrite(RepositoryChannel fileIoChannel) throws Exception _mover.getMover() .runIO(_mover.getFileAttributes(), fileIoChannel, _mover.getProtocolInfo(), _mover.getIoMode()); - reportTransferStart(); } finally { tryToSync(fileIoChannel); } } - private void reportTransferStart() { - if (_transferLifeCycle == null) { - SCITAGS_LOGGER.debug( - "scitags lifecycle=start skip reason=no-transfer-lifecycle protocol={} pnfsid={} transferTag={}", - protocolName(), - _mover.getFileAttributes().getPnfsId(), - transferTag()); - return; - } - - if (!(_mover.getProtocolInfo() instanceof IpProtocolInfo ipProtocolInfo)) { - SCITAGS_LOGGER.debug( - "scitags lifecycle=start skip reason=non-ip-protocol protocol={} pnfsid={} transferTag={}", - protocolName(), - _mover.getFileAttributes().getPnfsId(), - transferTag()); - return; - } - - InetSocketAddress remoteEndpoint = ipProtocolInfo.getSocketAddress(); - if (remoteEndpoint == null) { - SCITAGS_LOGGER.debug( - "scitags lifecycle=start skip reason=no-remote-endpoint protocol={} pnfsid={} transferTag={}", - protocolName(), - _mover.getFileAttributes().getPnfsId(), - transferTag()); - return; - } - - Optional localEndpoint = _mover.getLocalEndpoint(); - if (localEndpoint.isEmpty()) { - SCITAGS_LOGGER.debug( - "scitags lifecycle=start skip reason=no-local-endpoint protocol={} pnfsid={} remote={} transferTag={}", - protocolName(), - _mover.getFileAttributes().getPnfsId(), - formatEndpoint(remoteEndpoint), - transferTag()); - return; - } - - InetSocketAddress local = localEndpoint.get(); - SCITAGS_LOGGER.debug( - "scitags lifecycle=start invoke protocol={} pnfsid={} remote={} local={} transferTag={}", - protocolName(), - _mover.getFileAttributes().getPnfsId(), - formatEndpoint(remoteEndpoint), - formatEndpoint(local), - transferTag()); - - _transferLifeCycle.onStart( - remoteEndpoint, - local, - _mover.getProtocolInfo(), - _mover.getSubject()); - } - private String protocolName() { return _mover.getProtocolInfo().getProtocol().toLowerCase(); } @@ -290,18 +229,6 @@ private String transferTag() { return transferTag == null || transferTag.isEmpty() ? "-" : transferTag; } - private String formatEndpoint(InetSocketAddress endpoint) { - if (endpoint == null) { - return "-"; - } - - if (endpoint.getAddress() != null) { - return endpoint.getAddress().getHostAddress() + ":" + endpoint.getPort(); - } - - return endpoint.getHostString() + ":" + endpoint.getPort(); - } - private String formatError(Throwable t) { String message = t.getMessage(); return message == null || message.isEmpty() diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java index a6a3798de7a..e94c9c0c197 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java @@ -165,9 +165,9 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage } int activityId = getActivity(protocolInfo); - long transferDurationMillis = Math.max(1L, mover.getConnectionTime()); - Instant finishedAt = Instant.now(); - Instant startedAt = finishedAt.minusMillis(transferDurationMillis); + long transferDurationMillis = Math.max(0L, mover.getConnectionTime()); + Instant startedAt = Instant.ofEpochMilli(mover.getTimestamp()); + Instant finishedAt = startedAt.plusMillis(transferDurationMillis); var data = new FlowMarkerBuilder() .withStartedAt(startedAt) @@ -185,8 +185,8 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage } var firefly = data.build("end"); - InetSocketAddress fireflyDestination = toFireflyDestination.apply(src); - sendToMultipleDestinations(fireflyDestination, firefly); + InetSocketAddress fireflyDestination = toFireflyDestination.apply(src); + sendToMultipleDestinations(fireflyDestination, firefly); logMarkerEvent("end", src, dst, protocolInfo, optionalExpId.getAsInt(), activityId, mover.getBytesRead(), mover.getBytesWritten(), fireflyDestination); } diff --git a/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java b/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java index c2fdee14d6e..9300b2af3a6 100644 --- a/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java +++ b/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java @@ -99,6 +99,9 @@ public void shouldSetEndMarkerStartTimeFromTransferDuration() throws Exception { mover.setBytesRead(1024); mover.setBytesWritten(0); + Instant expectedStart = Instant.ofEpochMilli(mover.getTimestamp()); + Instant expectedEnd = expectedStart.plusMillis(transferDuration); + lifecycle.onEnd( new InetSocketAddress("203.0.113.20", 42000), new InetSocketAddress("198.51.100.55", 2880), @@ -115,7 +118,8 @@ public void shouldSetEndMarkerStartTimeFromTransferDuration() throws Exception { Instant startTime = Instant.parse(lifecyclePayload.getString("start-time")); Instant endTime = Instant.parse(lifecyclePayload.getString("end-time")); - assertTrue(endTime.isAfter(startTime)); + assertEquals(expectedStart, startTime); + assertEquals(expectedEnd, endTime); assertEquals(transferDuration, Duration.between(startTime, endTime).toMillis()); } } From ca6eb88ed703542daac53c6ab330e1d9a69a4711 Mon Sep 17 00:00:00 2001 From: Shawn McKee Date: Fri, 20 Mar 2026 10:16:05 -0400 Subject: [PATCH 4/5] firefly: address latest PR 8044 inline feedback Signed-off-by: Shawn McKee (cherry picked from commit d0be76403d48774ad6fc2f015d904137f96a87b0) Signed-off-by: Tigran Mkrtchyan --- .../main/java/org/dcache/xrootd/door/XrootdTransfer.java | 6 ++++-- .../pool/classic/AbstractMoverProtocolTransferService.java | 7 +++---- .../dcache/pool/classic/DefaultPostTransferService.java | 6 ++++-- .../java/org/dcache/pool/movers/TransferLifeCycle.java | 3 ++- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java index 48058f847c0..5389263f64e 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java @@ -1,5 +1,6 @@ package org.dcache.xrootd.door; +import static com.google.common.net.InetAddresses.toUriString; import static java.util.Objects.requireNonNull; import diskCacheV111.util.FsPath; @@ -107,9 +108,10 @@ private String formatAddress(InetSocketAddress address) { return "-"; } - return address.getAddress() == null + var inetAddress = address.getAddress(); + return inetAddress == null ? address.getHostString() - : address.getAddress().getHostAddress(); + : toUriString(inetAddress); } @Override diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java index 019a9ada2b2..89a632bfaf8 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java @@ -230,10 +230,9 @@ private String transferTag() { } private String formatError(Throwable t) { - String message = t.getMessage(); - return message == null || message.isEmpty() - ? t.getClass().getSimpleName() - : t.getClass().getSimpleName() + ':' + message; + return t instanceof Exception + ? Exceptions.messageOrClassName((Exception) t) + : t.getClass().getName(); } private synchronized void setThread() throws InterruptedException { diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java index 1cd688948fd..bcfc2333daf 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java @@ -17,6 +17,7 @@ */ package org.dcache.pool.classic; +import static com.google.common.net.InetAddresses.toUriString; import static org.dcache.util.Exceptions.messageOrClassName; import com.google.common.base.Throwables; @@ -342,8 +343,9 @@ private String formatEndpoint(InetSocketAddress endpoint) { return "-"; } - if (endpoint.getAddress() != null) { - return endpoint.getAddress().getHostAddress() + ":" + endpoint.getPort(); + InetAddress endpointAddress = endpoint.getAddress(); + if (endpointAddress != null) { + return toUriString(endpointAddress) + ":" + endpoint.getPort(); } return endpoint.getHostString() + ":" + endpoint.getPort(); diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java index e94c9c0c197..13b88df7fb4 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.net.InetAddresses.forString; +import static com.google.common.net.InetAddresses.toUriString; import com.google.common.base.Splitter; import com.google.common.net.HostAndPort; @@ -447,7 +448,7 @@ private String formatAddress(InetSocketAddress address) { } InetAddress inetAddress = address.getAddress(); - return inetAddress == null ? address.getHostString() : inetAddress.getHostAddress(); + return inetAddress == null ? address.getHostString() : toUriString(inetAddress); } private String formatPort(InetSocketAddress address) { From 37bb8eba8ec2aafc679c99d513e2bd3f48678bab Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Sat, 21 Mar 2026 10:19:00 +0100 Subject: [PATCH 5/5] pool: fix failing unit tests Fix NPE triggered by changes in commit cfa848240d5300fa03efd3cbc374038f079d83d4 Target: master Require-book: no Require-notes: no (cherry picked from commit 8d870cfbe399343adf2a5dcc2582657bba12ec95) Signed-off-by: Tigran Mkrtchyan --- .../AbstractMoverProtocolTransferServiceTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/modules/dcache/src/test/java/org/dcache/pool/classic/AbstractMoverProtocolTransferServiceTest.java b/modules/dcache/src/test/java/org/dcache/pool/classic/AbstractMoverProtocolTransferServiceTest.java index fe4f62e8526..b1534d6817f 100644 --- a/modules/dcache/src/test/java/org/dcache/pool/classic/AbstractMoverProtocolTransferServiceTest.java +++ b/modules/dcache/src/test/java/org/dcache/pool/classic/AbstractMoverProtocolTransferServiceTest.java @@ -33,6 +33,16 @@ public void setUp() throws Exception { moverProtocolMover = mock(MoverProtocolMover.class); when(moverProtocolMover.openChannel()).thenReturn(channel); + var protocolInfo = mock(ProtocolInfo.class); + when(moverProtocolMover.getProtocolInfo()).thenReturn(protocolInfo); + when(protocolInfo.getProtocol()).thenReturn("http"); + + var fileAttributes = FileAttributes.of() + .pnfsId("0000123456789012345678901234567890FF") + .build(); + + when(moverProtocolMover.getFileAttributes()).thenReturn(fileAttributes); + transferService = new AbstractMoverProtocolTransferService() { @Override protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception {