Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -770,4 +770,30 @@ CompletableFuture<RegisterResult> registerProducerOffsets(
* @since 0.9
*/
CompletableFuture<Void> deleteProducerOffsets(String producerId);

/**
* Get the health status of the cluster asynchronously.
*
* <p>The returned {@link ClusterHealth} contains replica statistics and an overall {@link
* ClusterHealthStatus}:
*
* <ul>
* <li>{@link ClusterHealthStatus#GREEN} — all replicas are in-sync and all leaders are
* active. The cluster is fully healthy.
* <li>{@link ClusterHealthStatus#YELLOW} — all leaders are active, but some replicas have not
* yet rejoined the in-sync replica set (ISR).
* <li>{@link ClusterHealthStatus#RED} — one or more leader replicas have not yet been
* confirmed active (e.g., leader election or KV snapshot recovery is still in progress).
* <li>{@link ClusterHealthStatus#UNKNOWN} — the Coordinator was unable to determine cluster
* health (e.g., the server does not support this API).
* </ul>
*
* <p>This API is designed for the situation like a Kubernetes readiness-probe gate during
* rolling upgrades: only proceed to the next pod when the status is {@code GREEN}, ensuring all
* replicas have fully recovered before the next server is restarted.
*
* @return a {@link CompletableFuture} that completes with the cluster health information.
* @since 1.0
*/
CompletableFuture<ClusterHealth> getClusterHealth();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.admin;

import org.apache.fluss.annotation.PublicEvolving;

import java.util.Objects;

/**
* Cluster health information returned by {@link Admin#getClusterHealth()}.
*
* @since 1.0
*/
@PublicEvolving
public final class ClusterHealth {

private final int numReplicas;
private final int inSyncReplicas;
private final int numLeaderReplicas;
private final int activeLeaderReplicas;
private final ClusterHealthStatus status;

public ClusterHealth(
int numReplicas,
int inSyncReplicas,
int numLeaderReplicas,
int activeLeaderReplicas,
ClusterHealthStatus status) {
this.numReplicas = numReplicas;
this.inSyncReplicas = inSyncReplicas;
this.numLeaderReplicas = numLeaderReplicas;
this.activeLeaderReplicas = activeLeaderReplicas;
this.status = Objects.requireNonNull(status, "status");
}

public int getNumReplicas() {
return numReplicas;
}

public int getInSyncReplicas() {
return inSyncReplicas;
}

public int getNumLeaderReplicas() {
return numLeaderReplicas;
}

public int getActiveLeaderReplicas() {
return activeLeaderReplicas;
}

public ClusterHealthStatus getStatus() {
return status;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ClusterHealth)) {
return false;
}
ClusterHealth that = (ClusterHealth) o;
return numReplicas == that.numReplicas
&& inSyncReplicas == that.inSyncReplicas
&& numLeaderReplicas == that.numLeaderReplicas
&& activeLeaderReplicas == that.activeLeaderReplicas
&& status == that.status;
}

@Override
public int hashCode() {
return Objects.hash(
numReplicas, inSyncReplicas, numLeaderReplicas, activeLeaderReplicas, status);
}

