From b4be480007b91217ee3b0a222efa39eb45deca13 Mon Sep 17 00:00:00 2001 From: xliuqq Date: Fri, 1 May 2026 00:12:53 +0800 Subject: [PATCH 1/2] add dataset related labels to nodes and support app pod affinity mark thin runtime reference not support cache runtime Signed-off-by: xliuqq --- pkg/ddc/base/runtime.go | 101 +++++--- pkg/ddc/base/runtime_conventions_test.go | 3 +- pkg/ddc/base/runtime_test.go | 221 ------------------ pkg/ddc/cache/component/component_manager.go | 2 + pkg/ddc/cache/component/daemonset_manager.go | 12 + .../cache/component/statefulset_manager.go | 18 +- pkg/ddc/cache/engine/runtime.go | 30 ++- pkg/ddc/cache/engine/shutdown.go | 6 + pkg/ddc/cache/engine/status.go | 6 + pkg/ddc/cache/engine/sync.go | 29 ++- pkg/ddc/thin/referencedataset/runtime.go | 59 ++++- pkg/utils/dataset/lifecycle/node.go | 12 +- pkg/utils/kubeclient/metadata.go | 3 +- pkg/utils/kubeclient/statefulset.go | 13 +- .../node_affinity_with_cache.go | 14 +- 15 files changed, 248 insertions(+), 281 deletions(-) diff --git a/pkg/ddc/base/runtime.go b/pkg/ddc/base/runtime.go index d82af8ffa2d..b7b8083eaed 100644 --- a/pkg/ddc/base/runtime.go +++ b/pkg/ddc/base/runtime.go @@ -24,7 +24,9 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -52,8 +54,6 @@ type Conventions interface { GetDatasetNumLabelName() string - GetWorkerStatefulsetName() string - GetExclusiveLabelValue() string } @@ -63,6 +63,9 @@ type Conventions interface { type RuntimeInfoInterface interface { Conventions + // GetWorkerPods returns the worker object and selector for runtime workers. + GetWorkerPods(client client.Client) ([]corev1.Pod, error) + GetTieredStoreInfo() TieredStoreInfo GetName() string @@ -375,6 +378,21 @@ func (info *RuntimeInfo) SetupFuseCleanPolicy(policy datav1alpha1.FuseCleanPolic info.fuse.CleanPolicy = policy } +func (info *RuntimeInfo) GetWorkerPods(client client.Client) ([]corev1.Pod, error) { + workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace()) + if err != nil { + return nil, err + } + workerSelector, err := metav1.LabelSelectorAsSelector(workers.Spec.Selector) + if err != nil { + return nil, err + } + + workerPods, err := kubeclient.GetPodsForStatefulSet(client, workers, workerSelector) + + return workerPods, err +} + func (info *RuntimeInfo) GetFuseCleanPolicy() datav1alpha1.FuseCleanPolicy { return info.fuse.CleanPolicy } @@ -598,66 +616,97 @@ func GetRuntimeInfo(reader client.Reader, name, namespace string) (runtimeInfo R return runtimeInfo, err } -// GetRuntimeStatus gets the runtime status according to the runtime type, name, and namespace. -// This function is primarily responsible for retrieving the current status of a specific runtime -// based on its type from the Kubernetes cluster. -// -// Parameters: -// - client (client.Client): The Kubernetes client used to interact with the API server. -// - runtimeType (string): The type of the runtime (e.g., AlluxioRuntime, JindoRuntime, GooseFSRuntime). -// - name (string): The name of the runtime. -// - namespace (string): The namespace where the runtime is located. -// -// Returns: -// - status (*datav1alpha1.RuntimeStatus): The status of the requested runtime. -// - err (error): Returns an error if the runtime status cannot be retrieved or the runtime type is unsupported, otherwise returns nil. -func GetRuntimeStatus(client client.Client, runtimeType, name, namespace string) (status *datav1alpha1.RuntimeStatus, err error) { +// RuntimeStatusAccessor provides a unified interface to access common status fields across different runtime types +type RuntimeStatusAccessor interface { + // GetCacheAffinity returns the cache affinity from the runtime status + GetCacheAffinity() (*corev1.NodeAffinity, error) +} + +// GetRuntimeStatusAccessor returns a unified status accessor for the given runtime +func GetRuntimeStatusAccessor(client client.Client, runtimeType, name, namespace string) (RuntimeStatusAccessor, error) { + switch runtimeType { + case common.AlluxioRuntime, common.JindoRuntime, common.GooseFSRuntime, + common.JuiceFSRuntime, common.EFCRuntime, common.ThinRuntime, common.VineyardRuntime: + status, err := getDDCRuntime(client, runtimeType, name, namespace) + if err != nil { + return nil, err + } + return &DDCRuntimeStatusAccessor{status: status}, nil + case common.CacheRuntime: + runtime, err := utils.GetCacheRuntime(client, name, namespace) + if err != nil { + return nil, err + } + return &CacheRuntimeStatusAccessor{status: &runtime.Status}, nil + default: + return nil, fmt.Errorf("fail to get runtimeInfo for runtime type: %s", runtimeType) + } +} + +// getDDCRuntime retrieves the runtime object based on runtime type for DDC-based runtimes +func getDDCRuntime(client client.Client, runtimeType, name, namespace string) (*datav1alpha1.RuntimeStatus, error) { switch runtimeType { case common.AlluxioRuntime: runtime, err := utils.GetAlluxioRuntime(client, name, namespace) if err != nil { - return status, err + return nil, err } return &runtime.Status, nil case common.JindoRuntime: runtime, err := utils.GetJindoRuntime(client, name, namespace) if err != nil { - return status, err + return nil, err } return &runtime.Status, nil case common.GooseFSRuntime: runtime, err := utils.GetGooseFSRuntime(client, name, namespace) if err != nil { - return status, err + return nil, err } return &runtime.Status, nil case common.JuiceFSRuntime: runtime, err := utils.GetJuiceFSRuntime(client, name, namespace) if err != nil { - return status, err + return nil, err } return &runtime.Status, nil case common.EFCRuntime: runtime, err := utils.GetEFCRuntime(client, name, namespace) if err != nil { - return status, err + return nil, err } return &runtime.Status, nil case common.ThinRuntime: runtime, err := utils.GetThinRuntime(client, name, namespace) if err != nil { - return status, err + return nil, err } return &runtime.Status, nil case common.VineyardRuntime: runtime, err := utils.GetVineyardRuntime(client, name, namespace) if err != nil { - return status, err + return nil, err } return &runtime.Status, nil - // TODO: how to handle with cache runtime? (currently used in app pod affinity scene) default: - err = fmt.Errorf("fail to get runtimeInfo for runtime type: %s", runtimeType) - return nil, err + return nil, fmt.Errorf("unsupported DDC runtime type: %s", runtimeType) } } + +// DDCRuntimeStatusAccessor implements RuntimeStatusAccessor for DDC-based runtimes (Alluxio, Jindo, GooseFS, etc.) +type DDCRuntimeStatusAccessor struct { + status *datav1alpha1.RuntimeStatus +} + +func (d *DDCRuntimeStatusAccessor) GetCacheAffinity() (*corev1.NodeAffinity, error) { + return d.status.CacheAffinity, nil +} + +// CacheRuntimeStatusAccessor implements RuntimeStatusAccessor for CacheRuntime +type CacheRuntimeStatusAccessor struct { + status *datav1alpha1.CacheRuntimeStatus +} + +func (c *CacheRuntimeStatusAccessor) GetCacheAffinity() (*corev1.NodeAffinity, error) { + return c.status.CacheAffinity, nil +} diff --git a/pkg/ddc/base/runtime_conventions_test.go b/pkg/ddc/base/runtime_conventions_test.go index 7b35255f2b9..f9a69c30908 100644 --- a/pkg/ddc/base/runtime_conventions_test.go +++ b/pkg/ddc/base/runtime_conventions_test.go @@ -28,8 +28,9 @@ var _ = Describe("RuntimeInfo.GetWorkerStatefulsetName", func() { DescribeTable("returns correct statefulset name", func(runtimeName, runtimeType, suffix string) { info, err := BuildRuntimeInfo(runtimeName, testNamespace, runtimeType) + realInfo := info.(*RuntimeInfo) Expect(err).NotTo(HaveOccurred()) - Expect(info.GetWorkerStatefulsetName()).To(Equal(runtimeName + suffix)) + Expect(realInfo.GetWorkerStatefulsetName()).To(Equal(runtimeName + suffix)) }, Entry("JindoRuntime uses jindofs suffix", "mydata", common.JindoRuntime, "-jindofs-worker"), Entry("JindoCacheEngineImpl uses jindofs suffix", "cache", common.JindoCacheEngineImpl, "-jindofs-worker"), diff --git a/pkg/ddc/base/runtime_test.go b/pkg/ddc/base/runtime_test.go index a1a544eb7ef..9925d75a5fd 100644 --- a/pkg/ddc/base/runtime_test.go +++ b/pkg/ddc/base/runtime_test.go @@ -1160,227 +1160,6 @@ var _ = Describe("GetRuntimeInfo", func() { } }) -var _ = Describe("GetRuntimeStatus", func() { - s := runtime.NewScheme() - - alluxioRuntime := v1alpha1.AlluxioRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "alluxio", - Namespace: "default", - }, - } - - goosefsRuntime := v1alpha1.GooseFSRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "goosefs", - Namespace: "default", - }, - } - - jindoRuntime := v1alpha1.JindoRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "jindo", - Namespace: "default", - }, - } - - juicefsRuntime := v1alpha1.JuiceFSRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "juice", - Namespace: "default", - }, - } - - efcRuntime := v1alpha1.EFCRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "efc", - Namespace: "default", - }, - } - - thinRuntime := v1alpha1.ThinRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "thin", - Namespace: "default", - }, - } - - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.AlluxioRuntime{}) - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.GooseFSRuntime{}) - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.JindoRuntime{}) - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.JuiceFSRuntime{}) - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.EFCRuntime{}) - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.ThinRuntime{}) - s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.Dataset{}) - - _ = v1.AddToScheme(s) - alluxioRuntimeObjs := []runtime.Object{} - goosefsRuntimeObjs := []runtime.Object{} - jindoRuntimeObjs := []runtime.Object{} - juicefsRuntimeObjs := []runtime.Object{} - efcRuntimeObjs := []runtime.Object{} - thinRuntimeObjs := []runtime.Object{} - - alluxioRuntimeObjs = append(alluxioRuntimeObjs, &alluxioRuntime) - goosefsRuntimeObjs = append(goosefsRuntimeObjs, &goosefsRuntime) - jindoRuntimeObjs = append(jindoRuntimeObjs, &jindoRuntime) - juicefsRuntimeObjs = append(juicefsRuntimeObjs, &juicefsRuntime) - efcRuntimeObjs = append(efcRuntimeObjs, &efcRuntime) - thinRuntimeObjs = append(thinRuntimeObjs, &thinRuntime) - type args struct { - client client.Client - name string - namespace string - runtimeType string - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "alluxio_test", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, alluxioRuntimeObjs...), - name: "alluxio", - namespace: "default", - runtimeType: common.AlluxioRuntime, - }, - wantErr: false, - }, - { - name: "alluxio_test_error", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, alluxioRuntimeObjs...), - name: "alluxio-error", - namespace: "default", - runtimeType: common.AlluxioRuntime, - }, - wantErr: true, - }, - { - name: "goosefs_test", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, goosefsRuntimeObjs...), - name: "goosefs", - namespace: "default", - runtimeType: common.GooseFSRuntime, - }, - wantErr: false, - }, - { - name: "goosefs_test_error", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, goosefsRuntimeObjs...), - name: "goosefs-error", - namespace: "default", - runtimeType: common.GooseFSRuntime, - }, - wantErr: true, - }, - { - name: "jindo_test", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, jindoRuntimeObjs...), - name: "jindo", - namespace: "default", - runtimeType: common.JindoRuntime, - }, - wantErr: false, - }, - { - name: "jindo_test_error", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, jindoRuntimeObjs...), - name: "jindo-error", - namespace: "default", - runtimeType: common.JindoRuntime, - }, - wantErr: true, - }, - { - name: "juicefs_test", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, juicefsRuntimeObjs...), - name: "juice", - namespace: "default", - runtimeType: common.JuiceFSRuntime, - }, - wantErr: false, - }, - { - name: "juicefs_test_error", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, juicefsRuntimeObjs...), - name: "juice-error", - namespace: "default", - runtimeType: common.JuiceFSRuntime, - }, - wantErr: true, - }, - { - name: "efc_test", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, efcRuntimeObjs...), - name: "efc", - namespace: "default", - runtimeType: common.EFCRuntime, - }, - wantErr: false, - }, - { - name: "efc_test_error", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, efcRuntimeObjs...), - name: "efc-error", - namespace: "default", - runtimeType: common.EFCRuntime, - }, - wantErr: true, - }, - { - name: "thin_test", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, thinRuntimeObjs...), - name: "thin", - namespace: "default", - runtimeType: common.ThinRuntime, - }, - wantErr: false, - }, - { - name: "thin_test_error", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, thinRuntimeObjs...), - name: "thin-error", - namespace: "default", - runtimeType: common.ThinRuntime, - }, - wantErr: true, - }, - { - name: "default_error", - args: args{ - client: fakeutils.NewFakeClientWithScheme(s, thinRuntimeObjs...), - name: "thin-not-exit", - namespace: "default", - runtimeType: "thin-not-exit", - }, - wantErr: true, - }, - } - for _, tt := range tests { - It(tt.name, func() { - _, err := GetRuntimeStatus(tt.args.client, tt.args.runtimeType, tt.args.name, tt.args.namespace) - if tt.wantErr { - Expect(err).To(HaveOccurred()) - } else { - Expect(err).NotTo(HaveOccurred()) - } - }) - } -}) - var _ = Describe("GetSyncRetryDuration", func() { It("should get default sync retry duration", func() { _, err := getSyncRetryDuration() diff --git a/pkg/ddc/cache/component/component_manager.go b/pkg/ddc/cache/component/component_manager.go index 958b2fb651c..b83916d0945 100644 --- a/pkg/ddc/cache/component/component_manager.go +++ b/pkg/ddc/cache/component/component_manager.go @@ -21,6 +21,7 @@ import ( "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -28,6 +29,7 @@ import ( type ComponentManager interface { Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error ConstructComponentStatus(todo context.Context, value *common.CacheRuntimeComponentValue) (v1alpha1.RuntimeComponentStatus, error) + GetNodeAffinity(value *common.CacheRuntimeComponentValue) (*corev1.NodeAffinity, error) } func NewComponentHelper(workloadType metav1.TypeMeta, client client.Client) ComponentManager { diff --git a/pkg/ddc/cache/component/daemonset_manager.go b/pkg/ddc/cache/component/daemonset_manager.go index 9ce5d9e25f4..d7961b168e2 100644 --- a/pkg/ddc/cache/component/daemonset_manager.go +++ b/pkg/ddc/cache/component/daemonset_manager.go @@ -23,7 +23,9 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -47,6 +49,16 @@ func (s *DaemonSetManager) Reconciler(ctx context.Context, component *common.Cac return reconcileService(ctx, s.client, component) } +func (s *DaemonSetManager) GetNodeAffinity(component *common.CacheRuntimeComponentValue) (*corev1.NodeAffinity, error) { + ds, err := kubeclient.GetDaemonset(s.client, component.Name, component.Namespace) + if err != nil { + return nil, err + } + + affinity := kubeclient.MergeNodeSelectorAndNodeAffinity(ds.Spec.Template.Spec.NodeSelector, ds.Spec.Template.Spec.Affinity) + return affinity, nil +} + func (s *DaemonSetManager) reconcileDaemonSet(ctx context.Context, component *common.CacheRuntimeComponentValue) error { logger := log.FromContext(ctx) logger.Info("start to reconciling ds workload") diff --git a/pkg/ddc/cache/component/statefulset_manager.go b/pkg/ddc/cache/component/statefulset_manager.go index 9bfa1cfce61..76ee4bb6d68 100644 --- a/pkg/ddc/cache/component/statefulset_manager.go +++ b/pkg/ddc/cache/component/statefulset_manager.go @@ -23,7 +23,9 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -47,12 +49,21 @@ func (s *StatefulSetManager) Reconciler(ctx context.Context, component *common.C return reconcileService(ctx, s.client, component) } +func (s *StatefulSetManager) GetNodeAffinity(component *common.CacheRuntimeComponentValue) (*corev1.NodeAffinity, error) { + sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) + if err != nil { + return nil, err + } + + affinity := kubeclient.MergeNodeSelectorAndNodeAffinity(sts.Spec.Template.Spec.NodeSelector, sts.Spec.Template.Spec.Affinity) + return affinity, nil +} + func (s *StatefulSetManager) reconcileStatefulSet(ctx context.Context, component *common.CacheRuntimeComponentValue) error { logger := log.FromContext(ctx) logger.Info("start to reconciling sts workload") - sts := &appsv1.StatefulSet{} - err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, sts) + sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) if err != nil && !apierrors.IsNotFound(err) { return err } @@ -114,8 +125,7 @@ func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, compo logger := log.FromContext(ctx) logger.Info("start to ConstructComponentStatus") - sts := &appsv1.StatefulSet{} - err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, sts) + sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) if err != nil { logger.Error(err, fmt.Sprintf("failed to get component: %s/%s", component.Namespace, component.Name)) return datav1alpha1.RuntimeComponentStatus{}, err diff --git a/pkg/ddc/cache/engine/runtime.go b/pkg/ddc/cache/engine/runtime.go index 64dc55f1abd..b926899adb7 100644 --- a/pkg/ddc/cache/engine/runtime.go +++ b/pkg/ddc/cache/engine/runtime.go @@ -18,13 +18,40 @@ package engine import ( "context" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/fluid-cloudnative/fluid/pkg/utils/testutil" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) +// CacheRuntimeInfo is a wrapper for RuntimeInfoInterface with override methods. +type CacheRuntimeInfo struct { + base.RuntimeInfoInterface +} + +func (info *CacheRuntimeInfo) GetWorkerPods(client client.Client) ([]corev1.Pod, error) { + workerName := GetComponentName(info.GetName(), common.ComponentTypeWorker) + workers, err := kubeclient.GetStatefulSet(client, workerName, info.GetNamespace()) + if err != nil { + return nil, err + } + workerSelector, err := metav1.LabelSelectorAsSelector(workers.Spec.Selector) + if err != nil { + return nil, err + } + + workerPods, err := kubeclient.GetPodsForStatefulSet(client, workers, workerSelector) + + return workerPods, err +} + // getRuntime get the current runtime func (e *CacheEngine) getRuntime() (*datav1alpha1.CacheRuntime, error) { key := types.NamespacedName{ @@ -67,11 +94,12 @@ func (e *CacheEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { base.WithMetadataList(base.GetMetadataListFromAnnotation(runtime)), base.WithAnnotations(runtime.Annotations), } - e.runtimeInfo, err = base.BuildRuntimeInfo(e.name, e.namespace, e.runtimeType, opts...) + runtimeInfo, err := base.BuildRuntimeInfo(e.name, e.namespace, e.runtimeType, opts...) if err != nil { return e.runtimeInfo, err } + e.runtimeInfo = &CacheRuntimeInfo{runtimeInfo} // Setup Fuse Deploy Mode e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Client.NodeSelector) } diff --git a/pkg/ddc/cache/engine/shutdown.go b/pkg/ddc/cache/engine/shutdown.go index e75324bf1eb..59fc1716f11 100644 --- a/pkg/ddc/cache/engine/shutdown.go +++ b/pkg/ddc/cache/engine/shutdown.go @@ -27,6 +27,7 @@ func (e *CacheEngine) Shutdown() (err error) { return err } helper := ctrl.BuildHelper(info, e.Client, e.Log) + // remove fuse label in worker nodes count, err := helper.CleanUpFuse() if err != nil { e.Log.Error(err, "Err in cleaning fuse") @@ -34,5 +35,10 @@ func (e *CacheEngine) Shutdown() (err error) { } e.Log.Info("clean up fuse count", "n", count) + // remove dataset related labels in worker nodes + err = helper.TearDownWorkers(info) + if err != nil { + return err + } return nil } diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index 0b71e8327a3..8e097e6a0a8 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -71,6 +71,12 @@ func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRunti } status.Worker = workerStatus + // Worker Affinity + affinity, err := manager.GetNodeAffinity(componentValue) + if err != nil { + return false, err + } + status.CacheAffinity = affinity return ready, err } func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (err error) { diff --git a/pkg/ddc/cache/engine/sync.go b/pkg/ddc/cache/engine/sync.go index dd2d16ca2e6..dd527360e15 100644 --- a/pkg/ddc/cache/engine/sync.go +++ b/pkg/ddc/cache/engine/sync.go @@ -18,13 +18,15 @@ package engine import ( "context" + "os" + "reflect" + "time" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils/dataset/lifecycle" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "os" - "reflect" - "time" ) func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { @@ -33,6 +35,11 @@ func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { if err != nil { return err } + dataset := ctx.Dataset + runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName) + if err != nil { + return err + } err = e.syncRuntimeValueConfigMap(runtime) if err != nil { @@ -44,12 +51,26 @@ func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { // handle ufs change // sync runtime status + runtimeValue, err := e.transform(dataset, runtime, runtimeClass) + // TODO: use different struct for input parameter to avoid fully transform + _, err = e.CheckAndUpdateRuntimeStatus(runtimeValue) + if err != nil { + return err + } // handle runtime spec change // sync metadata - // SyncScheduleInfoToCacheNodes + // add dataset related labels for worker nodes + info, err := e.getRuntimeInfo() + if err != nil { + return err + } + err = lifecycle.SyncScheduleInfoToCacheNodes(info, e.Client) + if err != nil { + return err + } return nil } diff --git a/pkg/ddc/thin/referencedataset/runtime.go b/pkg/ddc/thin/referencedataset/runtime.go index 3955634c15a..ed79168d31c 100644 --- a/pkg/ddc/thin/referencedataset/runtime.go +++ b/pkg/ddc/thin/referencedataset/runtime.go @@ -22,8 +22,10 @@ import ( "net/http" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,6 +33,8 @@ import ( ) // getPhysicalDatasetRuntimeStatus get the runtime status of the physical dataset +// Note: This function only supports DDC-based runtimes (Alluxio, Jindo, etc.) +// CacheRuntime is not supported because its status structure is incompatible with ThinRuntime. func (e *ReferenceDatasetEngine) getPhysicalDatasetRuntimeStatus() (status *datav1alpha1.RuntimeStatus, err error) { physicalRuntimeInfo, err := e.getPhysicalRuntimeInfo() if err != nil { @@ -42,10 +46,63 @@ func (e *ReferenceDatasetEngine) getPhysicalDatasetRuntimeStatus() (status *data return nil, nil } - return base.GetRuntimeStatus(e.Client, physicalRuntimeInfo.GetRuntimeType(), + return getRuntimeStatus(e.Client, physicalRuntimeInfo.GetRuntimeType(), physicalRuntimeInfo.GetName(), physicalRuntimeInfo.GetNamespace()) } +// getRuntimeStatus gets the runtime status according to the runtime type, name, and namespace. +// This is a private function used only within the thin runtime package for status synchronization. +// It returns the raw RuntimeStatus which is incompatible with CacheRuntime's CacheRuntimeStatus. +func getRuntimeStatus(client client.Client, runtimeType, name, namespace string) (status *datav1alpha1.RuntimeStatus, err error) { + switch runtimeType { + case common.AlluxioRuntime: + runtime, err := utils.GetAlluxioRuntime(client, name, namespace) + if err != nil { + return status, err + } + return &runtime.Status, nil + case common.JindoRuntime: + runtime, err := utils.GetJindoRuntime(client, name, namespace) + if err != nil { + return status, err + } + return &runtime.Status, nil + case common.GooseFSRuntime: + runtime, err := utils.GetGooseFSRuntime(client, name, namespace) + if err != nil { + return status, err + } + return &runtime.Status, nil + case common.JuiceFSRuntime: + runtime, err := utils.GetJuiceFSRuntime(client, name, namespace) + if err != nil { + return status, err + } + return &runtime.Status, nil + case common.EFCRuntime: + runtime, err := utils.GetEFCRuntime(client, name, namespace) + if err != nil { + return status, err + } + return &runtime.Status, nil + case common.ThinRuntime: + runtime, err := utils.GetThinRuntime(client, name, namespace) + if err != nil { + return status, err + } + return &runtime.Status, nil + case common.VineyardRuntime: + runtime, err := utils.GetVineyardRuntime(client, name, namespace) + if err != nil { + return status, err + } + return &runtime.Status, nil + default: + err = fmt.Errorf("%s is not supported as physical runtime for ThinRuntime with reference dataset", runtimeType) + return nil, err + } +} + // getRuntime get the current runtime func (e *ReferenceDatasetEngine) getRuntime() (*datav1alpha1.ThinRuntime, error) { key := types.NamespacedName{ diff --git a/pkg/utils/dataset/lifecycle/node.go b/pkg/utils/dataset/lifecycle/node.go index a241368811c..3f2d2a871b0 100644 --- a/pkg/utils/dataset/lifecycle/node.go +++ b/pkg/utils/dataset/lifecycle/node.go @@ -85,17 +85,7 @@ func SyncScheduleInfoToCacheNodes(runtimeInfo base.RuntimeInfoInterface, client // getDesiredNodesWithScheduleInfo retrieves the desired cache nodes with schedule info func getDesiredNodesWithScheduleInfo(runtimeInfo base.RuntimeInfoInterface, client client.Client) ([]string, error) { - workers, err := kubeclient.GetStatefulSet(client, runtimeInfo.GetWorkerStatefulsetName(), runtimeInfo.GetNamespace()) - if err != nil { - return nil, err - } - - workerSelector, err := metav1.LabelSelectorAsSelector(workers.Spec.Selector) - if err != nil { - return nil, err - } - - workerPods, err := kubeclient.GetPodsForStatefulSet(client, workers, workerSelector) + workerPods, err := runtimeInfo.GetWorkerPods(client) if err != nil { return nil, err } diff --git a/pkg/utils/kubeclient/metadata.go b/pkg/utils/kubeclient/metadata.go index 54896e70b3d..791841ef3a9 100644 --- a/pkg/utils/kubeclient/metadata.go +++ b/pkg/utils/kubeclient/metadata.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,7 +31,7 @@ import ( func compareOwnerRefMatcheWithExpected(c client.Client, controllerRef *metav1.OwnerReference, namespace string, - target runtime.Object) (matched bool, err error) { + target client.Object) (matched bool, err error) { kind := target.GetObjectKind() controllerObject, err := resolveControllerRef(c, controllerRef, namespace, kind, target.DeepCopyObject().(client.Object)) diff --git a/pkg/utils/kubeclient/statefulset.go b/pkg/utils/kubeclient/statefulset.go index 72e36f167dd..bfdf288f98f 100644 --- a/pkg/utils/kubeclient/statefulset.go +++ b/pkg/utils/kubeclient/statefulset.go @@ -18,11 +18,12 @@ package kubeclient import ( "context" - "k8s.io/client-go/util/retry" "reflect" "regexp" "strconv" + "k8s.io/client-go/util/retry" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -66,11 +67,11 @@ func GetStatefulSet(c client.Client, name string, namespace string) (master *app } // GetPodsForStatefulSet gets pods of the specified statefulset -func GetPodsForStatefulSet(c client.Client, sts *appsv1.StatefulSet, selector labels.Selector) (pods []v1.Pod, err error) { +func GetPodsForStatefulSet(c client.Client, sts client.Object, selector labels.Selector) (pods []v1.Pod, err error) { podList := &v1.PodList{} err = c.List(context.TODO(), podList, &client.ListOptions{ - Namespace: sts.Namespace, + Namespace: sts.GetNamespace(), LabelSelector: selector, }) @@ -125,9 +126,9 @@ func getParentName(pod *v1.Pod) string { return parent } -// isMemberOf tests if pod is a member of statefulset sts. -func isMemberOf(sts *appsv1.StatefulSet, pod *v1.Pod) bool { - return getParentName(pod) == sts.Name +// isMemberOf tests if pod has parent name. +func isMemberOf(obj client.Object, pod *v1.Pod) bool { + return getParentName(pod) == obj.GetName() } // GetPhaseFromStatefulset gets the phase from statefulset diff --git a/pkg/webhook/plugins/nodeaffinitywithcache/node_affinity_with_cache.go b/pkg/webhook/plugins/nodeaffinitywithcache/node_affinity_with_cache.go index 69216badba2..4389f630da1 100644 --- a/pkg/webhook/plugins/nodeaffinitywithcache/node_affinity_with_cache.go +++ b/pkg/webhook/plugins/nodeaffinitywithcache/node_affinity_with_cache.go @@ -142,7 +142,7 @@ func (p *NodeAffinityWithCache) getTieredLocalityPreferredSchedulingTerms(prefer } // get runtime worker node affinity - status, err := base.GetRuntimeStatus(p.client, runtimeInfo.GetRuntimeType(), runtimeInfo.GetName(), runtimeInfo.GetNamespace()) + statusAccessor, err := base.GetRuntimeStatusAccessor(p.client, runtimeInfo.GetRuntimeType(), runtimeInfo.GetName(), runtimeInfo.GetNamespace()) if err != nil { return preferredSchedulingTerms, err } @@ -156,7 +156,10 @@ func (p *NodeAffinityWithCache) getTieredLocalityPreferredSchedulingTerms(prefer } // customized locality - statusAffinity := status.CacheAffinity + statusAffinity, err := statusAccessor.GetCacheAffinity() + if err != nil { + return preferredSchedulingTerms, err + } if statusAffinity != nil && statusAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { terms := getPreferredSchedulingTermsFromRequired(statusAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, preferredLocality) preferredSchedulingTerms = append(preferredSchedulingTerms, terms...) @@ -175,7 +178,7 @@ func (p *NodeAffinityWithCache) getTieredLocalityNodeSelectorTerms(runtimeInfos } // get runtime worker node affinity - status, err := base.GetRuntimeStatus(p.client, runtimeInfo.GetRuntimeType(), runtimeInfo.GetName(), runtimeInfo.GetNamespace()) + statusAccessor, err := base.GetRuntimeStatusAccessor(p.client, runtimeInfo.GetRuntimeType(), runtimeInfo.GetName(), runtimeInfo.GetNamespace()) if err != nil { return requiredSchedulingTerms, err } @@ -188,7 +191,10 @@ func (p *NodeAffinityWithCache) getTieredLocalityNodeSelectorTerms(runtimeInfos } // customized locality - cacheAffinity := status.CacheAffinity + cacheAffinity, err := statusAccessor.GetCacheAffinity() + if err != nil { + return requiredSchedulingTerms, err + } // only RequiredDuringSchedulingIgnoredDuringExecution, not considering PreferredDuringSchedulingIgnoredDuringExecution if cacheAffinity != nil && cacheAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { terms := getNodeSelectorTermsFromRequired(cacheAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, requireLocalityNames) From de280867e4927987c7f0b021806e721e15c51c83 Mon Sep 17 00:00:00 2001 From: xliuqq Date: Thu, 7 May 2026 14:14:44 +0800 Subject: [PATCH 2/2] fix test and lint Signed-off-by: xliuqq --- pkg/ddc/base/runtime.go | 8 +- pkg/ddc/base/runtime_test.go | 173 ++++++++++++++++++ pkg/ddc/base/validate_test.go | 16 +- .../cache/component/statefulset_manager.go | 4 +- pkg/ddc/cache/engine/sync.go | 3 + pkg/ddc/thin/referencedataset/runtime.go | 57 +----- 6 files changed, 193 insertions(+), 68 deletions(-) diff --git a/pkg/ddc/base/runtime.go b/pkg/ddc/base/runtime.go index b7b8083eaed..7cf713a5b74 100644 --- a/pkg/ddc/base/runtime.go +++ b/pkg/ddc/base/runtime.go @@ -627,7 +627,7 @@ func GetRuntimeStatusAccessor(client client.Client, runtimeType, name, namespace switch runtimeType { case common.AlluxioRuntime, common.JindoRuntime, common.GooseFSRuntime, common.JuiceFSRuntime, common.EFCRuntime, common.ThinRuntime, common.VineyardRuntime: - status, err := getDDCRuntime(client, runtimeType, name, namespace) + status, err := GetDDCRuntimeStatus(client, runtimeType, name, namespace) if err != nil { return nil, err } @@ -639,12 +639,12 @@ func GetRuntimeStatusAccessor(client client.Client, runtimeType, name, namespace } return &CacheRuntimeStatusAccessor{status: &runtime.Status}, nil default: - return nil, fmt.Errorf("fail to get runtimeInfo for runtime type: %s", runtimeType) + return nil, fmt.Errorf("fail to get runtime status accessor for runtime type: %s", runtimeType) } } -// getDDCRuntime retrieves the runtime object based on runtime type for DDC-based runtimes -func getDDCRuntime(client client.Client, runtimeType, name, namespace string) (*datav1alpha1.RuntimeStatus, error) { +// GetDDCRuntimeStatus retrieves the runtime object based on runtime type for DDC-based runtimes +func GetDDCRuntimeStatus(client client.Client, runtimeType, name, namespace string) (*datav1alpha1.RuntimeStatus, error) { switch runtimeType { case common.AlluxioRuntime: runtime, err := utils.GetAlluxioRuntime(client, name, namespace) diff --git a/pkg/ddc/base/runtime_test.go b/pkg/ddc/base/runtime_test.go index 9925d75a5fd..6a2d5b44adf 100644 --- a/pkg/ddc/base/runtime_test.go +++ b/pkg/ddc/base/runtime_test.go @@ -29,6 +29,7 @@ import ( fakeutils "github.com/fluid-cloudnative/fluid/pkg/utils/fake" v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -1231,3 +1232,175 @@ var _ = Describe("PermitSync", func() { Expect(permit).To(BeTrue(), "expect permit after retry duration elapsed") }) }) + +var _ = Describe("GetDDCRuntimeStatus", func() { + It("should get AlluxioRuntime status successfully", func() { + runtime := &v1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-alluxio", + Namespace: "default", + }, + Status: v1alpha1.RuntimeStatus{ + MasterPhase: v1alpha1.RuntimePhaseReady, + }, + } + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme, runtime) + + status, err := GetDDCRuntimeStatus(client, common.AlluxioRuntime, "test-alluxio", "default") + Expect(err).NotTo(HaveOccurred()) + Expect(status).NotTo(BeNil()) + Expect(status.MasterPhase).To(Equal(v1alpha1.RuntimePhaseReady)) + }) + + It("should get JindoRuntime status successfully", func() { + runtime := &v1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-jindo", + Namespace: "default", + }, + Status: v1alpha1.RuntimeStatus{ + MasterPhase: v1alpha1.RuntimePhaseReady, + }, + } + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme, runtime) + + status, err := GetDDCRuntimeStatus(client, common.JindoRuntime, "test-jindo", "default") + Expect(err).NotTo(HaveOccurred()) + Expect(status).NotTo(BeNil()) + Expect(status.MasterPhase).To(Equal(v1alpha1.RuntimePhaseReady)) + }) + + It("should get JuiceFSRuntime status successfully", func() { + runtime := &v1alpha1.JuiceFSRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-juicefs", + Namespace: "default", + }, + Status: v1alpha1.RuntimeStatus{ + MasterPhase: v1alpha1.RuntimePhaseReady, + }, + } + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme, runtime) + + status, err := GetDDCRuntimeStatus(client, common.JuiceFSRuntime, "test-juicefs", "default") + Expect(err).NotTo(HaveOccurred()) + Expect(status).NotTo(BeNil()) + Expect(status.MasterPhase).To(Equal(v1alpha1.RuntimePhaseReady)) + }) + + It("should return error for unsupported runtime type", func() { + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme) + + _, err := GetDDCRuntimeStatus(client, "unsupported-runtime", "test", "default") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("unsupported DDC runtime type")) + }) + + It("should return error when runtime not found", func() { + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme) + + _, err := GetDDCRuntimeStatus(client, common.AlluxioRuntime, "non-existent", "default") + Expect(err).To(HaveOccurred()) + }) +}) + +var _ = Describe("GetRuntimeStatusAccessor", func() { + It("should get DDCRuntimeStatusAccessor for AlluxioRuntime", func() { + runtime := &v1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-alluxio", + Namespace: "default", + }, + Status: v1alpha1.RuntimeStatus{ + CacheAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "test-key", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"value1"}, + }, + }, + }, + }, + }, + }, + }, + } + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme, runtime) + + accessor, err := GetRuntimeStatusAccessor(client, common.AlluxioRuntime, "test-alluxio", "default") + Expect(err).NotTo(HaveOccurred()) + Expect(accessor).NotTo(BeNil()) + + // Test that it's a DDCRuntimeStatusAccessor + ddcAccessor, ok := accessor.(*DDCRuntimeStatusAccessor) + Expect(ok).To(BeTrue()) + Expect(ddcAccessor).NotTo(BeNil()) + + // Test GetCacheAffinity + affinity, err := accessor.GetCacheAffinity() + Expect(err).NotTo(HaveOccurred()) + Expect(affinity).NotTo(BeNil()) + Expect(affinity.RequiredDuringSchedulingIgnoredDuringExecution).NotTo(BeNil()) + }) + + It("should get CacheRuntimeStatusAccessor for CacheRuntime", func() { + runtime := &v1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cache", + Namespace: "default", + }, + Status: v1alpha1.CacheRuntimeStatus{ + CacheAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "cache-key", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"cache-value"}, + }, + }, + }, + }, + }, + }, + }, + } + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme, runtime) + + accessor, err := GetRuntimeStatusAccessor(client, common.CacheRuntime, "test-cache", "default") + Expect(err).NotTo(HaveOccurred()) + Expect(accessor).NotTo(BeNil()) + + // Test that it's a CacheRuntimeStatusAccessor + cacheAccessor, ok := accessor.(*CacheRuntimeStatusAccessor) + Expect(ok).To(BeTrue()) + Expect(cacheAccessor).NotTo(BeNil()) + + // Test GetCacheAffinity + affinity, err := accessor.GetCacheAffinity() + Expect(err).NotTo(HaveOccurred()) + Expect(affinity).NotTo(BeNil()) + Expect(affinity.RequiredDuringSchedulingIgnoredDuringExecution).NotTo(BeNil()) + }) + + It("should return error for unsupported runtime type", func() { + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme) + + _, err := GetRuntimeStatusAccessor(client, "unsupported-type", "test", "default") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("fail to get runtime status accessor")) + }) + + It("should return error when runtime not found", func() { + client := fakeutils.NewFakeClientWithScheme(v1alpha1.UnitTestScheme) + + _, err := GetRuntimeStatusAccessor(client, common.AlluxioRuntime, "non-existent", "default") + Expect(err).To(HaveOccurred()) + }) +}) diff --git a/pkg/ddc/base/validate_test.go b/pkg/ddc/base/validate_test.go index 3d8ec2e5273..5c145fc59e1 100644 --- a/pkg/ddc/base/validate_test.go +++ b/pkg/ddc/base/validate_test.go @@ -22,6 +22,7 @@ import ( fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -75,6 +76,10 @@ type mockRuntimeInfoForValidate struct { placementModeSet bool } +func (m *mockRuntimeInfoForValidate) GetWorkerPods(client client.Client) ([]corev1.Pod, error) { + return nil, nil +} + // Methods used by ValidateRuntimeInfo func (m *mockRuntimeInfoForValidate) GetOwnerDatasetUID() string { return m.ownerDatasetUID } func (m *mockRuntimeInfoForValidate) IsPlacementModeSet() bool { return m.placementModeSet } @@ -86,12 +91,11 @@ func (m *mockRuntimeInfoForValidate) GetLabelNameForDisk() string { return " func (m *mockRuntimeInfoForValidate) GetLabelNameForTotal() string { return "" } // GetCommonLabelName returns the common label name. -func (m *mockRuntimeInfoForValidate) GetCommonLabelName() string { return "" } -func (m *mockRuntimeInfoForValidate) GetFuseLabelName() string { return "" } -func (m *mockRuntimeInfoForValidate) GetRuntimeLabelName() string { return "" } -func (m *mockRuntimeInfoForValidate) GetDatasetNumLabelName() string { return "" } -func (m *mockRuntimeInfoForValidate) GetWorkerStatefulsetName() string { return "" } -func (m *mockRuntimeInfoForValidate) GetExclusiveLabelValue() string { return "" } +func (m *mockRuntimeInfoForValidate) GetCommonLabelName() string { return "" } +func (m *mockRuntimeInfoForValidate) GetFuseLabelName() string { return "" } +func (m *mockRuntimeInfoForValidate) GetRuntimeLabelName() string { return "" } +func (m *mockRuntimeInfoForValidate) GetDatasetNumLabelName() string { return "" } +func (m *mockRuntimeInfoForValidate) GetExclusiveLabelValue() string { return "" } // RuntimeInfoInterface methods (stub implementations) func (m *mockRuntimeInfoForValidate) GetTieredStoreInfo() TieredStoreInfo { return TieredStoreInfo{} } diff --git a/pkg/ddc/cache/component/statefulset_manager.go b/pkg/ddc/cache/component/statefulset_manager.go index 76ee4bb6d68..d8027886ad8 100644 --- a/pkg/ddc/cache/component/statefulset_manager.go +++ b/pkg/ddc/cache/component/statefulset_manager.go @@ -63,7 +63,7 @@ func (s *StatefulSetManager) reconcileStatefulSet(ctx context.Context, component logger := log.FromContext(ctx) logger.Info("start to reconciling sts workload") - sts, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) + _, err := kubeclient.GetStatefulSet(s.client, component.Name, component.Namespace) if err != nil && !apierrors.IsNotFound(err) { return err } @@ -72,7 +72,7 @@ func (s *StatefulSetManager) reconcileStatefulSet(ctx context.Context, component return nil } // create the stateful set - sts = s.constructStatefulSet(component) + sts := s.constructStatefulSet(component) err = s.client.Create(ctx, sts) if err != nil { return err diff --git a/pkg/ddc/cache/engine/sync.go b/pkg/ddc/cache/engine/sync.go index dd527360e15..07e6a24e0bf 100644 --- a/pkg/ddc/cache/engine/sync.go +++ b/pkg/ddc/cache/engine/sync.go @@ -52,6 +52,9 @@ func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { // sync runtime status runtimeValue, err := e.transform(dataset, runtime, runtimeClass) + if err != nil { + return err + } // TODO: use different struct for input parameter to avoid fully transform _, err = e.CheckAndUpdateRuntimeStatus(runtimeValue) if err != nil { diff --git a/pkg/ddc/thin/referencedataset/runtime.go b/pkg/ddc/thin/referencedataset/runtime.go index ed79168d31c..42926c31695 100644 --- a/pkg/ddc/thin/referencedataset/runtime.go +++ b/pkg/ddc/thin/referencedataset/runtime.go @@ -22,10 +22,8 @@ import ( "net/http" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -46,63 +44,10 @@ func (e *ReferenceDatasetEngine) getPhysicalDatasetRuntimeStatus() (status *data return nil, nil } - return getRuntimeStatus(e.Client, physicalRuntimeInfo.GetRuntimeType(), + return base.GetDDCRuntimeStatus(e.Client, physicalRuntimeInfo.GetRuntimeType(), physicalRuntimeInfo.GetName(), physicalRuntimeInfo.GetNamespace()) } -// getRuntimeStatus gets the runtime status according to the runtime type, name, and namespace. -// This is a private function used only within the thin runtime package for status synchronization. -// It returns the raw RuntimeStatus which is incompatible with CacheRuntime's CacheRuntimeStatus. -func getRuntimeStatus(client client.Client, runtimeType, name, namespace string) (status *datav1alpha1.RuntimeStatus, err error) { - switch runtimeType { - case common.AlluxioRuntime: - runtime, err := utils.GetAlluxioRuntime(client, name, namespace) - if err != nil { - return status, err - } - return &runtime.Status, nil - case common.JindoRuntime: - runtime, err := utils.GetJindoRuntime(client, name, namespace) - if err != nil { - return status, err - } - return &runtime.Status, nil - case common.GooseFSRuntime: - runtime, err := utils.GetGooseFSRuntime(client, name, namespace) - if err != nil { - return status, err - } - return &runtime.Status, nil - case common.JuiceFSRuntime: - runtime, err := utils.GetJuiceFSRuntime(client, name, namespace) - if err != nil { - return status, err - } - return &runtime.Status, nil - case common.EFCRuntime: - runtime, err := utils.GetEFCRuntime(client, name, namespace) - if err != nil { - return status, err - } - return &runtime.Status, nil - case common.ThinRuntime: - runtime, err := utils.GetThinRuntime(client, name, namespace) - if err != nil { - return status, err - } - return &runtime.Status, nil - case common.VineyardRuntime: - runtime, err := utils.GetVineyardRuntime(client, name, namespace) - if err != nil { - return status, err - } - return &runtime.Status, nil - default: - err = fmt.Errorf("%s is not supported as physical runtime for ThinRuntime with reference dataset", runtimeType) - return nil, err - } -} - // getRuntime get the current runtime func (e *ReferenceDatasetEngine) getRuntime() (*datav1alpha1.ThinRuntime, error) { key := types.NamespacedName{