Extend CacheRuntime phase 2.4: add dataset related labels to nodes and support app pod affinity#5836
Extend CacheRuntime phase 2.4: add dataset related labels to nodes and support app pod affinity#5836xliuqq wants to merge 2 commits into
Conversation
mark thin runtime reference not support cache runtime Signed-off-by: xliuqq <xlzq1992@gmail.com>
|
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Code Review
This pull request refactors runtime status management by introducing the RuntimeStatusAccessor interface and unifies worker pod retrieval via the GetWorkerPods method. It also implements node affinity support for cache components and updates the cache engine's synchronization and shutdown processes. Feedback highlights a critical bug where an error from e.transform is ignored, resulting in potential panics. Additionally, the reviewer noted code duplication in the thin runtime package that should be resolved by exporting helper functions from the base package, and pointed out the need to restore test coverage for the refactored status retrieval logic.
I am having trouble creating individual review comments. Click here to see my feedback.
pkg/ddc/cache/engine/sync.go (54-56)
The error returned by e.transform is ignored because the err variable is immediately overwritten by the result of e.CheckAndUpdateRuntimeStatus. If transform fails, runtimeValue might be invalid, which could lead to panics or incorrect status updates in the subsequent call.
runtimeValue, err := e.transform(dataset, runtime, runtimeClass)
if err != nil {
return err
}
// TODO: use different struct for input parameter to avoid fully transform
_, err = e.CheckAndUpdateRuntimeStatus(runtimeValue)
pkg/ddc/base/runtime.go (646-647)
Consider exporting this function as GetDDCRuntimeStatus. This would allow other packages, such as pkg/ddc/thin/referencedataset, to reuse this logic instead of duplicating the entire switch statement for DDC-based runtimes.
// GetDDCRuntimeStatus retrieves the runtime object based on runtime type for DDC-based runtimes
func GetDDCRuntimeStatus(client client.Client, runtimeType, name, namespace string) (*datav1alpha1.RuntimeStatus, error) {
pkg/ddc/base/runtime.go (630)
Update the call to use the exported function name if the suggestion to export getDDCRuntime is accepted.
status, err := GetDDCRuntimeStatus(client, runtimeType, name, namespace)
pkg/ddc/thin/referencedataset/runtime.go (49)
Instead of using a local duplicated version of the runtime status retrieval logic, use the (suggested) exported function from the base package. This improves maintainability by centralizing the mapping of runtime types to their status objects.
return base.GetDDCRuntimeStatus(e.Client, physicalRuntimeInfo.GetRuntimeType(),
pkg/ddc/thin/referencedataset/runtime.go (56-104)
This function is a duplicate of the logic in pkg/ddc/base/runtime.go. It should be removed in favor of using an exported helper from the base package to avoid code duplication and maintenance overhead when new runtimes are added.
pkg/ddc/base/runtime_test.go (1163-1382)
The tests for GetRuntimeStatus have been removed. While the function itself was refactored into GetRuntimeStatusAccessor, the test coverage should be maintained. Please update these tests to verify the new accessor and its implementations (DDCRuntimeStatusAccessor and CacheRuntimeStatusAccessor) instead of deleting them.
pkg/ddc/base/runtime.go (642)
The error message mentions runtimeInfo, but the function GetRuntimeStatusAccessor is intended to return a status accessor. Updating the message to 'fail to get runtime status accessor' would be more accurate.
return nil, fmt.Errorf("fail to get runtime status accessor for runtime type: %s", runtimeType)
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #5836 +/- ##
==========================================
- Coverage 59.10% 58.90% -0.20%
==========================================
Files 480 480
Lines 32511 32599 +88
==========================================
- Hits 19215 19202 -13
- Misses 11746 11844 +98
- Partials 1550 1553 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: xliuqq <xlzq1992@gmail.com>
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a unified interface for accessing runtime status and worker pods across various Fluid runtime types. Key changes include the addition of the RuntimeStatusAccessor interface, the implementation of GetWorkerPods for retrieving worker pod information, and the integration of node affinity tracking for CacheRuntime. The feedback suggests correcting the documentation for GetWorkerPods, handling "not found" errors gracefully when fetching Kubernetes resources to prevent reconciliation loops from failing prematurely, and ensuring RuntimeClassName is validated before use.
| type RuntimeInfoInterface interface { | ||
| Conventions | ||
|
|
||
| // GetWorkerPods returns the worker object and selector for runtime workers. |
There was a problem hiding this comment.
| workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace()) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
If the worker StatefulSet is not found, GetWorkerPods should return an empty list and no error to avoid failing the reconciliation loop during initial setup. Using utils.IgnoreNotFound is a clean way to handle this.
| workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace()) | |
| if err != nil { | |
| return nil, err | |
| } | |
| workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace()) | |
| if err != nil { | |
| return nil, utils.IgnoreNotFound(err) | |
| } |
| ds, err := kubeclient.GetDaemonset(s.client, component.Name, component.Namespace) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Handle the case where the DaemonSet is not found gracefully by returning nil, nil instead of an error. This prevents the status update from failing when the component hasn't been created yet.
| ds, err := kubeclient.GetDaemonset(s.client, component.Name, component.Namespace) | |
| if err != nil { | |
| return nil, err | |
| } | |
| ds, err := kubeclient.GetDaemonset(s.client, component.Name, component.Namespace) | |
| if err != nil { | |
| return nil, utils.IgnoreNotFound(err) | |
| } |
| sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Handle the case where the StatefulSet is not found gracefully by returning nil, nil instead of an error to avoid unnecessary reconciliation failures.
| sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) | |
| if err != nil { | |
| return nil, err | |
| } | |
| sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) | |
| if err != nil { | |
| return nil, utils.IgnoreNotFound(err) | |
| } |
| workers, err := kubeclient.GetStatefulSet(client, workerName, info.GetNamespace()) | ||
| if err != nil { | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Handle the case where the worker StatefulSet is not found gracefully to avoid reconciliation errors during the initial setup phase.
| workers, err := kubeclient.GetStatefulSet(client, workerName, info.GetNamespace()) | |
| if err != nil { | |
| return nil, err | |
| } | |
| workers, err := kubeclient.GetStatefulSet(client, workerName, info.GetNamespace()) | |
| if err != nil { | |
| return nil, utils.IgnoreNotFound(err) | |
| } |
| @@ -18,13 +18,15 @@ package engine | |||
|
|
|||
| import ( | |||
| "context" | |||
| return err | ||
| } | ||
| dataset := ctx.Dataset | ||
| runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName) |
There was a problem hiding this comment.
It's safer to check if RuntimeClassName is set before attempting to fetch the CacheRuntimeClass to provide a clearer error message if it's missing.
| runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName) | |
| if runtime.Spec.RuntimeClassName == "" { | |
| return fmt.Errorf("runtimeClassName is not set in CacheRuntime %s/%s", runtime.Namespace, runtime.Name) | |
| } | |
| runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName) |
|
Review of PR #5836 — CacheRuntime phase 2.4: add dataset-related labels to nodes and support app pod affinity. The PR makes several well-structured changes: (1) RuntimeStatusAccessor abstraction to unify DDC and Cache runtime status access for the affinity webhook, (2) GetWorkerPods interface to decouple node labeling from direct StatefulSet access, (3) CacheRuntime status sync + SyncScheduleInfoToCacheNodes integration, (4) proper shutdown cleanup of dataset labels, (5) ThinRuntime reference now explicitly excludes CacheRuntime. One blocking concern: CacheRuntimeInfo.GetWorkerPods duplicates the naming logic that already exists in base.RuntimeInfo.GetWorkerPods, using GetComponentName vs GetWorkerStatefulsetName. These two paths may produce different worker names depending on runtime type suffix conventions, creating a maintenance risk when conventions evolve. Non-blocking: ctx.Dataset nil-safety in Sync, GetNodeAffinity API call overhead, full-transform-as-input pattern, ThinRuntime exclusion lacks user-facing validation/docs, and Shutdown TearDownWorkers error handling. |
| corev1 "k8s.io/api/core/v1" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" |
There was a problem hiding this comment.
CacheRuntimeInfo.GetWorkerPods computes the worker StatefulSet name as GetComponentName(info.GetName(), common.ComponentTypeWorker), while base.RuntimeInfo.GetWorkerPods uses runtimeInfo.GetWorkerStatefulsetName(). These may produce different names depending on the runtime type suffix convention.
For CacheRuntime, the worker StatefulSet name is <name>-cache-worker (derived from the cache engine's naming convention via GetComponentName), but GetWorkerStatefulsetName() on the embedded RuntimeInfo may return a different suffix depending on how BuildRuntimeInfo was configured.
Since CacheRuntimeInfo embeds the base RuntimeInfoInterface and delegates all other methods, it's unclear which naming convention is authoritative. If the embedded RuntimeInfo was built with common.CacheRuntime as runtimeType, GetWorkerStatefulsetName() would also compute the suffix — but the two code paths use completely different logic to derive the name, creating a maintenance risk.
Recommendation: remove the duplicate GetWorkerPods implementation from CacheRuntimeInfo and let it delegate to the embedded base.RuntimeInfo.GetWorkerPods(), or at minimum ensure both paths produce the same name.
| @@ -33,6 +35,11 @@ func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { | |||
| if err != nil { | |||
| return err | |||
There was a problem hiding this comment.
Sync now uses ctx.Dataset directly to call e.transform(dataset, runtime, runtimeClass). If the ReconcileRequestContext is constructed without a Dataset (e.g., before the dataset is bound), this will panic or produce incorrect behavior. Other DDC engines typically check for nil dataset or handle the pre-binding case explicitly.
Consider adding a nil check for ctx.Dataset before proceeding with the transform and status sync, similar to how other runtime engines handle the unbound case.
There was a problem hiding this comment.
If not blocking, it will be fixed in the same issue "use different struct for input parameter to avoid fully transform". #5836 (comment)
| } | ||
|
|
||
| func (s *StatefulSetManager) GetNodeAffinity(component *common.CacheRuntimeComponentValue) (*corev1.NodeAffinity, error) { | ||
| sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) |
There was a problem hiding this comment.
GetNodeAffinity calls kubeclient.GetStatefulSet for every status update cycle. Since the StatefulSet spec (nodeSelector + affinity) rarely changes, this adds unnecessary API server load. Consider caching the affinity or reading it from the runtime value instead of re-fetching on every reconcile.
There was a problem hiding this comment.
I will add an issue to track it. Does cache runtime support to change Worker Affinify ? If so, the affinity shoule be generated according to the runtime and dataset spec .
| _, err = e.CheckAndUpdateRuntimeStatus(runtimeValue) | ||
| if err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
The TODO comment "use different struct for input parameter to avoid fully transform" is correct — calling e.transform() + CheckAndUpdateRuntimeStatus() on every Sync cycle forces a full value computation even when only the status needs updating. This is wasteful. While not blocking, it should be tracked with an issue link rather than just a TODO.
There was a problem hiding this comment.
will add an issue to track it.
| ) | ||
|
|
||
| // getPhysicalDatasetRuntimeStatus get the runtime status of the physical dataset | ||
| // Note: This function only supports DDC-based runtimes (Alluxio, Jindo, etc.) |
There was a problem hiding this comment.
The comment "CacheRuntime is not supported because its status structure is incompatible with ThinRuntime" makes it clear that CacheRuntime cannot be used as a physical dataset runtime for ThinRuntime reference. However, there is no user-facing documentation or validation to prevent users from attempting this configuration. Without a webhook validation or clear docs, users may configure a ThinRuntime pointing to a CacheRuntime-backed Dataset and get a cryptic runtime error.
Consider adding: (1) a validation in the ThinRuntime webhook or controller that rejects CacheRuntime as physicalRuntimeType, or (2) documentation that explicitly lists supported runtime types for ThinRuntime reference.
There was a problem hiding this comment.
will add an issue to track it.
cheyang
left a comment
There was a problem hiding this comment.
/lgtm
/approve
Non-blocking naming concern noted in review; addressed in separate follow-up acceptable.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: cheyang The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |



mark thin runtime reference not support cache runtime
Ⅰ. Describe what this PR does
add dataset related labels to nodes and support app pod affinity.
Ⅱ. Does this pull request fix one issue?
part of #5412
Ⅲ. List the added test cases (unit test/integration test) if any, please explain if no tests are needed.
Ⅳ. Describe how to verify it
Ⅴ. Special notes for reviews