@Override
public String toString() {
return "ClusterHealth{"
+ "numReplicas="
+ numReplicas
+ ", inSyncReplicas="
+ inSyncReplicas
+ ", numLeaderReplicas="
+ numLeaderReplicas
+ ", activeLeaderReplicas="
+ activeLeaderReplicas
+ ", status="
+ status
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.admin;

import org.apache.fluss.annotation.PublicEvolving;

/**
* Health status of the Fluss cluster.
*
* @since 1.0
*/
@PublicEvolving
public enum ClusterHealthStatus {
GREEN,
YELLOW,
RED,
UNKNOWN
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.fluss.rpc.messages.DropAclsRequest;
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
import org.apache.fluss.rpc.messages.DropTableRequest;
import org.apache.fluss.rpc.messages.GetClusterHealthRequest;
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.GetLakeSnapshotRequest;
Expand Down Expand Up @@ -870,6 +871,12 @@ private static void handleListOffsetsResponse(
}
}

@Override
public CompletableFuture<ClusterHealth> getClusterHealth() {
return gateway.getClusterHealth(new GetClusterHealthRequest())
.thenApply(ClientRpcMessageUtils::toClusterHealth);
}

@VisibleForTesting
public AdminGateway getAdminGateway() {
return gateway;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.fluss.client.utils;

import org.apache.fluss.client.admin.ClusterHealth;
import org.apache.fluss.client.admin.ClusterHealthStatus;
import org.apache.fluss.client.admin.OffsetSpec;
import org.apache.fluss.client.admin.ProducerOffsetsResult;
import org.apache.fluss.client.lookup.LookupBatch;
Expand Down Expand Up @@ -51,6 +53,7 @@
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.CreatePartitionRequest;
import org.apache.fluss.rpc.messages.DropPartitionRequest;
import org.apache.fluss.rpc.messages.GetClusterHealthResponse;
import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse;
Expand Down Expand Up @@ -829,4 +832,26 @@ public static GetTableStatsRequest makeGetTableStatsRequest(List<TableBucket> bu
.collect(Collectors.toList());
return new GetTableStatsRequest().setTableId(tableId).addAllBucketsReqs(pbBuckets);
}

public static ClusterHealth toClusterHealth(GetClusterHealthResponse resp) {
return new ClusterHealth(
resp.getNumReplicas(),
resp.getInSyncReplicas(),
resp.getNumLeaderReplicas(),
resp.getActiveLeaderReplicas(),
toClusterHealthStatus(resp.getStatus()));
}

private static ClusterHealthStatus toClusterHealthStatus(int pbStatus) {
switch (pbStatus) {
case 0:
return ClusterHealthStatus.GREEN;
case 1:
return ClusterHealthStatus.YELLOW;
case 2:
return ClusterHealthStatus.RED;
default:
return ClusterHealthStatus.UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2383,4 +2383,55 @@ public CompletableFuture<ListOffsetsResponse> listOffsets(
.isInstanceOf(NetworkException.class)
.hasMessageContaining("connection timed out");
}

@Test
void testClusterHealthDuringRollingUpgrade() throws Exception {
TablePath tablePath = TablePath.of("test_db", "health_test_table");
TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(DEFAULT_SCHEMA).distributedBy(3, "id").build();
long tableId = createTable(tablePath, tableDescriptor, true);
waitAllReplicasReady(tableId, 3);

// Phase 1: Cluster is healthy — status should be GREEN.
ClusterHealth health = admin.getClusterHealth().get();
assertThat(health.getStatus()).isEqualTo(ClusterHealthStatus.GREEN);
assertThat(health.getNumReplicas()).isEqualTo(health.getInSyncReplicas());
assertThat(health.getNumLeaderReplicas()).isEqualTo(health.getActiveLeaderReplicas());

// Phase 2: Stop one tablet server (simulate server crash during rolling upgrade).
int stoppedServerId = 0;
FLUSS_CLUSTER_EXTENSION.stopTabletServer(stoppedServerId);
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(2);

for (int bucket = 0; bucket < 3; bucket++) {
TableBucket tb = new TableBucket(tableId, bucket);
FLUSS_CLUSTER_EXTENSION.waitUntilReplicaShrinkFromIsr(tb, stoppedServerId);
}

// Status should not be GREEN (YELLOW or RED depending on leader placement).
ClusterHealth duringDown = admin.getClusterHealth().get();
assertThat(duringDown.getStatus()).isNotEqualTo(ClusterHealthStatus.GREEN);
assertThat(duringDown.getInSyncReplicas()).isLessThan(duringDown.getNumReplicas());

// Phase 3: Restart the server.
FLUSS_CLUSTER_EXTENSION.startTabletServer(stoppedServerId);
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);

// Phase 4: Wait for recovery — status should return to GREEN.
for (int bucket = 0; bucket < 3; bucket++) {
TableBucket tb = new TableBucket(tableId, bucket);
FLUSS_CLUSTER_EXTENSION.waitUntilReplicaExpandToIsr(tb, stoppedServerId);
}

waitUntil(
() -> admin.getClusterHealth().get().getStatus() == ClusterHealthStatus.GREEN,
Duration.ofMinutes(1),
"Cluster should return to GREEN after server restart");

ClusterHealth afterRecovery = admin.getClusterHealth().get();
assertThat(afterRecovery.getStatus()).isEqualTo(ClusterHealthStatus.GREEN);
assertThat(afterRecovery.getNumReplicas()).isEqualTo(afterRecovery.getInSyncReplicas());
assertThat(afterRecovery.getNumLeaderReplicas())
.isEqualTo(afterRecovery.getActiveLeaderReplicas());
}
}
7 changes: 7 additions & 0 deletions fluss-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-client</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- filesystem plugin -->
<dependency>
<groupId>org.apache.fluss</groupId>
Expand Down
16 changes: 16 additions & 0 deletions fluss-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@
<fileMode>0644</fileMode>
</file>

<!-- copy fluss-client fat jar (needed by ClusterHealthReadinessCheck CLI) -->
<file>
<source>../fluss-client/target/fluss-client-${project.version}.jar</source>
<outputDirectory>lib/</outputDirectory>
<destName>fluss-client-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>

<!-- copy fluss-dist jar (contains ClusterHealthReadinessCheck CLI) -->
<file>
<source>target/fluss-dist-${project.version}.jar</source>
<outputDirectory>lib/</outputDirectory>
<destName>fluss-dist-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>

<!-- copy the config file -->
<file>
<source>src/main/resources/server.yaml</source>
Expand Down
Loading