diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index ea28a246b5..5f0bcbde50 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -770,4 +770,30 @@ CompletableFuture registerProducerOffsets( * @since 0.9 */ CompletableFuture deleteProducerOffsets(String producerId); + + /** + * Get the health status of the cluster asynchronously. + * + *

The returned {@link ClusterHealth} contains replica statistics and an overall {@link + * ClusterHealthStatus}: + * + *

+ * + *

This API is designed for the situation like a Kubernetes readiness-probe gate during + * rolling upgrades: only proceed to the next pod when the status is {@code GREEN}, ensuring all + * replicas have fully recovered before the next server is restarted. + * + * @return a {@link CompletableFuture} that completes with the cluster health information. + * @since 1.0 + */ + CompletableFuture getClusterHealth(); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/ClusterHealth.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/ClusterHealth.java new file mode 100644 index 0000000000..8f751bc169 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/ClusterHealth.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.Objects; + +/** + * Cluster health information returned by {@link Admin#getClusterHealth()}. + * + * @since 1.0 + */ +@PublicEvolving +public final class ClusterHealth { + + private final int numReplicas; + private final int inSyncReplicas; + private final int numLeaderReplicas; + private final int activeLeaderReplicas; + private final ClusterHealthStatus status; + + public ClusterHealth( + int numReplicas, + int inSyncReplicas, + int numLeaderReplicas, + int activeLeaderReplicas, + ClusterHealthStatus status) { + this.numReplicas = numReplicas; + this.inSyncReplicas = inSyncReplicas; + this.numLeaderReplicas = numLeaderReplicas; + this.activeLeaderReplicas = activeLeaderReplicas; + this.status = Objects.requireNonNull(status, "status"); + } + + public int getNumReplicas() { + return numReplicas; + } + + public int getInSyncReplicas() { + return inSyncReplicas; + } + + public int getNumLeaderReplicas() { + return numLeaderReplicas; + } + + public int getActiveLeaderReplicas() { + return activeLeaderReplicas; + } + + public ClusterHealthStatus getStatus() { + return status; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ClusterHealth)) { + return false; + } + ClusterHealth that = (ClusterHealth) o; + return numReplicas == that.numReplicas + && inSyncReplicas == that.inSyncReplicas + && numLeaderReplicas == that.numLeaderReplicas + && activeLeaderReplicas == that.activeLeaderReplicas + && status == that.status; + } + + @Override + public int hashCode() { + return Objects.hash( + numReplicas, inSyncReplicas, numLeaderReplicas, activeLeaderReplicas, status); + } + + @Override + public String toString() { + return "ClusterHealth{" + + "numReplicas=" + + numReplicas + + ", inSyncReplicas=" + + inSyncReplicas + + ", numLeaderReplicas=" + + numLeaderReplicas + + ", activeLeaderReplicas=" + + activeLeaderReplicas + + ", status=" + + status + + '}'; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/ClusterHealthStatus.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/ClusterHealthStatus.java new file mode 100644 index 0000000000..531ebf0eb7 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/ClusterHealthStatus.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Health status of the Fluss cluster. + * + * @since 1.0 + */ +@PublicEvolving +public enum ClusterHealthStatus { + GREEN, + YELLOW, + RED, + UNKNOWN +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index a1d429c99a..2524e0266a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -67,6 +67,7 @@ import org.apache.fluss.rpc.messages.DropAclsRequest; import org.apache.fluss.rpc.messages.DropDatabaseRequest; import org.apache.fluss.rpc.messages.DropTableRequest; +import org.apache.fluss.rpc.messages.GetClusterHealthRequest; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest; import org.apache.fluss.rpc.messages.GetLakeSnapshotRequest; @@ -870,6 +871,12 @@ private static void handleListOffsetsResponse( } } + @Override + public CompletableFuture getClusterHealth() { + return gateway.getClusterHealth(new GetClusterHealthRequest()) + .thenApply(ClientRpcMessageUtils::toClusterHealth); + } + @VisibleForTesting public AdminGateway getAdminGateway() { return gateway; diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 0bd67da17d..3f6e2443a9 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -17,6 +17,8 @@ package org.apache.fluss.client.utils; +import org.apache.fluss.client.admin.ClusterHealth; +import org.apache.fluss.client.admin.ClusterHealthStatus; import org.apache.fluss.client.admin.OffsetSpec; import org.apache.fluss.client.admin.ProducerOffsetsResult; import org.apache.fluss.client.lookup.LookupBatch; @@ -51,6 +53,7 @@ import org.apache.fluss.rpc.messages.AlterTableRequest; import org.apache.fluss.rpc.messages.CreatePartitionRequest; import org.apache.fluss.rpc.messages.DropPartitionRequest; +import org.apache.fluss.rpc.messages.GetClusterHealthResponse; import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenResponse; import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse; import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse; @@ -829,4 +832,26 @@ public static GetTableStatsRequest makeGetTableStatsRequest(List bu .collect(Collectors.toList()); return new GetTableStatsRequest().setTableId(tableId).addAllBucketsReqs(pbBuckets); } + + public static ClusterHealth toClusterHealth(GetClusterHealthResponse resp) { + return new ClusterHealth( + resp.getNumReplicas(), + resp.getInSyncReplicas(), + resp.getNumLeaderReplicas(), + resp.getActiveLeaderReplicas(), + toClusterHealthStatus(resp.getStatus())); + } + + private static ClusterHealthStatus toClusterHealthStatus(int pbStatus) { + switch (pbStatus) { + case 0: + return ClusterHealthStatus.GREEN; + case 1: + return ClusterHealthStatus.YELLOW; + case 2: + return ClusterHealthStatus.RED; + default: + return ClusterHealthStatus.UNKNOWN; + } + } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index da737dd07c..c46064b0e6 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -2383,4 +2383,55 @@ public CompletableFuture listOffsets( .isInstanceOf(NetworkException.class) .hasMessageContaining("connection timed out"); } + + @Test + void testClusterHealthDuringRollingUpgrade() throws Exception { + TablePath tablePath = TablePath.of("test_db", "health_test_table"); + TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(DEFAULT_SCHEMA).distributedBy(3, "id").build(); + long tableId = createTable(tablePath, tableDescriptor, true); + waitAllReplicasReady(tableId, 3); + + // Phase 1: Cluster is healthy — status should be GREEN. + ClusterHealth health = admin.getClusterHealth().get(); + assertThat(health.getStatus()).isEqualTo(ClusterHealthStatus.GREEN); + assertThat(health.getNumReplicas()).isEqualTo(health.getInSyncReplicas()); + assertThat(health.getNumLeaderReplicas()).isEqualTo(health.getActiveLeaderReplicas()); + + // Phase 2: Stop one tablet server (simulate server crash during rolling upgrade). + int stoppedServerId = 0; + FLUSS_CLUSTER_EXTENSION.stopTabletServer(stoppedServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(2); + + for (int bucket = 0; bucket < 3; bucket++) { + TableBucket tb = new TableBucket(tableId, bucket); + FLUSS_CLUSTER_EXTENSION.waitUntilReplicaShrinkFromIsr(tb, stoppedServerId); + } + + // Status should not be GREEN (YELLOW or RED depending on leader placement). + ClusterHealth duringDown = admin.getClusterHealth().get(); + assertThat(duringDown.getStatus()).isNotEqualTo(ClusterHealthStatus.GREEN); + assertThat(duringDown.getInSyncReplicas()).isLessThan(duringDown.getNumReplicas()); + + // Phase 3: Restart the server. + FLUSS_CLUSTER_EXTENSION.startTabletServer(stoppedServerId); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3); + + // Phase 4: Wait for recovery — status should return to GREEN. + for (int bucket = 0; bucket < 3; bucket++) { + TableBucket tb = new TableBucket(tableId, bucket); + FLUSS_CLUSTER_EXTENSION.waitUntilReplicaExpandToIsr(tb, stoppedServerId); + } + + waitUntil( + () -> admin.getClusterHealth().get().getStatus() == ClusterHealthStatus.GREEN, + Duration.ofMinutes(1), + "Cluster should return to GREEN after server restart"); + + ClusterHealth afterRecovery = admin.getClusterHealth().get(); + assertThat(afterRecovery.getStatus()).isEqualTo(ClusterHealthStatus.GREEN); + assertThat(afterRecovery.getNumReplicas()).isEqualTo(afterRecovery.getInSyncReplicas()); + assertThat(afterRecovery.getNumLeaderReplicas()) + .isEqualTo(afterRecovery.getActiveLeaderReplicas()); + } } diff --git a/fluss-dist/pom.xml b/fluss-dist/pom.xml index 117f0a6469..96bbb38157 100644 --- a/fluss-dist/pom.xml +++ b/fluss-dist/pom.xml @@ -39,6 +39,13 @@ provided + + org.apache.fluss + fluss-client + ${project.version} + provided + + org.apache.fluss diff --git a/fluss-dist/src/main/assemblies/bin.xml b/fluss-dist/src/main/assemblies/bin.xml index 3c1d879f08..0d0b6dbb19 100644 --- a/fluss-dist/src/main/assemblies/bin.xml +++ b/fluss-dist/src/main/assemblies/bin.xml @@ -56,6 +56,22 @@ 0644 + + + ../fluss-client/target/fluss-client-${project.version}.jar + lib/ + fluss-client-${project.version}.jar + 0644 + + + + + target/fluss-dist-${project.version}.jar + lib/ + fluss-dist-${project.version}.jar + 0644 + + src/main/resources/server.yaml diff --git a/fluss-dist/src/main/java/org/apache/fluss/dist/ClusterHealthReadinessCheck.java b/fluss-dist/src/main/java/org/apache/fluss/dist/ClusterHealthReadinessCheck.java new file mode 100644 index 0000000000..c19c012e98 --- /dev/null +++ b/fluss-dist/src/main/java/org/apache/fluss/dist/ClusterHealthReadinessCheck.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.dist; + +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.admin.ClusterHealth; +import org.apache.fluss.client.admin.ClusterHealthStatus; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.GlobalConfiguration; +import org.apache.fluss.exception.UnsupportedVersionException; +import org.apache.fluss.utils.ExceptionUtils; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Readiness check CLI tool for Fluss rolling upgrades. + * + *

Queries the Coordinator's Cluster Health API to determine whether the cluster is healthy + * enough for the next pod to restart. + * + *

Exit codes: + * + *

+ */ +public class ClusterHealthReadinessCheck { + + public static final int EXIT_READY = 0; + public static final int EXIT_NOT_READY = 1; + public static final int EXIT_API_UNSUPPORTED = 2; + public static final int EXIT_ERROR = 3; + + private static final long DEFAULT_TIMEOUT_MS = 5000; + + public static void main(String[] args) { + int exitCode = run(args); + System.exit(exitCode); + } + + static int run(String[] args) { + String configDir = null; + String bootstrapServers = null; + long timeoutMs = DEFAULT_TIMEOUT_MS; + + for (int i = 0; i < args.length; i++) { + switch (args[i]) { + case "--configDir": + if (i + 1 < args.length) { + configDir = args[++i]; + } + break; + case "--timeoutMs": + if (i + 1 < args.length) { + timeoutMs = Long.parseLong(args[++i]); + } + break; + case "--bootstrapServers": + if (i + 1 < args.length) { + bootstrapServers = args[++i]; + } + break; + default: + break; + } + } + + if (configDir == null) { + System.err.println( + "Usage: ClusterHealthReadinessCheck --configDir [--timeoutMs ] [--bootstrapServers ]"); + return EXIT_ERROR; + } + + Configuration conf; + try { + conf = GlobalConfiguration.loadConfiguration(configDir, null); + } catch (Exception e) { + System.err.println("[readiness-check] Failed to load configuration: " + e.getMessage()); + return EXIT_ERROR; + } + + if (bootstrapServers != null) { + conf.setString("bootstrap.servers", bootstrapServers); + } + + try (org.apache.fluss.client.Connection connection = + ConnectionFactory.createConnection(conf)) { + Admin admin = connection.getAdmin(); + + ClusterHealth health = queryCoordinator(admin, timeoutMs); + return evaluateHealth(health); + + } catch (ApiUnsupportedException e) { + System.err.println("[readiness-check] API unsupported: " + e.getMessage()); + return EXIT_API_UNSUPPORTED; + + } catch (TimeoutException e) { + System.err.println( + "[readiness-check] Timeout connecting to Coordinator, treating as not ready: " + + e.getMessage()); + return EXIT_NOT_READY; + + } catch (Exception e) { + System.err.println( + "[readiness-check] Cannot reach Coordinator, treating as not ready: " + + e.getMessage()); + return EXIT_NOT_READY; + } + } + + private static ClusterHealth queryCoordinator(Admin admin, long timeoutMs) throws Exception { + try { + return admin.getClusterHealth().get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + if (ExceptionUtils.findThrowable(cause, UnsupportedVersionException.class) + .isPresent()) { + throw new ApiUnsupportedException(cause.getMessage()); + } + throw e; + } + } + + private static int evaluateHealth(ClusterHealth health) { + ClusterHealthStatus status = health.getStatus(); + System.out.println( + "[readiness-check] status=" + + status + + " numReplicas=" + + health.getNumReplicas() + + " inSyncReplicas=" + + health.getInSyncReplicas() + + " numLeaderReplicas=" + + health.getNumLeaderReplicas() + + " activeLeaderReplicas=" + + health.getActiveLeaderReplicas()); + return status == ClusterHealthStatus.GREEN ? EXIT_READY : EXIT_NOT_READY; + } + + private static final class ApiUnsupportedException extends Exception { + ApiUnsupportedException(String message) { + super(message); + } + } +} diff --git a/fluss-dist/src/main/resources/bin/readiness-check.sh b/fluss-dist/src/main/resources/bin/readiness-check.sh new file mode 100644 index 0000000000..c9cdd30da8 --- /dev/null +++ b/fluss-dist/src/main/resources/bin/readiness-check.sh @@ -0,0 +1,249 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# ============================================================================== +# Fluss Readiness Check Script +# ============================================================================== +# +# Two-step readiness probe for Kubernetes StatefulSet rolling upgrades: +# +# Step 1 — Local TCP port check: verify this TabletServer process is alive +# and has bound its RPC port. Fast, no external dependency. +# +# Step 2 — Cluster health check: query the Coordinator's Cluster Health API +# and pass only if status is GREEN. +# YELLOW/RED/UNKNOWN means recovery is incomplete — block the upgrade. +# +# Both steps must pass for the pod to be marked Ready. +# +# Exit codes (from ClusterHealthReadinessCheck.java): +# 0 = Ready (status GREEN) +# 1 = Not ready (status YELLOW, RED, UNKNOWN, or Coordinator unreachable) +# 2 = API unsupported → shell handles grace period then TCP fallback +# +# Environment variables (set by helm template or container spec): +# FLUSS_HOME - Fluss installation directory +# FLUSS_CONF_DIR - Configuration directory (default: $FLUSS_HOME/conf) +# READINESS_TIMEOUT_MS - Timeout for Health API call (default: 5000) +# READINESS_TCP_HOST - Host for TCP check (default: $POD_IP or 127.0.0.1) +# READINESS_TCP_PORT - Port for TCP check (default: 9124) +# READINESS_BOOTSTRAP_SERVERS - Coordinator bootstrap address +# READINESS_GRACE_SECS - Grace period when API is unsupported (default: 60) +# ============================================================================== + +set -o pipefail + +# ---- Configuration ---- + +FLUSS_HOME="${FLUSS_HOME:-/opt/fluss}" +FLUSS_CONF_DIR="${FLUSS_CONF_DIR:-${FLUSS_HOME}/conf}" +TIMEOUT_MS="${READINESS_TIMEOUT_MS:-5000}" +TCP_HOST="${READINESS_TCP_HOST:-${POD_IP:-127.0.0.1}}" +TCP_PORT="${READINESS_TCP_PORT:-9124}" +BOOTSTRAP_SERVERS="${READINESS_BOOTSTRAP_SERVERS:-}" +GRACE_SECS="${READINESS_GRACE_SECS:-60}" + +# Marker files for tracking state across probe invocations +MARKER_DIR="/tmp/fluss-readiness" +FIRST_READY_MARKER="${MARKER_DIR}/first-ready" +API_UNSUPPORTED_SINCE="${MARKER_DIR}/api-unsupported-since" + +mkdir -p "${MARKER_DIR}" + +# ---- Helper Functions ---- + +# Step 1: TCP port check (local liveness) +check_tcp() { + local host="${TCP_HOST}" + local port="${1:-${TCP_PORT}}" + (echo > /dev/tcp/"${host}"/"${port}") >/dev/null 2>&1 + return $? +} + +# Step 2: Run the Java ClusterHealthReadinessCheck CLI tool (cluster health check) +run_recovery_check() { + local conf_dir="$1" + local timeout_ms="$2" + + # Construct classpath (same logic as config.sh) + local classpath="" + local fluss_server_jar="" + while IFS= read -r -d '' jarfile; do + if [[ "$jarfile" =~ .*/fluss-server[^/]*.jar$ ]]; then + fluss_server_jar="$jarfile" + elif [[ -z "$classpath" ]]; then + classpath="$jarfile" + else + classpath="${classpath}:${jarfile}" + fi + done < <(find "${FLUSS_HOME}/lib" ! -type d -name '*.jar' -print0 | sort -z) + + if [[ -n "$fluss_server_jar" ]]; then + classpath="${classpath}:${fluss_server_jar}" + fi + + if [[ -z "$classpath" ]]; then + echo "[readiness-check] ERROR: No jars found in ${FLUSS_HOME}/lib" + return 3 + fi + + # Find Java + local java_cmd="java" + if [[ -n "${JAVA_HOME}" && -x "${JAVA_HOME}/bin/java" ]]; then + java_cmd="${JAVA_HOME}/bin/java" + fi + + # Build optional --bootstrapServers argument + local bootstrap_arg="" + if [[ -n "${BOOTSTRAP_SERVERS}" ]]; then + bootstrap_arg="--bootstrapServers ${BOOTSTRAP_SERVERS}" + fi + + # Run the check (suppress JVM startup noise, only care about exit code + output) + local output + output=$("${java_cmd}" \ + -XX:+IgnoreUnrecognizedVMOptions \ + -Xmx64m \ + -classpath "${classpath}" \ + org.apache.fluss.dist.ClusterHealthReadinessCheck \ + --configDir "${conf_dir}" \ + --timeoutMs "${timeout_ms}" \ + ${bootstrap_arg} 2>&1) + local exit_code=$? + + # Log output to container's main process stderr (visible in kubectl logs) + if [[ -n "$output" ]]; then + echo "$output" >&2 + if [[ -w /proc/1/fd/2 ]]; then + echo "[readiness-probe] $output" > /proc/1/fd/2 + fi + fi + + return $exit_code +} + +# Record when API unsupported was first detected +mark_api_unsupported() { + if [[ ! -f "${API_UNSUPPORTED_SINCE}" ]]; then + date +%s > "${API_UNSUPPORTED_SINCE}" + fi +} + +# Clear API-unsupported marker +clear_api_unsupported() { + rm -f "${API_UNSUPPORTED_SINCE}" +} + +# Check if grace period for API-unsupported has elapsed +is_grace_period_elapsed() { + if [[ ! -f "${API_UNSUPPORTED_SINCE}" ]]; then + return 1 # not elapsed (no marker) + fi + local since + since=$(cat "${API_UNSUPPORTED_SINCE}") + local now + now=$(date +%s) + local elapsed=$(( now - since )) + if [[ $elapsed -ge ${GRACE_SECS} ]]; then + return 0 # elapsed + fi + return 1 # not elapsed yet +} + +# ---- Main Logic ---- + +# Helper: log to container's main process (visible in kubectl logs) +log_to_main() { + echo "$1" >&2 + if [[ -w /proc/1/fd/2 ]]; then + echo "$1" > /proc/1/fd/2 + fi +} + +# ---- Step 1: Local TCP port check ---- +# If the local TS process hasn't bound its port yet, fail immediately. +# No need to query the Coordinator for cluster state. +if ! check_tcp; then + exit 1 +fi + +# ---- Step 2: Cluster health check ---- + +# First-boot detection: +# On the very first startup (no previous data), there's nothing to recover. +# We use a marker file: once the cluster has been "ready" at least once, we +# know subsequent probes after a restart are "upgrade" scenarios that need +# the full recovery gate. +if [[ ! -f "${FIRST_READY_MARKER}" ]]; then + run_recovery_check "${FLUSS_CONF_DIR}" "${TIMEOUT_MS}" + local_exit=$? + + case $local_exit in + 0) + log_to_main "[readiness-check] First boot: GREEN, marking ready" + touch "${FIRST_READY_MARKER}" + clear_api_unsupported + exit 0 + ;; + 1) + log_to_main "[readiness-check] First boot: not ready (exit 1)" + exit 1 + ;; + 2) + log_to_main "[readiness-check] First boot: API unsupported, TCP-only fallback" + touch "${FIRST_READY_MARKER}" + exit 0 + ;; + *) + log_to_main "[readiness-check] Config error, falling back to TCP-only" + exit 0 + ;; + esac +fi + +# Upgrade/restart scenario (marker file exists = has been ready before) +run_recovery_check "${FLUSS_CONF_DIR}" "${TIMEOUT_MS}" +local_exit=$? + +case $local_exit in + 0) + log_to_main "[readiness-check] Upgrade check: PASSED (exit 0)" + clear_api_unsupported + exit 0 + ;; + 1) + log_to_main "[readiness-check] Upgrade check: BLOCKED (exit 1) — waiting for recovery" + clear_api_unsupported + exit 1 + ;; + 2) + mark_api_unsupported + if is_grace_period_elapsed; then + log_to_main "[readiness-check] API unsupported for >${GRACE_SECS}s, TCP fallback" + exit 0 + fi + log_to_main "[readiness-check] API unsupported, waiting (grace period)" + exit 1 + ;; + *) + log_to_main "[readiness-check] Config error (exit $local_exit), treating as not ready" + exit 1 + ;; +esac diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java index 7082265456..b8454c713e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java @@ -317,4 +317,9 @@ public void close() { public CompletableFuture getReadableLakeSnapshot(TablePath tablePath) { throw new UnsupportedOperationException("Not implemented in TestAdminAdapter"); } + + @Override + public CompletableFuture getClusterHealth() { + throw new UnsupportedOperationException("Not implemented in TestAdminAdapter"); + } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java index 272d1b4a11..94045435c0 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java @@ -22,6 +22,8 @@ import org.apache.fluss.rpc.messages.DatabaseExistsResponse; import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest; import org.apache.fluss.rpc.messages.DescribeClusterConfigsResponse; +import org.apache.fluss.rpc.messages.GetClusterHealthRequest; +import org.apache.fluss.rpc.messages.GetClusterHealthResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; import org.apache.fluss.rpc.messages.GetDatabaseInfoResponse; import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; @@ -192,4 +194,7 @@ CompletableFuture listPartitionInfos( @RPC(api = ApiKeys.DESCRIBE_CLUSTER_CONFIGS) CompletableFuture describeClusterConfigs( DescribeClusterConfigsRequest request); + + @RPC(api = ApiKeys.GET_CLUSTER_HEALTH) + CompletableFuture getClusterHealth(GetClusterHealthRequest request); } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 92a4680eef..dd164d7cc8 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -103,7 +103,8 @@ public enum ApiKeys { DROP_KV_SNAPSHOT_LEASE(1058, 0, 0, PUBLIC), GET_TABLE_STATS(1059, 0, 0, PUBLIC), ALTER_DATABASE(1060, 0, 0, PUBLIC), - SCAN_KV(1061, 0, 0, PUBLIC); + SCAN_KV(1061, 0, 0, PUBLIC), + GET_CLUSTER_HEALTH(1062, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index e8381215fc..7b86bc9158 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -1369,4 +1369,14 @@ message PbLiteralValue { optional int64 timestamp_millis_value = 11; // Epoch millis optional int32 timestamp_nano_of_millis_value = 12; // Nano of millis optional bytes decimal_bytes = 13; // Serialized decimal (non-compact mode) +} + +message GetClusterHealthRequest { } + +message GetClusterHealthResponse { + required int32 num_replicas = 1; + required int32 in_sync_replicas = 2; + required int32 num_leader_replicas = 3; + required int32 active_leader_replicas = 4; + required int32 status = 5; // PbClusterHealthStatus: GREEN=0, YELLOW=1, RED=2, UNKNOWN=3 } \ No newline at end of file diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 3517680932..a408fa5fd5 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -167,6 +167,12 @@ public CompletableFuture notifyKvSnapshotOffset( return null; } + @Override + public CompletableFuture + getClusterHealth(org.apache.fluss.rpc.messages.GetClusterHealthRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture notifyLakeTableOffset( NotifyLakeTableOffsetRequest request) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index f7b6de9eb8..cc20737b9f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -45,6 +45,8 @@ import org.apache.fluss.rpc.messages.DatabaseExistsResponse; import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest; import org.apache.fluss.rpc.messages.DescribeClusterConfigsResponse; +import org.apache.fluss.rpc.messages.GetClusterHealthRequest; +import org.apache.fluss.rpc.messages.GetClusterHealthResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; import org.apache.fluss.rpc.messages.GetDatabaseInfoResponse; import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; @@ -574,6 +576,18 @@ public CompletableFuture describeClusterConfigs( new DescribeClusterConfigsResponse().addAllConfigs(toPbConfigEntries(configs))); } + @Override + public CompletableFuture getClusterHealth( + GetClusterHealthRequest request) { + GetClusterHealthResponse resp = new GetClusterHealthResponse(); + resp.setNumReplicas(0) + .setInSyncReplicas(0) + .setNumLeaderReplicas(0) + .setActiveLeaderReplicas(0) + .setStatus(3); // UNKNOWN + return CompletableFuture.completedFuture(resp); + } + protected MetadataResponse processMetadataRequest( MetadataRequest request, String listenerName, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index 92dfc5ee5c..34fdb63869 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -106,6 +106,13 @@ public class CoordinatorContext { */ private final Map> replicasOnOffline = new HashMap<>(); + /** + * Tracks leader buckets that are currently inactive — a leader is inactive from the moment we + * send NotifyLeaderAndIsr until the target server responds successfully confirming it is the + * leader. Also inactive when leader == NO_LEADER. + */ + private final Set inactiveLeaderBuckets = new HashSet<>(); + /** A mapping from tabletServers to server tag. */ private final Map serverTags = new HashMap<>(); @@ -218,6 +225,32 @@ public void removeOfflineBucketInServer(int serverId) { replicasOnOffline.remove(serverId); } + // ---- Inactive leader tracking (for Cluster Health API) ---- + + public void markLeaderInactive(TableBucket bucket) { + inactiveLeaderBuckets.add(bucket); + } + + public void markLeadersInactive(Collection buckets) { + inactiveLeaderBuckets.addAll(buckets); + } + + public void markLeaderActive(TableBucket bucket) { + inactiveLeaderBuckets.remove(bucket); + } + + public boolean isLeaderActive(TableBucket bucket) { + return !inactiveLeaderBuckets.contains(bucket); + } + + public Set getInactiveLeaderBuckets() { + return Collections.unmodifiableSet(inactiveLeaderBuckets); + } + + public void removeFromInactiveLeaders(Set buckets) { + inactiveLeaderBuckets.removeAll(buckets); + } + public Map allTables() { return tablePathById; } @@ -656,10 +689,16 @@ public void removeTable(long tableId) { tablesToBeDeleted.remove(tableId); Map> assignment = tableAssignments.remove(tableId); if (assignment != null) { - // remove leadership info for each bucket from the context + Set removedBuckets = new HashSet<>(); assignment .keySet() - .forEach(bucket -> bucketLeaderAndIsr.remove(new TableBucket(tableId, bucket))); + .forEach( + bucket -> { + TableBucket tb = new TableBucket(tableId, bucket); + bucketLeaderAndIsr.remove(tb); + removedBuckets.add(tb); + }); + removeFromInactiveLeaders(removedBuckets); } TablePath tablePath = tablePathById.remove(tableId); @@ -673,16 +712,20 @@ public void removePartition(TablePartition tablePartition) { partitionsToBeDeleted.remove(tablePartition); Map> assignment = partitionAssignments.remove(tablePartition); if (assignment != null) { - // remove leadership info for each bucket from the context + Set removedBuckets = new HashSet<>(); assignment .keySet() .forEach( - bucket -> - bucketLeaderAndIsr.remove( - new TableBucket( - tablePartition.getTableId(), - tablePartition.getPartitionId(), - bucket))); + bucket -> { + TableBucket tb = + new TableBucket( + tablePartition.getTableId(), + tablePartition.getPartitionId(), + bucket); + bucketLeaderAndIsr.remove(tb); + removedBuckets.add(tb); + }); + removeFromInactiveLeaders(removedBuckets); } PhysicalTablePath physicalTablePath = @@ -717,6 +760,7 @@ private void clearTablesState() { partitionAssignments.clear(); bucketLeaderAndIsr.clear(); replicasOnOffline.clear(); + inactiveLeaderBuckets.clear(); bucketStates.clear(); replicaStates.clear(); tablePathById.clear(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 96f39ef08a..e859d6a782 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -964,6 +964,7 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent( // get the server that receives the response int serverId = notifyLeaderAndIsrResponseReceivedEvent.getResponseServerId(); Set offlineReplicas = new HashSet<>(); + List succeededBuckets = new ArrayList<>(); // get all the results for each bucket List notifyLeaderAndIsrResultForBuckets = notifyLeaderAndIsrResponseReceivedEvent.getNotifyLeaderAndIsrResultForBuckets(); @@ -974,10 +975,17 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent( offlineReplicas.add( new TableBucketReplica( notifyLeaderAndIsrResultForBucket.getTableBucket(), serverId)); + } else { + succeededBuckets.add(notifyLeaderAndIsrResultForBucket.getTableBucket()); + } + } + for (TableBucket tb : succeededBuckets) { + Optional laiOpt = coordinatorContext.getBucketLeaderAndIsr(tb); + if (laiOpt.isPresent() && laiOpt.get().leader() == serverId) { + coordinatorContext.markLeaderActive(tb); } } if (!offlineReplicas.isEmpty()) { - // trigger replicas to offline onReplicaBecomeOffline(offlineReplicas); } @@ -1130,6 +1138,7 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent // process dead tablet server LOG.info("Tablet server failure callback for {}.", tabletServerId); coordinatorContext.removeOfflineBucketInServer(tabletServerId); + coordinatorContext.removeLiveTabletServer(tabletServerId); coordinatorContext.shuttingDownTabletServers().remove(tabletServerId); coordinatorChannelManager.removeTabletServer(tabletServerId); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 3441053671..555a88519e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -35,11 +35,13 @@ import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent; import org.apache.fluss.server.coordinator.event.EventManager; import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; import org.apache.fluss.server.entity.DeleteReplicaResultForBucket; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; +import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.metadata.BucketMetadata; import org.apache.fluss.server.metadata.PartitionMetadata; import org.apache.fluss.server.metadata.TableMetadata; @@ -409,10 +411,19 @@ private void sendNotifyLeaderAndIsrRequest(int coordinatorEpoch) { notifyRequestEntry : notifyLeaderAndIsrRequestMap.entrySet()) { // send request for each tablet server Integer serverId = notifyRequestEntry.getKey(); + Set buckets = notifyRequestEntry.getValue().keySet(); NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest = makeNotifyLeaderAndIsrRequest( coordinatorEpoch, notifyRequestEntry.getValue().values()); + for (Map.Entry entry : + notifyRequestEntry.getValue().entrySet()) { + int leader = entry.getValue().getLeader(); + if (leader == serverId || leader == LeaderAndIsr.NO_LEADER) { + coordinatorContext.markLeaderInactive(entry.getKey()); + } + } + coordinatorChannelManager.sendBucketLeaderAndIsrRequest( serverId, notifyLeaderAndIsrRequest, @@ -422,12 +433,20 @@ private void sendNotifyLeaderAndIsrRequest(int coordinatorEpoch) { "Failed to send notify leader and isr request to tablet server {}.", serverId, throwable); - // todo: in FLUSS-55886145, we will introduce a sender thread to send - // the request, and retry if encounter any error; It may happens that - // the tablet server is offline and will always got error. But, - // coordinator will remove the sender for the tablet server and mark all - // replica in the tablet server as offline. so, in here, if encounter - // any error, we just ignore it. + // Treat all buckets as failed — clears pending state and triggers + // re-election via onReplicaBecomeOffline. + List failedResults = + new ArrayList<>(); + ApiError sendError = + new ApiError( + Errors.UNKNOWN_SERVER_ERROR, throwable.getMessage()); + for (TableBucket tb : buckets) { + failedResults.add( + new NotifyLeaderAndIsrResultForBucket(tb, sendError)); + } + eventManager.put( + new NotifyLeaderAndIsrResponseReceivedEvent( + failedResults, serverId)); return; } // put the response receive event into the event manager diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index e96e36f454..ecb4475edf 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -98,6 +98,8 @@ import org.apache.fluss.rpc.messages.DropPartitionResponse; import org.apache.fluss.rpc.messages.DropTableRequest; import org.apache.fluss.rpc.messages.DropTableResponse; +import org.apache.fluss.rpc.messages.GetClusterHealthRequest; +import org.apache.fluss.rpc.messages.GetClusterHealthResponse; import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest; import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; @@ -166,6 +168,7 @@ import org.apache.fluss.server.utils.ServerRpcMessageUtils; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; @@ -291,6 +294,62 @@ public CoordinatorService( this.coordinatorLeaderElection = coordinatorLeaderElection; } + @Override + public CompletableFuture getClusterHealth( + GetClusterHealthRequest request) { + AccessContextEvent event = + new AccessContextEvent<>(CoordinatorService::computeClusterHealth); + eventManagerSupplier.get().put(event); + return event.getResultFuture(); + } + + @VisibleForTesting + static GetClusterHealthResponse computeClusterHealth(CoordinatorContext ctx) { + GetClusterHealthResponse response = new GetClusterHealthResponse(); + + int numReplicas = 0; + int inSyncReplicas = 0; + int numLeaderReplicas = 0; + int activeLeaderReplicas = 0; + Set inactiveLeaders = ctx.getInactiveLeaderBuckets(); + + for (TableBucket tb : ctx.getAllBuckets()) { + List assignment = ctx.getAssignment(tb); + numReplicas += assignment.size(); + numLeaderReplicas++; + + Optional laiOpt = ctx.getBucketLeaderAndIsr(tb); + if (laiOpt.isPresent()) { + LeaderAndIsr lai = laiOpt.get(); + inSyncReplicas += lai.isr().size(); + if (lai.leader() != LeaderAndIsr.NO_LEADER + && ctx.getLiveTabletServers().containsKey(lai.leader()) + && !inactiveLeaders.contains(tb)) { + activeLeaderReplicas++; + } + } + } + + // PbClusterHealthStatus: GREEN=0, YELLOW=1, RED=2, UNKNOWN=3 + int status; + if (numLeaderReplicas == 0) { + status = 0; // GREEN + } else if (activeLeaderReplicas < numLeaderReplicas) { + status = 2; // RED + } else if (inSyncReplicas < numReplicas) { + status = 1; // YELLOW + } else { + status = 0; // GREEN + } + + response.setNumReplicas(numReplicas); + response.setInSyncReplicas(inSyncReplicas); + response.setNumLeaderReplicas(numLeaderReplicas); + response.setActiveLeaderReplicas(activeLeaderReplicas); + response.setStatus(status); + return response; + } + @Override public String name() { return "coordinator"; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/ClusterHealthTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/ClusterHealthTest.java new file mode 100644 index 0000000000..7700551670 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/ClusterHealthTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.cluster.Endpoint; +import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.rpc.messages.GetClusterHealthResponse; +import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZkEpoch; +import org.apache.fluss.server.zk.data.LeaderAndIsr; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CoordinatorService#computeClusterHealth}. */ +class ClusterHealthTest { + + private CoordinatorContext ctx; + + @BeforeEach + void setUp() { + ctx = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); + ctx.setLiveTabletServers( + Arrays.asList(makeServerInfo(0), makeServerInfo(1), makeServerInfo(2))); + } + + @Test + void testEmptyCluster() { + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getNumReplicas()).isEqualTo(0); + assertThat(resp.getInSyncReplicas()).isEqualTo(0); + assertThat(resp.getNumLeaderReplicas()).isEqualTo(0); + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(0); + assertThat(resp.getStatus()).isEqualTo(0 /* GREEN */); + } + + @Test + void testGreenAllInSyncAllLeadersActive() { + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1, 2)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(0, 1, Arrays.asList(0, 1, 2), Collections.emptyList(), 0, 1)); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getNumReplicas()).isEqualTo(3); + assertThat(resp.getInSyncReplicas()).isEqualTo(3); + assertThat(resp.getNumLeaderReplicas()).isEqualTo(1); + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(1); + assertThat(resp.getStatus()).isEqualTo(0 /* GREEN */); + } + + @Test + void testYellowIsrIncompleteButLeadersActive() { + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1, 2)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(0, 1, Arrays.asList(0, 1), Collections.emptyList(), 0, 1)); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getNumReplicas()).isEqualTo(3); + assertThat(resp.getInSyncReplicas()).isEqualTo(2); + assertThat(resp.getNumLeaderReplicas()).isEqualTo(1); + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(1); + assertThat(resp.getStatus()).isEqualTo(1 /* YELLOW */); + } + + @Test + void testRedLeaderInactive() { + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1, 2)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(0, 1, Arrays.asList(0, 1, 2), Collections.emptyList(), 0, 1)); + ctx.markLeaderInactive(tb); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getNumReplicas()).isEqualTo(3); + assertThat(resp.getInSyncReplicas()).isEqualTo(3); + assertThat(resp.getNumLeaderReplicas()).isEqualTo(1); + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(0); + assertThat(resp.getStatus()).isEqualTo(2 /* RED */); + } + + @Test + void testRedNoLeader() { + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1)); + ctx.putBucketLeaderAndIsr( + tb, + new LeaderAndIsr( + LeaderAndIsr.NO_LEADER, + 1, + Arrays.asList(0, 1), + Collections.emptyList(), + 0, + 1)); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(0); + assertThat(resp.getStatus()).isEqualTo(2 /* RED */); + } + + @Test + void testRedLeaderOnDeadServer() { + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1, 5)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(5, 1, Arrays.asList(0, 1, 5), Collections.emptyList(), 0, 1)); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(0); + assertThat(resp.getStatus()).isEqualTo(2 /* RED */); + } + + @Test + void testNoLeaderAndIsrCountsAsRedAndZeroIsr() { + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1)); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getNumReplicas()).isEqualTo(2); + assertThat(resp.getInSyncReplicas()).isEqualTo(0); + assertThat(resp.getNumLeaderReplicas()).isEqualTo(1); + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(0); + assertThat(resp.getStatus()).isEqualTo(2 /* RED */); + } + + @Test + void testTransitionRedToGreenAfterMarkActive() { + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(0, 1, Arrays.asList(0, 1), Collections.emptyList(), 0, 1)); + ctx.markLeaderInactive(tb); + + assertThat(CoordinatorService.computeClusterHealth(ctx).getStatus()).isEqualTo(2 /* RED */); + + ctx.markLeaderActive(tb); + + assertThat(CoordinatorService.computeClusterHealth(ctx).getStatus()) + .isEqualTo(0 /* GREEN */); + } + + @Test + void testMultipleBucketsMixed() { + TableBucket tb1 = new TableBucket(1L, 0); + TableBucket tb2 = new TableBucket(1L, 1); + TableBucket tb3 = new TableBucket(2L, 0); + + ctx.updateBucketReplicaAssignment(tb1, Arrays.asList(0, 1)); + ctx.updateBucketReplicaAssignment(tb2, Arrays.asList(1, 2)); + ctx.updateBucketReplicaAssignment(tb3, Arrays.asList(0, 2)); + + ctx.putBucketLeaderAndIsr( + tb1, new LeaderAndIsr(0, 1, Arrays.asList(0, 1), Collections.emptyList(), 0, 1)); + ctx.putBucketLeaderAndIsr( + tb2, + new LeaderAndIsr( + 1, 1, Collections.singletonList(1), Collections.emptyList(), 0, 1)); + ctx.putBucketLeaderAndIsr( + tb3, new LeaderAndIsr(0, 1, Arrays.asList(0, 2), Collections.emptyList(), 0, 1)); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getNumReplicas()).isEqualTo(6); + assertThat(resp.getInSyncReplicas()).isEqualTo(5); + assertThat(resp.getNumLeaderReplicas()).isEqualTo(3); + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(3); + assertThat(resp.getStatus()).isEqualTo(1 /* YELLOW */); + } + + @Test + void testPartitionedTableBucket() { + TableBucket tb = new TableBucket(1L, 100L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1)); + ctx.putBucketLeaderAndIsr( + tb, + new LeaderAndIsr( + 0, 1, Collections.singletonList(0), Collections.emptyList(), 0, 1)); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getNumReplicas()).isEqualTo(2); + assertThat(resp.getInSyncReplicas()).isEqualTo(1); + assertThat(resp.getNumLeaderReplicas()).isEqualTo(1); + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(1); + assertThat(resp.getStatus()).isEqualTo(1 /* YELLOW */); + } + + @Test + void testInactiveLeaderClearedOnTableRemoval() { + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(0, 1, Arrays.asList(0, 1), Collections.emptyList(), 0, 1)); + ctx.markLeaderInactive(tb); + + assertThat(ctx.getInactiveLeaderBuckets()).contains(tb); + + ctx.removeTable(1L); + + assertThat(ctx.getInactiveLeaderBuckets()).doesNotContain(tb); + } + + @Test + void testRedWithInactiveLeaderAndIncompleteIsr() { + TableBucket tb1 = new TableBucket(1L, 0); + TableBucket tb2 = new TableBucket(1L, 1); + + ctx.updateBucketReplicaAssignment(tb1, Arrays.asList(0, 1)); + ctx.updateBucketReplicaAssignment(tb2, Arrays.asList(0, 1)); + + ctx.putBucketLeaderAndIsr( + tb1, + new LeaderAndIsr( + 0, 1, Collections.singletonList(0), Collections.emptyList(), 0, 1)); + ctx.putBucketLeaderAndIsr( + tb2, new LeaderAndIsr(1, 1, Arrays.asList(0, 1), Collections.emptyList(), 0, 1)); + + ctx.markLeaderInactive(tb2); + + GetClusterHealthResponse resp = CoordinatorService.computeClusterHealth(ctx); + + assertThat(resp.getNumReplicas()).isEqualTo(4); + assertThat(resp.getInSyncReplicas()).isEqualTo(3); + assertThat(resp.getActiveLeaderReplicas()).isEqualTo(1); + assertThat(resp.getStatus()).isEqualTo(2 /* RED */); + } + + @Test + void testInactiveLeaderClearedOnPartitionRemoval() { + TableBucket tb = new TableBucket(1L, 100L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(0, 1, Arrays.asList(0, 1), Collections.emptyList(), 0, 1)); + ctx.markLeaderInactive(tb); + + assertThat(ctx.getInactiveLeaderBuckets()).contains(tb); + + ctx.removePartition(new TablePartition(1L, 100L)); + + assertThat(ctx.getInactiveLeaderBuckets()).doesNotContain(tb); + } + + @Test + void testFollowerNotificationDoesNotMarkInactive() { + // Simulate sendNotifyLeaderAndIsrRequest: when NotifyLeaderAndIsr is sent + // to a server for a bucket where it is a follower, the bucket should NOT + // be marked inactive. Only the server that is the leader gets marked. + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(0, 1, Arrays.asList(0, 1), Collections.emptyList(), 0, 1)); + + // Sending to server 1 (follower): leader=0, serverId=1 + // Follower does not match leader, so markLeaderInactive is NOT called. + assertThat(ctx.isLeaderActive(tb)).isTrue(); + assertThat(CoordinatorService.computeClusterHealth(ctx).getStatus()) + .isEqualTo(0 /* GREEN */); + + // Sending to server 0 (leader): leader=0, serverId=0 — matches, so mark inactive. + ctx.markLeaderInactive(tb); + assertThat(ctx.isLeaderActive(tb)).isFalse(); + assertThat(CoordinatorService.computeClusterHealth(ctx).getStatus()).isEqualTo(2 /* RED */); + } + + @Test + void testLeaderChangedBetweenSendAndResponseStaysInactive() { + // Simulate processNotifyLeaderAndIsrResponseReceivedEvent: if the leader + // changed between send and response, the responding server is no longer + // the leader, so the bucket must stay inactive. + TableBucket tb = new TableBucket(1L, 0); + ctx.updateBucketReplicaAssignment(tb, Arrays.asList(0, 1, 2)); + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(0, 1, Arrays.asList(0, 1, 2), Collections.emptyList(), 0, 1)); + ctx.markLeaderInactive(tb); + + // Simulate: leader changed from server 0 to server 1 before server 0 responds + ctx.putBucketLeaderAndIsr( + tb, new LeaderAndIsr(1, 2, Arrays.asList(0, 1, 2), Collections.emptyList(), 0, 1)); + + // Server 0 responds successfully — but it's no longer the leader + int respondingServerId = 0; + ctx.getBucketLeaderAndIsr(tb) + .ifPresent( + lai -> { + if (lai.leader() == respondingServerId) { + ctx.markLeaderActive(tb); + } + }); + + // Bucket must stay inactive because the responding server (0) != current leader (1) + assertThat(ctx.isLeaderActive(tb)).isFalse(); + assertThat(CoordinatorService.computeClusterHealth(ctx).getStatus()).isEqualTo(2 /* RED */); + + // Now server 1 (the actual leader) responds → bucket becomes active + int actualLeaderServerId = 1; + ctx.getBucketLeaderAndIsr(tb) + .ifPresent( + lai -> { + if (lai.leader() == actualLeaderServerId) { + ctx.markLeaderActive(tb); + } + }); + + assertThat(ctx.isLeaderActive(tb)).isTrue(); + assertThat(CoordinatorService.computeClusterHealth(ctx).getStatus()) + .isEqualTo(0 /* GREEN */); + } + + private static ServerInfo makeServerInfo(int id) { + return new ServerInfo( + id, + "RACK" + id, + Endpoint.fromListenersString("CLIENT://host" + id + ":9124"), + ServerType.TABLET_SERVER); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java index 2b788bdb48..87930b41a6 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java @@ -19,6 +19,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; @@ -28,6 +29,9 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED; import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR; @@ -69,6 +73,78 @@ void testGetLakeTableCount() { assertThat(context.getLakeTableCount()).isEqualTo(2); } + // ---- Inactive Leader Tracking Tests ---- + + @Test + void testMarkLeaderInactive() { + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); + TableBucket tb1 = new TableBucket(1L, 0); + TableBucket tb2 = new TableBucket(1L, 1); + + assertThat(context.isLeaderActive(tb1)).isTrue(); + assertThat(context.getInactiveLeaderBuckets()).isEmpty(); + + context.markLeaderInactive(tb1); + context.markLeaderInactive(tb2); + + assertThat(context.isLeaderActive(tb1)).isFalse(); + assertThat(context.isLeaderActive(tb2)).isFalse(); + assertThat(context.getInactiveLeaderBuckets()).containsExactlyInAnyOrder(tb1, tb2); + } + + @Test + void testMarkLeaderActive() { + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); + TableBucket tb1 = new TableBucket(1L, 0); + TableBucket tb2 = new TableBucket(1L, 1); + + context.markLeadersInactive(Arrays.asList(tb1, tb2)); + + context.markLeaderActive(tb1); + assertThat(context.isLeaderActive(tb1)).isTrue(); + assertThat(context.isLeaderActive(tb2)).isFalse(); + + context.markLeaderActive(tb2); + assertThat(context.getInactiveLeaderBuckets()).isEmpty(); + } + + @Test + void testMarkLeaderActiveForNonExistentBucket() { + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); + TableBucket tb1 = new TableBucket(1L, 0); + + // Should not throw + context.markLeaderActive(tb1); + assertThat(context.isLeaderActive(tb1)).isTrue(); + } + + @Test + void testRemoveFromInactiveLeaders() { + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); + TableBucket tb1 = new TableBucket(1L, 0); + TableBucket tb2 = new TableBucket(1L, 1); + TableBucket tb3 = new TableBucket(2L, 0); + + context.markLeadersInactive(Arrays.asList(tb1, tb2, tb3)); + + Set toRemove = new HashSet<>(Arrays.asList(tb1, tb3)); + context.removeFromInactiveLeaders(toRemove); + + assertThat(context.isLeaderActive(tb1)).isTrue(); + assertThat(context.isLeaderActive(tb2)).isFalse(); + assertThat(context.isLeaderActive(tb3)).isTrue(); + } + + @Test + void testGetInactiveLeaderBucketsReturnsUnmodifiableSet() { + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); + TableBucket tb1 = new TableBucket(1L, 0); + context.markLeaderInactive(tb1); + + Set inactive = context.getInactiveLeaderBuckets(); + assertThat(inactive).isUnmodifiable(); + } + private TableInfo createTableInfo(long tableId, TablePath tablePath, boolean isLake) { TableDescriptor tableDescriptor = TableDescriptor.builder() diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index 98e295f9a1..f11b8737b9 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -461,6 +461,12 @@ public CompletableFuture describeClusterConfigs( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture + getClusterHealth(org.apache.fluss.rpc.messages.GetClusterHealthRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture registerProducerOffsets( RegisterProducerOffsetsRequest request) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index e54987f134..a21c8ec2e0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -366,6 +366,12 @@ public CompletableFuture describeClusterConfigs( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture + getClusterHealth(org.apache.fluss.rpc.messages.GetClusterHealthRequest request) { + throw new UnsupportedOperationException(); + } + public int pendingRequestSize() { return requests.size(); } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 1df67094f1..dc4ccd1d0b 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -436,7 +436,9 @@ org.apache.fluss.predicate.* org.apache.fluss.lake.source.* - + + org.apache.fluss.dist.ClusterHealthReadinessCheck + org.apache.fluss.dist.ClusterHealthReadinessCheck.* org.apache.fluss.dist.DummyClass org.apache.fluss.flink.DummyClass120 org.apache.fluss.lake.batch.ArrowRecordBatch diff --git a/helm/templates/sts-coordinator.yaml b/helm/templates/sts-coordinator.yaml index 918bf2d06a..1d4dd3e9eb 100644 --- a/helm/templates/sts-coordinator.yaml +++ b/helm/templates/sts-coordinator.yaml @@ -25,6 +25,8 @@ metadata: spec: serviceName: coordinator-server-hs replicas: {{ .Values.coordinator.numberOfReplicas }} + updateStrategy: + type: RollingUpdate selector: matchLabels: {{- include "fluss.selectorLabels" . | nindent 6 }} @@ -122,7 +124,7 @@ spec: echo "bind.listeners: ${BIND_LISTENERS}" >> $FLUSS_HOME/conf/server.yaml && \ echo "advertised.listeners: ${ADVERTISED_LISTENERS}" >> $FLUSS_HOME/conf/server.yaml && \ - bin/coordinator-server.sh start-foreground + exec bin/coordinator-server.sh start-foreground livenessProbe: failureThreshold: 100 timeoutSeconds: 1 diff --git a/helm/templates/sts-tablet.yaml b/helm/templates/sts-tablet.yaml index 6232544de3..c0863f9c60 100644 --- a/helm/templates/sts-tablet.yaml +++ b/helm/templates/sts-tablet.yaml @@ -25,6 +25,8 @@ metadata: spec: serviceName: tablet-server-hs replicas: {{ .Values.tablet.numberOfReplicas }} + updateStrategy: + type: RollingUpdate selector: matchLabels: {{- include "fluss.selectorLabels" . | nindent 6 }} @@ -119,7 +121,7 @@ spec: echo "bind.listeners: ${BIND_LISTENERS}" >> $FLUSS_HOME/conf/server.yaml && \ echo "advertised.listeners: ${ADVERTISED_LISTENERS}" >> $FLUSS_HOME/conf/server.yaml && \ - bin/tablet-server.sh start-foreground + exec bin/tablet-server.sh start-foreground livenessProbe: failureThreshold: 100 timeoutSeconds: 1 @@ -128,12 +130,21 @@ spec: tcpSocket: port: {{ .Values.listeners.client.port }} readinessProbe: - failureThreshold: 100 - timeoutSeconds: 1 - initialDelaySeconds: 10 - periodSeconds: 3 - tcpSocket: - port: {{ .Values.listeners.client.port }} + failureThreshold: {{ .Values.tablet.readinessProbe.failureThreshold | default 200 }} + timeoutSeconds: {{ .Values.tablet.readinessProbe.timeoutSeconds | default 10 }} + initialDelaySeconds: {{ .Values.tablet.readinessProbe.initialDelaySeconds | default 15 }} + periodSeconds: {{ .Values.tablet.readinessProbe.periodSeconds | default 5 }} + exec: + command: + - /bin/bash + - -c + - | + export FLUSS_SERVER_ID=${POD_NAME##*-} + export READINESS_TIMEOUT_MS={{ .Values.tablet.readinessProbe.rpcTimeoutMs | default 5000 }} + export READINESS_TCP_PORT={{ .Values.listeners.client.port }} + export READINESS_BOOTSTRAP_SERVERS="coordinator-server-hs.${POD_NAMESPACE}.svc.cluster.local:{{ .Values.listeners.client.port }}" + export READINESS_GRACE_SECS={{ .Values.tablet.readinessProbe.gracePeriodSecs | default 60 }} + exec $FLUSS_HOME/bin/readiness-check.sh resources: {{- toYaml .Values.resources.tabletServer | nindent 12 }} volumeMounts: diff --git a/helm/values.yaml b/helm/values.yaml index e14877d110..a8332be2dc 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -43,6 +43,23 @@ tablet: enabled: false size: 1Gi storageClass: + # Readiness probe configuration for rolling upgrade gate. + # The probe calls the Coordinator's Cluster Health API to verify that + # cluster status is GREEN before marking the pod Ready. If the + # Coordinator does not support the API (older version), the probe + # automatically falls back to TCP-only after the grace period. + readinessProbe: + # Timeout in ms for the RPC call to Coordinator. + rpcTimeoutMs: 5000 + # Grace period (seconds) before falling back to TCP-only when the + # Coordinator does not support the Cluster Health API (e.g., mixed-version + # upgrade). After this period, TCP port reachability is sufficient. + gracePeriodSecs: 60 + # Standard Kubernetes probe parameters (tuned for recovery checks). + failureThreshold: 200 + timeoutSeconds: 10 + initialDelaySeconds: 15 + periodSeconds: 5 extraVolumes: [] extraVolumeMounts: [] initContainers: [] diff --git a/website/docs/install-deploy/deploying-with-helm.md b/website/docs/install-deploy/deploying-with-helm.md index 32c4e03ce7..3a67ea4e59 100644 --- a/website/docs/install-deploy/deploying-with-helm.md +++ b/website/docs/install-deploy/deploying-with-helm.md @@ -771,6 +771,47 @@ helm upgrade fluss ./helm -f values-new.yaml The StatefulSets support rolling updates. When you update the configuration, pods will be restarted one by one to maintain availability. +#### Cluster Health Readiness Probe + +The TabletServer readiness probe performs a two-step check to ensure safe rolling upgrades: + +1. **Local TCP port check** — confirms the TabletServer process is up and listening. +2. **Cluster health check** — calls the Coordinator's Cluster Health API to confirm cluster status is GREEN (all replicas + in-sync, all leaders active) across the cluster. + +Only when both steps pass is the pod marked as Ready, allowing the StatefulSet controller to proceed to the next pod. +If the Coordinator does not support the Cluster Health API (e.g., during a mixed-version upgrade from an older version), +the probe automatically falls back to TCP-only after the grace period. + +You can tune the probe parameters in your values: + +```yaml +tablet: + readinessProbe: + # Timeout in ms for the RPC call to Coordinator (default: 5000) + rpcTimeoutMs: 5000 + # Grace period (seconds) before falling back to TCP-only when the + # Coordinator does not support the Cluster Health API (default: 60) + gracePeriodSecs: 60 + # Standard Kubernetes probe parameters (tuned for recovery checks) + failureThreshold: 200 + timeoutSeconds: 10 + initialDelaySeconds: 15 + periodSeconds: 5 +``` + +| Parameter | Default | Description | +|--------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------| +| `rpcTimeoutMs` | `5000` | Timeout in milliseconds for the RPC call to the Coordinator. | +| `gracePeriodSecs` | `60` | When the Coordinator does not support the Cluster Health API (older version), the probe falls back to TCP-only after this grace period. | +| `failureThreshold` | `200` | Max consecutive probe failures before marking the pod as unready. With `periodSeconds=5`, this allows up to ~16 minutes for recovery. | +| `periodSeconds` | `5` | How often the probe runs. | + +:::note +The CoordinatorServer does not need the Cluster Health API probe — it does not host data replicas, so a simple TCP check is sufficient. +The Coordinator should be upgraded **after** all TabletServers are fully upgraded and recovered. +::: + ## Custom Container Images ### Building Custom Images @@ -805,7 +846,7 @@ image: ### Health Checks -The chart includes liveness and readiness probes: +The chart includes liveness and readiness probes. By default, both use TCP socket checks: ```yaml livenessProbe: @@ -823,6 +864,9 @@ readinessProbe: failureThreshold: 100 ``` +For TabletServers, you can enable the Cluster Health readiness probe for safe rolling upgrades. +See [Cluster Health Readiness Probe](#cluster-health-readiness-probe) for details. + ### Logs Access logs from different components: diff --git a/website/docs/maintenance/operations/upgrading.md b/website/docs/maintenance/operations/upgrading.md index 2b75557b63..1eb768a465 100644 --- a/website/docs/maintenance/operations/upgrading.md +++ b/website/docs/maintenance/operations/upgrading.md @@ -57,6 +57,44 @@ To upgrade the `TabletServers`, follow these steps one-by-one for each `TabletSe ./fluss-$FLUSS_VERSION$/bin/tablet-server.sh start ``` +**Wait for the cluster to recover before upgrading the next TabletServer** + +After restarting a TabletServer, you should wait for the cluster to fully recover before proceeding to the next one. +Upgrading too quickly can cause multiple servers to be in an unrecovered state simultaneously, which may break min-ISR guarantees +and affect data availability. + +You can use the **Cluster Health API** to monitor the cluster's health status: + +```java +Admin admin = connection.getAdmin(); + +ClusterHealth health = admin.getClusterHealth().get(); +System.out.println("Status: " + health.getStatus()); +System.out.println("Replicas: " + health.getInSyncReplicas() + "/" + health.getNumReplicas()); +System.out.println("Leaders: " + health.getActiveLeaderReplicas() + "/" + health.getNumLeaderReplicas()); + +if (health.getStatus() == ClusterHealthStatus.GREEN) { + // Safe to proceed with the next TabletServer +} +``` + +The API returns the following health metrics: + +| Field | Meaning | +|-------|---------| +| `status` | Cluster health: GREEN (all replicas in-sync, all leaders active), YELLOW (all leaders active, some followers not in-sync), RED (some leaders not active) | +| `numReplicas` | Total number of assigned replicas across all buckets | +| `inSyncReplicas` | Total number of in-sync replicas across all buckets | +| `numLeaderReplicas` | Total number of leader slots (one per bucket) | +| `activeLeaderReplicas` | Number of active leaders (leader alive and acknowledged) | + +Wait until the status is GREEN before upgrading the next TabletServer. GREEN means all replicas are in-sync and all leaders are active — fully recovered. + +:::tip +For Kubernetes deployments, you can enable the Cluster Health API readiness probe in the Helm chart to automate this check. +See [Deploying with Helm Charts — Cluster Health Readiness Probe](docs/install-deploy/deploying-with-helm.md#cluster-health-readiness-probe) for details. +::: + ### Upgrade the CoordinatorServer After all `TabletServers` have been upgraded, you can proceed to upgrade the `CoordinatorServer` by following these steps: