From ffd58e944628fa4639d2f1cae6dddecec445c0b7 Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 3 May 2026 01:48:02 +0530 Subject: [PATCH 1/6] test: add thin transform ginkgo coverage Signed-off-by: Harsh --- pkg/ddc/thin/health_check_test.go | 189 +++++++++- pkg/ddc/thin/master_test.go | 179 +++++++++ pkg/ddc/thin/shutdown_test.go | 4 + pkg/ddc/thin/transform.go | 5 +- pkg/ddc/thin/transform_config_test.go | 100 ++++++ pkg/ddc/thin/transform_test.go | 500 +++++++++++++++++--------- 6 files changed, 803 insertions(+), 174 deletions(-) diff --git a/pkg/ddc/thin/health_check_test.go b/pkg/ddc/thin/health_check_test.go index 7b9e1594f6f..9302f01b027 100644 --- a/pkg/ddc/thin/health_check_test.go +++ b/pkg/ddc/thin/health_check_test.go @@ -18,7 +18,6 @@ package thin import ( "context" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -265,6 +264,117 @@ var _ = Describe("ThinEngine Health Check", Label("pkg.ddc.thin.health_check_tes Expect(err).To(HaveOccurred()) }) }) + + Context("when a fuse-only runtime is healthy", func() { + var ( + client client.Client + engine ThinEngine + ) + + BeforeEach(func() { + healthyFuse := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: healthCheckTestSpark + "-fuse", + Namespace: healthCheckTestNamespace, + }, + Status: appsv1.DaemonSetStatus{ + NumberUnavailable: 0, + NumberReady: 1, + NumberAvailable: 1, + }, + } + runtimeObj := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: healthCheckTestSpark, + Namespace: healthCheckTestNamespace, + }, + Spec: datav1alpha1.ThinRuntimeSpec{}, + Status: datav1alpha1.RuntimeStatus{ + CacheStates: map[common.CacheStateName]string{common.Cached: "true"}, + }, + } + datasetObj := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: healthCheckTestSpark, + Namespace: healthCheckTestNamespace, + }, + Status: datav1alpha1.DatasetStatus{}, + } + + client = fake.NewFakeClientWithScheme(testScheme, healthyFuse, runtimeObj, datasetObj) + engine = ThinEngine{ + Client: client, + Log: fake.NullLogger(), + namespace: healthCheckTestNamespace, + name: healthCheckTestSpark, + runtime: runtimeObj, + } + }) + + JustBeforeEach(func() { + runtimeInfo, err := base.BuildRuntimeInfo(engine.name, engine.namespace, common.ThinRuntime) + Expect(err).NotTo(HaveOccurred()) + engine.Helper = ctrl.BuildHelper(runtimeInfo, client, engine.Log) + }) + + It("skips worker checks and updates the dataset to bound", func() { + err := engine.CheckRuntimeHealthy() + Expect(err).NotTo(HaveOccurred()) + + updatedRuntime, err := engine.getRuntime() + Expect(err).NotTo(HaveOccurred()) + Expect(updatedRuntime.Status.FuseNumberReady).To(Equal(int32(1))) + Expect(updatedRuntime.Status.FuseNumberAvailable).To(Equal(int32(1))) + + updatedDataset, err := utils.GetDataset(client, healthCheckTestSpark, healthCheckTestNamespace) + Expect(err).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.BoundDatasetPhase)) + }) + }) + + Context("when checking a fuse-only runtime without a fuse daemonset", func() { + var ( + client client.Client + engine ThinEngine + ) + + BeforeEach(func() { + runtimeObj := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: healthCheckTestSpark, + Namespace: healthCheckTestNamespace, + }, + Spec: datav1alpha1.ThinRuntimeSpec{}, + } + datasetObj := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: healthCheckTestSpark, + Namespace: healthCheckTestNamespace, + }, + } + + client = fake.NewFakeClientWithScheme(testScheme, runtimeObj, datasetObj) + engine = ThinEngine{ + Client: client, + Log: fake.NullLogger(), + namespace: healthCheckTestNamespace, + name: healthCheckTestSpark, + runtime: runtimeObj, + } + }) + + JustBeforeEach(func() { + runtimeInfo, err := base.BuildRuntimeInfo(engine.name, engine.namespace, common.ThinRuntime) + Expect(err).NotTo(HaveOccurred()) + engine.Helper = ctrl.BuildHelper(runtimeInfo, client, engine.Log) + }) + + It("returns the fuse check error", func() { + err := engine.CheckRuntimeHealthy() + Expect(err).To(HaveOccurred()) + }) + }) + }) Describe("checkFuseHealthy", func() { @@ -419,4 +529,81 @@ var _ = Describe("ThinEngine Health Check", Label("pkg.ddc.thin.health_check_tes }) }) }) + + Describe("CheckAndUpdateRuntimeStatus", func() { + It("initializes fuse-only runtime status and strips mount options", func() { + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: healthCheckTestSpark, + Namespace: healthCheckTestNamespace, + }, + Status: datav1alpha1.DatasetStatus{ + Mounts: []datav1alpha1.Mount{{ + MountPoint: "s3://bucket/data", + Options: map[string]string{"endpoint": "test"}, + EncryptOptions: []datav1alpha1.EncryptOption{{ + Name: "secret", + }}, + }}, + }, + } + + runtimeObj := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: healthCheckTestSpark, + Namespace: healthCheckTestNamespace, + CreationTimestamp: metav1.Now(), + }, + Spec: datav1alpha1.ThinRuntimeSpec{ + Fuse: datav1alpha1.ThinFuseSpec{}, + }, + Status: datav1alpha1.RuntimeStatus{ + FusePhase: datav1alpha1.RuntimePhaseNone, + }, + } + + client := fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj) + engine := ThinEngine{ + Client: client, + Log: fake.NullLogger(), + name: healthCheckTestSpark, + namespace: healthCheckTestNamespace, + runtime: runtimeObj, + } + + ready, err := engine.CheckAndUpdateRuntimeStatus() + + Expect(err).NotTo(HaveOccurred()) + Expect(ready).To(BeTrue()) + + updatedRuntime, err := engine.getRuntime() + Expect(err).NotTo(HaveOccurred()) + Expect(updatedRuntime.Status.FusePhase).To(Equal(datav1alpha1.RuntimePhaseReady)) + Expect(updatedRuntime.Status.ValueFileConfigmap).To(Equal(engine.getHelmValuesConfigMapName())) + Expect(updatedRuntime.Status.SetupDuration).NotTo(BeEmpty()) + Expect(updatedRuntime.Status.CacheStates).To(HaveKeyWithValue(common.Cached, "N/A")) + Expect(updatedRuntime.Status.Mounts).To(HaveLen(1)) + Expect(updatedRuntime.Status.Mounts[0].Options).To(BeNil()) + Expect(updatedRuntime.Status.Mounts[0].EncryptOptions).To(BeNil()) + + _, cond := utils.GetRuntimeCondition(updatedRuntime.Status.Conditions, datav1alpha1.RuntimeFusesInitialized) + Expect(cond).NotTo(BeNil()) + }) + }) + + Describe("ThinEngine.getDataSetFileNum", func() { + It("returns an empty count when no running fuse pod can be found", func() { + engine := &ThinEngine{ + Client: fake.NewFakeClientWithScheme(testScheme), + Log: fake.NullLogger(), + name: healthCheckTestName, + namespace: healthCheckTestNamespace, + } + + count, err := engine.getDataSetFileNum() + + Expect(err).To(HaveOccurred()) + Expect(count).To(BeEmpty()) + }) + }) }) diff --git a/pkg/ddc/thin/master_test.go b/pkg/ddc/thin/master_test.go index 2ff13742367..538b2a6a29c 100644 --- a/pkg/ddc/thin/master_test.go +++ b/pkg/ddc/thin/master_test.go @@ -17,13 +17,21 @@ package thin import ( + "context" + "errors" + . "github.com/agiledragon/gomonkey/v2" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/fluid-cloudnative/fluid/pkg/utils/helm" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ) var _ = Describe("Master Tests", func() { @@ -123,5 +131,176 @@ var _ = Describe("Master Tests", func() { Expect(err).To(BeNil()) }) }) + + }) + + Describe("ThinEngine.generateThinValueFile", func() { + It("generates values when the runtime profile lookup falls back to nil", func() { + dataset, runtimeObj, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{ + Image: "runtime-fuse", + } + profile = nil + + client := fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj) + engine := mockThinEngineForTests(dataset, runtimeObj, profile) + engine.Client = client + engine.runtime = runtimeObj + + generatedProfile, err := engine.getThinRuntimeProfile() + Expect(err).To(HaveOccurred()) + Expect(generatedProfile).To(BeNil()) + + valueFile, err := engine.generateThinValueFile(runtimeObj, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(valueFile).To(BeAnExistingFile()) + + configMap := &corev1.ConfigMap{} + err = engine.Client.Get(context.TODO(), types.NamespacedName{ + Name: engine.getHelmValuesConfigMapName(), + Namespace: engine.namespace, + }, configMap) + Expect(err).NotTo(HaveOccurred()) + Expect(configMap.Labels).To(HaveKeyWithValue(common.LabelAnnotationDatasetId, dataset.Labels[common.LabelAnnotationDatasetId])) + Expect(configMap.Data).To(HaveKey("data")) + }) + + It("skips storing runtime helm values when runtime config map generation is disabled", func() { + dataset, runtimeObj, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{Image: "runtime-fuse"} + runtimeObj.Annotations = map[string]string{common.AnnotationDisableRuntimeHelmValueConfig: "false"} + + engine := mockThinEngineForTests(dataset, runtimeObj, profile) + engine.Client = fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj, profile) + engine.runtime = runtimeObj + + valueFile, err := engine.generateThinValueFile(runtimeObj, profile) + Expect(err).NotTo(HaveOccurred()) + Expect(valueFile).To(BeAnExistingFile()) + + configMap := &corev1.ConfigMap{} + err = engine.Client.Get(context.TODO(), types.NamespacedName{ + Name: engine.getHelmValuesConfigMapName(), + Namespace: engine.namespace, + }, configMap) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + }) + + }) + + Describe("ThinEngine.setupMasterInternal", func() { + It("continues with a missing runtime profile when the release already exists", func() { + dataset, runtimeObj, _ := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{Image: "runtime-fuse"} + + engine := mockThinEngineForTests(dataset, runtimeObj, nil) + engine.Client = fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj) + engine.runtime = runtimeObj + + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(name string, namespace string) (bool, error) { + Expect(name).To(Equal(engine.name)) + Expect(namespace).To(Equal(engine.namespace)) + return true, nil + }) + installReleasePatch := ApplyFunc(helm.InstallRelease, func(string, string, string, string) error { + Fail("InstallRelease should not be called when the release already exists") + return nil + }) + defer checkReleasePatch.Reset() + defer installReleasePatch.Reset() + + Expect(engine.setupMasterInternal()).To(Succeed()) + }) + + It("installs the release when it is not already present", func() { + dataset, runtimeObj, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{Image: "runtime-fuse"} + + engine := mockThinEngineForTests(dataset, runtimeObj, profile) + engine.Client = fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj, profile) + engine.runtime = runtimeObj + + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(string, string) (bool, error) { + return false, nil + }) + installReleasePatch := ApplyFunc(helm.InstallRelease, func(name string, namespace string, valueFile string, chart string) error { + Expect(name).To(Equal(engine.name)) + Expect(namespace).To(Equal(engine.namespace)) + Expect(valueFile).To(BeAnExistingFile()) + Expect(chart).To(ContainSubstring("thin")) + return nil + }) + defer checkReleasePatch.Reset() + defer installReleasePatch.Reset() + + Expect(engine.setupMasterInternal()).To(Succeed()) + }) + + It("returns the install release error", func() { + dataset, runtimeObj, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{Image: "runtime-fuse"} + + engine := mockThinEngineForTests(dataset, runtimeObj, profile) + engine.Client = fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj, profile) + engine.runtime = runtimeObj + + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(string, string) (bool, error) { + return false, nil + }) + installReleasePatch := ApplyFunc(helm.InstallRelease, func(string, string, string, string) error { + return errors.New("install failed") + }) + defer checkReleasePatch.Reset() + defer installReleasePatch.Reset() + + Expect(engine.setupMasterInternal()).To(MatchError("install failed")) + }) + }) + + Describe("ThinEngine.getThinRuntimeProfile", func() { + It("returns nil when the engine runtime is nil", func() { + engine := &ThinEngine{} + + profile, err := engine.getThinRuntimeProfile() + + Expect(err).NotTo(HaveOccurred()) + Expect(profile).To(BeNil()) + }) + + It("loads the referenced runtime profile", func() { + dataset, runtimeObj, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + engine := mockThinEngineForTests(dataset, runtimeObj, profile) + engine.Client = fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj, profile) + + loadedProfile, err := engine.getThinRuntimeProfile() + + Expect(err).NotTo(HaveOccurred()) + Expect(loadedProfile).NotTo(BeNil()) + Expect(loadedProfile.Name).To(Equal(profile.Name)) + }) + }) + + Describe("ThinEngine.ifRuntimeHelmValueEnable", func() { + It("defaults to enabled when runtime is nil or the annotation is invalid", func() { + Expect((&ThinEngine{}).ifRuntimeHelmValueEnable()).To(BeTrue()) + + engine := &ThinEngine{runtime: &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{common.AnnotationDisableRuntimeHelmValueConfig: "not-a-bool"}, + }, + }} + + Expect(engine.ifRuntimeHelmValueEnable()).To(BeTrue()) + }) + + It("follows the parsed runtime annotation value", func() { + engine := &ThinEngine{runtime: &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{common.AnnotationDisableRuntimeHelmValueConfig: "false"}, + }, + }} + + Expect(engine.ifRuntimeHelmValueEnable()).To(BeFalse()) + }) }) }) diff --git a/pkg/ddc/thin/shutdown_test.go b/pkg/ddc/thin/shutdown_test.go index ede2bf3300a..427405a9e2b 100644 --- a/pkg/ddc/thin/shutdown_test.go +++ b/pkg/ddc/thin/shutdown_test.go @@ -112,6 +112,10 @@ func TestDestroyWorker(t *testing.T) { for _, nodeInput := range nodeInputs { testNodes = append(testNodes, nodeInput.DeepCopy()) } + testNodes = append(testNodes, + &datav1alpha1.ThinRuntime{ObjectMeta: metav1.ObjectMeta{Name: "spark", Namespace: "fluid"}}, + &datav1alpha1.ThinRuntime{ObjectMeta: metav1.ObjectMeta{Name: "hadoop", Namespace: "fluid"}}, + ) client := fake.NewFakeClientWithScheme(testScheme, testNodes...) diff --git a/pkg/ddc/thin/transform.go b/pkg/ddc/thin/transform.go index 79b0d48718a..d072e9e42c0 100644 --- a/pkg/ddc/thin/transform.go +++ b/pkg/ddc/thin/transform.go @@ -32,6 +32,9 @@ func (t *ThinEngine) transform(runtime *datav1alpha1.ThinRuntime, profile *datav err = fmt.Errorf("the thinRuntime is null") return } + if profile == nil { + profile = &datav1alpha1.ThinRuntimeProfile{} + } defer utils.TimeTrack(time.Now(), "ThinRuntime.Transform", "name", runtime.Name) dataset, err := utils.GetDataset(t.Client, t.name, t.namespace) @@ -44,8 +47,8 @@ func (t *ThinEngine) transform(runtime *datav1alpha1.ThinRuntime, profile *datav Namespace: runtime.Namespace, Name: runtime.Name, }, - ImagePullSecrets: profile.Spec.ImagePullSecrets, } + value.ImagePullSecrets = profile.Spec.ImagePullSecrets if len(runtime.Spec.ImagePullSecrets) != 0 { value.ImagePullSecrets = runtime.Spec.ImagePullSecrets } diff --git a/pkg/ddc/thin/transform_config_test.go b/pkg/ddc/thin/transform_config_test.go index 288c258f214..0200b1c2515 100644 --- a/pkg/ddc/thin/transform_config_test.go +++ b/pkg/ddc/thin/transform_config_test.go @@ -17,6 +17,8 @@ package thin import ( + "encoding/json" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -134,5 +136,103 @@ var _ = Describe("ThinEngine extractVolumeMountOptions", func() { []string{"ro", "noexec"}, false, ), + Entry("no mount options configured", + &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{}, + Spec: corev1.PersistentVolumeSpec{}, + }, + nil, + false, + ), ) }) + +var _ = Describe("ThinEngine transformFuseConfig", func() { + var ( + engine ThinEngine + runtime *datav1alpha1.ThinRuntime + dataset *datav1alpha1.Dataset + value *ThinValue + ) + + BeforeEach(func() { + engine = ThinEngine{ + name: "thin-test", + namespace: "fluid", + Client: fake.NewFakeClientWithScheme(testScheme), + Log: fake.NullLogger(), + } + runtime = &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "thin-test", Namespace: "fluid"}, + } + dataset = &datav1alpha1.Dataset{} + value = &ThinValue{} + }) + + It("returns an error when the configured fuse config storage is unsupported", func() { + By("using an unsupported storage backend") + GinkgoTB().Setenv(EnvFuseConfigStorage, "invalid") + dataset.Spec.Mounts = []datav1alpha1.Mount{{MountPoint: "s3://bucket/data"}} + + err := engine.transformFuseConfig(runtime, dataset, value) + + Expect(err).To(MatchError(ContainSubstring("FUSE config storage \"invalid\" is not supported"))) + }) + + It("returns an error when a pvc mount is not yet bound", func() { + By("providing a pvc mount whose claim is still pending") + pendingPVC := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: "pending-pvc", Namespace: "fluid"}, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimPending}, + } + engine.Client = fake.NewFakeClientWithScheme(testScheme, pendingPVC) + dataset.Spec.Mounts = []datav1alpha1.Mount{{MountPoint: "pvc://pending-pvc"}} + + err := engine.transformFuseConfig(runtime, dataset, value) + + Expect(err).To(MatchError(ContainSubstring("failed to extract volume info from PersistentVolumeClaim \"pending-pvc\""))) + Expect(err).To(MatchError(ContainSubstring("persistent volume claim pending-pvc not bounded yet"))) + }) + + It("serializes secret-backed mount options when fuse config storage is secret", func() { + GinkgoTB().Setenv(EnvFuseConfigStorage, "secret") + engine.Client = fake.NewFakeClientWithScheme(testScheme, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "my-s3-secret", Namespace: "fluid"}, + Data: map[string][]byte{ + "access-key-id": []byte("test-ak"), + "access-key-secret": []byte("test-sk"), + }, + }) + dataset.Spec.Mounts = []datav1alpha1.Mount{{ + MountPoint: "s3://bucket/data", + Options: map[string]string{ + "endpoint": "https://minio.example.com", + }, + EncryptOptions: []datav1alpha1.EncryptOption{{ + Name: "access-key-id", + ValueFrom: datav1alpha1.EncryptOptionSource{ + SecretKeyRef: datav1alpha1.SecretKeySelector{Name: "my-s3-secret", Key: "access-key-id"}, + }, + }, { + Name: "access-key-secret", + ValueFrom: datav1alpha1.EncryptOptionSource{ + SecretKeyRef: datav1alpha1.SecretKeySelector{Name: "my-s3-secret", Key: "access-key-secret"}, + }, + }}, + }} + + err := engine.transformFuseConfig(runtime, dataset, value) + + Expect(err).NotTo(HaveOccurred()) + Expect(value.Fuse.ConfigStorage).To(Equal("secret")) + Expect(value.Fuse.Volumes).To(BeEmpty()) + Expect(value.Fuse.VolumeMounts).To(BeEmpty()) + + config := &Config{} + Expect(json.Unmarshal([]byte(value.Fuse.ConfigValue), config)).To(Succeed()) + Expect(config.Mounts).To(HaveLen(1)) + Expect(config.Mounts[0].Options).To(HaveKeyWithValue("endpoint", "https://minio.example.com")) + Expect(config.Mounts[0].Options).To(HaveKeyWithValue("access-key-id", "test-ak")) + Expect(config.Mounts[0].Options).To(HaveKeyWithValue("access-key-secret", "test-sk")) + }) +}) diff --git a/pkg/ddc/thin/transform_test.go b/pkg/ddc/thin/transform_test.go index b0c79c18d7d..0a8782f2496 100644 --- a/pkg/ddc/thin/transform_test.go +++ b/pkg/ddc/thin/transform_test.go @@ -17,73 +17,269 @@ package thin import ( - "reflect" - "testing" - - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + data1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) -func TestThinEngine_transformTolerations(t *testing.T) { - type fields struct { - name string - namespace string - } - type args struct { - dataset *datav1alpha1.Dataset - value *ThinValue - } - var tests = []struct { - name string - fields fields - args args - }{ - { - name: "test", - fields: fields{ - name: "", - namespace: "", - }, - args: args{ - dataset: &datav1alpha1.Dataset{Spec: datav1alpha1.DatasetSpec{ - Tolerations: []corev1.Toleration{{ - Key: "a", - Operator: corev1.TolerationOpEqual, - Value: "b", +var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_test.go"), func() { + var ( + dataset *data1alpha1.Dataset + thinruntime *data1alpha1.ThinRuntime + profile *data1alpha1.ThinRuntimeProfile + engine *ThinEngine + resources []runtime.Object + ) + + BeforeEach(func() { + dataset, thinruntime, profile = mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + engine = mockThinEngineForTests(dataset, thinruntime, profile) + resources = []runtime.Object{dataset, thinruntime, profile} + }) + + JustBeforeEach(func() { + engine.Client = fake.NewFakeClientWithScheme(data1alpha1.UnitTestScheme, resources...) + }) + + Describe("transform", func() { + It("returns an error when runtime is nil", func() { + value, err := engine.transform(nil, profile) + + Expect(err).To(MatchError("the thinRuntime is null")) + Expect(value).To(BeNil()) + }) + + It("returns a dataset lookup error when dataset is missing", func() { + engine.Client = fake.NewFakeClientWithScheme(data1alpha1.UnitTestScheme, thinruntime, profile) + + value, err := engine.transform(thinruntime, profile) + + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + Expect(value).To(BeNil()) + }) + + When("worker is disabled", func() { + BeforeEach(func() { + thinruntime.Spec.Replicas = 1 + dataset.Spec.PlacementMode = "" + dataset.Spec.Tolerations = []corev1.Toleration{{ + Key: "dedicated", + Operator: corev1.TolerationOpEqual, + Value: "thin", + }} + profile.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: "profile-secret"}} + }) + + It("uses profile defaults on the happy path", func() { + value, err := engine.transform(thinruntime, profile) + + Expect(err).NotTo(HaveOccurred()) + Expect(value.ImagePullSecrets).To(Equal(profile.Spec.ImagePullSecrets)) + Expect(value.Worker.Enabled).To(BeFalse()) + Expect(value.Tolerations).To(Equal(dataset.Spec.Tolerations)) + Expect(value.PlacementMode).To(Equal(string(data1alpha1.ExclusiveMode))) + Expect(value.OwnerDatasetId).To(Equal(dataset.Labels[common.LabelAnnotationDatasetId])) + Expect(value.RuntimeIdentity).To(Equal(common.RuntimeIdentity{ + Namespace: thinruntime.Namespace, + Name: thinruntime.Name, + })) + Expect(value.Owner).NotTo(BeNil()) + Expect(value.Owner.Name).To(Equal(thinruntime.Name)) + }) + }) + + When("worker is enabled", func() { + BeforeEach(func() { + thinruntime.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: "runtime-secret"}} + thinruntime.Spec.TieredStore.Levels = []data1alpha1.Level{{Path: "/runtime/cache"}} + thinruntime.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + Enabled: true, + Image: "runtime-worker", + ImageTag: "runtime-tag", + ImagePullPolicy: string(corev1.PullIfNotPresent), + ImagePullSecrets: []corev1.LocalObjectReference{{Name: "runtime-worker-secret"}}, + Env: []corev1.EnvVar{{Name: "RUNTIME_ENV", Value: "runtime"}}, + Ports: []corev1.ContainerPort{{Name: "http", ContainerPort: 8080}}, + NodeSelector: map[string]string{"node": "runtime"}, + NetworkMode: data1alpha1.HostNetworkMode, + VolumeMounts: []corev1.VolumeMount{{ + Name: "runtime-volume", + MountPath: "/runtime/mount", }}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1")}, + Limits: corev1.ResourceList{corev1.ResourceMemory: resource.MustParse("2Gi")}, + }, + } + thinruntime.Spec.Volumes = []corev1.Volume{{ + Name: "runtime-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }} + profile.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + Image: "profile-worker", + ImageTag: "profile-tag", + ImagePullPolicy: string(corev1.PullAlways), + ImagePullSecrets: []corev1.LocalObjectReference{{Name: "profile-worker-secret"}}, + Env: []corev1.EnvVar{{Name: "PROFILE_ENV", Value: "profile"}}, + Ports: []corev1.ContainerPort{{Name: "metrics", ContainerPort: 9090}}, + NodeSelector: map[string]string{"node": "profile"}, + VolumeMounts: []corev1.VolumeMount{{ + Name: "profile-volume", + MountPath: "/profile/mount", + }}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{corev1.ResourceMemory: resource.MustParse("1Gi")}, + Limits: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("500m")}, + }, + } + profile.Spec.Volumes = []corev1.Volume{{ + Name: "profile-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }} + }) + + It("uses runtime image pull secrets and transforms worker fields", func() { + value, err := engine.transform(thinruntime, profile) + + Expect(err).NotTo(HaveOccurred()) + Expect(value.ImagePullSecrets).To(Equal(thinruntime.Spec.ImagePullSecrets)) + Expect(value.Worker.Image).To(Equal(thinruntime.Spec.Worker.Image)) + Expect(value.Worker.ImageTag).To(Equal(thinruntime.Spec.Worker.ImageTag)) + Expect(value.Worker.ImagePullPolicy).To(Equal(thinruntime.Spec.Worker.ImagePullPolicy)) + Expect(value.Worker.ImagePullSecrets).To(Equal(thinruntime.Spec.Worker.ImagePullSecrets)) + Expect(value.Worker.Envs).To(ContainElements(profile.Spec.Worker.Env[0], thinruntime.Spec.Worker.Env[0])) + Expect(value.Worker.Ports).To(ContainElements(profile.Spec.Worker.Ports[0], thinruntime.Spec.Worker.Ports[0])) + Expect(value.Worker.NodeSelector).To(Equal(thinruntime.Spec.Worker.NodeSelector)) + Expect(value.Worker.CacheDir).To(Equal("/runtime/cache")) + Expect(value.Worker.HostNetwork).To(BeTrue()) + Expect(value.Worker.VolumeMounts).To(ContainElements(profile.Spec.Worker.VolumeMounts[0], thinruntime.Spec.Worker.VolumeMounts[0])) + Expect(value.Worker.Volumes).To(ContainElement(profile.Spec.Volumes[0])) + Expect(value.Worker.Volumes).To(ContainElement(thinruntime.Spec.Volumes[0])) + Expect(value.Worker.Resources.Requests).To(HaveKeyWithValue(corev1.ResourceCPU, "1")) + Expect(value.Worker.Resources.Requests).To(HaveKeyWithValue(corev1.ResourceMemory, "1Gi")) + Expect(value.Worker.Resources.Limits).To(HaveKeyWithValue(corev1.ResourceCPU, "500m")) + Expect(value.Worker.Resources.Limits).To(HaveKeyWithValue(corev1.ResourceMemory, "2Gi")) + }) + }) + + It("handles a missing profile when callers continue after profile lookup not found", func() { + thinruntime.Spec.Fuse.Image = "runtime-fuse" + thinruntime.Spec.Fuse.ImageTag = "runtime-tag" + + value, err := engine.transform(thinruntime, nil) + + Expect(err).NotTo(HaveOccurred()) + Expect(value).NotTo(BeNil()) + Expect(value.ImagePullSecrets).To(BeNil()) + }) + }) + + Describe("transformWorkers", func() { + var value *ThinValue + + BeforeEach(func() { + value = &ThinValue{} + profile.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + Image: "profile-worker", + ImageTag: "profile-tag", + ImagePullPolicy: string(corev1.PullAlways), + ImagePullSecrets: []corev1.LocalObjectReference{{Name: "profile-worker-secret"}}, + Env: []corev1.EnvVar{{Name: "PROFILE_ENV", Value: "profile"}}, + Ports: []corev1.ContainerPort{{Name: "profile-port", ContainerPort: 9090}}, + NodeSelector: map[string]string{"profile": "true"}, + NetworkMode: data1alpha1.ContainerNetworkMode, + VolumeMounts: []corev1.VolumeMount{{ + Name: "profile-volume", + MountPath: "/profile/mount", }}, - value: &ThinValue{}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - j := &ThinEngine{ - name: tt.fields.name, - namespace: tt.fields.namespace, } - j.transformTolerations(tt.args.dataset, tt.args.value) - if len(tt.args.value.Tolerations) != len(tt.args.dataset.Spec.Tolerations) { - t.Errorf("transformTolerations() tolerations = %v", tt.args.value.Tolerations) + profile.Spec.Volumes = []corev1.Volume{{ + Name: "profile-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }} + thinruntime.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + ImageTag: "runtime-tag", + ImagePullSecrets: []corev1.LocalObjectReference{{Name: "runtime-worker-secret"}}, + Env: []corev1.EnvVar{{Name: "RUNTIME_ENV", Value: "runtime"}}, + Ports: []corev1.ContainerPort{{Name: "runtime-port", ContainerPort: 8080}}, + NodeSelector: map[string]string{"runtime": "true"}, + NetworkMode: data1alpha1.HostNetworkMode, + VolumeMounts: []corev1.VolumeMount{{ + Name: "runtime-volume", + MountPath: "/runtime/mount", + }}, } + thinruntime.Spec.Volumes = []corev1.Volume{{ + Name: "runtime-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }} + thinruntime.Spec.TieredStore.Levels = []data1alpha1.Level{{Path: "/runtime/cache"}} + }) + + It("applies profile defaults and runtime overrides", func() { + err := engine.transformWorkers(thinruntime, profile, value) + + Expect(err).NotTo(HaveOccurred()) + Expect(value.Worker.Image).To(Equal(profile.Spec.Worker.Image)) + Expect(value.Worker.ImageTag).To(Equal(thinruntime.Spec.Worker.ImageTag)) + Expect(value.Worker.ImagePullPolicy).To(Equal(profile.Spec.Worker.ImagePullPolicy)) + Expect(value.Worker.ImagePullSecrets).To(Equal(thinruntime.Spec.Worker.ImagePullSecrets)) + Expect(value.Worker.Envs).To(ContainElements(profile.Spec.Worker.Env[0], thinruntime.Spec.Worker.Env[0])) + Expect(value.Worker.Ports).To(ContainElements(profile.Spec.Worker.Ports[0], thinruntime.Spec.Worker.Ports[0])) + Expect(value.Worker.NodeSelector).To(Equal(thinruntime.Spec.Worker.NodeSelector)) + Expect(value.Worker.VolumeMounts).To(ContainElements(profile.Spec.Worker.VolumeMounts[0], thinruntime.Spec.Worker.VolumeMounts[0])) + Expect(value.Worker.Volumes).To(ContainElements(profile.Spec.Volumes[0], thinruntime.Spec.Volumes[0])) + Expect(value.Worker.CacheDir).To(Equal("/runtime/cache")) + Expect(value.Worker.HostNetwork).To(BeTrue()) }) - } -} - -func TestThinEngine_parseFromProfile(t1 *testing.T) { - profile := datav1alpha1.ThinRuntimeProfile{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "fluid", - }, - Spec: datav1alpha1.ThinRuntimeProfileSpec{ - Worker: datav1alpha1.ThinCompTemplateSpec{ - Image: "test", - ImageTag: "v1", - ImagePullPolicy: "Always", + + It("swallows worker volume transformation errors", func() { + profile.Spec.Worker.VolumeMounts = []corev1.VolumeMount{{Name: "missing-volume", MountPath: "/profile/mount"}} + profile.Spec.Volumes = nil + + err := engine.transformWorkers(thinruntime, profile, value) + + Expect(err).NotTo(HaveOccurred()) + Expect(value.Worker.VolumeMounts).To(ContainElement(thinruntime.Spec.Worker.VolumeMounts[0])) + Expect(value.Worker.Volumes).To(ContainElement(thinruntime.Spec.Volumes[0])) + }) + }) + + Describe("parseFromProfile", func() { + var value *ThinValue + + BeforeEach(func() { + value = &ThinValue{} + }) + + It("is a no-op for a nil profile", func() { + engine.parseFromProfile(nil, value) + + Expect(value).To(Equal(&ThinValue{})) + }) + + It("copies worker settings from the profile", func() { + profile.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + Image: "test", + ImageTag: "v1", + ImagePullPolicy: string(corev1.PullAlways), + ImagePullSecrets: []corev1.LocalObjectReference{{Name: "pull-secret"}}, Env: []corev1.EnvVar{{ Name: "a", Value: "b", @@ -91,9 +287,7 @@ func TestThinEngine_parseFromProfile(t1 *testing.T) { Name: "b", ValueFrom: &corev1.EnvVarSource{ ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "test-cm", - }, + LocalObjectReference: corev1.LocalObjectReference{Name: "test-cm"}, }, }, }}, @@ -103,11 +297,7 @@ func TestThinEngine_parseFromProfile(t1 *testing.T) { ContainerPort: 8080, }}, LivenessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - }, - }, + ProbeHandler: corev1.ProbeHandler{HTTPGet: &corev1.HTTPGetAction{Path: "/healthz"}}, InitialDelaySeconds: 1, TimeoutSeconds: 1, PeriodSeconds: 1, @@ -115,121 +305,87 @@ func TestThinEngine_parseFromProfile(t1 *testing.T) { FailureThreshold: 1, }, ReadinessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - }, - }, + ProbeHandler: corev1.ProbeHandler{HTTPGet: &corev1.HTTPGetAction{Path: "/healthz"}}, InitialDelaySeconds: 1, TimeoutSeconds: 1, PeriodSeconds: 1, SuccessThreshold: 1, FailureThreshold: 1, }, - NetworkMode: datav1alpha1.HostNetworkMode, - }, - }, - } - wantValue := &ThinValue{ - Worker: Worker{ - Image: "test", - ImageTag: "v1", - ImagePullPolicy: "Always", - Resources: common.Resources{ + NetworkMode: data1alpha1.HostNetworkMode, + } + + engine.parseFromProfile(profile, value) + + Expect(value.Worker.Image).To(Equal("test")) + Expect(value.Worker.ImageTag).To(Equal("v1")) + Expect(value.Worker.ImagePullPolicy).To(Equal(string(corev1.PullAlways))) + Expect(value.Worker.ImagePullSecrets).To(Equal([]corev1.LocalObjectReference{{Name: "pull-secret"}})) + Expect(value.Worker.Resources).To(Equal(common.Resources{ Requests: map[corev1.ResourceName]string{}, Limits: map[corev1.ResourceName]string{}, - }, - HostNetwork: true, - Envs: []corev1.EnvVar{{ - Name: "a", - Value: "b", - }, { - Name: "b", - ValueFrom: &corev1.EnvVarSource{ - ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "test-cm", - }, - }, - }, - }}, - NodeSelector: map[string]string{"a": "b"}, - Ports: []corev1.ContainerPort{{ - Name: "port", - ContainerPort: 8080, - }}, - LivenessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - }, - }, - InitialDelaySeconds: 1, - TimeoutSeconds: 1, - PeriodSeconds: 1, - SuccessThreshold: 1, - FailureThreshold: 1, - }, - ReadinessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - }, - }, - InitialDelaySeconds: 1, - TimeoutSeconds: 1, - PeriodSeconds: 1, - SuccessThreshold: 1, - FailureThreshold: 1, - }, - }, - } - value := &ThinValue{} - t1.Run("test", func(t1 *testing.T) { - t := &ThinEngine{ - Log: fake.NullLogger(), - } - t.parseFromProfile(&profile, value) - if !reflect.DeepEqual(value, wantValue) { - t1.Errorf("parseFromProfile() got = %v, want = %v", value, wantValue) - } + })) + Expect(value.Worker.HostNetwork).To(BeTrue()) + Expect(value.Worker.Envs).To(Equal(profile.Spec.Worker.Env)) + Expect(value.Worker.NodeSelector).To(Equal(profile.Spec.Worker.NodeSelector)) + Expect(value.Worker.Ports).To(Equal(profile.Spec.Worker.Ports)) + Expect(value.Worker.LivenessProbe).To(Equal(profile.Spec.Worker.LivenessProbe)) + Expect(value.Worker.ReadinessProbe).To(Equal(profile.Spec.Worker.ReadinessProbe)) + }) }) -} - -func TestThinEngine_parseWorkerImage(t1 *testing.T) { - type args struct { - runtime *datav1alpha1.ThinRuntime - value *ThinValue - } - tests := []struct { - name string - args args - }{ - { - name: "test", - args: args{ - runtime: &datav1alpha1.ThinRuntime{ - Spec: datav1alpha1.ThinRuntimeSpec{ - Worker: datav1alpha1.ThinCompTemplateSpec{ - Image: "test", - ImageTag: "v1", - ImagePullPolicy: "Always", - }, - }, - }, - value: &ThinValue{}, - }, - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - t := &ThinEngine{} - t.parseWorkerImage(tt.args.runtime, tt.args.value) - if tt.args.value.Worker.Image != tt.args.runtime.Spec.Worker.Image || - tt.args.value.Worker.ImageTag != tt.args.runtime.Spec.Worker.ImageTag || - tt.args.value.Worker.ImagePullPolicy != tt.args.runtime.Spec.Worker.ImagePullPolicy { - t1.Errorf("got %v, want %v", tt.args.value.Worker, tt.args.runtime.Spec.Worker) + + Describe("parseWorkerImage", func() { + It("selectively overrides only populated runtime image fields", func() { + value := &ThinValue{Worker: Worker{ + Image: "profile-image", + ImageTag: "profile-tag", + ImagePullPolicy: string(corev1.PullAlways), + ImagePullSecrets: []corev1.LocalObjectReference{{Name: "profile-secret"}}, + }} + thinruntime.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + ImageTag: "runtime-tag", + ImagePullSecrets: []corev1.LocalObjectReference{{Name: "runtime-secret"}}, } + + engine.parseWorkerImage(thinruntime, value) + + Expect(value.Worker.Image).To(Equal("profile-image")) + Expect(value.Worker.ImageTag).To(Equal("runtime-tag")) + Expect(value.Worker.ImagePullPolicy).To(Equal(string(corev1.PullAlways))) + Expect(value.Worker.ImagePullSecrets).To(Equal([]corev1.LocalObjectReference{{Name: "runtime-secret"}})) + }) + }) + + Describe("transformPlacementMode", func() { + It("defaults to exclusive mode when dataset placement mode is empty", func() { + value := &ThinValue{} + + engine.transformPlacementMode(&data1alpha1.Dataset{}, value) + + Expect(value.PlacementMode).To(Equal(string(data1alpha1.ExclusiveMode))) }) - } -} + + It("keeps an existing dataset placement mode", func() { + value := &ThinValue{} + + engine.transformPlacementMode(&data1alpha1.Dataset{Spec: data1alpha1.DatasetSpec{PlacementMode: data1alpha1.ShareMode}}, value) + + Expect(value.PlacementMode).To(Equal(string(data1alpha1.ShareMode))) + }) + }) + + Describe("transformTolerations", func() { + It("copies dataset tolerations into values", func() { + value := &ThinValue{} + tolerations := []corev1.Toleration{{ + Key: "a", + Operator: corev1.TolerationOpEqual, + Value: "b", + }} + + engine.transformTolerations(&data1alpha1.Dataset{Spec: data1alpha1.DatasetSpec{Tolerations: tolerations}}, value) + + Expect(value.Tolerations).To(Equal(tolerations)) + }) + }) +}) From 8f0a37636afd0110cd03aee8ebc82f185e31609f Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 3 May 2026 14:16:04 +0530 Subject: [PATCH 2/6] test: remove unused thin profile fixture Signed-off-by: Harsh --- pkg/ddc/thin/master_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/ddc/thin/master_test.go b/pkg/ddc/thin/master_test.go index 538b2a6a29c..8d2e57405e9 100644 --- a/pkg/ddc/thin/master_test.go +++ b/pkg/ddc/thin/master_test.go @@ -136,14 +136,13 @@ var _ = Describe("Master Tests", func() { Describe("ThinEngine.generateThinValueFile", func() { It("generates values when the runtime profile lookup falls back to nil", func() { - dataset, runtimeObj, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + dataset, runtimeObj, _ := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{ Image: "runtime-fuse", } - profile = nil client := fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj) - engine := mockThinEngineForTests(dataset, runtimeObj, profile) + engine := mockThinEngineForTests(dataset, runtimeObj, nil) engine.Client = client engine.runtime = runtimeObj From 9132f371dcfeb2436425d8d1c1fdbe92c848ac7e Mon Sep 17 00:00:00 2001 From: Harsh Date: Mon, 11 May 2026 06:48:28 +0530 Subject: [PATCH 3/6] remove thin transform overlap Signed-off-by: Harsh --- pkg/ddc/thin/shutdown_test.go | 4 ---- pkg/ddc/thin/transform.go | 5 +---- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/pkg/ddc/thin/shutdown_test.go b/pkg/ddc/thin/shutdown_test.go index 427405a9e2b..ede2bf3300a 100644 --- a/pkg/ddc/thin/shutdown_test.go +++ b/pkg/ddc/thin/shutdown_test.go @@ -112,10 +112,6 @@ func TestDestroyWorker(t *testing.T) { for _, nodeInput := range nodeInputs { testNodes = append(testNodes, nodeInput.DeepCopy()) } - testNodes = append(testNodes, - &datav1alpha1.ThinRuntime{ObjectMeta: metav1.ObjectMeta{Name: "spark", Namespace: "fluid"}}, - &datav1alpha1.ThinRuntime{ObjectMeta: metav1.ObjectMeta{Name: "hadoop", Namespace: "fluid"}}, - ) client := fake.NewFakeClientWithScheme(testScheme, testNodes...) diff --git a/pkg/ddc/thin/transform.go b/pkg/ddc/thin/transform.go index d072e9e42c0..79b0d48718a 100644 --- a/pkg/ddc/thin/transform.go +++ b/pkg/ddc/thin/transform.go @@ -32,9 +32,6 @@ func (t *ThinEngine) transform(runtime *datav1alpha1.ThinRuntime, profile *datav err = fmt.Errorf("the thinRuntime is null") return } - if profile == nil { - profile = &datav1alpha1.ThinRuntimeProfile{} - } defer utils.TimeTrack(time.Now(), "ThinRuntime.Transform", "name", runtime.Name) dataset, err := utils.GetDataset(t.Client, t.name, t.namespace) @@ -47,8 +44,8 @@ func (t *ThinEngine) transform(runtime *datav1alpha1.ThinRuntime, profile *datav Namespace: runtime.Namespace, Name: runtime.Name, }, + ImagePullSecrets: profile.Spec.ImagePullSecrets, } - value.ImagePullSecrets = profile.Spec.ImagePullSecrets if len(runtime.Spec.ImagePullSecrets) != 0 { value.ImagePullSecrets = runtime.Spec.ImagePullSecrets } From 3ec80929e1db230f022e50cae7513faf7e3553cd Mon Sep 17 00:00:00 2001 From: Harsh Date: Mon, 11 May 2026 06:49:34 +0530 Subject: [PATCH 4/6] fix thin transform review issues Signed-off-by: Harsh --- pkg/ddc/thin/master_internal.go | 2 +- pkg/ddc/thin/master_test.go | 68 +++++++-------------------------- pkg/ddc/thin/transform_test.go | 66 ++++++++++++++------------------ 3 files changed, 43 insertions(+), 93 deletions(-) diff --git a/pkg/ddc/thin/master_internal.go b/pkg/ddc/thin/master_internal.go index ece3d79dec4..5ef74662cc5 100644 --- a/pkg/ddc/thin/master_internal.go +++ b/pkg/ddc/thin/master_internal.go @@ -132,7 +132,7 @@ func (t *ThinEngine) ifRuntimeHelmValueEnable() bool { if err != nil { return true } - return enableRuntimeHelmValueConfig + return !enableRuntimeHelmValueConfig } func (t *ThinEngine) getHelmValuesConfigMapName() string { diff --git a/pkg/ddc/thin/master_test.go b/pkg/ddc/thin/master_test.go index 8d2e57405e9..1ab023dec6b 100644 --- a/pkg/ddc/thin/master_test.go +++ b/pkg/ddc/thin/master_test.go @@ -135,39 +135,10 @@ var _ = Describe("Master Tests", func() { }) Describe("ThinEngine.generateThinValueFile", func() { - It("generates values when the runtime profile lookup falls back to nil", func() { - dataset, runtimeObj, _ := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) - runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{ - Image: "runtime-fuse", - } - - client := fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj) - engine := mockThinEngineForTests(dataset, runtimeObj, nil) - engine.Client = client - engine.runtime = runtimeObj - - generatedProfile, err := engine.getThinRuntimeProfile() - Expect(err).To(HaveOccurred()) - Expect(generatedProfile).To(BeNil()) - - valueFile, err := engine.generateThinValueFile(runtimeObj, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(valueFile).To(BeAnExistingFile()) - - configMap := &corev1.ConfigMap{} - err = engine.Client.Get(context.TODO(), types.NamespacedName{ - Name: engine.getHelmValuesConfigMapName(), - Namespace: engine.namespace, - }, configMap) - Expect(err).NotTo(HaveOccurred()) - Expect(configMap.Labels).To(HaveKeyWithValue(common.LabelAnnotationDatasetId, dataset.Labels[common.LabelAnnotationDatasetId])) - Expect(configMap.Data).To(HaveKey("data")) - }) - It("skips storing runtime helm values when runtime config map generation is disabled", func() { dataset, runtimeObj, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{Image: "runtime-fuse"} - runtimeObj.Annotations = map[string]string{common.AnnotationDisableRuntimeHelmValueConfig: "false"} + runtimeObj.Annotations = map[string]string{common.AnnotationDisableRuntimeHelmValueConfig: "true"} engine := mockThinEngineForTests(dataset, runtimeObj, profile) engine.Client = fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj, profile) @@ -188,29 +159,6 @@ var _ = Describe("Master Tests", func() { }) Describe("ThinEngine.setupMasterInternal", func() { - It("continues with a missing runtime profile when the release already exists", func() { - dataset, runtimeObj, _ := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) - runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{Image: "runtime-fuse"} - - engine := mockThinEngineForTests(dataset, runtimeObj, nil) - engine.Client = fake.NewFakeClientWithScheme(testScheme, dataset, runtimeObj) - engine.runtime = runtimeObj - - checkReleasePatch := ApplyFunc(helm.CheckRelease, func(name string, namespace string) (bool, error) { - Expect(name).To(Equal(engine.name)) - Expect(namespace).To(Equal(engine.namespace)) - return true, nil - }) - installReleasePatch := ApplyFunc(helm.InstallRelease, func(string, string, string, string) error { - Fail("InstallRelease should not be called when the release already exists") - return nil - }) - defer checkReleasePatch.Reset() - defer installReleasePatch.Reset() - - Expect(engine.setupMasterInternal()).To(Succeed()) - }) - It("installs the release when it is not already present", func() { dataset, runtimeObj, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) runtimeObj.Spec.Fuse = datav1alpha1.ThinFuseSpec{Image: "runtime-fuse"} @@ -292,14 +240,24 @@ var _ = Describe("Master Tests", func() { Expect(engine.ifRuntimeHelmValueEnable()).To(BeTrue()) }) - It("follows the parsed runtime annotation value", func() { + It("disables runtime helm values when the disable annotation is true", func() { engine := &ThinEngine{runtime: &datav1alpha1.ThinRuntime{ ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{common.AnnotationDisableRuntimeHelmValueConfig: "false"}, + Annotations: map[string]string{common.AnnotationDisableRuntimeHelmValueConfig: "true"}, }, }} Expect(engine.ifRuntimeHelmValueEnable()).To(BeFalse()) }) + + It("keeps runtime helm values enabled when the disable annotation is false", func() { + engine := &ThinEngine{runtime: &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{common.AnnotationDisableRuntimeHelmValueConfig: "false"}, + }, + }} + + Expect(engine.ifRuntimeHelmValueEnable()).To(BeTrue()) + }) }) }) diff --git a/pkg/ddc/thin/transform_test.go b/pkg/ddc/thin/transform_test.go index 0a8782f2496..e940309b2e8 100644 --- a/pkg/ddc/thin/transform_test.go +++ b/pkg/ddc/thin/transform_test.go @@ -17,7 +17,7 @@ package thin import ( - data1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" corev1 "k8s.io/api/core/v1" @@ -32,9 +32,9 @@ import ( var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_test.go"), func() { var ( - dataset *data1alpha1.Dataset - thinruntime *data1alpha1.ThinRuntime - profile *data1alpha1.ThinRuntimeProfile + dataset *datav1alpha1.Dataset + thinruntime *datav1alpha1.ThinRuntime + profile *datav1alpha1.ThinRuntimeProfile engine *ThinEngine resources []runtime.Object ) @@ -46,7 +46,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes }) JustBeforeEach(func() { - engine.Client = fake.NewFakeClientWithScheme(data1alpha1.UnitTestScheme, resources...) + engine.Client = fake.NewFakeClientWithScheme(datav1alpha1.UnitTestScheme, resources...) }) Describe("transform", func() { @@ -58,7 +58,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes }) It("returns a dataset lookup error when dataset is missing", func() { - engine.Client = fake.NewFakeClientWithScheme(data1alpha1.UnitTestScheme, thinruntime, profile) + engine.Client = fake.NewFakeClientWithScheme(datav1alpha1.UnitTestScheme, thinruntime, profile) value, err := engine.transform(thinruntime, profile) @@ -85,7 +85,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes Expect(value.ImagePullSecrets).To(Equal(profile.Spec.ImagePullSecrets)) Expect(value.Worker.Enabled).To(BeFalse()) Expect(value.Tolerations).To(Equal(dataset.Spec.Tolerations)) - Expect(value.PlacementMode).To(Equal(string(data1alpha1.ExclusiveMode))) + Expect(value.PlacementMode).To(Equal(string(datav1alpha1.ExclusiveMode))) Expect(value.OwnerDatasetId).To(Equal(dataset.Labels[common.LabelAnnotationDatasetId])) Expect(value.RuntimeIdentity).To(Equal(common.RuntimeIdentity{ Namespace: thinruntime.Namespace, @@ -99,8 +99,8 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes When("worker is enabled", func() { BeforeEach(func() { thinruntime.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: "runtime-secret"}} - thinruntime.Spec.TieredStore.Levels = []data1alpha1.Level{{Path: "/runtime/cache"}} - thinruntime.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + thinruntime.Spec.TieredStore.Levels = []datav1alpha1.Level{{Path: "/runtime/cache"}} + thinruntime.Spec.Worker = datav1alpha1.ThinCompTemplateSpec{ Enabled: true, Image: "runtime-worker", ImageTag: "runtime-tag", @@ -109,7 +109,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes Env: []corev1.EnvVar{{Name: "RUNTIME_ENV", Value: "runtime"}}, Ports: []corev1.ContainerPort{{Name: "http", ContainerPort: 8080}}, NodeSelector: map[string]string{"node": "runtime"}, - NetworkMode: data1alpha1.HostNetworkMode, + NetworkMode: datav1alpha1.HostNetworkMode, VolumeMounts: []corev1.VolumeMount{{ Name: "runtime-volume", MountPath: "/runtime/mount", @@ -125,7 +125,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }} - profile.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + profile.Spec.Worker = datav1alpha1.ThinCompTemplateSpec{ Image: "profile-worker", ImageTag: "profile-tag", ImagePullPolicy: string(corev1.PullAlways), @@ -174,16 +174,6 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes }) }) - It("handles a missing profile when callers continue after profile lookup not found", func() { - thinruntime.Spec.Fuse.Image = "runtime-fuse" - thinruntime.Spec.Fuse.ImageTag = "runtime-tag" - - value, err := engine.transform(thinruntime, nil) - - Expect(err).NotTo(HaveOccurred()) - Expect(value).NotTo(BeNil()) - Expect(value.ImagePullSecrets).To(BeNil()) - }) }) Describe("transformWorkers", func() { @@ -191,7 +181,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes BeforeEach(func() { value = &ThinValue{} - profile.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + profile.Spec.Worker = datav1alpha1.ThinCompTemplateSpec{ Image: "profile-worker", ImageTag: "profile-tag", ImagePullPolicy: string(corev1.PullAlways), @@ -199,7 +189,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes Env: []corev1.EnvVar{{Name: "PROFILE_ENV", Value: "profile"}}, Ports: []corev1.ContainerPort{{Name: "profile-port", ContainerPort: 9090}}, NodeSelector: map[string]string{"profile": "true"}, - NetworkMode: data1alpha1.ContainerNetworkMode, + NetworkMode: datav1alpha1.ContainerNetworkMode, VolumeMounts: []corev1.VolumeMount{{ Name: "profile-volume", MountPath: "/profile/mount", @@ -211,13 +201,13 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }} - thinruntime.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + thinruntime.Spec.Worker = datav1alpha1.ThinCompTemplateSpec{ ImageTag: "runtime-tag", ImagePullSecrets: []corev1.LocalObjectReference{{Name: "runtime-worker-secret"}}, Env: []corev1.EnvVar{{Name: "RUNTIME_ENV", Value: "runtime"}}, Ports: []corev1.ContainerPort{{Name: "runtime-port", ContainerPort: 8080}}, NodeSelector: map[string]string{"runtime": "true"}, - NetworkMode: data1alpha1.HostNetworkMode, + NetworkMode: datav1alpha1.HostNetworkMode, VolumeMounts: []corev1.VolumeMount{{ Name: "runtime-volume", MountPath: "/runtime/mount", @@ -229,7 +219,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }} - thinruntime.Spec.TieredStore.Levels = []data1alpha1.Level{{Path: "/runtime/cache"}} + thinruntime.Spec.TieredStore.Levels = []datav1alpha1.Level{{Path: "/runtime/cache"}} }) It("applies profile defaults and runtime overrides", func() { @@ -249,15 +239,17 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes Expect(value.Worker.HostNetwork).To(BeTrue()) }) - It("swallows worker volume transformation errors", func() { + It("logs but does not return worker volume transformation errors", func() { + thinruntime.Spec.Worker.VolumeMounts = nil + thinruntime.Spec.Volumes = nil profile.Spec.Worker.VolumeMounts = []corev1.VolumeMount{{Name: "missing-volume", MountPath: "/profile/mount"}} profile.Spec.Volumes = nil err := engine.transformWorkers(thinruntime, profile, value) Expect(err).NotTo(HaveOccurred()) - Expect(value.Worker.VolumeMounts).To(ContainElement(thinruntime.Spec.Worker.VolumeMounts[0])) - Expect(value.Worker.Volumes).To(ContainElement(thinruntime.Spec.Volumes[0])) + Expect(value.Worker.VolumeMounts).To(BeEmpty()) + Expect(value.Worker.Volumes).To(BeEmpty()) }) }) @@ -275,7 +267,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes }) It("copies worker settings from the profile", func() { - profile.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + profile.Spec.Worker = datav1alpha1.ThinCompTemplateSpec{ Image: "test", ImageTag: "v1", ImagePullPolicy: string(corev1.PullAlways), @@ -312,7 +304,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes SuccessThreshold: 1, FailureThreshold: 1, }, - NetworkMode: data1alpha1.HostNetworkMode, + NetworkMode: datav1alpha1.HostNetworkMode, } engine.parseFromProfile(profile, value) @@ -342,7 +334,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes ImagePullPolicy: string(corev1.PullAlways), ImagePullSecrets: []corev1.LocalObjectReference{{Name: "profile-secret"}}, }} - thinruntime.Spec.Worker = data1alpha1.ThinCompTemplateSpec{ + thinruntime.Spec.Worker = datav1alpha1.ThinCompTemplateSpec{ ImageTag: "runtime-tag", ImagePullSecrets: []corev1.LocalObjectReference{{Name: "runtime-secret"}}, } @@ -360,17 +352,17 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes It("defaults to exclusive mode when dataset placement mode is empty", func() { value := &ThinValue{} - engine.transformPlacementMode(&data1alpha1.Dataset{}, value) + engine.transformPlacementMode(&datav1alpha1.Dataset{}, value) - Expect(value.PlacementMode).To(Equal(string(data1alpha1.ExclusiveMode))) + Expect(value.PlacementMode).To(Equal(string(datav1alpha1.ExclusiveMode))) }) It("keeps an existing dataset placement mode", func() { value := &ThinValue{} - engine.transformPlacementMode(&data1alpha1.Dataset{Spec: data1alpha1.DatasetSpec{PlacementMode: data1alpha1.ShareMode}}, value) + engine.transformPlacementMode(&datav1alpha1.Dataset{Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ShareMode}}, value) - Expect(value.PlacementMode).To(Equal(string(data1alpha1.ShareMode))) + Expect(value.PlacementMode).To(Equal(string(datav1alpha1.ShareMode))) }) }) @@ -383,7 +375,7 @@ var _ = Describe("ThinEngine transform tests", Label("pkg.ddc.thin.transform_tes Value: "b", }} - engine.transformTolerations(&data1alpha1.Dataset{Spec: data1alpha1.DatasetSpec{Tolerations: tolerations}}, value) + engine.transformTolerations(&datav1alpha1.Dataset{Spec: datav1alpha1.DatasetSpec{Tolerations: tolerations}}, value) Expect(value.Tolerations).To(Equal(tolerations)) }) From be1cdabe2d3517177497ab80f1c4fc113d4082d8 Mon Sep 17 00:00:00 2001 From: Harsh Date: Mon, 11 May 2026 07:22:58 +0530 Subject: [PATCH 5/6] seed thin shutdown runtimes Signed-off-by: Harsh --- pkg/ddc/thin/shutdown_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/ddc/thin/shutdown_test.go b/pkg/ddc/thin/shutdown_test.go index ede2bf3300a..427405a9e2b 100644 --- a/pkg/ddc/thin/shutdown_test.go +++ b/pkg/ddc/thin/shutdown_test.go @@ -112,6 +112,10 @@ func TestDestroyWorker(t *testing.T) { for _, nodeInput := range nodeInputs { testNodes = append(testNodes, nodeInput.DeepCopy()) } + testNodes = append(testNodes, + &datav1alpha1.ThinRuntime{ObjectMeta: metav1.ObjectMeta{Name: "spark", Namespace: "fluid"}}, + &datav1alpha1.ThinRuntime{ObjectMeta: metav1.ObjectMeta{Name: "hadoop", Namespace: "fluid"}}, + ) client := fake.NewFakeClientWithScheme(testScheme, testNodes...) From bd997375059978f2d6fcd1f0251fb9643a8d5276 Mon Sep 17 00:00:00 2001 From: Harsh Date: Mon, 11 May 2026 07:30:14 +0530 Subject: [PATCH 6/6] Revert "seed thin shutdown runtimes" This reverts commit be1cdabe2d3517177497ab80f1c4fc113d4082d8. Signed-off-by: Harsh --- pkg/ddc/thin/shutdown_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/ddc/thin/shutdown_test.go b/pkg/ddc/thin/shutdown_test.go index 427405a9e2b..ede2bf3300a 100644 --- a/pkg/ddc/thin/shutdown_test.go +++ b/pkg/ddc/thin/shutdown_test.go @@ -112,10 +112,6 @@ func TestDestroyWorker(t *testing.T) { for _, nodeInput := range nodeInputs { testNodes = append(testNodes, nodeInput.DeepCopy()) } - testNodes = append(testNodes, - &datav1alpha1.ThinRuntime{ObjectMeta: metav1.ObjectMeta{Name: "spark", Namespace: "fluid"}}, - &datav1alpha1.ThinRuntime{ObjectMeta: metav1.ObjectMeta{Name: "hadoop", Namespace: "fluid"}}, - ) client := fake.NewFakeClientWithScheme(testScheme, testNodes...)