diff --git a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java index 30aa97a5514..9f2d905383d 100644 --- a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java +++ b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java @@ -169,11 +169,28 @@ * This ResourceFactory exposes the dCache name space through the Milton WebDAV framework. */ public class DcacheResourceFactory - extends AbstractCellComponent - implements ResourceFactory, CellMessageReceiver, CellCommandListener, CellInfoProvider { + extends AbstractCellComponent + implements ResourceFactory, CellMessageReceiver, CellCommandListener, CellInfoProvider { private static final Logger LOGGER = - LoggerFactory.getLogger(DcacheResourceFactory.class); + LoggerFactory.getLogger(DcacheResourceFactory.class); + private static final Logger SCITAGS_LOGGER = + LoggerFactory.getLogger("org.dcache.scitags"); + + static Optional findHeaderIgnoreCase(HttpServletRequest request, + String expectedHeaderName) { + Enumeration headerNames = request.getHeaderNames(); + if (headerNames != null) { + while (headerNames.hasMoreElements()) { + String actualHeaderName = headerNames.nextElement(); + if (actualHeaderName.equalsIgnoreCase(expectedHeaderName)) { + return Optional.ofNullable(request.getHeader(actualHeaderName)); + } + } + } + + return Optional.ofNullable(request.getHeader(expectedHeaderName)); + } private static final XMLOutputFactory XML_OUTPUT_FACTORY = XMLOutputFactory.newFactory(); @@ -1697,11 +1714,11 @@ private class HttpTransfer extends RedirectedTransfer { /** * The original request path that will be passed to pool for fall-back redirect. */ - private final String _requestPath; - private String _transferTag = ""; + private final String _requestPath; + private String _transferTag = ""; - private static final String HEADER_SCITAG = "SciTag"; - private static final String HEADER_TRANSFER_HEADER_SCITAG = "TransferHeaderSciTag"; + private static final String HEADER_SCITAG = "SciTag"; + private static final String HEADER_TRANSFER_HEADER_SCITAG = "TransferHeaderSciTag"; private static final String[] SCITAG_HEADERS = { HEADER_SCITAG, HEADER_TRANSFER_HEADER_SCITAG @@ -1720,36 +1737,47 @@ public HttpTransfer(PnfsHandler pnfs, Subject subject, } private String readTransferTag(HttpServletRequest request) { + String door = getCellName() + '@' + getCellDomainName(); + // SciTag takes precedence because it is checked first. for (String header : SCITAG_HEADERS) { - String transferTag = request.getHeader(header); - if (transferTag != null && !transferTag.isBlank()) { - String trimmed = transferTag.trim(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} header found: {} (from client={})", - header, trimmed, request.getRemoteAddr()); - } - return trimmed; + var transferTag = findHeaderIgnoreCase(request, header) + .map(String::trim) + .filter(tag -> !tag.isEmpty()); + if (transferTag.isPresent()) { + logSciTagsRequest(request, door, header + "-header", transferTag.get()); + return transferTag.get(); } } - String flowFromQuery = request.getParameter("scitag.flow"); - if (flowFromQuery != null && !flowFromQuery.isBlank()) { - String trimmed = flowFromQuery.trim(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("scitag.flow query parameter found: {} (from client={})", - trimmed, request.getRemoteAddr()); - } - return trimmed; + var flowFromQuery = Optional.ofNullable(request.getParameter("scitag.flow")) + .map(String::trim) + .filter(tag -> !tag.isEmpty()); + if (flowFromQuery.isPresent()) { + logSciTagsRequest(request, door, "scitag.flow-query", flowFromQuery.get()); + return flowFromQuery.get(); } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("No SciTag header/parameter found in request (client={})", - request.getRemoteAddr()); - } + logSciTagsRequest(request, door, "none", ""); return ""; } + private static void logSciTagsRequest(HttpServletRequest request, String door, + String tagSource, String transferTag) { + if (SCITAGS_LOGGER.isDebugEnabled()) { + SCITAGS_LOGGER.debug( + "scitags event=request protocol={} door={} remote={} method={} alias={} local={} tagSource={} transferTag={}", + request.isSecure() ? PROTOCOL_INFO_SSL_NAME : PROTOCOL_INFO_NAME, + door, + request.getRemoteAddr(), + request.getMethod(), + request.getServerName(), + request.getLocalAddr(), + tagSource, + transferTag.isEmpty() ? "-" : transferTag); + } + } + protected ProtocolInfo createProtocolInfo(InetSocketAddress address) { List wantedChecksums = _wantedChecksum == null ? Collections.emptyList() diff --git a/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java b/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java new file mode 100644 index 00000000000..8fbfc9afac6 --- /dev/null +++ b/modules/dcache-webdav/src/test/java/org/dcache/webdav/DcacheResourceFactoryTest.java @@ -0,0 +1,55 @@ +package org.dcache.webdav; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import org.junit.Test; + +public class DcacheResourceFactoryTest { + + @Test + public void shouldFindSciTagHeaderIgnoringCase() { + HttpServletRequest request = mock(HttpServletRequest.class); + given(request.getHeaderNames()).willReturn(Collections.enumeration(List.of("scitag"))); + given(request.getHeader("scitag")).willReturn("313"); + + Optional header = + DcacheResourceFactory.findHeaderIgnoreCase(request, "SciTag"); + + assertThat(header.isPresent(), is(true)); + assertThat(header.get(), is("313")); + } + + @Test + public void shouldFindTransferHeaderSciTagIgnoringCase() { + HttpServletRequest request = mock(HttpServletRequest.class); + given(request.getHeaderNames()).willReturn( + Collections.enumeration(List.of("transferheaderscitag"))); + given(request.getHeader("transferheaderscitag")).willReturn("777"); + + Optional header = + DcacheResourceFactory.findHeaderIgnoreCase(request, "TransferHeaderSciTag"); + + assertThat(header.isPresent(), is(true)); + assertThat(header.get(), is("777")); + } + + @Test + public void shouldFallbackToServletHeaderLookup() { + HttpServletRequest request = mock(HttpServletRequest.class); + given(request.getHeaderNames()).willReturn(null); + given(request.getHeader("SciTag")).willReturn("313"); + + Optional header = + DcacheResourceFactory.findHeaderIgnoreCase(request, "SciTag"); + + assertThat(header.isPresent(), is(true)); + assertThat(header.get(), is("313")); + } +} diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java index 864ce724506..427d959308f 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java @@ -511,6 +511,7 @@ public synchronized void finished(CacheException error) { transfer.setKafkaSender(_kafkaSender); transfer.setTriedHosts(tried); transfer.setProxiedTransfer(proxied); + transfer.logSciTagsRequest(opaque); return transfer; } diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java index abbcc49e626..5389263f64e 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdTransfer.java @@ -1,5 +1,6 @@ package org.dcache.xrootd.door; +import static com.google.common.net.InetAddresses.toUriString; import static java.util.Objects.requireNonNull; import diskCacheV111.util.FsPath; @@ -26,6 +27,7 @@ public class XrootdTransfer extends RedirectedTransfer { private static final Logger LOGGER = LoggerFactory.getLogger(XrootdTransfer.class); + private static final Logger SCITAGS_LOGGER = LoggerFactory.getLogger("org.dcache.scitags"); private UUID _uuid; private InetSocketAddress _doorAddress; @@ -43,15 +45,6 @@ public XrootdTransfer(PnfsHandler pnfs, Subject subject, this.restriction = requireNonNull(restriction); tpcInfo = new XrootdTpcInfo(opaque); _transferTag = opaque.getOrDefault("scitag.flow", ""); - if (LOGGER.isDebugEnabled()) { - if (!_transferTag.isEmpty()) { - LOGGER.debug("scitag.flow parameter found: {}", _transferTag); - } else if (opaque.containsKey("scitag.flow")) { - LOGGER.debug("scitag.flow parameter found but empty"); - } else { - LOGGER.debug("No scitag.flow parameter in this request"); - } - } try { tpcInfo.setUid(Subjects.getUid(subject)); } catch (NoSuchElementException e) { @@ -96,6 +89,31 @@ protected synchronized ProtocolInfo createProtocolInfo() { return createXrootdProtocolInfo(); } + void logSciTagsRequest(Map opaque) { + if (SCITAGS_LOGGER.isDebugEnabled()) { + String tagSource = !_transferTag.isEmpty() + ? "scitag.flow" + : opaque.containsKey("scitag.flow") ? "scitag.flow-empty" : "none"; + SCITAGS_LOGGER.debug( + "scitags event=request protocol=xrootd door={} remote={} tagSource={} transferTag={}", + getCellName() + '@' + getDomainName(), + formatAddress(getClientAddress()), + tagSource, + _transferTag.isEmpty() ? "-" : _transferTag); + } + } + + private String formatAddress(InetSocketAddress address) { + if (address == null) { + return "-"; + } + + var inetAddress = address.getAddress(); + return inetAddress == null + ? address.getHostString() + : toUriString(inetAddress); + } + @Override protected ProtocolInfo getProtocolInfoForPoolManager() { ProtocolInfo info = createProtocolInfo(); diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java index 73bf90beba4..89a632bfaf8 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java @@ -38,6 +38,7 @@ import org.dcache.pool.movers.Mover; import org.dcache.pool.movers.MoverProtocol; import org.dcache.pool.movers.MoverProtocolMover; +import org.dcache.pool.movers.TransferLifeCycle; import org.dcache.pool.repository.ReplicaDescriptor; import org.dcache.pool.repository.RepositoryChannel; import org.dcache.util.CDCExecutorServiceDecorator; @@ -52,12 +53,15 @@ public abstract class AbstractMoverProtocolTransferService private static final Logger LOGGER = LoggerFactory.getLogger(MoverMapTransferService.class); + private static final Logger SCITAGS_LOGGER = + LoggerFactory.getLogger("org.dcache.scitags"); private final ExecutorService _executor = new CDCExecutorServiceDecorator<>( Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat( getClass().getSimpleName() + "-transfer-service-%d").build())); private PostTransferService _postTransferService; + private TransferLifeCycle _transferLifeCycle; @Required @@ -65,6 +69,10 @@ public void setPostTransferService(PostTransferService postTransferService) { _postTransferService = postTransferService; } + public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) { + _transferLifeCycle = transferLifeCycle; + } + @Override public Mover createMover(ReplicaDescriptor handle, PoolIoFileMessage message, CellPath pathToDoor) @@ -155,10 +163,22 @@ public void run() { _completionHandler.completed(null, null); } catch (InterruptedException e) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=start abort reason=interrupted protocol={} pnfsid={} transferTag={} message={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + transferTag(), + formatError(e)); InterruptedException why = _explanation == null ? e : (InterruptedException) (new InterruptedException(_explanation).initCause(e)); _completionHandler.failed(why, null); } catch (Throwable t) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=start abort reason=execution-failed protocol={} pnfsid={} transferTag={} message={}", + protocolName(), + _mover.getFileAttributes().getPnfsId(), + transferTag(), + formatError(t)); _completionHandler.failed(t, null); } } @@ -200,6 +220,21 @@ private void runMoverForWrite(RepositoryChannel fileIoChannel) throws Exception } } + private String protocolName() { + return _mover.getProtocolInfo().getProtocol().toLowerCase(); + } + + private String transferTag() { + String transferTag = _mover.getProtocolInfo().getTransferTag(); + return transferTag == null || transferTag.isEmpty() ? "-" : transferTag; + } + + private String formatError(Throwable t) { + return t instanceof Exception + ? Exceptions.messageOrClassName((Exception) t) + : t.getClass().getName(); + } + private synchronized void setThread() throws InterruptedException { if (_needInterruption) { throw new InterruptedException("Thread interrupted before execution"); diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java index bc7c73cbb7b..bcfc2333daf 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java @@ -17,6 +17,7 @@ */ package org.dcache.pool.classic; +import static com.google.common.net.InetAddresses.toUriString; import static org.dcache.util.Exceptions.messageOrClassName; import com.google.common.base.Throwables; @@ -30,8 +31,12 @@ import dmg.cells.nucleus.CellInfoProvider; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketException; import java.nio.channels.CompletionHandler; import java.nio.file.StandardOpenOption; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -47,6 +52,7 @@ import org.dcache.pool.statistics.SnapshotStatistics; import org.dcache.util.CDCExecutorServiceDecorator; import org.dcache.util.FireAndForgetTask; +import org.dcache.util.NetworkUtils; import org.dcache.vehicles.FileAttributes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +66,7 @@ public class DefaultPostTransferService extends AbstractCellComponent implements PostTransferService, CellInfoProvider { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPostTransferService.class); + private static final Logger SCITAGS_LOGGER = LoggerFactory.getLogger("org.dcache.scitags"); private final ExecutorService _executor = new CDCExecutorServiceDecorator<>( @@ -239,14 +246,111 @@ public void sendFinished(Mover mover, MoverInfoMessage moverInfoMessage) { finished.setReply(mover.getErrorCode(), mover.getErrorMessage()); } - mover.getLocalEndpoint().ifPresent(e -> - transferLifeCycle.onEnd(((IpProtocolInfo) mover.getProtocolInfo()).getSocketAddress(), - e, - moverInfoMessage)); + if (transferLifeCycle == null) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=no-transfer-lifecycle protocol={} pnfsid={} transferTag={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + transferTag(mover)); + } else if (!(mover.getProtocolInfo() instanceof IpProtocolInfo ipProtocolInfo)) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=non-ip-protocol protocol={} pnfsid={} transferTag={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + transferTag(mover)); + } else { + InetSocketAddress remoteEndpoint = ipProtocolInfo.getSocketAddress(); + if (remoteEndpoint == null) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=no-remote-endpoint protocol={} pnfsid={} transferTag={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + transferTag(mover)); + } else { + Optional localEndpoint = mover.getLocalEndpoint(); + String localEndpointSource = "mover"; + + if (localEndpoint.isEmpty()) { + localEndpoint = deriveLocalEndpoint(remoteEndpoint); + localEndpointSource = "derived"; + } + + if (localEndpoint.isEmpty()) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=no-local-endpoint protocol={} pnfsid={} remote={} transferTag={} connectionTime={} bytesTransferred={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + formatEndpoint(remoteEndpoint), + transferTag(mover), + moverInfoMessage.getConnectionTime(), + moverInfoMessage.getDataTransferred()); + } else { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end invoke protocol={} pnfsid={} remote={} local={} localSource={} transferTag={} connectionTime={} bytesTransferred={} errorCode={}", + protocolName(mover), + mover.getFileAttributes().getPnfsId(), + formatEndpoint(remoteEndpoint), + formatEndpoint(localEndpoint.get()), + localEndpointSource, + transferTag(mover), + moverInfoMessage.getConnectionTime(), + moverInfoMessage.getDataTransferred(), + mover.getErrorCode()); + transferLifeCycle.onEnd( + remoteEndpoint, + localEndpoint.get(), + moverInfoMessage); + } + } + } _door.notify(mover.getPathToDoor(), finished); } + private Optional deriveLocalEndpoint(InetSocketAddress remoteEndpoint) { + if (remoteEndpoint == null) { + return Optional.empty(); + } + + InetAddress remoteAddress = remoteEndpoint.getAddress(); + if (remoteAddress == null) { + return Optional.empty(); + } + + try { + InetAddress localAddress = NetworkUtils.getLocalAddress(remoteAddress); + return Optional.ofNullable(localAddress).map(address -> new InetSocketAddress(address, 0)); + } catch (SocketException e) { + SCITAGS_LOGGER.debug( + "scitags lifecycle=end skip reason=local-endpoint-derivation-failed remote={} message={}", + formatEndpoint(remoteEndpoint), + e.getMessage()); + return Optional.empty(); + } + } + + private String protocolName(Mover mover) { + return mover.getProtocolInfo().getProtocol().toLowerCase(); + } + + private String transferTag(Mover mover) { + String transferTag = mover.getProtocolInfo().getTransferTag(); + return transferTag == null || transferTag.isEmpty() ? "-" : transferTag; + } + + private String formatEndpoint(InetSocketAddress endpoint) { + if (endpoint == null) { + return "-"; + } + + InetAddress endpointAddress = endpoint.getAddress(); + if (endpointAddress != null) { + return toUriString(endpointAddress) + ":" + endpoint.getPort(); + } + + return endpoint.getHostString() + ":" + endpoint.getPort(); + } + public void shutdown() { MoreExecutors.shutdownAndAwaitTermination(_executor, 10, TimeUnit.SECONDS); } diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java index 1962f62d86b..13b88df7fb4 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/TransferLifeCycle.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.net.InetAddresses.forString; +import static com.google.common.net.InetAddresses.toUriString; import com.google.common.base.Splitter; import com.google.common.net.HostAndPort; @@ -50,6 +51,7 @@ public class TransferLifeCycle { private final static Logger LOGGER = LoggerFactory.getLogger(TransferLifeCycle.class); + private final static Logger SCITAGS_LOGGER = LoggerFactory.getLogger("org.dcache.scitags"); private static final int MIN_VALID_TRANSFER_TAG = 64; private static final int MAX_VALID_TRANSFER_TAG = 65535; private static final int EXPERIMENT_ID_BIT_SHIFT = 6; @@ -91,26 +93,32 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p Subject subject) { if (!enabled) { + logSkippedMarker("disabled", src, dst, protocolInfo); return; } if (isExcludedTransfer(src, dst)) { + logSkippedMarker("excluded", src, dst, protocolInfo); return; } if (!needMarker(protocolInfo)) { + logSkippedMarker("unsupported-protocol", src, dst, protocolInfo); return; } - var optionalExpId = getExperimentId(protocolInfo, subject); - if (optionalExpId.isEmpty()) { + var optionalExpId = getExperimentId(protocolInfo, subject); + if (optionalExpId.isEmpty()) { + logSkippedMarker("no-experiment-id", src, dst, protocolInfo); return; - } + } - var data = new FlowMarkerBuilder() + int activityId = getActivity(protocolInfo); + + var data = new FlowMarkerBuilder() .withStartedAt(Instant.now()) .withExperimentId(optionalExpId.getAsInt()) - .withActivityId(getActivity(protocolInfo)) + .withActivityId(activityId) .wittApplication(getApplication(protocolInfo)) .withProtocol("tcp") .withAFI(toAFI(dst)) @@ -118,7 +126,10 @@ public void onStart(InetSocketAddress src, InetSocketAddress dst, ProtocolInfo p .withSource(src) .build("start"); - sendToMultipleDestinations(toFireflyDestination.apply(src), data); + InetSocketAddress fireflyDestination = toFireflyDestination.apply(src); + sendToMultipleDestinations(fireflyDestination, data); + logMarkerEvent("start", src, dst, protocolInfo, optionalExpId.getAsInt(), activityId, + null, null, fireflyDestination); } /** @@ -134,27 +145,36 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage if (!enabled) { + logSkippedMarker("disabled", src, dst, protocolInfo); return; } if (isExcludedTransfer(src, dst)) { + logSkippedMarker("excluded", src, dst, protocolInfo); return; } if (!needMarker(protocolInfo)) { + logSkippedMarker("unsupported-protocol", src, dst, protocolInfo); return; } var optionalExpId = getExperimentId(protocolInfo, subject); if (optionalExpId.isEmpty()) { + logSkippedMarker("no-experiment-id", src, dst, protocolInfo); return; } + int activityId = getActivity(protocolInfo); + long transferDurationMillis = Math.max(0L, mover.getConnectionTime()); + Instant startedAt = Instant.ofEpochMilli(mover.getTimestamp()); + Instant finishedAt = startedAt.plusMillis(transferDurationMillis); + var data = new FlowMarkerBuilder() - .withStartedAt(Instant.now()) - .withFinishedAt(Instant.now()) + .withStartedAt(startedAt) + .withFinishedAt(finishedAt) .withExperimentId(optionalExpId.getAsInt()) - .withActivityId(getActivity(protocolInfo)) + .withActivityId(activityId) .wittApplication(getApplication(protocolInfo)) .withUsage(mover.getBytesRead(), mover.getBytesWritten()) .withProtocol("tcp") @@ -166,7 +186,10 @@ public void onEnd(InetSocketAddress src, InetSocketAddress dst, MoverInfoMessage } var firefly = data.build("end"); - sendToMultipleDestinations(toFireflyDestination.apply(src), firefly); + InetSocketAddress fireflyDestination = toFireflyDestination.apply(src); + sendToMultipleDestinations(fireflyDestination, firefly); + logMarkerEvent("end", src, dst, protocolInfo, optionalExpId.getAsInt(), activityId, + mover.getBytesRead(), mover.getBytesWritten(), fireflyDestination); } /** @@ -293,7 +316,7 @@ private String getApplication(ProtocolInfo protocolInfo) { * @return the experiment ID, or -1 if it cannot be determined */ private OptionalInt getExperimentId(ProtocolInfo protocolInfo, Subject subject) { - if (protocolInfo.getTransferTag() != null && !protocolInfo.getTransferTag().isEmpty()) { + if (hasTransferTag(protocolInfo)) { try { int transferTag = Integer.parseInt(protocolInfo.getTransferTag()); if (transferTag < MIN_VALID_TRANSFER_TAG || transferTag > MAX_VALID_TRANSFER_TAG) { @@ -335,7 +358,7 @@ private boolean isExcludedTransfer(InetSocketAddress src, InetSocketAddress dst) } private int getActivity(ProtocolInfo protocolInfo) { - if (!protocolInfo.getTransferTag().isEmpty()) { + if (hasTransferTag(protocolInfo)) { // scitag = exp_id << 6 | act_id return Integer.parseInt(protocolInfo.getTransferTag()) & ACTIVITY_ID_MASK; } else { @@ -343,6 +366,95 @@ private int getActivity(ProtocolInfo protocolInfo) { } } + private boolean hasTransferTag(ProtocolInfo protocolInfo) { + String transferTag = protocolInfo.getTransferTag(); + return transferTag != null && !transferTag.isEmpty(); + } + + private void logSkippedMarker(String reason, InetSocketAddress src, InetSocketAddress dst, + ProtocolInfo protocolInfo) { + if (SCITAGS_LOGGER.isDebugEnabled()) { + SCITAGS_LOGGER.debug( + "scitags event=marker-skip reason={} protocol={} source={} sourcePort={} destination={} destinationPort={} transferTag={}", + reason, + protocolInfo.getProtocol().toLowerCase(), + formatAddress(src), + formatPort(src), + formatAddress(dst), + formatPort(dst), + formatTransferTag(protocolInfo)); + } + } + + private void logMarkerEvent(String event, InetSocketAddress src, InetSocketAddress dst, + ProtocolInfo protocolInfo, int experimentId, int activityId, Double bytesRead, + Double bytesWritten, InetSocketAddress fireflyDestination) { + if (SCITAGS_LOGGER.isDebugEnabled()) { + String classification = hasTransferTag(protocolInfo) ? "transfer-tag" : "fqan"; + if (bytesRead == null || bytesWritten == null) { + SCITAGS_LOGGER.debug( + "scitags event=marker state={} protocol={} source={} sourcePort={} destination={} destinationPort={} transferTag={} experimentId={} activityId={} classifier={} fireflyDestination={} collector={}", + event, + protocolInfo.getProtocol().toLowerCase(), + formatAddress(src), + formatPort(src), + formatAddress(dst), + formatPort(dst), + formatTransferTag(protocolInfo), + experimentId, + activityId, + classification, + formatSocketAddress(fireflyDestination), + formatCollectorDestination()); + } else { + SCITAGS_LOGGER.debug( + "scitags event=marker state={} protocol={} source={} sourcePort={} destination={} destinationPort={} transferTag={} experimentId={} activityId={} classifier={} bytesRead={} bytesWritten={} fireflyDestination={} collector={}", + event, + protocolInfo.getProtocol().toLowerCase(), + formatAddress(src), + formatPort(src), + formatAddress(dst), + formatPort(dst), + formatTransferTag(protocolInfo), + experimentId, + activityId, + classification, + bytesRead, + bytesWritten, + formatSocketAddress(fireflyDestination), + formatCollectorDestination()); + } + } + } + + private String formatSocketAddress(InetSocketAddress address) { + return formatAddress(address) + ":" + formatPort(address); + } + + private String formatCollectorDestination() { + return fireflyCollector == null + ? "-" + : formatAddress(fireflyCollector) + ":" + fireflyCollector.getPort(); + } + + private String formatTransferTag(ProtocolInfo protocolInfo) { + String transferTag = protocolInfo.getTransferTag(); + return transferTag == null || transferTag.isEmpty() ? "-" : transferTag; + } + + private String formatAddress(InetSocketAddress address) { + if (address == null) { + return "-"; + } + + InetAddress inetAddress = address.getAddress(); + return inetAddress == null ? address.getHostString() : toUriString(inetAddress); + } + + private String formatPort(InetSocketAddress address) { + return address == null ? "-" : Integer.toString(address.getPort()); + } + private String toAFI(InetSocketAddress dst) { var addr = dst.getAddress(); diff --git a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml index ce20e7258ae..e0093d71fe2 100644 --- a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml +++ b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml @@ -496,6 +496,7 @@ + diff --git a/modules/dcache/src/test/java/org/dcache/pool/classic/AbstractMoverProtocolTransferServiceTest.java b/modules/dcache/src/test/java/org/dcache/pool/classic/AbstractMoverProtocolTransferServiceTest.java index fe4f62e8526..b1534d6817f 100644 --- a/modules/dcache/src/test/java/org/dcache/pool/classic/AbstractMoverProtocolTransferServiceTest.java +++ b/modules/dcache/src/test/java/org/dcache/pool/classic/AbstractMoverProtocolTransferServiceTest.java @@ -33,6 +33,16 @@ public void setUp() throws Exception { moverProtocolMover = mock(MoverProtocolMover.class); when(moverProtocolMover.openChannel()).thenReturn(channel); + var protocolInfo = mock(ProtocolInfo.class); + when(moverProtocolMover.getProtocolInfo()).thenReturn(protocolInfo); + when(protocolInfo.getProtocol()).thenReturn("http"); + + var fileAttributes = FileAttributes.of() + .pnfsId("0000123456789012345678901234567890FF") + .build(); + + when(moverProtocolMover.getFileAttributes()).thenReturn(fileAttributes); + transferService = new AbstractMoverProtocolTransferService() { @Override protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception { diff --git a/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java b/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java index af1b35adcae..9300b2af3a6 100644 --- a/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java +++ b/modules/dcache/src/test/java/org/dcache/pool/movers/TransferLifeCycleTest.java @@ -4,16 +4,23 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.MoverInfoMessage; import diskCacheV111.vehicles.ProtocolInfo; +import dmg.cells.nucleus.CellAddressCore; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.lang.reflect.Method; import java.util.OptionalInt; import javax.security.auth.Subject; import org.dcache.auth.FQANPrincipal; +import org.json.JSONObject; import org.junit.Before; import org.junit.Test; @@ -73,6 +80,49 @@ public void shouldNotSuppressMarkerWhenOnlyDestinationIsExcluded() throws Except assertTrue(sendsStartMarker("203.0.113.20", "10.20.20.20", "10.0.0.0/8")); } + @Test + public void shouldSetEndMarkerStartTimeFromTransferDuration() throws Exception { + try (DatagramSocket socket = new DatagramSocket(0, InetAddress.getByName("127.0.0.1"))) { + socket.setSoTimeout(700); + + TransferLifeCycle lifecycle = new TransferLifeCycle(); + lifecycle.setEnabled(true); + lifecycle.setVoMapping("atlas:2"); + lifecycle.setFireflyDestination("127.0.0.1:" + socket.getLocalPort()); + + long transferDuration = 2500; + MoverInfoMessage mover = new MoverInfoMessage(new CellAddressCore("pool", "test"), + new PnfsId("000000000001")); + mover.setTransferAttributes(1024, transferDuration, + new TestProtocolInfo("xrootd", "129")); + mover.setSubject(new Subject()); + mover.setBytesRead(1024); + mover.setBytesWritten(0); + + Instant expectedStart = Instant.ofEpochMilli(mover.getTimestamp()); + Instant expectedEnd = expectedStart.plusMillis(transferDuration); + + lifecycle.onEnd( + new InetSocketAddress("203.0.113.20", 42000), + new InetSocketAddress("198.51.100.55", 2880), + mover); + + var packet = new DatagramPacket(new byte[4096], 4096); + socket.receive(packet); + + String payload = new String(packet.getData(), 0, packet.getLength(), + StandardCharsets.UTF_8); + JSONObject lifecyclePayload = new JSONObject(payload.substring(payload.indexOf('{'))) + .getJSONObject("flow-lifecycle"); + + Instant startTime = Instant.parse(lifecyclePayload.getString("start-time")); + Instant endTime = Instant.parse(lifecyclePayload.getString("end-time")); + + assertEquals(expectedStart, startTime); + assertEquals(expectedEnd, endTime); + assertEquals(transferDuration, Duration.between(startTime, endTime).toMillis()); + } + } private OptionalInt resolveExperimentId(String transferTag, Subject subject) throws Exception { return (OptionalInt) getExperimentId.invoke(transferLifeCycle, new TestProtocolInfo("xrootd", transferTag), subject);