Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,8 @@ const (
SkipPrecheckAnnotationKey = "sidecar.fluid.io/skip-precheck"
HostMountPathModeOnDefaultPlatformKey = "default.fuse-sidecar.fluid.io/host-mount-path-mode"
)

const (
// DatasetPolicyAutoCreate indicates that a Dataset should be auto-created for the Runtime.
DatasetPolicyAutoCreate = "auto-create"
)
4 changes: 4 additions & 0 deletions pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ const (
// i.e. fluid.io/check-mount-script-sha256
AnnotationCheckMountScriptSHA256 = LabelAnnotationPrefix + "check-mount-script-sha256"

// AnnotationDatasetPolicy is a runtime annotation that controls how Dataset is handled.
// i.e. fluid.io/dataset-policy
AnnotationDatasetPolicy = LabelAnnotationPrefix + "dataset-policy"

// AnnotationDisableRuntimeHelmValueConfig is a runtime label indicates the configmap contains helm value will not be created in setup.
AnnotationDisableRuntimeHelmValueConfig = "runtime." + LabelAnnotationPrefix + "disable-helm-value-config"

Expand Down
76 changes: 69 additions & 7 deletions pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand All @@ -34,8 +35,7 @@ import (
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

// "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/fluid-cloudnative/fluid/pkg/dump"
fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors"
Expand Down Expand Up @@ -110,14 +110,27 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte
return utils.RequeueIfError(err)
}

var datasetPolicy string
if annotations := objectMeta.GetAnnotations(); annotations != nil {
datasetPolicy = annotations[common.AnnotationDatasetPolicy]
}

// 5.Get the dataset
dataset, err := r.GetDataset(ctx)
if err != nil {
// r.Recorder.Eventf(ctx.Dataset, corev1.EventTypeWarning, common.ErrorProcessRuntimeReason, "Process Runtime error %v", err)
if utils.IgnoreNotFound(err) == nil {
ctx.Log.Info("The dataset is not found", "dataset", ctx.NamespacedName)
dataset = nil
// return ctrl.Result{}, nil
if datasetPolicy == common.DatasetPolicyAutoCreate {
ctx.Log.Info("The dataset is not found, auto-creating according to policy", "dataset", ctx.NamespacedName)
dataset, err = r.ensureDatasetForRuntime(ctx, objectMeta)
if err != nil {
ctx.Log.Error(err, "Failed to auto-create the dataset")
r.Recorder.Eventf(runtime, corev1.EventTypeWarning, common.ErrorCreateDataset, "Failed to auto-create dataset: %v", err)
return utils.RequeueAfterInterval(5 * time.Second)
}
} else {
ctx.Log.Info("The dataset is not found", "dataset", ctx.NamespacedName)
dataset = nil
}
} else {
ctx.Log.Error(err, "Failed to get the ddc dataset")
return utils.RequeueIfError(errors.Wrap(err, "Unable to get dataset"))
Expand Down Expand Up @@ -172,7 +185,11 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte
}
} else {
// If dataset is nil, need to wait because the user may have not created dataset
ctx.Log.Info("No dataset can be bound to the runtime, waiting.")
if datasetPolicy == common.DatasetPolicyAutoCreate {
ctx.Log.Info("No dataset is available for the runtime after auto-create, waiting.", "dataset", ctx.NamespacedName)
} else {
ctx.Log.Info("No dataset can be bound to the runtime, waiting.")
}
r.Recorder.Event(runtime, corev1.EventTypeWarning, common.ErrorProcessRuntimeReason, "No dataset can be bound to the runtime, waiting.")
return utils.RequeueAfterInterval(time.Duration(5 * time.Second))
}
Expand Down Expand Up @@ -385,6 +402,51 @@ func (r *RuntimeReconciler) GetDataset(ctx cruntime.ReconcileRequestContext) (*d
return &dataset, nil
}

func (r *RuntimeReconciler) ensureDatasetForRuntime(ctx cruntime.ReconcileRequestContext, objectMeta metav1.Object) (*datav1alpha1.Dataset, error) {
runtime := ctx.Runtime
if runtime == nil {
return nil, fmt.Errorf("runtime is nil")
}

annotations := make(map[string]string)
for k, v := range objectMeta.GetAnnotations() {
annotations[k] = v
}
annotations[common.AnnotationDatasetPolicy] = common.DatasetPolicyAutoCreate

dataset := &datav1alpha1.Dataset{
TypeMeta: metav1.TypeMeta{
Kind: datav1alpha1.Datasetkind,
APIVersion: datav1alpha1.GroupVersion.Group + "/" + datav1alpha1.GroupVersion.Version,
},
ObjectMeta: metav1.ObjectMeta{
Name: objectMeta.GetName(),
Namespace: objectMeta.GetNamespace(),
Annotations: annotations,
},
}
Comment thread
jakharmonika364 marked this conversation as resolved.

// SetControllerReference looks up the GVK from the scheme, avoiding the
// empty-TypeMeta problem that arises when runtime.GetObjectKind() is used
// on objects retrieved via controller-runtime's client.Get().
if err := controllerutil.SetControllerReference(runtime, dataset, r.Client.Scheme()); err != nil {
return nil, fmt.Errorf("failed to set controller reference on auto-created dataset: %w", err)
}

err := r.Create(ctx, dataset)
if err != nil && !apierrors.IsAlreadyExists(err) {
return nil, err
}

if err := r.Get(ctx, ctx.NamespacedName, dataset); err != nil {
return nil, err
}

r.Recorder.Eventf(runtime, corev1.EventTypeNormal, common.Succeed, "Auto-created Dataset %s for Runtime", dataset.Name)

return dataset, nil
}

func (r *RuntimeReconciler) CheckIfReferenceDatasetIsSupported(ctx cruntime.ReconcileRequestContext) (bool, string) {
mounted := base.GetPhysicalDatasetFromMounts(ctx.Dataset.Spec.Mounts)

Expand Down
18 changes: 13 additions & 5 deletions test/gha-e2e/curvine/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function create_dataset() {
}

function wait_dataset_bound() {
local deadline=180 # 3 minutes
local deadline=300 # 5 minutes
local last_state=""
local log_interval=0
local log_times=0
Expand Down Expand Up @@ -83,16 +83,24 @@ function create_job() {

function wait_job_completed() {
local job_name=$1
local deadline=300 # 5 minutes
local elapsed=0
while true; do
succeed=$(kubectl get job "$job_name" -ojsonpath='{@.status.succeeded}')
failed=$(kubectl get job "$job_name" -ojsonpath='{@.status.failed}')
if [[ "$failed" -ne "0" ]]; then
panic "job failed when accessing data"
# Check for terminal Job failure via status.conditions (not individual pod failures,
# which can be transient when the FUSE client is still initializing).
job_failed=$(kubectl get job "$job_name" -ojsonpath='{@.status.conditions[?(@.type=="Failed")].status}' 2>/dev/null)
succeed=$(kubectl get job "$job_name" -ojsonpath='{@.status.succeeded}' 2>/dev/null)
if [[ "$job_failed" == "True" ]]; then
panic "job $job_name failed when accessing data"
fi
if [[ "$succeed" -eq "1" ]]; then
break
fi
if [[ $elapsed -ge $deadline ]]; then
panic "timeout waiting ${deadline}s for job $job_name to complete"
fi
sleep 5
elapsed=$((elapsed + 5))
done
syslog "Found succeeded job $job_name"
}
Expand Down
Loading