From 2a17d179d24d185e3ec55c06f8309ca3a63983e1 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Wed, 1 Apr 2026 17:22:14 +0800 Subject: [PATCH] Add IP Address of Subtask Running Checkpoint to Flink WebU --- .../src/app/interfaces/job-checkpoint.ts | 1 + .../job-checkpoints-subtask.component.html | 2 + .../job-checkpoints-subtask.component.ts | 23 +++-- .../DefaultCheckpointStatsTracker.java | 3 +- .../runtime/checkpoint/PendingCheckpoint.java | 12 ++- .../runtime/checkpoint/SubtaskStateStats.java | 13 ++- ...TaskCheckpointStatisticDetailsHandler.java | 3 +- .../SubtaskCheckpointStatistics.java | 19 ++++- .../checkpoint/CompletedCheckpointTest.java | 2 +- .../DefaultCheckpointStatsTrackerTest.java | 12 +-- .../PendingCheckpointStatsTest.java | 3 +- .../checkpoint/PendingCheckpointTest.java | 84 +++++++++++++++++++ .../checkpoint/SubtaskStateStatsTest.java | 4 +- .../checkpoint/TaskStateStatsTest.java | 6 +- ...pointStatisticsWithSubtaskDetailsTest.java | 3 +- 15 files changed, 164 insertions(+), 26 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts index 5979a7d0cd064..0f4f50961c8bb 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts @@ -188,6 +188,7 @@ export interface CompletedSubTaskCheckpointStatistics { start_delay: number; unaligned_checkpoint: boolean; aborted: boolean; + ip: string | null; } export interface PendingSubTaskCheckpointStatistics {} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html index 2647ae83a2ac0..9c4f730cff346 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.html @@ -144,6 +144,7 @@ ID + IP Address Acknowledged End to End Duration @@ -166,6 +167,7 @@ {{ subTask['index'] }} + {{ subTask['ip'] || '-' }} {{ subTask['ack_timestamp'] | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts index d80d1003111e7..10ae807026b91 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/subtask/job-checkpoints-subtask.component.ts @@ -48,13 +48,13 @@ import { NzTableModule, NzTableSortFn } from 'ng-zorro-antd/table'; import { JobLocalService } from '../../job-local.service'; function createSortFn( - selector: (item: CompletedSubTaskCheckpointStatistics) => number | boolean + selector: (item: CompletedSubTaskCheckpointStatistics) => number | boolean | string | null ): NzTableSortFn { - // FIXME This type-asserts that pre / next are a specific subtype. - return (pre, next) => - selector(pre as CompletedSubTaskCheckpointStatistics) > selector(next as CompletedSubTaskCheckpointStatistics) - ? 1 - : -1; + return (pre, next) => { + const a = selector(pre as CompletedSubTaskCheckpointStatistics) ?? ''; + const b = selector(next as CompletedSubTaskCheckpointStatistics) ?? ''; + return a > b ? 1 : -1; + }; } @Component({ @@ -77,6 +77,17 @@ export class JobCheckpointsSubtaskComponent implements OnInit, OnChanges, OnDest public mapOfSubtask: Map = new Map(); public readonly sortAckTimestampFn = createSortFn(item => item.ack_timestamp); + public readonly sortIPAddressFn: NzTableSortFn = (a, b) => { + const ipA = (a as CompletedSubTaskCheckpointStatistics).ip ?? ''; + const ipB = (b as CompletedSubTaskCheckpointStatistics).ip ?? ''; + const normalize = (ip: string): string => + ip + .split('.') + .map(seg => seg.padStart(3, '0')) + .join('.'); + + return normalize(ipA) > normalize(ipB) ? 1 : normalize(ipA) < normalize(ipB) ? -1 : 0; + }; public readonly sortEndToEndDurationFn = createSortFn(item => item.end_to_end_duration); public readonly sortCheckpointedSizeFn = createSortFn(item => item.checkpointed_size); public readonly sortStateSizeFn = createSortFn(item => item.state_size); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java index d9c63d9d52f43..9552877392c19 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java @@ -567,7 +567,8 @@ public void reportIncompleteStats( metrics.getAlignmentDurationNanos() / 1_000_000, metrics.getCheckpointStartDelayNanos() / 1_000_000, metrics.getUnalignedCheckpoint(), - false)); + false, + null)); dirty = true; } } finally { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index c1b428c201309..cc8e780d30c0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -427,6 +428,14 @@ public TaskAcknowledgeResult acknowledgeTask( long checkpointStartDelayMillis = metrics.getCheckpointStartDelayNanos() / 1_000_000; + String taskManagerIp = null; + if (vertex.getCurrentExecutionAttempt() != null) { + TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); + if (location != null) { + taskManagerIp = location.address().getHostAddress(); + } + } + SubtaskStateStats subtaskStateStats = new SubtaskStateStats( vertex.getParallelSubtaskIndex(), @@ -440,7 +449,8 @@ public TaskAcknowledgeResult acknowledgeTask( alignmentDurationMillis, checkpointStartDelayMillis, metrics.getUnalignedCheckpoint(), - true); + true, + taskManagerIp); LOG.trace( "Checkpoint {} stats for {}: size={}Kb, duration={}ms, sync part={}ms, async part={}ms", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java index 3908dbbcd3588..8c82d689e3fb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java @@ -66,8 +66,10 @@ public class SubtaskStateStats implements Serializable { /** Is the checkpoint completed by this subtask. */ private final boolean completed; + private final String ip; + SubtaskStateStats(int subtaskIndex, long ackTimestamp) { - this(subtaskIndex, ackTimestamp, 0, 0, 0, 0, 0, 0, 0, 0, false, true); + this(subtaskIndex, ackTimestamp, 0, 0, 0, 0, 0, 0, 0, 0, false, true, null); } SubtaskStateStats( @@ -82,8 +84,8 @@ public class SubtaskStateStats implements Serializable { long alignmentDuration, long checkpointStartDelay, boolean unalignedCheckpoint, - boolean completed) { - + boolean completed, + String ip) { checkArgument(subtaskIndex >= 0, "Negative subtask index"); this.subtaskIndex = subtaskIndex; checkArgument(checkpointedSize >= 0, "Negative incremental state size"); @@ -99,6 +101,7 @@ public class SubtaskStateStats implements Serializable { this.checkpointStartDelay = checkpointStartDelay; this.unalignedCheckpoint = unalignedCheckpoint; this.completed = completed; + this.ip = ip; } public int getSubtaskIndex() { @@ -194,4 +197,8 @@ public boolean getUnalignedCheckpoint() { public boolean isCompleted() { return completed; } + + public String getIp() { + return ip; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java index 473c3f52c920e..b49707c426561 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java @@ -220,7 +220,8 @@ private static List createSubtaskCheckpointStatisti subtask.getAlignmentDuration()), subtask.getCheckpointStartDelay(), subtask.getUnalignedCheckpoint(), - !subtask.isCompleted())); + !subtask.isCompleted(), + subtask.getIp())); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java index 613916eb3d762..deb0f0f765f69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java @@ -111,6 +111,8 @@ public static final class CompletedSubtaskCheckpointStatistics public static final String FIELD_NAME_CHECKPOINTED_SIZE = "checkpointed_size"; + public static final String FIELD_NAME_IP_ADDRESS = "ip"; + /** * The accurate name of this field should be 'checkpointed_data_size', keep it as before to * not break backwards compatibility for old web UI. @@ -156,6 +158,9 @@ public static final class CompletedSubtaskCheckpointStatistics @JsonProperty(FIELD_NAME_ABORTED) private final boolean aborted; + @JsonProperty(value = FIELD_NAME_IP_ADDRESS, required = false) + private final String ip; + @JsonCreator public CompletedSubtaskCheckpointStatistics( @JsonProperty(FIELD_NAME_INDEX) int index, @@ -167,7 +172,8 @@ public CompletedSubtaskCheckpointStatistics( @JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment alignment, @JsonProperty(FIELD_NAME_START_DELAY) long startDelay, @JsonProperty(FIELD_NAME_UNALIGNED_CHECKPOINT) boolean unalignedCheckpoint, - @JsonProperty(FIELD_NAME_ABORTED) boolean aborted) { + @JsonProperty(FIELD_NAME_ABORTED) boolean aborted, + @JsonProperty(FIELD_NAME_IP_ADDRESS) String ip) { super(index, "completed"); this.ackTimestamp = ackTimestamp; this.duration = duration; @@ -178,6 +184,7 @@ public CompletedSubtaskCheckpointStatistics( this.startDelay = startDelay; this.unalignedCheckpoint = unalignedCheckpoint; this.aborted = aborted; + this.ip = ip; } public long getAckTimestamp() { @@ -216,6 +223,10 @@ public boolean isAborted() { return aborted; } + public String getIp() { + return ip; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -233,7 +244,8 @@ public boolean equals(Object o) { && Objects.equals(alignment, that.alignment) && startDelay == that.startDelay && unalignedCheckpoint == that.unalignedCheckpoint - && aborted == that.aborted; + && aborted == that.aborted + && Objects.equals(ip, that.ip); } @Override @@ -247,7 +259,8 @@ public int hashCode() { alignment, startDelay, unalignedCheckpoint, - aborted); + aborted, + ip); } /** Duration of the checkpoint. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 8fe8dc19ed27f..463e7f4a02325 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -415,7 +415,7 @@ void testIsJavaSerializable() throws Exception { 44L, true, new SubtaskStateStats( - 123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true), + 123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true, null), null); CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java index b3bbe4b3d9b02..fc051e4ef9825 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java @@ -673,13 +673,14 @@ public void addEvent(EventBuilder eventBuilder) { subtasksByVertex); pending.reportSubtaskStats( - jobVertexID0, new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true)); + jobVertexID0, + new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true, null)); pending.reportSubtaskStats( jobVertexID0, - new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true)); + new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true, null)); pending.reportSubtaskStats( jobVertexID1, - new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true)); + new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true, null)); // Complete checkpoint => new snapshot tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984)); return reportedSpansOut; @@ -984,7 +985,8 @@ public > G gauge(String name, G gauge) { ignored, ignored, false, - true); + true, + null); assertThat(pending.reportSubtaskStats(jobVertexID, subtaskStats)).isTrue(); @@ -1071,7 +1073,7 @@ private SubtaskStateStats createSubtaskStats(int index) { } private SubtaskStateStats createSubtaskStats(int index, boolean unaligned) { - return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true); + return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true, null); } private void reportRestoredCheckpoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java index e55cbb6fca748..9181cee3363dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java @@ -284,6 +284,7 @@ private SubtaskStateStats createSubtaskStats(int index, boolean unalignedCheckpo Integer.MAX_VALUE + (long) index, Integer.MAX_VALUE + (long) index, unalignedCheckpoint, - true); + true, + null); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 4dbc22dc34a90..54972ef746049 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -28,10 +28,12 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer; import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo; @@ -39,6 +41,7 @@ import org.apache.flink.runtime.state.TestingStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.concurrent.Executors; @@ -49,11 +52,13 @@ import java.io.IOException; import java.lang.reflect.Field; +import java.net.InetAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -509,6 +514,85 @@ void testReportTaskFinishedOperators() throws IOException { .contains(ACK_TASKS.get(0).getVertex()); } + @Test + void testAcknowledgeTaskCapturesTaskManagerIp() throws Exception { + final String expectedIp = "10.0.0.1"; + final ExecutionAttemptID ipTestAttemptId = createExecutionAttemptId(); + final JobVertexID jobVertexId = new JobVertexID(); + + ExecutionJobVertex ejv = mock(ExecutionJobVertex.class); + when(ejv.getOperatorIDs()) + .thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OPERATOR_ID))); + + TaskManagerLocation location = + new TaskManagerLocation( + ResourceID.generate(), InetAddress.getByName(expectedIp), 6121); + + Execution currentAttempt = mock(Execution.class); + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getMaxParallelism()).thenReturn(MAX_PARALLELISM); + when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(PARALLELISM); + when(vertex.getJobVertex()).thenReturn(ejv); + when(vertex.getJobvertexId()).thenReturn(jobVertexId); + when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("test-task (0/1)"); + when(vertex.getCurrentExecutionAttempt()).thenReturn(currentAttempt); + when(vertex.getCurrentAssignedResourceLocation()).thenReturn(location); + + Execution execution = mock(Execution.class); + when(execution.getAttemptId()).thenReturn(ipTestAttemptId); + when(execution.getVertex()).thenReturn(vertex); + + Map taskStatsCounts = new HashMap<>(); + taskStatsCounts.put(jobVertexId, PARALLELISM); + PendingCheckpointStats pendingStats = + new PendingCheckpointStats( + 0, + 1, + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + taskStatsCounts); + + CheckpointPlan plan = + new DefaultCheckpointPlan( + Collections.emptyList(), + Collections.singletonList(execution), + Collections.singletonList(vertex), + Collections.emptyList(), + Collections.emptyList(), + true); + + final Path checkpointDir = new Path(TempDirUtils.newFolder(tmpFolder).toURI()); + final FsCheckpointStorageLocation storageLocation = + new FsCheckpointStorageLocation( + LocalFileSystem.getSharedInstance(), + checkpointDir, + checkpointDir, + checkpointDir, + CheckpointStorageLocationReference.getDefault(), + 1024, + 4096); + + PendingCheckpoint checkpoint = + new PendingCheckpoint( + new JobID(), + 0, + 1, + plan, + Collections.emptyList(), + Collections.emptyList(), + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + new CompletableFuture<>(), + pendingStats, + new CompletableFuture<>()); + checkpoint.setCheckpointTargetLocation(storageLocation); + + checkpoint.acknowledgeTask(ipTestAttemptId, null, new CheckpointMetrics()); + + assertThat(pendingStats.getLatestAcknowledgedSubtaskStats()).isNotNull(); + assertThat(pendingStats.getLatestAcknowledgedSubtaskStats().getIp()).isEqualTo(expectedIp); + } + // ------------------------------------------------------------------------ private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java index 30970fcd9ee00..8ffb750b08f77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java @@ -52,7 +52,8 @@ public void test(boolean serialize) throws Exception { Integer.MAX_VALUE + 6L, Integer.MAX_VALUE + 7L, false, - true); + true, + "10.0.0.1"); stats = serialize ? CommonTestUtils.createCopySerializable(stats) : stats; @@ -63,6 +64,7 @@ public void test(boolean serialize) throws Exception { assertThat(stats.getAsyncCheckpointDuration()).isEqualTo(Integer.MAX_VALUE + 4L); assertThat(stats.getAlignmentDuration()).isEqualTo(Integer.MAX_VALUE + 6L); assertThat(stats.getCheckpointStartDelay()).isEqualTo(Integer.MAX_VALUE + 7L); + assertThat(stats.getIp()).isEqualTo("10.0.0.1"); // Check duration helper long ackTimestamp = stats.getAckTimestamp(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java index e754e33117423..5a1f86dd44109 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java @@ -74,7 +74,8 @@ private void test(boolean serialize) throws Exception { rand.nextInt(128), rand.nextInt(128), false, - true); + true, + null); stateSize += subtasks[i].getStateSize(); processedData += subtasks[i].getProcessedData(); @@ -94,7 +95,8 @@ private void test(boolean serialize) throws Exception { assertThat( taskStats.reportSubtaskStats( - new SubtaskStateStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true))) + new SubtaskStateStats( + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true, null))) .isFalse(); taskStats = serialize ? CommonTestUtils.createCopySerializable(taskStats) : taskStats; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java index 89b8acfa52c81..67d8020d8c811 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java @@ -73,7 +73,8 @@ protected TaskCheckpointStatisticsWithSubtaskDetails getTestResponseInstance() .CheckpointAlignment(2L, 4L, 5L, 3L), 42L, true, - false)); + false, + "192.168.1.100")); return new TaskCheckpointStatisticsWithSubtaskDetails( 4L,