diff --git a/pkg/controllers/v1alpha1/dataset/dataset_controller_ut_test.go b/pkg/controllers/v1alpha1/dataset/dataset_controller_ut_test.go new file mode 100644 index 00000000000..679a595edd4 --- /dev/null +++ b/pkg/controllers/v1alpha1/dataset/dataset_controller_ut_test.go @@ -0,0 +1,505 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataset + +import ( + "context" + "fmt" + "time" + + "github.com/agiledragon/gomonkey/v2" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/controllers/deploy" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("Dataset Controller Unit", func() { + var scheme *runtime.Scheme + var scaleoutPatch *gomonkey.Patches + + BeforeEach(func() { + scheme = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(scheme)).NotTo(HaveOccurred()) + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + scaleoutPatch = gomonkey.ApplyFunc(deploy.ScaleoutRuntimeControllerOnDemand, + func(client.Client, types.NamespacedName, logr.Logger) (string, bool, error) { + return "", false, nil + }) + }) + + AfterEach(func() { + scaleoutPatch.Reset() + }) + + Describe("ControllerName", func() { + It("should return the dataset controller name", func() { + r := &DatasetReconciler{} + + Expect(r.ControllerName()).To(Equal(controllerName)) + }) + }) + + Describe("Reconcile", func() { + It("should return empty result when dataset is not found", func() { + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + } + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "missing", Namespace: "default"}, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + + It("should return an error when dataset name violates DNS-1035 validation", func() { + invalidName := "20-dataset" + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: invalidName, + Namespace: "default", + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, dataset), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + } + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: invalidName, Namespace: "default"}, + }) + + Expect(err).To(MatchError(ContainSubstring("metadata.name"))) + }) + + It("should add the finalizer and requeue for a new dataset", func() { + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "sample-dataset", + Namespace: "default", + Generation: 1, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, dataset), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + ResyncPeriod: 30 * time.Second, + } + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: dataset.Name, Namespace: dataset.Namespace}, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + + updatedDataset := &datav1alpha1.Dataset{} + Expect(r.Get(context.Background(), types.NamespacedName{Name: dataset.Name, Namespace: dataset.Namespace}, updatedDataset)).To(Succeed()) + Expect(updatedDataset.GetFinalizers()).To(ContainElement(finalizer)) + }) + + It("should update an initialized dataset phase to not bound without requeue", func() { + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "validated-dataset", + Namespace: "default", + Generation: 1, + Finalizers: []string{finalizer}, + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{{ + MountPoint: "oss://bucket/path", + Name: "main", + }}, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, dataset), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + ResyncPeriod: 30 * time.Second, + } + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: dataset.Name, Namespace: dataset.Namespace}, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + + updatedDataset := &datav1alpha1.Dataset{} + Expect(r.Get(context.Background(), types.NamespacedName{Name: dataset.Name, Namespace: dataset.Namespace}, updatedDataset)).To(Succeed()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.NotBoundDatasetPhase)) + Expect(updatedDataset.Status.Conditions).To(BeEmpty()) + }) + + It("should requeue after the resync period when runtime scaleout requested a retry", func() { + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "requeue-dataset", + Namespace: "default", + Generation: 1, + Finalizers: []string{finalizer}, + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{{ + MountPoint: "oss://bucket/requeue", + Name: "main", + }}, + }, + Status: datav1alpha1.DatasetStatus{ + Phase: datav1alpha1.NotBoundDatasetPhase, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, dataset), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + ResyncPeriod: 45 * time.Second, + } + + ctx := reconcileRequestContext{ + Context: context.Background(), + Dataset: *dataset, + NamespacedName: types.NamespacedName{ + Name: dataset.Name, + Namespace: dataset.Namespace, + }, + } + + result, err := r.reconcileDataset(ctx, true) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(45 * time.Second)) + }) + + It("should requeue after the resync period when runtime scaleout returns an error", func() { + scaleoutPatch.Reset() + scaleoutPatch = gomonkey.ApplyFunc(deploy.ScaleoutRuntimeControllerOnDemand, + func(client.Client, types.NamespacedName, logr.Logger) (string, bool, error) { + return "", false, fmt.Errorf("scaleout failed") + }) + + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "scaleout-requeue-dataset", + Namespace: "default", + Generation: 1, + Finalizers: []string{finalizer}, + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{{ + MountPoint: "oss://bucket/requeue-after-scaleout-error", + Name: "main", + }}, + }, + Status: datav1alpha1.DatasetStatus{ + Phase: datav1alpha1.NotBoundDatasetPhase, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, dataset), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + ResyncPeriod: 20 * time.Second, + } + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: dataset.Name, Namespace: dataset.Namespace}, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(20 * time.Second)) + }) + }) + + Describe("addFinalizerAndRequeue", func() { + It("should add the controller finalizer and requeue immediately", func() { + storedDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "finalizer-dataset", + Namespace: "default", + Generation: 3, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, storedDataset), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + } + + currentDataset := datav1alpha1.Dataset{} + Expect(r.Get(context.Background(), types.NamespacedName{Name: storedDataset.Name, Namespace: storedDataset.Namespace}, ¤tDataset)).To(Succeed()) + + result, err := r.addFinalizerAndRequeue(reconcileRequestContext{ + Context: context.Background(), + Dataset: currentDataset, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + + updatedDataset := &datav1alpha1.Dataset{} + Expect(r.Get(context.Background(), types.NamespacedName{Name: storedDataset.Name, Namespace: storedDataset.Namespace}, updatedDataset)).To(Succeed()) + Expect(updatedDataset.GetFinalizers()).To(ContainElement(finalizer)) + }) + }) + + Describe("reconcileDatasetDeletion", func() { + It("should remove the controller finalizer when the dataset can be deleted", func() { + deletionTime := metav1.NewTime(time.Now()) + storedDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deleting-dataset", + Namespace: "default", + Finalizers: []string{finalizer}, + DeletionTimestamp: &deletionTime, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, storedDataset), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + } + + currentDataset := datav1alpha1.Dataset{} + Expect(r.Get(context.Background(), types.NamespacedName{Name: storedDataset.Name, Namespace: storedDataset.Namespace}, ¤tDataset)).To(Succeed()) + + result, err := r.reconcileDatasetDeletion(reconcileRequestContext{ + Context: context.Background(), + Dataset: currentDataset, + NamespacedName: types.NamespacedName{ + Name: currentDataset.Name, + Namespace: currentDataset.Namespace, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + + updatedDataset := &datav1alpha1.Dataset{} + err = r.Get(context.Background(), types.NamespacedName{Name: storedDataset.Name, Namespace: storedDataset.Namespace}, updatedDataset) + Expect(err).To(HaveOccurred()) + Expect(client.IgnoreNotFound(err)).NotTo(HaveOccurred()) + }) + + It("should requeue when referenced datasets are still mounted", func() { + deletionTime := metav1.NewTime(time.Now()) + referencingDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consumer-a", + Namespace: "default", + }, + } + dataset := datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "referenced-dataset", + Namespace: "default", + Finalizers: []string{finalizer}, + DeletionTimestamp: &deletionTime, + }, + Status: datav1alpha1.DatasetStatus{ + DatasetRef: []string{"default/consumer-a"}, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, dataset.DeepCopy(), referencingDataset), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + } + + result, err := r.reconcileDatasetDeletion(reconcileRequestContext{ + Context: context.Background(), + Dataset: dataset, + NamespacedName: types.NamespacedName{ + Name: dataset.Name, + Namespace: dataset.Namespace, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(10 * time.Second)) + }) + + It("should prune missing dataset references and requeue immediately", func() { + deletionTime := metav1.NewTime(time.Now()) + storedDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dataset-ref-prune", + Namespace: "default", + Finalizers: []string{finalizer}, + DeletionTimestamp: &deletionTime, + }, + Status: datav1alpha1.DatasetStatus{ + DatasetRef: []string{"default/existing-ref", "default/missing-ref"}, + }, + } + existingRef := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "existing-ref", + Namespace: "default", + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, storedDataset, existingRef), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + } + + currentDataset := datav1alpha1.Dataset{} + Expect(r.Get(context.Background(), types.NamespacedName{Name: storedDataset.Name, Namespace: storedDataset.Namespace}, ¤tDataset)).To(Succeed()) + + result, err := r.reconcileDatasetDeletion(reconcileRequestContext{ + Context: context.Background(), + Dataset: currentDataset, + NamespacedName: types.NamespacedName{ + Name: currentDataset.Name, + Namespace: currentDataset.Namespace, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(1 * time.Second)) + + updatedDataset := &datav1alpha1.Dataset{} + Expect(r.Get(context.Background(), types.NamespacedName{Name: storedDataset.Name, Namespace: storedDataset.Namespace}, updatedDataset)).To(Succeed()) + Expect(updatedDataset.Status.DatasetRef).To(Equal([]string{"default/existing-ref"})) + }) + }) + + Describe("reconcileDataset", func() { + It("should return an error when a reference dataset mixes dataset and non-dataset mounts", func() { + dataset := datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "invalid-reference-dataset", + Namespace: "default", + Generation: 1, + Finalizers: []string{finalizer}, + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{ + { + MountPoint: "dataset://fluid/source-dataset", + Name: "reference", + }, + { + MountPoint: "oss://bucket/extra", + Name: "extra", + }, + }, + }, + Status: datav1alpha1.DatasetStatus{ + Phase: datav1alpha1.NotBoundDatasetPhase, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, dataset.DeepCopy()), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + } + + _, err := r.reconcileDataset(reconcileRequestContext{ + Context: context.Background(), + Dataset: dataset, + NamespacedName: types.NamespacedName{ + Name: dataset.Name, + Namespace: dataset.Namespace, + }, + }, false) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not validated")) + }) + + It("should create a thin runtime for a reference dataset and keep owner metadata", func() { + dataset := datav1alpha1.Dataset{ + TypeMeta: metav1.TypeMeta{ + Kind: datav1alpha1.Datasetkind, + APIVersion: datav1alpha1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "reference-dataset", + Namespace: "default", + UID: types.UID("dataset-uid"), + Generation: 1, + Finalizers: []string{finalizer}, + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{{ + MountPoint: "dataset://fluid/source-dataset", + Name: "reference", + }}, + }, + Status: datav1alpha1.DatasetStatus{ + Phase: datav1alpha1.NotBoundDatasetPhase, + }, + } + + r := &DatasetReconciler{ + Client: fake.NewFakeClientWithScheme(scheme, dataset.DeepCopy()), + Scheme: scheme, + Recorder: record.NewFakeRecorder(16), + } + + result, err := r.reconcileDataset(reconcileRequestContext{ + Context: context.Background(), + Dataset: dataset, + NamespacedName: types.NamespacedName{ + Name: dataset.Name, + Namespace: dataset.Namespace, + }, + }, false) + + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + + thinRuntime := &datav1alpha1.ThinRuntime{} + Expect(r.Get(context.Background(), types.NamespacedName{Name: dataset.Name, Namespace: dataset.Namespace}, thinRuntime)).To(Succeed()) + Expect(thinRuntime.OwnerReferences).To(HaveLen(1)) + Expect(thinRuntime.OwnerReferences[0].Name).To(Equal(dataset.Name)) + Expect(thinRuntime.OwnerReferences[0].UID).To(Equal(dataset.UID)) + Expect(ptr.Deref(thinRuntime.OwnerReferences[0].Controller, false)).To(BeTrue()) + }) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataset/suite_test.go b/pkg/controllers/v1alpha1/dataset/suite_test.go index 9240bfd2126..41fa853e9ad 100644 --- a/pkg/controllers/v1alpha1/dataset/suite_test.go +++ b/pkg/controllers/v1alpha1/dataset/suite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Fluid Authors. +Copyright 2026 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,146 +17,13 @@ limitations under the License. package dataset import ( - "context" - "os" - "path/filepath" - "strconv" "testing" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - // +kubebuilder:scaffold:imports ) -// These tests use Ginkgo (BDD-style Go testing framework). Refer to -// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. - -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment -var testCtx = context.Background() -var useExistingCluster = false -var testEnvStarted = false - -const skipEnvtestStartFailureEnvVar = "FLUID_DATASET_TEST_SKIP_ENVTEST_START_FAILURE" - -func allowSkippingEnvtestStartFailure() bool { - value, ok := os.LookupEnv(skipEnvtestStartFailureEnvVar) - if !ok { - return false - } - - allowed, err := strconv.ParseBool(value) - return err == nil && allowed -} - func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) - - RunSpecs(t, - "Controller Suite") + RunSpecs(t, "Controller Suite") } - -var _ = BeforeSuite(func(ctx context.Context) { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - if env := os.Getenv("USE_EXISTING_CLUSTER"); env != "" { - useExistingCluster = true - } - By("bootstrapping test environment") - testEnv = &envtest.Environment{ - UseExistingCluster: &useExistingCluster, - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "..", "config", "crd", "bases")}, - } - - var err error - cfg, err = testEnv.Start() - if err != nil { - if allowSkippingEnvtestStartFailure() { - GinkgoLogr.Info( - "envtest unavailable, skipping envtest-dependent specs due to explicit opt-in", - "envVar", skipEnvtestStartFailureEnvVar, - "error", err, - ) - return - } - - Expect(err).NotTo(HaveOccurred()) - } - Expect(cfg).ToNot(BeNil()) - testEnvStarted = true - - err = datav1alpha1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - // +kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - Expect(err).ToNot(HaveOccurred()) - Expect(k8sClient).ToNot(BeNil()) -}) - -var _ = AfterSuite(func() { - if !testEnvStarted { - return - } - - By("tearing down the test environment") - err := testEnv.Stop() - Expect(err).ToNot(HaveOccurred()) -}) - -var _ = Describe("dataset", func() { - var dataset datav1alpha1.Dataset - - BeforeEach(func() { - dataset = datav1alpha1.Dataset{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-name", - Namespace: "default", - }, - Spec: datav1alpha1.DatasetSpec{ - Mounts: []datav1alpha1.Mount{{ - MountPoint: "test-MountPoint", - Name: "test-MountName", - }, - }, - }, - } - }) - - It("Should create dataset successfully", func() { - if !testEnvStarted { - Skip("envtest not available") - } - By("create dataset") - err := k8sClient.Create(testCtx, &dataset) - Expect(err).NotTo(HaveOccurred()) - - By("check dataset status") - var createdDataset datav1alpha1.Dataset - var name = types.NamespacedName{ - Namespace: dataset.Namespace, - Name: dataset.Name, - } - err = k8sClient.Get(testCtx, name, &createdDataset) - Expect(err).NotTo(HaveOccurred()) - Expect(createdDataset.Status.Phase).Should( - Or(Equal(datav1alpha1.NoneDatasetPhase), - Equal(datav1alpha1.NotBoundDatasetPhase))) - - By("delete dataset") - err = k8sClient.Delete(testCtx, &dataset) - Expect(err).NotTo(HaveOccurred()) - }) -}) diff --git a/pkg/controllers/v1alpha1/dataset/suite_test_helpers_test.go b/pkg/controllers/v1alpha1/dataset/suite_test_helpers_test.go deleted file mode 100644 index 58f8dbed0fd..00000000000 --- a/pkg/controllers/v1alpha1/dataset/suite_test_helpers_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package dataset - -import "testing" - -func TestAllowSkippingEnvtestStartFailure(t *testing.T) { - t.Run("empty value stays disabled", func(t *testing.T) { - t.Setenv(skipEnvtestStartFailureEnvVar, "") - - if allowSkippingEnvtestStartFailure() { - t.Fatalf("expected empty %s value to keep opt-in skip disabled", skipEnvtestStartFailureEnvVar) - } - }) - - t.Run("accepts explicit opt in", func(t *testing.T) { - t.Setenv(skipEnvtestStartFailureEnvVar, "true") - - if !allowSkippingEnvtestStartFailure() { - t.Fatalf("expected %s=true to enable opt-in skip", skipEnvtestStartFailureEnvVar) - } - }) - - t.Run("rejects invalid values", func(t *testing.T) { - t.Setenv(skipEnvtestStartFailureEnvVar, "sometimes") - - if allowSkippingEnvtestStartFailure() { - t.Fatalf("expected invalid %s value to keep opt-in skip disabled", skipEnvtestStartFailureEnvVar) - } - }) -}