From 11a2a1a4bd94bd878743582cf13105c0288c5246 Mon Sep 17 00:00:00 2001 From: Harsh Date: Thu, 5 Mar 2026 20:24:27 +0530 Subject: [PATCH 1/7] test(ctrl): add manager and operation controller tests Signed-off-by: Harsh --- pkg/controllers/manager_test.go | 237 +++++++ pkg/controllers/operation_controller_test.go | 625 +++++++++++++++++++ pkg/controllers/suite_test.go | 29 + 3 files changed, 891 insertions(+) create mode 100644 pkg/controllers/manager_test.go create mode 100644 pkg/controllers/operation_controller_test.go create mode 100644 pkg/controllers/suite_test.go diff --git a/pkg/controllers/manager_test.go b/pkg/controllers/manager_test.go new file mode 100644 index 00000000000..1364f79585d --- /dev/null +++ b/pkg/controllers/manager_test.go @@ -0,0 +1,237 @@ +/* +Copyright 2025 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "os" + "path/filepath" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("NewFluidControllerRateLimiter", func() { + It("should create a rate limiter with valid parameters", func() { + limiter := NewFluidControllerRateLimiter( + 5*time.Millisecond, + 1000*time.Second, + 10, + 100, + ) + Expect(limiter).NotTo(BeNil()) + }) + + It("should return increasing delays for repeated failures on the same item", func() { + limiter := NewFluidControllerRateLimiter( + 5*time.Millisecond, + 1000*time.Second, + 10, + 100, + ) + first := limiter.When("test-item") + second := limiter.When("test-item") + Expect(second).To(BeNumerically(">=", first)) + }) + + It("should return different delays for different items", func() { + limiter := NewFluidControllerRateLimiter( + 5*time.Millisecond, + 1000*time.Second, + 10, + 100, + ) + // Advance item-a several times to increase its delay + for i := 0; i < 5; i++ { + limiter.When("item-a") + } + delayA := limiter.When("item-a") + delayB := limiter.When("item-b") + Expect(delayA).To(BeNumerically(">", delayB)) + }) + + It("should respect the max delay parameter", func() { + maxDelay := 100 * time.Millisecond + limiter := NewFluidControllerRateLimiter( + 5*time.Millisecond, + maxDelay, + 10, + 100, + ) + // Push the item through many failures + for i := 0; i < 100; i++ { + limiter.When("test-item") + } + delay := limiter.When("test-item") + Expect(delay).To(BeNumerically("<=", maxDelay)) + }) + + It("should reset delay after forgetting an item", func() { + limiter := NewFluidControllerRateLimiter( + 5*time.Millisecond, + 1000*time.Second, + 10, + 100, + ) + for i := 0; i < 10; i++ { + limiter.When("test-item") + } + limiter.Forget("test-item") + delay := limiter.When("test-item") + firstDelay := limiter.When("fresh-item") + // After forget, the item's delay should be close to the initial value + Expect(delay).To(BeNumerically("<=", firstDelay*2)) + }) +}) + +var _ = Describe("NewFluidControllerClient", func() { + Describe("HELM_DRIVER env variable handling", func() { + var originalVal string + var wasSet bool + + BeforeEach(func() { + originalVal, wasSet = os.LookupEnv("HELM_DRIVER") + }) + + AfterEach(func() { + if wasSet { + Expect(os.Setenv("HELM_DRIVER", originalVal)).To(Succeed()) + } else { + Expect(os.Unsetenv("HELM_DRIVER")).To(Succeed()) + } + }) + + It("should call NewCacheClientBypassSecrets when HELM_DRIVER is not set", func() { + Expect(os.Unsetenv("HELM_DRIVER")).To(Succeed()) + // NewFluidControllerClient requires a valid rest.Config, which we + // cannot easily provide in a unit test without envtest. We verify + // the branching logic indirectly by checking that the env var lookup + // returns false when unset. + _, exists := os.LookupEnv("HELM_DRIVER") + Expect(exists).To(BeFalse()) + }) + + It("should detect HELM_DRIVER=secret for the default client path", func() { + Expect(os.Setenv("HELM_DRIVER", "secret")).To(Succeed()) + driver, exists := os.LookupEnv("HELM_DRIVER") + Expect(exists).To(BeTrue()) + Expect(driver).To(Equal("secret")) + }) + + It("should use cache-bypass path when HELM_DRIVER is not 'secret'", func() { + Expect(os.Setenv("HELM_DRIVER", "configmap")).To(Succeed()) + driver, exists := os.LookupEnv("HELM_DRIVER") + Expect(exists).To(BeTrue()) + Expect(driver).NotTo(Equal("secret")) + }) + }) +}) + +var _ = Describe("manager client and config helpers", func() { + Describe("NewFluidControllerClient", func() { + var ( + originalVal string + wasSet bool + ) + + BeforeEach(func() { + originalVal, wasSet = os.LookupEnv("HELM_DRIVER") + }) + + AfterEach(func() { + if wasSet { + Expect(os.Setenv("HELM_DRIVER", originalVal)).To(Succeed()) + } else { + Expect(os.Unsetenv("HELM_DRIVER")).To(Succeed()) + } + }) + + It("returns error on secret driver path with nil rest config", func() { + Expect(os.Setenv("HELM_DRIVER", "secret")).To(Succeed()) + + c, err := NewFluidControllerClient(nil, client.Options{}) + Expect(err).To(HaveOccurred()) + Expect(c).To(BeNil()) + }) + + It("returns error on cache-bypass path with nil rest config", func() { + Expect(os.Setenv("HELM_DRIVER", "configmap")).To(Succeed()) + + c, err := NewFluidControllerClient(nil, client.Options{Cache: &client.CacheOptions{}}) + Expect(err).To(HaveOccurred()) + Expect(c).To(BeNil()) + }) + }) + + Describe("NewCacheClientBypassSecrets", func() { + It("returns error with nil rest config", func() { + c, err := NewCacheClientBypassSecrets(nil, client.Options{Cache: &client.CacheOptions{}}) + Expect(err).To(HaveOccurred()) + Expect(c).To(BeNil()) + }) + }) + + Describe("GetConfigOrDieWithQPSAndBurst", func() { + var ( + originalKubeconfig string + wasSet bool + ) + + BeforeEach(func() { + originalKubeconfig, wasSet = os.LookupEnv("KUBECONFIG") + }) + + AfterEach(func() { + if wasSet { + Expect(os.Setenv("KUBECONFIG", originalKubeconfig)).To(Succeed()) + } else { + Expect(os.Unsetenv("KUBECONFIG")).To(Succeed()) + } + }) + + It("sets qps and burst when both are positive", func() { + tmpDir := GinkgoT().TempDir() + kubeconfig := filepath.Join(tmpDir, "config") + content := `apiVersion: v1 +kind: Config +clusters: +- name: local + cluster: + server: https://127.0.0.1:65535 + insecure-skip-tls-verify: true +users: +- name: local-user + user: + token: fake-token +contexts: +- name: local + context: + cluster: local + user: local-user +current-context: local +` + Expect(os.WriteFile(kubeconfig, []byte(content), 0o600)).To(Succeed()) + Expect(os.Setenv("KUBECONFIG", kubeconfig)).To(Succeed()) + + cfg := GetConfigOrDieWithQPSAndBurst(123, 456) + Expect(cfg).NotTo(BeNil()) + Expect(cfg.QPS).To(Equal(float32(123))) + Expect(cfg.Burst).To(Equal(456)) + }) + }) +}) diff --git a/pkg/controllers/operation_controller_test.go b/pkg/controllers/operation_controller_test.go new file mode 100644 index 00000000000..7ee2786837b --- /dev/null +++ b/pkg/controllers/operation_controller_test.go @@ -0,0 +1,625 @@ +/* +Copyright 2025 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "sync" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/dataoperation" + "github.com/fluid-cloudnative/fluid/pkg/ddc" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// mockOperationInterfaceBuilder implements dataoperation.OperationInterfaceBuilder +type mockOperationInterfaceBuilder struct { + buildFunc func(object client.Object) (dataoperation.OperationInterface, error) +} + +func (m *mockOperationInterfaceBuilder) Build(object client.Object) (dataoperation.OperationInterface, error) { + if m.buildFunc != nil { + return m.buildFunc(object) + } + return nil, fmt.Errorf("not implemented") +} + +// mockOperationInterface implements dataoperation.OperationInterface +type mockOperationInterface struct { + operationObject client.Object + releaseNamespacedName types.NamespacedName + targetDataset *datav1alpha1.Dataset + targetDatasetErr error + possibleTargetDatasetNamespacedNames []types.NamespacedName + operationType dataoperation.OperationType +} + +func (m *mockOperationInterface) HasPrecedingOperation() bool { return false } +func (m *mockOperationInterface) GetOperationObject() client.Object { + return m.operationObject +} +func (m *mockOperationInterface) GetPossibleTargetDatasetNamespacedNames() []types.NamespacedName { + return m.possibleTargetDatasetNamespacedNames +} +func (m *mockOperationInterface) GetTargetDataset() (*datav1alpha1.Dataset, error) { + return m.targetDataset, m.targetDatasetErr +} +func (m *mockOperationInterface) GetReleaseNameSpacedName() types.NamespacedName { + return m.releaseNamespacedName +} +func (m *mockOperationInterface) GetChartsDirectory() string { return "" } +func (m *mockOperationInterface) GetOperationType() dataoperation.OperationType { + return m.operationType +} +func (m *mockOperationInterface) UpdateOperationApiStatus(opStatus *datav1alpha1.OperationStatus) error { + return nil +} +func (m *mockOperationInterface) Validate(ctx cruntime.ReconcileRequestContext) ([]datav1alpha1.Condition, error) { + return nil, nil +} +func (m *mockOperationInterface) UpdateStatusInfoForCompleted(infos map[string]string) error { + return nil +} +func (m *mockOperationInterface) SetTargetDatasetStatusInProgress(dataset *datav1alpha1.Dataset) {} +func (m *mockOperationInterface) RemoveTargetDatasetStatusInProgress(dataset *datav1alpha1.Dataset) { +} +func (m *mockOperationInterface) GetStatusHandler() dataoperation.StatusHandler { return nil } +func (m *mockOperationInterface) GetTTL() (ttl *int32, err error) { return nil, nil } +func (m *mockOperationInterface) GetParallelTaskNumber() int32 { return 1 } + +var _ = Describe("NewDataOperationReconciler", func() { + It("should create an OperationReconciler with the provided parameters", func() { + s := runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(s)).To(Succeed()) + + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).Build() + fakeRecorder := record.NewFakeRecorder(10) + log := fake.NullLogger() + + reconciler := NewDataOperationReconciler(nil, fakeClient, log, fakeRecorder) + + Expect(reconciler).NotTo(BeNil()) + Expect(reconciler.Client).To(Equal(fakeClient)) + Expect(reconciler.Log).To(Equal(log)) + Expect(reconciler.Recorder).NotTo(BeNil()) + Expect(reconciler.engines).NotTo(BeNil()) + Expect(reconciler.engines).To(BeEmpty()) + Expect(reconciler.mutex).NotTo(BeNil()) + }) +}) + +var _ = Describe("OperationReconciler engine cache", func() { + var ( + reconciler *OperationReconciler + s *runtime.Scheme + ) + + BeforeEach(func() { + s = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(s)).To(Succeed()) + Expect(corev1.AddToScheme(s)).To(Succeed()) + + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).Build() + fakeRecorder := record.NewFakeRecorder(10) + log := fake.NullLogger() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: log, + Recorder: fakeRecorder, + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + }) + + Describe("RemoveEngine", func() { + It("should remove an engine from the cache by namespaced name", func() { + nn := types.NamespacedName{Namespace: "default", Name: "test-runtime"} + // Use ddc.GenerateEngineID to get the correct key format + id := ddc.GenerateEngineID(nn) + reconciler.engines[id] = nil // placeholder + + Expect(reconciler.engines).To(HaveLen(1)) + + reconciler.RemoveEngine(nn) + + Expect(reconciler.engines).To(BeEmpty()) + }) + + It("should not panic when removing a non-existent engine", func() { + nn := types.NamespacedName{Namespace: "default", Name: "nonexistent"} + Expect(func() { + reconciler.RemoveEngine(nn) + }).NotTo(Panic()) + }) + + It("should only remove the targeted engine and leave others intact", func() { + nn1 := types.NamespacedName{Namespace: "ns1", Name: "runtime1"} + nn2 := types.NamespacedName{Namespace: "ns2", Name: "runtime2"} + reconciler.engines[ddc.GenerateEngineID(nn1)] = nil + reconciler.engines[ddc.GenerateEngineID(nn2)] = nil + + Expect(reconciler.engines).To(HaveLen(2)) + + reconciler.RemoveEngine(nn1) + + Expect(reconciler.engines).To(HaveLen(1)) + _, exists := reconciler.engines[ddc.GenerateEngineID(nn2)] + Expect(exists).To(BeTrue()) + }) + + It("should handle concurrent RemoveEngine calls safely", func() { + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < 100; i++ { + reconciler.RemoveEngine(types.NamespacedName{ + Namespace: "ns", + Name: "runtime", + }) + } + }() + + for i := 0; i < 100; i++ { + reconciler.RemoveEngine(types.NamespacedName{ + Namespace: "ns", + Name: "runtime", + }) + } + + Eventually(done).Should(BeClosed()) + }) + }) + + Describe("GetOrCreateEngine", func() { + It("should return cached engine when it already exists", func() { + nn := types.NamespacedName{Namespace: "default", Name: "test-runtime"} + id := ddc.GenerateEngineID(nn) + existingEngine := &fakeEngine{id: id} + reconciler.engines[id] = existingEngine + + ctx := dataoperation.ReconcileRequestContext{ + ReconcileRequestContext: cruntime.ReconcileRequestContext{ + Context: context.Background(), + NamespacedName: types.NamespacedName{ + Name: "test-runtime", + Namespace: "default", + }, + }, + } + + engine, err := reconciler.GetOrCreateEngine(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(engine).To(Equal(existingEngine)) + }) + }) +}) + +var _ = Describe("OperationReconciler getRuntimeObjectAndEngineImpl", func() { + var ( + reconciler *OperationReconciler + s *runtime.Scheme + ) + + BeforeEach(func() { + s = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(s)).To(Succeed()) + Expect(corev1.AddToScheme(s)).To(Succeed()) + }) + + It("should return an error for unsupported runtime type", func() { + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).Build() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: fake.NullLogger(), + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + + _, _, err := reconciler.getRuntimeObjectAndEngineImpl("unsupported-runtime", "test", "default") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not supported")) + }) + + It("should return an error when alluxio runtime is not found", func() { + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).Build() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: fake.NullLogger(), + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + + _, _, err := reconciler.getRuntimeObjectAndEngineImpl(common.AlluxioRuntime, "nonexistent", "default") + Expect(err).To(HaveOccurred()) + }) + + It("should return the runtime object and engine impl for an existing alluxio runtime", func() { + alluxioRuntime := &datav1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-alluxio", + Namespace: "default", + }, + } + + fakeClient := fakeclient.NewClientBuilder(). + WithScheme(s). + WithObjects(alluxioRuntime). + Build() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: fake.NullLogger(), + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + + obj, engineImpl, err := reconciler.getRuntimeObjectAndEngineImpl(common.AlluxioRuntime, "test-alluxio", "default") + Expect(err).NotTo(HaveOccurred()) + Expect(obj).NotTo(BeNil()) + Expect(engineImpl).To(Equal(common.AlluxioEngineImpl)) + }) + + It("should return ThinEngineImpl for thin runtime type", func() { + thinRuntime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-thin", + Namespace: "default", + }, + } + + fakeClient := fakeclient.NewClientBuilder(). + WithScheme(s). + WithObjects(thinRuntime). + Build() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: fake.NullLogger(), + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + + obj, engineImpl, err := reconciler.getRuntimeObjectAndEngineImpl(common.ThinRuntime, "test-thin", "default") + Expect(err).NotTo(HaveOccurred()) + Expect(obj).NotTo(BeNil()) + Expect(engineImpl).To(Equal(common.ThinEngineImpl)) + }) + + DescribeTable("should return not-found error for missing runtimes of each type", + func(runtimeType string) { + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).Build() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: fake.NullLogger(), + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + + _, _, err := reconciler.getRuntimeObjectAndEngineImpl(runtimeType, "nonexistent", "default") + Expect(err).To(HaveOccurred()) + }, + Entry("alluxio", common.AlluxioRuntime), + Entry("jindo", common.JindoRuntime), + Entry("goosefs", common.GooseFSRuntime), + Entry("juicefs", common.JuiceFSRuntime), + Entry("efc", common.EFCRuntime), + Entry("thin", common.ThinRuntime), + Entry("vineyard", common.VineyardRuntime), + ) + + DescribeTable("should return the correct engine impl for existing runtimes", + func(runtimeType string, runtimeObj runtime.Object, expectedEngineImpl string) { + fakeClient := fakeclient.NewClientBuilder(). + WithScheme(s). + WithRuntimeObjects(runtimeObj). + Build() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: fake.NullLogger(), + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + + obj, engineImpl, err := reconciler.getRuntimeObjectAndEngineImpl(runtimeType, "test-runtime", "default") + Expect(err).NotTo(HaveOccurred()) + Expect(obj).NotTo(BeNil()) + Expect(engineImpl).To(Equal(expectedEngineImpl)) + }, + Entry("goosefs", common.GooseFSRuntime, &datav1alpha1.GooseFSRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + }, common.GooseFSEngineImpl), + Entry("juicefs", common.JuiceFSRuntime, &datav1alpha1.JuiceFSRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + }, common.JuiceFSEngineImpl), + Entry("efc", common.EFCRuntime, &datav1alpha1.EFCRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + }, common.EFCEngineImpl), + Entry("vineyard", common.VineyardRuntime, &datav1alpha1.VineyardRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + }, common.VineyardEngineImpl), + ) +}) + +var _ = Describe("OperationReconciler addFinalizerAndRequeue", func() { + var ( + reconciler *OperationReconciler + s *runtime.Scheme + ) + + BeforeEach(func() { + s = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(s)).To(Succeed()) + Expect(corev1.AddToScheme(s)).To(Succeed()) + }) + + It("should add a finalizer to the data operation object and requeue", func() { + dataload := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataload", + Namespace: "default", + }, + } + + fakeClient := fakeclient.NewClientBuilder(). + WithScheme(s). + WithObjects(dataload). + Build() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: fake.NullLogger(), + Recorder: record.NewFakeRecorder(10), + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + + ctx := dataoperation.ReconcileRequestContext{ + ReconcileRequestContext: cruntime.ReconcileRequestContext{ + Context: context.Background(), + Dataset: &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + }, + }, + }, + DataOpFinalizerName: "fluid.io/dataload-finalizer", + } + ctx.Log = fake.NullLogger() + + result, err := reconciler.addFinalizerAndRequeue(ctx, dataload) + Expect(err).NotTo(HaveOccurred()) + // Should requeue + Expect(result.Requeue || result.RequeueAfter > 0 || !result.IsZero()).To(BeTrue()) + + // Verify the finalizer was added + updatedDataload := &datav1alpha1.DataLoad{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{ + Name: "test-dataload", + Namespace: "default", + }, updatedDataload)).To(Succeed()) + Expect(updatedDataload.GetFinalizers()).To(ContainElement("fluid.io/dataload-finalizer")) + }) +}) + +var _ = Describe("OperationReconciler addOwnerAndRequeue", func() { + var ( + reconciler *OperationReconciler + s *runtime.Scheme + ) + + BeforeEach(func() { + s = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(s)).To(Succeed()) + Expect(corev1.AddToScheme(s)).To(Succeed()) + }) + + It("should add an owner reference and requeue", func() { + dataload := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataload", + Namespace: "default", + }, + } + + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-dataset", + Namespace: "default", + UID: "dataset-uid-456", + }, + } + + fakeClient := fakeclient.NewClientBuilder(). + WithScheme(s). + WithObjects(dataload, dataset). + Build() + + reconciler = &OperationReconciler{ + Client: fakeClient, + Log: fake.NullLogger(), + Recorder: record.NewFakeRecorder(10), + engines: map[string]base.Engine{}, + mutex: &sync.Mutex{}, + } + + ctx := dataoperation.ReconcileRequestContext{ + ReconcileRequestContext: cruntime.ReconcileRequestContext{ + Context: context.Background(), + }, + } + ctx.Log = fake.NullLogger() + + result, err := reconciler.addOwnerAndRequeue(ctx, dataload, dataset) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + + // Verify the owner reference was added + updatedDataload := &datav1alpha1.DataLoad{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{ + Name: "test-dataload", + Namespace: "default", + }, updatedDataload)).To(Succeed()) + Expect(updatedDataload.GetOwnerReferences()).To(HaveLen(1)) + Expect(updatedDataload.GetOwnerReferences()[0].Name).To(Equal("test-dataset")) + Expect(updatedDataload.GetOwnerReferences()[0].UID).To(Equal(types.UID("dataset-uid-456"))) + }) +}) + +var _ = Describe("OperationReconciler ReconcileInternal", func() { + var ( + reconciler *OperationReconciler + s *runtime.Scheme + ) + + BeforeEach(func() { + s = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(s)).To(Succeed()) + Expect(corev1.AddToScheme(s)).To(Succeed()) + }) + + It("requeues when building operation interface fails", func() { + obj := &datav1alpha1.DataLoad{ObjectMeta: metav1.ObjectMeta{Name: "load", Namespace: "default"}} + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).Build() + + reconciler = NewDataOperationReconciler(&mockOperationInterfaceBuilder{ + buildFunc: func(object client.Object) (dataoperation.OperationInterface, error) { + return nil, fmt.Errorf("build failed") + }, + }, fakeClient, fake.NullLogger(), record.NewFakeRecorder(10)) + + ctx := dataoperation.ReconcileRequestContext{ + ReconcileRequestContext: cruntime.ReconcileRequestContext{Context: context.Background()}, + DataObject: obj, + } + ctx.Log = fake.NullLogger() + + result, err := reconciler.ReconcileInternal(ctx) + Expect(err).To(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + + It("requeues when target dataset is not found", func() { + obj := &datav1alpha1.DataLoad{ObjectMeta: metav1.ObjectMeta{Name: "load", Namespace: "default"}} + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).Build() + + reconciler = NewDataOperationReconciler(&mockOperationInterfaceBuilder{ + buildFunc: func(object client.Object) (dataoperation.OperationInterface, error) { + return &mockOperationInterface{ + operationObject: obj, + operationType: dataoperation.DataLoadType, + targetDatasetErr: apierrors.NewNotFound(schema.GroupResource{Group: "data.fluid.io", Resource: "datasets"}, "missing"), + }, nil + }, + }, fakeClient, fake.NullLogger(), record.NewFakeRecorder(10)) + + ctx := dataoperation.ReconcileRequestContext{ + ReconcileRequestContext: cruntime.ReconcileRequestContext{Context: context.Background()}, + DataObject: obj, + } + ctx.Log = fake.NullLogger() + + result, err := reconciler.ReconcileInternal(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) + }) + + It("requeues when no accelerate runtime is bound on target dataset", func() { + obj := &datav1alpha1.DataLoad{ObjectMeta: metav1.ObjectMeta{Name: "load", Namespace: "default"}} + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "dataset", Namespace: "default"}, + } + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).WithObjects(dataset).Build() + + reconciler = NewDataOperationReconciler(&mockOperationInterfaceBuilder{ + buildFunc: func(object client.Object) (dataoperation.OperationInterface, error) { + return &mockOperationInterface{ + operationObject: obj, + operationType: dataoperation.DataLoadType, + targetDataset: dataset, + }, nil + }, + }, fakeClient, fake.NullLogger(), record.NewFakeRecorder(10)) + + ctx := dataoperation.ReconcileRequestContext{ + ReconcileRequestContext: cruntime.ReconcileRequestContext{Context: context.Background()}, + DataObject: obj, + } + ctx.Log = fake.NullLogger() + + result, err := reconciler.ReconcileInternal(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) + }) + + It("returns without requeue when bound runtime is not found", func() { + obj := &datav1alpha1.DataLoad{ObjectMeta: metav1.ObjectMeta{Name: "load", Namespace: "default"}} + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "dataset", Namespace: "default"}, + Status: datav1alpha1.DatasetStatus{Runtimes: []datav1alpha1.Runtime{{ + Name: "missing-runtime", + Namespace: "default", + Type: common.AlluxioRuntime, + Category: common.AccelerateCategory, + }}}, + } + fakeClient := fakeclient.NewClientBuilder().WithScheme(s).WithObjects(dataset).Build() + + reconciler = NewDataOperationReconciler(&mockOperationInterfaceBuilder{ + buildFunc: func(object client.Object) (dataoperation.OperationInterface, error) { + return &mockOperationInterface{ + operationObject: obj, + operationType: dataoperation.DataLoadType, + targetDataset: dataset, + }, nil + }, + }, fakeClient, fake.NullLogger(), record.NewFakeRecorder(10)) + + ctx := dataoperation.ReconcileRequestContext{ + ReconcileRequestContext: cruntime.ReconcileRequestContext{Context: context.Background()}, + DataObject: obj, + } + ctx.Log = fake.NullLogger() + + result, err := reconciler.ReconcileInternal(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) +}) diff --git a/pkg/controllers/suite_test.go b/pkg/controllers/suite_test.go new file mode 100644 index 00000000000..5d4eec40ad0 --- /dev/null +++ b/pkg/controllers/suite_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2025 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestControllers(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Package Controllers Suite") +} From 9135adefe2f1f6a59378df87ec035ed117b59251 Mon Sep 17 00:00:00 2001 From: Harsh Date: Thu, 5 Mar 2026 20:26:23 +0530 Subject: [PATCH 2/7] test(ctrl): add core engine test helper Signed-off-by: Harsh --- pkg/controllers/fake_engine_core_test.go | 33 ++++++++++++++++++++ pkg/controllers/operation_controller_test.go | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 pkg/controllers/fake_engine_core_test.go diff --git a/pkg/controllers/fake_engine_core_test.go b/pkg/controllers/fake_engine_core_test.go new file mode 100644 index 00000000000..ff6d1a6bd24 --- /dev/null +++ b/pkg/controllers/fake_engine_core_test.go @@ -0,0 +1,33 @@ +package controllers + +import ( + ctrl "sigs.k8s.io/controller-runtime" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/dataoperation" + cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" +) + +type fakeEngineCore struct { + id string +} + +func (e *fakeEngineCore) ID() string { return e.id } + +func (e *fakeEngineCore) Shutdown() error { return nil } + +func (e *fakeEngineCore) Setup(ctx cruntime.ReconcileRequestContext) (bool, error) { + return true, nil +} + +func (e *fakeEngineCore) CreateVolume() error { return nil } + +func (e *fakeEngineCore) DeleteVolume() error { return nil } + +func (e *fakeEngineCore) Sync(ctx cruntime.ReconcileRequestContext) error { return nil } + +func (e *fakeEngineCore) Validate(ctx cruntime.ReconcileRequestContext) error { return nil } + +func (e *fakeEngineCore) Operate(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus, operation dataoperation.OperationInterface) (ctrl.Result, error) { + return ctrl.Result{}, nil +} diff --git a/pkg/controllers/operation_controller_test.go b/pkg/controllers/operation_controller_test.go index 7ee2786837b..363b3547894 100644 --- a/pkg/controllers/operation_controller_test.go +++ b/pkg/controllers/operation_controller_test.go @@ -207,7 +207,7 @@ var _ = Describe("OperationReconciler engine cache", func() { It("should return cached engine when it already exists", func() { nn := types.NamespacedName{Namespace: "default", Name: "test-runtime"} id := ddc.GenerateEngineID(nn) - existingEngine := &fakeEngine{id: id} + existingEngine := &fakeEngineCore{id: id} reconciler.engines[id] = existingEngine ctx := dataoperation.ReconcileRequestContext{ From 4b7cf67afe45634bcca26490f902139ea401ab17 Mon Sep 17 00:00:00 2001 From: Harsh Date: Thu, 5 Mar 2026 21:02:48 +0530 Subject: [PATCH 3/7] test(ctrl): tighten controller helper coverage from bot review Signed-off-by: Harsh --- pkg/controllers/manager_test.go | 43 -------------------- pkg/controllers/operation_controller_test.go | 10 +++++ 2 files changed, 10 insertions(+), 43 deletions(-) diff --git a/pkg/controllers/manager_test.go b/pkg/controllers/manager_test.go index 1364f79585d..adb5261751b 100644 --- a/pkg/controllers/manager_test.go +++ b/pkg/controllers/manager_test.go @@ -99,49 +99,6 @@ var _ = Describe("NewFluidControllerRateLimiter", func() { }) }) -var _ = Describe("NewFluidControllerClient", func() { - Describe("HELM_DRIVER env variable handling", func() { - var originalVal string - var wasSet bool - - BeforeEach(func() { - originalVal, wasSet = os.LookupEnv("HELM_DRIVER") - }) - - AfterEach(func() { - if wasSet { - Expect(os.Setenv("HELM_DRIVER", originalVal)).To(Succeed()) - } else { - Expect(os.Unsetenv("HELM_DRIVER")).To(Succeed()) - } - }) - - It("should call NewCacheClientBypassSecrets when HELM_DRIVER is not set", func() { - Expect(os.Unsetenv("HELM_DRIVER")).To(Succeed()) - // NewFluidControllerClient requires a valid rest.Config, which we - // cannot easily provide in a unit test without envtest. We verify - // the branching logic indirectly by checking that the env var lookup - // returns false when unset. - _, exists := os.LookupEnv("HELM_DRIVER") - Expect(exists).To(BeFalse()) - }) - - It("should detect HELM_DRIVER=secret for the default client path", func() { - Expect(os.Setenv("HELM_DRIVER", "secret")).To(Succeed()) - driver, exists := os.LookupEnv("HELM_DRIVER") - Expect(exists).To(BeTrue()) - Expect(driver).To(Equal("secret")) - }) - - It("should use cache-bypass path when HELM_DRIVER is not 'secret'", func() { - Expect(os.Setenv("HELM_DRIVER", "configmap")).To(Succeed()) - driver, exists := os.LookupEnv("HELM_DRIVER") - Expect(exists).To(BeTrue()) - Expect(driver).NotTo(Equal("secret")) - }) - }) -}) - var _ = Describe("manager client and config helpers", func() { Describe("NewFluidControllerClient", func() { var ( diff --git a/pkg/controllers/operation_controller_test.go b/pkg/controllers/operation_controller_test.go index 363b3547894..6d8d6234a44 100644 --- a/pkg/controllers/operation_controller_test.go +++ b/pkg/controllers/operation_controller_test.go @@ -41,6 +41,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/ddc/base" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -365,6 +366,9 @@ var _ = Describe("OperationReconciler getRuntimeObjectAndEngineImpl", func() { Entry("goosefs", common.GooseFSRuntime, &datav1alpha1.GooseFSRuntime{ ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, }, common.GooseFSEngineImpl), + Entry("jindo", common.JindoRuntime, &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + }, jindoutils.GetDefaultEngineImpl()), Entry("juicefs", common.JuiceFSRuntime, &datav1alpha1.JuiceFSRuntime{ ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, }, common.JuiceFSEngineImpl), @@ -460,6 +464,10 @@ var _ = Describe("OperationReconciler addOwnerAndRequeue", func() { } dataset := &datav1alpha1.Dataset{ + TypeMeta: metav1.TypeMeta{ + Kind: "Dataset", + APIVersion: "data.fluid.io/v1alpha1", + }, ObjectMeta: metav1.ObjectMeta{ Name: "test-dataset", Namespace: "default", @@ -500,6 +508,8 @@ var _ = Describe("OperationReconciler addOwnerAndRequeue", func() { Expect(updatedDataload.GetOwnerReferences()).To(HaveLen(1)) Expect(updatedDataload.GetOwnerReferences()[0].Name).To(Equal("test-dataset")) Expect(updatedDataload.GetOwnerReferences()[0].UID).To(Equal(types.UID("dataset-uid-456"))) + Expect(updatedDataload.GetOwnerReferences()[0].APIVersion).To(Equal("data.fluid.io/v1alpha1")) + Expect(updatedDataload.GetOwnerReferences()[0].Kind).To(Equal("Dataset")) }) }) From 69d2cd082cd2a8c989775e6138d1c1f092a98795 Mon Sep 17 00:00:00 2001 From: Harsh Date: Thu, 5 Mar 2026 22:16:06 +0530 Subject: [PATCH 4/7] test(controllers): isolate GetConfigOrDie test in subprocess Signed-off-by: Harsh --- pkg/controllers/manager_test.go | 17 +++++++++++++---- pkg/controllers/operation_controller_test.go | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/controllers/manager_test.go b/pkg/controllers/manager_test.go index adb5261751b..d86b580b5f8 100644 --- a/pkg/controllers/manager_test.go +++ b/pkg/controllers/manager_test.go @@ -18,6 +18,7 @@ package controllers import ( "os" + "os/exec" "path/filepath" "time" @@ -162,6 +163,14 @@ var _ = Describe("manager client and config helpers", func() { }) It("sets qps and burst when both are positive", func() { + if os.Getenv("FLUID_GET_CONFIG_SUBPROCESS") == "1" { + cfg := GetConfigOrDieWithQPSAndBurst(123, 456) + if cfg == nil || cfg.QPS != float32(123) || cfg.Burst != 456 { + os.Exit(2) + } + os.Exit(0) + } + tmpDir := GinkgoT().TempDir() kubeconfig := filepath.Join(tmpDir, "config") content := `apiVersion: v1 @@ -185,10 +194,10 @@ current-context: local Expect(os.WriteFile(kubeconfig, []byte(content), 0o600)).To(Succeed()) Expect(os.Setenv("KUBECONFIG", kubeconfig)).To(Succeed()) - cfg := GetConfigOrDieWithQPSAndBurst(123, 456) - Expect(cfg).NotTo(BeNil()) - Expect(cfg.QPS).To(Equal(float32(123))) - Expect(cfg.Burst).To(Equal(456)) + cmd := exec.Command(os.Args[0], "-test.run=TestControllers", "-ginkgo.focus=sets qps and burst when both are positive") + cmd.Env = append(os.Environ(), "FLUID_GET_CONFIG_SUBPROCESS=1", "KUBECONFIG="+kubeconfig) + out, err := cmd.CombinedOutput() + Expect(err).NotTo(HaveOccurred(), string(out)) }) }) }) diff --git a/pkg/controllers/operation_controller_test.go b/pkg/controllers/operation_controller_test.go index 6d8d6234a44..f66add78104 100644 --- a/pkg/controllers/operation_controller_test.go +++ b/pkg/controllers/operation_controller_test.go @@ -525,7 +525,7 @@ var _ = Describe("OperationReconciler ReconcileInternal", func() { Expect(corev1.AddToScheme(s)).To(Succeed()) }) - It("requeues when building operation interface fails", func() { + It("returns error when building operation interface fails", func() { obj := &datav1alpha1.DataLoad{ObjectMeta: metav1.ObjectMeta{Name: "load", Namespace: "default"}} fakeClient := fakeclient.NewClientBuilder().WithScheme(s).Build() From 13b970ef40f41e939c3b4eb7bcfc868d5d4edb84 Mon Sep 17 00:00:00 2001 From: Harsh Date: Fri, 6 Mar 2026 21:57:46 +0530 Subject: [PATCH 5/7] fix: update copyright year to 2026 in test files Signed-off-by: Harsh --- pkg/controllers/fake_engine_core_test.go | 16 ++++++++++++++++ pkg/controllers/manager_test.go | 2 +- pkg/controllers/operation_controller_test.go | 2 +- pkg/controllers/suite_test.go | 2 +- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/controllers/fake_engine_core_test.go b/pkg/controllers/fake_engine_core_test.go index ff6d1a6bd24..4c8be88f1c9 100644 --- a/pkg/controllers/fake_engine_core_test.go +++ b/pkg/controllers/fake_engine_core_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package controllers import ( diff --git a/pkg/controllers/manager_test.go b/pkg/controllers/manager_test.go index d86b580b5f8..edd23ea0cba 100644 --- a/pkg/controllers/manager_test.go +++ b/pkg/controllers/manager_test.go @@ -1,5 +1,5 @@ /* -Copyright 2025 The Fluid Authors. +Copyright 2026 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/controllers/operation_controller_test.go b/pkg/controllers/operation_controller_test.go index f66add78104..2ec5ba0c556 100644 --- a/pkg/controllers/operation_controller_test.go +++ b/pkg/controllers/operation_controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2025 The Fluid Authors. +Copyright 2026 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/controllers/suite_test.go b/pkg/controllers/suite_test.go index 5d4eec40ad0..6cc71653867 100644 --- a/pkg/controllers/suite_test.go +++ b/pkg/controllers/suite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2025 The Fluid Authors. +Copyright 2026 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 43b15a15bb57c610e4005e85bb3a1a3d543e4db1 Mon Sep 17 00:00:00 2001 From: Harsh Date: Fri, 6 Mar 2026 22:13:05 +0530 Subject: [PATCH 6/7] test(ctrl): address sonarqube bot comments Signed-off-by: Harsh --- pkg/controllers/manager_test.go | 16 ++++---- pkg/controllers/operation_controller_test.go | 43 ++++++++++++-------- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/pkg/controllers/manager_test.go b/pkg/controllers/manager_test.go index edd23ea0cba..8e5c51bbf0a 100644 --- a/pkg/controllers/manager_test.go +++ b/pkg/controllers/manager_test.go @@ -27,6 +27,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +const testItem = "test-item" + var _ = Describe("NewFluidControllerRateLimiter", func() { It("should create a rate limiter with valid parameters", func() { limiter := NewFluidControllerRateLimiter( @@ -45,8 +47,8 @@ var _ = Describe("NewFluidControllerRateLimiter", func() { 10, 100, ) - first := limiter.When("test-item") - second := limiter.When("test-item") + first := limiter.When(testItem) + second := limiter.When(testItem) Expect(second).To(BeNumerically(">=", first)) }) @@ -76,9 +78,9 @@ var _ = Describe("NewFluidControllerRateLimiter", func() { ) // Push the item through many failures for i := 0; i < 100; i++ { - limiter.When("test-item") + limiter.When(testItem) } - delay := limiter.When("test-item") + delay := limiter.When(testItem) Expect(delay).To(BeNumerically("<=", maxDelay)) }) @@ -90,10 +92,10 @@ var _ = Describe("NewFluidControllerRateLimiter", func() { 100, ) for i := 0; i < 10; i++ { - limiter.When("test-item") + limiter.When(testItem) } - limiter.Forget("test-item") - delay := limiter.When("test-item") + limiter.Forget(testItem) + delay := limiter.When(testItem) firstDelay := limiter.When("fresh-item") // After forget, the item's delay should be close to the initial value Expect(delay).To(BeNumerically("<=", firstDelay*2)) diff --git a/pkg/controllers/operation_controller_test.go b/pkg/controllers/operation_controller_test.go index 2ec5ba0c556..93c0c614862 100644 --- a/pkg/controllers/operation_controller_test.go +++ b/pkg/controllers/operation_controller_test.go @@ -45,6 +45,12 @@ import ( fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" ) +const ( + testRuntimeName = "test-runtime" + testDataloadName = "test-dataload" + testDatasetName = "test-dataset" +) + // mockOperationInterfaceBuilder implements dataoperation.OperationInterfaceBuilder type mockOperationInterfaceBuilder struct { buildFunc func(object client.Object) (dataoperation.OperationInterface, error) @@ -93,8 +99,11 @@ func (m *mockOperationInterface) Validate(ctx cruntime.ReconcileRequestContext) func (m *mockOperationInterface) UpdateStatusInfoForCompleted(infos map[string]string) error { return nil } -func (m *mockOperationInterface) SetTargetDatasetStatusInProgress(dataset *datav1alpha1.Dataset) {} +func (m *mockOperationInterface) SetTargetDatasetStatusInProgress(dataset *datav1alpha1.Dataset) { + // No-op for test mock +} func (m *mockOperationInterface) RemoveTargetDatasetStatusInProgress(dataset *datav1alpha1.Dataset) { + // No-op for test mock } func (m *mockOperationInterface) GetStatusHandler() dataoperation.StatusHandler { return nil } func (m *mockOperationInterface) GetTTL() (ttl *int32, err error) { return nil, nil } @@ -147,7 +156,7 @@ var _ = Describe("OperationReconciler engine cache", func() { Describe("RemoveEngine", func() { It("should remove an engine from the cache by namespaced name", func() { - nn := types.NamespacedName{Namespace: "default", Name: "test-runtime"} + nn := types.NamespacedName{Namespace: "default", Name: testRuntimeName} // Use ddc.GenerateEngineID to get the correct key format id := ddc.GenerateEngineID(nn) reconciler.engines[id] = nil // placeholder @@ -206,7 +215,7 @@ var _ = Describe("OperationReconciler engine cache", func() { Describe("GetOrCreateEngine", func() { It("should return cached engine when it already exists", func() { - nn := types.NamespacedName{Namespace: "default", Name: "test-runtime"} + nn := types.NamespacedName{Namespace: "default", Name: testRuntimeName} id := ddc.GenerateEngineID(nn) existingEngine := &fakeEngineCore{id: id} reconciler.engines[id] = existingEngine @@ -215,7 +224,7 @@ var _ = Describe("OperationReconciler engine cache", func() { ReconcileRequestContext: cruntime.ReconcileRequestContext{ Context: context.Background(), NamespacedName: types.NamespacedName{ - Name: "test-runtime", + Name: testRuntimeName, Namespace: "default", }, }, @@ -358,25 +367,25 @@ var _ = Describe("OperationReconciler getRuntimeObjectAndEngineImpl", func() { mutex: &sync.Mutex{}, } - obj, engineImpl, err := reconciler.getRuntimeObjectAndEngineImpl(runtimeType, "test-runtime", "default") + obj, engineImpl, err := reconciler.getRuntimeObjectAndEngineImpl(runtimeType, testRuntimeName, "default") Expect(err).NotTo(HaveOccurred()) Expect(obj).NotTo(BeNil()) Expect(engineImpl).To(Equal(expectedEngineImpl)) }, Entry("goosefs", common.GooseFSRuntime, &datav1alpha1.GooseFSRuntime{ - ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: testRuntimeName, Namespace: "default"}, }, common.GooseFSEngineImpl), Entry("jindo", common.JindoRuntime, &datav1alpha1.JindoRuntime{ - ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: testRuntimeName, Namespace: "default"}, }, jindoutils.GetDefaultEngineImpl()), Entry("juicefs", common.JuiceFSRuntime, &datav1alpha1.JuiceFSRuntime{ - ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: testRuntimeName, Namespace: "default"}, }, common.JuiceFSEngineImpl), Entry("efc", common.EFCRuntime, &datav1alpha1.EFCRuntime{ - ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: testRuntimeName, Namespace: "default"}, }, common.EFCEngineImpl), Entry("vineyard", common.VineyardRuntime, &datav1alpha1.VineyardRuntime{ - ObjectMeta: metav1.ObjectMeta{Name: "test-runtime", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: testRuntimeName, Namespace: "default"}, }, common.VineyardEngineImpl), ) }) @@ -396,7 +405,7 @@ var _ = Describe("OperationReconciler addFinalizerAndRequeue", func() { It("should add a finalizer to the data operation object and requeue", func() { dataload := &datav1alpha1.DataLoad{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-dataload", + Name: testDataloadName, Namespace: "default", }, } @@ -419,7 +428,7 @@ var _ = Describe("OperationReconciler addFinalizerAndRequeue", func() { Context: context.Background(), Dataset: &datav1alpha1.Dataset{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-dataset", + Name: testDatasetName, Namespace: "default", }, }, @@ -436,7 +445,7 @@ var _ = Describe("OperationReconciler addFinalizerAndRequeue", func() { // Verify the finalizer was added updatedDataload := &datav1alpha1.DataLoad{} Expect(fakeClient.Get(context.Background(), types.NamespacedName{ - Name: "test-dataload", + Name: testDataloadName, Namespace: "default", }, updatedDataload)).To(Succeed()) Expect(updatedDataload.GetFinalizers()).To(ContainElement("fluid.io/dataload-finalizer")) @@ -458,7 +467,7 @@ var _ = Describe("OperationReconciler addOwnerAndRequeue", func() { It("should add an owner reference and requeue", func() { dataload := &datav1alpha1.DataLoad{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-dataload", + Name: testDataloadName, Namespace: "default", }, } @@ -469,7 +478,7 @@ var _ = Describe("OperationReconciler addOwnerAndRequeue", func() { APIVersion: "data.fluid.io/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Name: "test-dataset", + Name: testDatasetName, Namespace: "default", UID: "dataset-uid-456", }, @@ -502,11 +511,11 @@ var _ = Describe("OperationReconciler addOwnerAndRequeue", func() { // Verify the owner reference was added updatedDataload := &datav1alpha1.DataLoad{} Expect(fakeClient.Get(context.Background(), types.NamespacedName{ - Name: "test-dataload", + Name: testDataloadName, Namespace: "default", }, updatedDataload)).To(Succeed()) Expect(updatedDataload.GetOwnerReferences()).To(HaveLen(1)) - Expect(updatedDataload.GetOwnerReferences()[0].Name).To(Equal("test-dataset")) + Expect(updatedDataload.GetOwnerReferences()[0].Name).To(Equal(testDatasetName)) Expect(updatedDataload.GetOwnerReferences()[0].UID).To(Equal(types.UID("dataset-uid-456"))) Expect(updatedDataload.GetOwnerReferences()[0].APIVersion).To(Equal("data.fluid.io/v1alpha1")) Expect(updatedDataload.GetOwnerReferences()[0].Kind).To(Equal("Dataset")) From e8b20c085cf0ad9c46066329a935a642a672eb54 Mon Sep 17 00:00:00 2001 From: Harsh Date: Sat, 7 Mar 2026 02:48:29 +0530 Subject: [PATCH 7/7] test(jindo): improve minio setup and data seeding in e2e script Signed-off-by: Harsh --- test/gha-e2e/jindo/test.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/gha-e2e/jindo/test.sh b/test/gha-e2e/jindo/test.sh index 6d0b4adafe8..0206a63852d 100644 --- a/test/gha-e2e/jindo/test.sh +++ b/test/gha-e2e/jindo/test.sh @@ -17,10 +17,10 @@ function panic() { function setup_minio() { kubectl create -f test/gha-e2e/jindo/minio.yaml - minio_pod=$(kubectl get pod -oname | grep minio) - kubectl wait --for=condition=Ready $minio_pod + minio_pod=$(kubectl get pod -l app=minio -oname) + kubectl wait --for=condition=Ready $minio_pod --timeout=180s || panic "minio pod is not ready" - kubectl exec -it $minio_pod -- /bin/bash -c 'mc alias set myminio http://127.0.0.1:9000 minioadmin minioadmin && mc mb myminio/mybucket && echo "helloworld" > testfile && mc mv testfile myminio/mybucket/subpath/testfile && mc cat myminio/mybucket/subpath/testfile' + kubectl exec $minio_pod -- /bin/sh -c 'mc alias set myminio http://127.0.0.1:9000 minioadmin minioadmin && mc mb myminio/mybucket && echo "helloworld" > testfile && mc mv testfile myminio/mybucket/subpath/testfile && mc cat myminio/mybucket/subpath/testfile' || panic "failed to seed data into minio" } function create_dataset() {