diff --git a/api/v1alpha1/cacheruntime_status.go b/api/v1alpha1/cacheruntime_status.go index e0edc4c4ce1..e4674af5723 100644 --- a/api/v1alpha1/cacheruntime_status.go +++ b/api/v1alpha1/cacheruntime_status.go @@ -60,21 +60,12 @@ type CacheRuntimeStatus struct { // RuntimeComponentStatusCollection contains the status of runtime components (master, worker, client). RuntimeComponentStatusCollection `json:",inline"` - // MountPoints represents the status of mount points specified in the bound dataset. - // Each entry tracks the mount configuration and the time of the last successful mount. - // +optional - MountPoints []MountPointStatus `json:"mountPoints,omitempty"` -} - -// MountPointStatus describes the status of a single mount point in the dataset -type MountPointStatus struct { - // Mount contains the mount point configuration from the bound dataset. - // This includes the remote path, mount options, and other mount-specific settings. - Mount `json:"mount,omitempty"` + // Mounts contains the mount point configurations from the bound dataset. + // Currently not used, may be used when integrating thin runtime. + Mounts []Mount `json:"mounts,omitempty"` // MountTime is the timestamp of the last successful mount operation. // If MountTime is earlier than the master component's start time, a remount will be required. - // +optional MountTime *metav1.Time `json:"mountTime,omitempty"` } diff --git a/api/v1alpha1/cacheruntimeclass_types.go b/api/v1alpha1/cacheruntimeclass_types.go index c2e2b225cf8..5f5e8197a2b 100644 --- a/api/v1alpha1/cacheruntimeclass_types.go +++ b/api/v1alpha1/cacheruntimeclass_types.go @@ -67,7 +67,7 @@ type RuntimeComponentDefinition struct { } type ExecutionEntries struct { - // MountUFS defines the operations for mounting UFS + // MountUFS defines the operations for mounting UFS. The command's stdout must be JSON matching CacheRuntimeMountUfsOutput. MountUFS *ExecutionCommonEntry `json:"mountUFS,omitempty"` // ReportSummary it defines the operation how to get cache status like capacity, hit ratio etc. diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index fa7714a64a0..4a37aa205c1 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -318,3 +318,8 @@ type ClientMetrics struct { // Defaults to None when it is not explicitly set. ScrapeTarget string `json:"scrapeTarget,omitempty"` } + +type CacheRuntimeMountUfsOutput struct { + // Mounted are the ufs paths that have been mounted. + Mounted []string `json:"mounted,omitempty"` +} diff --git a/api/v1alpha1/openapi_generated.go b/api/v1alpha1/openapi_generated.go index f7138b9488d..849cdd510f5 100644 --- a/api/v1alpha1/openapi_generated.go +++ b/api/v1alpha1/openapi_generated.go @@ -41,6 +41,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/fluid-cloudnative/fluid/api/v1alpha1.CacheRuntimeClientSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeClientSpec(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.CacheRuntimeList": schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeList(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.CacheRuntimeMasterSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeMasterSpec(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.CacheRuntimeMountUfsOutput": schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeMountUfsOutput(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.CacheRuntimeSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeSpec(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.CacheRuntimeStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeStatus(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.CacheRuntimeWorkerSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeWorkerSpec(ref), @@ -111,7 +112,6 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/fluid-cloudnative/fluid/api/v1alpha1.Metadata": schema_fluid_cloudnative_fluid_api_v1alpha1_Metadata(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.MetadataSyncPolicy": schema_fluid_cloudnative_fluid_api_v1alpha1_MetadataSyncPolicy(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.Mount": schema_fluid_cloudnative_fluid_api_v1alpha1_Mount(ref), - "github.com/fluid-cloudnative/fluid/api/v1alpha1.MountPointStatus": schema_fluid_cloudnative_fluid_api_v1alpha1_MountPointStatus(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.OSAdvise": schema_fluid_cloudnative_fluid_api_v1alpha1_OSAdvise(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.ObjectRef": schema_fluid_cloudnative_fluid_api_v1alpha1_ObjectRef(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.OperationRef": schema_fluid_cloudnative_fluid_api_v1alpha1_OperationRef(ref), @@ -1390,6 +1390,33 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeMasterSpec(ref comm } } +func schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeMountUfsOutput(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "mounted": { + SchemaProps: spec.SchemaProps{ + Description: "Mounted are the ufs paths that have been mounted.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + }, + }, + }, + } +} + func schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -1584,25 +1611,31 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_CacheRuntimeStatus(ref common.R Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeComponentStatus"), }, }, - "mountPoints": { + "mounts": { SchemaProps: spec.SchemaProps{ - Description: "MountPoints represents the status of mount points specified in the bound dataset. Each entry tracks the mount configuration and the time of the last successful mount.", + Description: "Mounts contains the mount point configurations from the bound dataset. Currently not used, may be used when integrating thin runtime.", Type: []string{"array"}, Items: &spec.SchemaOrArray{ Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ Default: map[string]interface{}{}, - Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.MountPointStatus"), + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.Mount"), }, }, }, }, }, + "mountTime": { + SchemaProps: spec.SchemaProps{ + Description: "MountTime is the timestamp of the last successful mount operation. If MountTime is earlier than the master component's start time, a remount will be required.", + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, }, }, }, Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.MountPointStatus", "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeComponentStatus", "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeCondition", "k8s.io/api/core/v1.NodeAffinity"}, + "github.com/fluid-cloudnative/fluid/api/v1alpha1.Mount", "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeComponentStatus", "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeCondition", "k8s.io/api/core/v1.NodeAffinity", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, } } @@ -3853,7 +3886,7 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_ExecutionEntries(ref common.Ref Properties: map[string]spec.Schema{ "mountUFS": { SchemaProps: spec.SchemaProps{ - Description: "MountUFS defines the operations for mounting UFS", + Description: "MountUFS defines the operations for mounting UFS. The command's stdout must be JSON matching CacheRuntimeMountUfsOutput.", Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.ExecutionCommonEntry"), }, }, @@ -6155,34 +6188,6 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_Mount(ref common.ReferenceCallb } } -func schema_fluid_cloudnative_fluid_api_v1alpha1_MountPointStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { - return common.OpenAPIDefinition{ - Schema: spec.Schema{ - SchemaProps: spec.SchemaProps{ - Description: "MountPointStatus describes the status of a single mount point in the dataset", - Type: []string{"object"}, - Properties: map[string]spec.Schema{ - "mount": { - SchemaProps: spec.SchemaProps{ - Description: "Mount contains the mount point configuration from the bound dataset. This includes the remote path, mount options, and other mount-specific settings.", - Default: map[string]interface{}{}, - Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.Mount"), - }, - }, - "mountTime": { - SchemaProps: spec.SchemaProps{ - Description: "MountTime is the timestamp of the last successful mount operation. If MountTime is earlier than the master component's start time, a remount will be required.", - Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), - }, - }, - }, - }, - }, - Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.Mount", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, - } -} - func schema_fluid_cloudnative_fluid_api_v1alpha1_OSAdvise(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -7300,7 +7305,7 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_RuntimeStatus(ref common.Refere }, "mounts": { SchemaProps: spec.SchemaProps{ - Description: "MountPoints represents the mount points specified in the bounded dataset", + Description: "Mounts represents the mount points specified in the bounded dataset. Currently only set and used in thin runtime, see pr #2257.", Type: []string{"array"}, Items: &spec.SchemaOrArray{ Schema: &spec.Schema{ diff --git a/api/v1alpha1/status.go b/api/v1alpha1/status.go index 57bf44e81fb..4f98bca2bea 100644 --- a/api/v1alpha1/status.go +++ b/api/v1alpha1/status.go @@ -126,7 +126,8 @@ type RuntimeStatus struct { // if Mounttime is earlier than master starting time, remount will be required MountTime *metav1.Time `json:"mountTime,omitempty"` - // MountPoints represents the mount points specified in the bounded dataset + // Mounts represents the mount points specified in the bounded dataset. + // Currently only set and used in thin runtime, see pr #2257. Mounts []Mount `json:"mounts,omitempty"` // CacheAffinity represents the runtime worker pods node affinity including node selector diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index e6f7214ad7b..7b3b73691df 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -470,6 +470,26 @@ func (in *CacheRuntimeMasterSpec) DeepCopy() *CacheRuntimeMasterSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CacheRuntimeMountUfsOutput) DeepCopyInto(out *CacheRuntimeMountUfsOutput) { + *out = *in + if in.Mounted != nil { + in, out := &in.Mounted, &out.Mounted + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheRuntimeMountUfsOutput. +func (in *CacheRuntimeMountUfsOutput) DeepCopy() *CacheRuntimeMountUfsOutput { + if in == nil { + return nil + } + out := new(CacheRuntimeMountUfsOutput) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CacheRuntimeSpec) DeepCopyInto(out *CacheRuntimeSpec) { *out = *in @@ -524,13 +544,17 @@ func (in *CacheRuntimeStatus) DeepCopyInto(out *CacheRuntimeStatus) { (*in).DeepCopyInto(*out) } out.RuntimeComponentStatusCollection = in.RuntimeComponentStatusCollection - if in.MountPoints != nil { - in, out := &in.MountPoints, &out.MountPoints - *out = make([]MountPointStatus, len(*in)) + if in.Mounts != nil { + in, out := &in.Mounts, &out.Mounts + *out = make([]Mount, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.MountTime != nil { + in, out := &in.MountTime, &out.MountTime + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheRuntimeStatus. @@ -2588,26 +2612,6 @@ func (in *Mount) DeepCopy() *Mount { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *MountPointStatus) DeepCopyInto(out *MountPointStatus) { - *out = *in - in.Mount.DeepCopyInto(&out.Mount) - if in.MountTime != nil { - in, out := &in.MountTime, &out.MountTime - *out = (*in).DeepCopy() - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MountPointStatus. -func (in *MountPointStatus) DeepCopy() *MountPointStatus { - if in == nil { - return nil - } - out := new(MountPointStatus) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OSAdvise) DeepCopyInto(out *OSAdvise) { *out = *in diff --git a/charts/fluid/fluid/crds/data.fluid.io_cacheruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_cacheruntimes.yaml index 912109dfa69..9f749c881b7 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_cacheruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_cacheruntimes.yaml @@ -1798,54 +1798,51 @@ spec: required: - phase type: object - mountPoints: + mountTime: + format: date-time + type: string + mounts: items: properties: - mount: - properties: - encryptOptions: - items: + encryptOptions: + items: + properties: + name: + type: string + valueFrom: properties: - name: - type: string - valueFrom: + secretKeyRef: properties: - secretKeyRef: - properties: - key: - type: string - name: - type: string - required: - - name - type: object + key: + type: string + name: + type: string + required: + - name type: object - required: - - name type: object - type: array - mountPoint: - minLength: 5 - type: string - name: - minLength: 0 - type: string - options: - additionalProperties: - type: string - type: object - path: - type: string - readOnly: - type: boolean - shared: - type: boolean - required: - - mountPoint + required: + - name + type: object + type: array + mountPoint: + minLength: 5 + type: string + name: + minLength: 0 + type: string + options: + additionalProperties: + type: string type: object - mountTime: - format: date-time + path: type: string + readOnly: + type: boolean + shared: + type: boolean + required: + - mountPoint type: object type: array selector: diff --git a/config/crd/bases/data.fluid.io_cacheruntimes.yaml b/config/crd/bases/data.fluid.io_cacheruntimes.yaml index 912109dfa69..9f749c881b7 100644 --- a/config/crd/bases/data.fluid.io_cacheruntimes.yaml +++ b/config/crd/bases/data.fluid.io_cacheruntimes.yaml @@ -1798,54 +1798,51 @@ spec: required: - phase type: object - mountPoints: + mountTime: + format: date-time + type: string + mounts: items: properties: - mount: - properties: - encryptOptions: - items: + encryptOptions: + items: + properties: + name: + type: string + valueFrom: properties: - name: - type: string - valueFrom: + secretKeyRef: properties: - secretKeyRef: - properties: - key: - type: string - name: - type: string - required: - - name - type: object + key: + type: string + name: + type: string + required: + - name type: object - required: - - name type: object - type: array - mountPoint: - minLength: 5 - type: string - name: - minLength: 0 - type: string - options: - additionalProperties: - type: string - type: object - path: - type: string - readOnly: - type: boolean - shared: - type: boolean - required: - - mountPoint + required: + - name + type: object + type: array + mountPoint: + minLength: 5 + type: string + name: + minLength: 0 + type: string + options: + additionalProperties: + type: string type: object - mountTime: - format: date-time + path: type: string + readOnly: + type: boolean + shared: + type: boolean + required: + - mountPoint type: object type: array selector: diff --git a/docs/en/dev/generic_cache_runtime_integration.md b/docs/en/dev/generic_cache_runtime_integration.md index 16765b0b6e1..49d64d45ab8 100644 --- a/docs/en/dev/generic_cache_runtime_integration.md +++ b/docs/en/dev/generic_cache_runtime_integration.md @@ -82,7 +82,7 @@ The component in Topology mainly contains the following content: | Template | PodTemplateSpec native field | | | Service | Currently only supports Headless | | | Dependencies | ExtraResources | Whether this component needs to mount additional ConfigMaps (the dependent ConfigMap information is defined in the ExtraResources field of CacheRuntimeClass). | -| ExecutionEntries| MountUFS | For Master-Worker architecture, when Master is Ready, the underlying file system mount operation needs to be executed. | +| ExecutionEntries| MountUFS | For Master-Worker architecture, when Master is Ready, the underlying file system mount operation needs to be executed. The MountUFS script must output JSON in `CacheRuntimeMountUfsOutput` struct format, containing the list of mounted UFS paths. See Step 2.7 for details. | | ExecutionEntries| ReportSummary | How the cache system defines operations to obtain cache information metrics [Not supported in current version]. | ### Step 2.1 Prepare K8s-adapted Native Images and Define Component workloadType and PodTemplate @@ -319,3 +319,55 @@ the `mounts`, `accessModes`, and `targetPath` fields in the JSON are all derived } } ``` + +### Step 2.7 MountUFS Script Output Format Requirements + +For cache systems with Master-Worker architecture, after the Master component is Ready, Fluid will execute the MountUFS script to mount the underlying file system (UFS). **The MountUFS script must output JSON in `CacheRuntimeMountUfsOutput` struct format**, so that Fluid can correctly parse the mounted paths and synchronize the Dataset status. + +#### Optional Configuration Note + +If the underlying cache system's image has the following capabilities, you can **omit the MountUFS configuration**: + +1. **Automatic RuntimeConfig Monitoring**: The process inside the container can monitor changes to the "$FLUID_RUNTIME_CONFIG_PATH" file +2. **Automatic Mount Execution**: When changes to the mounts configuration are detected, automatically execute the underlying file system mount operations +3. **Ready State Control**: Ensure that the Master component's Pod starting status only becomes Ready after all UFS mounts are completed + +In this case, Fluid will confirm whether the Master component is ready through Kubernetes' Ready probe, without needing to execute the MountUFS script additionally. + +#### Use Cases + +The output of the MountUFS script is mainly used for the following scenarios: + +1. **Mount Status Synchronization**: Fluid parses the script output to confirm which UFS paths have been successfully mounted +2. **Dataset Status Update**: Updates the Dataset's mount status and Phase based on the mount results +3. **Dynamic Mount Management**: When the Dataset's mounts configuration changes, Fluid re-executes the MountUFS script and verifies whether the mount operation completed successfully through the output +4. **Remount Detection**: In scenarios such as Master Pod restarts, Fluid determines whether remounting is necessary based on the output + +#### Output Format Specification + +The standard output of the MountUFS script must be in the following JSON format: + +```json +{ + "mounted": ["/path1", "/path2", "/path3"] +} +``` + +Where: +- `mounted`: A string array containing the list of all successfully mounted UFS paths +- If no paths are mounted, output: `{"mounted": []}` + +#### Go Struct Definition + +```go +type CacheRuntimeMountUfsOutput struct { + // Mounted are the ufs paths that have been mounted. + Mounted []string `json:"mounted,omitempty"` +} +``` + +#### Important Notes + +1. **Must output to standard output (stdout)**: Fluid reads JSON data from the script's standard output +2. **Error messages to standard error (stderr)**: Use `>&2` to output error messages to stderr to avoid polluting stdout +3. **JSON format must strictly comply with requirements**: Otherwise, Fluid cannot parse it, leading to mount failure diff --git a/docs/zh/dev/generic_cache_runtime_integration.md b/docs/zh/dev/generic_cache_runtime_integration.md index 5db17c4d6d9..20c84833375 100644 --- a/docs/zh/dev/generic_cache_runtime_integration.md +++ b/docs/zh/dev/generic_cache_runtime_integration.md @@ -82,7 +82,7 @@ Topology中comopent主要包含以下内容 | Template | PodTemplateSpec 原生字段 | | | Service | 目前仅支持Headless | | | Dependencies | ExtraResources | 该组件是否需要挂载额外的 ConfigMap (其依赖的ConfigMap 信息定义于 CacheRuntimeClass 的 ExtraResources 字段)。 | -| ExecutionEntries| MountUFS | 对于Master-Worker架构,当Master Ready时,需要执行底层文件系统的挂载操作。 | +| ExecutionEntries| MountUFS | 对于Master-Worker架构,当Master Ready时,需要执行底层文件系统的挂载操作。MountUFS 脚本必须输出符合 `CacheRuntimeMountUfsOutput` 结构体格式的 JSON,包含已挂载的 UFS 路径列表。详见步骤 2.7。 | | ExecutionEntries| ReportSummary | 缓存系统定义如何获取缓存信息指标的操作 【当前版本暂未支持】。 | ### 步骤2.1 准备K8s适配的原生镜像及明确组件workloadType和PodTemplate @@ -317,4 +317,56 @@ spec: } } } -``` \ No newline at end of file +``` + +### 步骤2.7 MountUFS 脚本输出格式要求 + +对于 Master-Worker 架构的缓存系统,当 Master 组件 Ready 后,Fluid 会执行 MountUFS 脚本来挂载底层文件系统(UFS)。**MountUFS 脚本必须输出符合 `CacheRuntimeMountUfsOutput` 结构体格式的 JSON**,以便 Fluid 能够正确解析已挂载的路径并同步 Dataset 状态。 + +#### 可选配置说明 + +如果底层缓存系统的镜像具备以下能力,可以**不设置 MountUFS**: + +1. **自动监控 RuntimeConfig 变化**:容器内的进程能够监听 "$FLUID_RUNTIME_CONFIG_PATH" 文件的变化 +2. **自动执行挂载操作**:当检测到 mounts 配置变化时,自动执行底层文件系统的挂载 +3. **Ready 状态控制**:确保只有在所有 UFS 挂载完成后,Master 组件的 Pod 启动时 Ready 状态才变为 true + +在这种情况下,Fluid 会通过 Kubernetes 的 Ready 探针来确认 Master 组件是否就绪,而无需额外执行 MountUFS 脚本。 + +#### 使用场景 + +MountUFS 脚本的输出主要用于以下场景: + +1. **挂载状态同步**:Fluid 通过解析脚本输出,确认哪些 UFS 路径已成功挂载 +2. **Dataset 状态更新**:根据挂载结果更新 Dataset 的挂载状态和 Phase +3. **动态挂载管理**:当 Dataset 的 mounts 配置发生变化时,Fluid 会重新执行 MountUFS 脚本,并通过输出来验证挂载操作是否成功完成 +4. **重挂载检测**:在 Master Pod 重启等场景下,Fluid 会根据输出判断是否需要重新挂载 + +#### 输出格式规范 + +MountUFS 脚本的标准输出必须是以下 JSON 格式: + +```json +{ + "mounted": ["/path1", "/path2", "/path3"] +} +``` + +其中: +- `mounted`:字符串数组,包含所有已成功挂载的 UFS 路径列表 +- 如果没有挂载任何路径,应输出:`{"mounted": []}` + +#### Go 结构体定义 + +```go +type CacheRuntimeMountUfsOutput struct { + // Mounted are the ufs paths that have been mounted. + Mounted []string `json:"mounted,omitempty"` +} +``` + +#### 注意事项 + +1. **必须输出到标准输出(stdout)**:Fluid 会从脚本的标准输出读取 JSON 数据 +2. **错误信息输出到标准错误(stderr)**:使用 `>&2` 将错误信息输出到 stderr,避免污染 stdout +3. **JSON 格式必须严格符合要求**:否则 Fluid 无法解析,会导致挂载失败 \ No newline at end of file diff --git a/pkg/ddc/cache/engine/dataset.go b/pkg/ddc/cache/engine/dataset.go index da707cdad7a..10bf79f7ea4 100644 --- a/pkg/ddc/cache/engine/dataset.go +++ b/pkg/ddc/cache/engine/dataset.go @@ -32,6 +32,12 @@ import ( func (e *CacheEngine) BindToDataset() (err error) { e.Log.V(1).Info("Start to BindToDataset") + // update runtime status mount time + err = e.updateMountTime() + if err != nil { + return + } + return e.UpdateDatasetStatus(datav1alpha1.BoundDatasetPhase) } diff --git a/pkg/ddc/cache/engine/fileutils.go b/pkg/ddc/cache/engine/fileutils.go index 758e600616a..7b807ef621d 100644 --- a/pkg/ddc/cache/engine/fileutils.go +++ b/pkg/ddc/cache/engine/fileutils.go @@ -17,23 +17,29 @@ package engine import ( + "time" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" securityutils "github.com/fluid-cloudnative/fluid/pkg/utils/security" "github.com/go-logr/logr" "github.com/pkg/errors" - "time" ) -type CacheFileUtils struct { +// CacheFileUtil defines the interface for cache system operations +type CacheFileUtil interface { + Mount(command []string, timeout time.Duration) (stdout string, err error) +} + +type CacheFileUtilImpl struct { podName string namespace string container string log logr.Logger } -func newCacheFileUtils(podName string, containerName string, namespace string, log logr.Logger) CacheFileUtils { +func NewCacheFileUtil(podName string, containerName string, namespace string, log logr.Logger) CacheFileUtil { - return CacheFileUtils{ + return &CacheFileUtilImpl{ podName: podName, namespace: namespace, container: containerName, @@ -42,7 +48,7 @@ func newCacheFileUtils(podName string, containerName string, namespace string, l } // exec with timeout -func (c CacheFileUtils) exec(command []string, timeout time.Duration) (stdout string, stderr string, err error) { +func (c *CacheFileUtilImpl) exec(command []string, timeout time.Duration) (stdout string, stderr string, err error) { // redact sensitive info in command for printing redactedCommand := securityutils.FilterCommand(command) @@ -57,7 +63,7 @@ func (c CacheFileUtils) exec(command []string, timeout time.Duration) (stdout st return } -func (c CacheFileUtils) Mount(command []string, timeout time.Duration) (err error) { +func (c *CacheFileUtilImpl) Mount(command []string, timeout time.Duration) (stdout string, err error) { stdout, stderr, err := c.exec(command, timeout) if err != nil { @@ -65,5 +71,5 @@ func (c CacheFileUtils) Mount(command []string, timeout time.Duration) (err erro return } - return nil + return stdout, nil } diff --git a/pkg/ddc/cache/engine/fileutils_test.go b/pkg/ddc/cache/engine/fileutils_test.go new file mode 100644 index 00000000000..cb4f9a74432 --- /dev/null +++ b/pkg/ddc/cache/engine/fileutils_test.go @@ -0,0 +1,194 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "time" + + "github.com/agiledragon/gomonkey/v2" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/pkg/errors" +) + +var _ = Describe("CacheFileUtils Tests", Label("pkg.ddc.cache.engine.fileutils_test.go"), func() { + var ( + fileUtil CacheFileUtil + log logr.Logger + patches *gomonkey.Patches + ) + + BeforeEach(func() { + log = GinkgoLogr + fileUtil = NewCacheFileUtil("test-pod", "test-container", "default", log) + }) + + AfterEach(func() { + if patches != nil { + patches.Reset() + } + }) + + Describe("Mount Tests", func() { + Context("when command executes successfully", func() { + BeforeEach(func() { + patches = gomonkey.ApplyFunc(kubeclient.ExecCommandInContainerWithTimeout, + func(podName, containerName, namespace string, command []string, timeout time.Duration) (stdout string, stderr string, err error) { + return "mount successful", "", nil + }) + }) + + It("should return stdout without error", func() { + stdout, err := fileUtil.Mount([]string{"/mount.sh"}, 30*time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(Equal("mount successful")) + }) + }) + + Context("when command returns empty output", func() { + BeforeEach(func() { + patches = gomonkey.ApplyFunc(kubeclient.ExecCommandInContainerWithTimeout, + func(podName, containerName, namespace string, command []string, timeout time.Duration) (stdout string, stderr string, err error) { + return "", "", nil + }) + }) + + It("should return empty stdout without error", func() { + stdout, err := fileUtil.Mount([]string{"/mount.sh"}, 30*time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(BeEmpty()) + }) + }) + + Context("when command execution fails", func() { + BeforeEach(func() { + patches = gomonkey.ApplyFunc(kubeclient.ExecCommandInContainerWithTimeout, + func(podName, containerName, namespace string, command []string, timeout time.Duration) (stdout string, stderr string, err error) { + return "", "error output", errors.New("command failed") + }) + }) + + It("should return error", func() { + stdout, err := fileUtil.Mount([]string{"/mount.sh"}, 30*time.Second) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("command failed")) + Expect(stdout).To(BeEmpty()) + }) + }) + + Context("when command has sensitive information", func() { + BeforeEach(func() { + patches = gomonkey.ApplyFunc(kubeclient.ExecCommandInContainerWithTimeout, + func(podName, containerName, namespace string, command []string, timeout time.Duration) (stdout string, stderr string, err error) { + // Verify that the command was called (sensitive info filtering happens in exec) + return "success", "", nil + }) + }) + + It("should execute command successfully", func() { + command := []string{"/mount.sh", "--password=secret123"} + stdout, err := fileUtil.Mount(command, 30*time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(Equal("success")) + }) + }) + + Context("when timeout is very short", func() { + BeforeEach(func() { + patches = gomonkey.ApplyFunc(kubeclient.ExecCommandInContainerWithTimeout, + func(podName, containerName, namespace string, command []string, timeout time.Duration) (stdout string, stderr string, err error) { + return "completed", "", nil + }) + }) + + It("should still execute with short timeout", func() { + stdout, err := fileUtil.Mount([]string{"/quick-mount.sh"}, 1*time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(Equal("completed")) + }) + }) + + Context("when command contains multiple arguments", func() { + BeforeEach(func() { + patches = gomonkey.ApplyFunc(kubeclient.ExecCommandInContainerWithTimeout, + func(podName, containerName, namespace string, command []string, timeout time.Duration) (stdout string, stderr string, err error) { + // Verify all arguments are passed + Expect(command).To(HaveLen(4)) + return "multi-arg success", "", nil + }) + }) + + It("should pass all arguments correctly", func() { + command := []string{"/mount.sh", "--source=/data", "--target=/mnt", "--options=rw"} + stdout, err := fileUtil.Mount(command, 30*time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(Equal("multi-arg success")) + }) + }) + + Context("when stderr contains warnings but command succeeds", func() { + BeforeEach(func() { + patches = gomonkey.ApplyFunc(kubeclient.ExecCommandInContainerWithTimeout, + func(podName, containerName, namespace string, command []string, timeout time.Duration) (stdout string, stderr string, err error) { + return "mount ok", "warning: deprecated option", nil + }) + }) + + It("should return success despite warnings in stderr", func() { + stdout, err := fileUtil.Mount([]string{"/mount.sh"}, 30*time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(Equal("mount ok")) + }) + }) + + Context("when kubeclient returns wrapped error", func() { + BeforeEach(func() { + patches = gomonkey.ApplyFunc(kubeclient.ExecCommandInContainerWithTimeout, + func(podName, containerName, namespace string, command []string, timeout time.Duration) (stdout string, stderr string, err error) { + return "", "", errors.New("connection refused") + }) + }) + + It("should wrap error with command context", func() { + stdout, err := fileUtil.Mount([]string{"/mount.sh"}, 30*time.Second) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("error when executing command")) + Expect(err.Error()).To(ContainSubstring("connection refused")) + Expect(stdout).To(BeEmpty()) + }) + }) + }) + + Describe("newCacheFileUtils Tests", func() { + Context("when creating new CacheFileUtils instance", func() { + It("should return non-nil interface", func() { + utils := NewCacheFileUtil("pod1", "container1", "ns1", log) + Expect(utils).NotTo(BeNil()) + }) + }) + + Context("when creating with different parameters", func() { + It("should create separate instances", func() { + utils1 := NewCacheFileUtil("pod1", "container1", "ns1", log) + utils2 := NewCacheFileUtil("pod2", "container2", "ns2", log) + Expect(utils1).NotTo(Equal(utils2)) + }) + }) + }) +}) diff --git a/pkg/ddc/cache/engine/master.go b/pkg/ddc/cache/engine/master.go index 06ebd754e86..39389bbcc33 100644 --- a/pkg/ddc/cache/engine/master.go +++ b/pkg/ddc/cache/engine/master.go @@ -107,14 +107,14 @@ func (e *CacheEngine) setupMasterInternal(masterValue *common.CacheRuntimeCompon return nil } -func (e *CacheEngine) getMasterPodInfo(value *common.CacheRuntimeValue) (podName string, containerName string, err error) { +func (e *CacheEngine) getMasterPodInfo(runtimeClass *datav1alpha1.CacheRuntimeClass) (podName string, containerName string, err error) { // pod name is auto generated podName = GetComponentName(e.name, common.ComponentTypeMaster) + "-0" // container name, use the first container name - if value.Master == nil || len(value.Master.PodTemplateSpec.Spec.Containers) == 0 { + if runtimeClass.Topology == nil || runtimeClass.Topology.Master == nil || + len(runtimeClass.Topology.Master.Template.Spec.Containers) == 0 { return "", "", errors.New("no container in master pod template") } - containerName = value.Master.PodTemplateSpec.Spec.Containers[0].Name - + containerName = runtimeClass.Topology.Master.Template.Spec.Containers[0].Name return } diff --git a/pkg/ddc/cache/engine/runtime.go b/pkg/ddc/cache/engine/runtime.go index 64dc55f1abd..0bd1c6a4710 100644 --- a/pkg/ddc/cache/engine/runtime.go +++ b/pkg/ddc/cache/engine/runtime.go @@ -18,11 +18,17 @@ package engine import ( "context" + "reflect" + "time" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/testutil" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" ) // getRuntime get the current runtime @@ -111,3 +117,30 @@ func (e *CacheEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { return e.runtimeInfo, nil } + +// updateMountTime updates the runtime status MountTime to the current time. +func (e *CacheEngine) updateMountTime() error { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + runtime, err := e.getRuntime() + if err != nil { + return err + } + + runtimeToUpdate := runtime.DeepCopy() + runtimeToUpdate.Status.MountTime = &metav1.Time{Time: time.Now()} + + if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { + err = e.Client.Status().Update(context.TODO(), runtimeToUpdate) + } else { + e.Log.Info("Do nothing because the runtime status is not changed.") + } + + return err + }) + + if err != nil { + return errors.Wrap(err, "update runtime status MountTime field failed") + } + + return nil +} diff --git a/pkg/ddc/cache/engine/setup.go b/pkg/ddc/cache/engine/setup.go index 3c7ff1ff45e..2f70093fa7a 100644 --- a/pkg/ddc/cache/engine/setup.go +++ b/pkg/ddc/cache/engine/setup.go @@ -50,7 +50,7 @@ func (e *CacheEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, e return false, err } - // Create Master/Worker/Client components + // Create Master/Worker/Client components, won't be nil. e.Log.Info("Setup runtime", "runtime", ctx.Runtime) if runtimeValue.Master.Enabled { e.Log.Info("Setup master", "runtime", ctx.Runtime) @@ -86,10 +86,9 @@ func (e *CacheEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, e } // dataset mount after runtime ready to ensure master pod is ready for executing commands. - if runtimeValue.Master.Enabled && runtimeClass.Topology != nil && - runtimeClass.Topology.Master != nil && runtimeClass.Topology.Master.ExecutionEntries != nil { - // currently only support mount ufs for master in master-worker architecture - err = e.PrepareUFS(runtimeClass.Topology.Master.ExecutionEntries.MountUFS, runtimeValue) + // currently only support mount ufs for master in master-worker architecture + if runtimeValue.Master.Enabled { + _, err = e.PrepareUFS(runtimeClass) if err != nil { return false, err } diff --git a/pkg/ddc/cache/engine/sync.go b/pkg/ddc/cache/engine/sync.go index dd2d16ca2e6..cf8978f3453 100644 --- a/pkg/ddc/cache/engine/sync.go +++ b/pkg/ddc/cache/engine/sync.go @@ -18,30 +18,36 @@ package engine import ( "context" + "os" + "reflect" + "time" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "os" - "reflect" - "time" ) func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { - // sync the runtime value configmap runtime, err := e.getRuntime() if err != nil { return err } + // sync the runtime value config map err = e.syncRuntimeValueConfigMap(runtime) if err != nil { return err } - // TODO: implement other logic + // handle ufs change - support dynamic mount updates + err = e.UpdateOnUFSChange(runtime) + if err != nil { + e.Log.Error(err, "Failed to update UFS") + return err + } - // handle ufs change + // TODO: implement other logic // sync runtime status diff --git a/pkg/ddc/cache/engine/sync_test.go b/pkg/ddc/cache/engine/sync_test.go new file mode 100644 index 00000000000..6e38ca31f0e --- /dev/null +++ b/pkg/ddc/cache/engine/sync_test.go @@ -0,0 +1,774 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "time" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +var _ = Describe("CacheEngine Sync Tests", Label("pkg.ddc.cache.engine.sync_test.go"), func() { + var ( + fakeClient client.Client + engine *CacheEngine + ctx cruntime.ReconcileRequestContext + testScheme *runtime.Scheme + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(testScheme)).NotTo(HaveOccurred()) + Expect(corev1.AddToScheme(testScheme)).NotTo(HaveOccurred()) + + fakeClient = fake.NewClientBuilder().WithScheme(testScheme).Build() + + log := GinkgoLogr + recorder := record.NewFakeRecorder(100) + + engine = &CacheEngine{ + Client: fakeClient, + Log: log, + Recorder: recorder, + name: "test-runtime", + namespace: "default", + runtimeType: "cache", + engineImpl: "cache", + gracefulShutdownLimits: 5, + retryShutdown: 0, + syncRetryDuration: 5 * time.Second, + timeOfLastSync: time.Now(), + } + + ctx = cruntime.ReconcileRequestContext{ + Context: context.Background(), + NamespacedName: types.NamespacedName{ + Name: "test-runtime", + Namespace: "default", + }, + Runtime: &datav1alpha1.CacheRuntime{}, + RuntimeType: "cache", + EngineImpl: "cache", + Client: fakeClient, + Log: log, + Recorder: recorder, + } + }) + + Describe("Sync - Main Entry Point Tests", func() { + Context("when CacheRuntime does not exist", func() { + It("should return error from getRuntime", func() { + err := engine.Sync(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not found")) + }) + }) + + Context("when CacheRuntime exists but CacheRuntimeClass is missing", func() { + BeforeEach(func() { + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-123"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "non-existent-class", + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + }) + + It("should fail when generating configmap data due to missing runtime class", func() { + err := engine.Sync(ctx) + Expect(err).To(HaveOccurred()) + // The error occurs during generateRuntimeConfigData when trying to get runtime class + }) + }) + + Context("when CacheRuntime and CacheRuntimeClass exist but Dataset is missing", func() { + BeforeEach(func() { + runtimeClass := &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + Worker: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + }, + Client: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-456"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + }) + + It("should fail when generating configmap data due to missing dataset", func() { + err := engine.Sync(ctx) + Expect(err).To(HaveOccurred()) + // Error occurs in generateRuntimeConfigData when GetDataset fails + }) + }) + + Context("when all dependencies exist and ConfigMap needs to be created", func() { + BeforeEach(func() { + // Create Dataset + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{ + { + Name: "test-mount", + MountPoint: "local:///mnt/test", + Path: "/data", + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), dataset)).NotTo(HaveOccurred()) + + // Create RuntimeClass + runtimeClass := &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{"master-key": "master-value"}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + Worker: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{"worker-key": "worker-value"}, + }, + Client: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{"client-key": "client-value"}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + // Create Runtime + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-789"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + Master: datav1alpha1.CacheRuntimeMasterSpec{ + Replicas: 1, + }, + Worker: datav1alpha1.CacheRuntimeWorkerSpec{ + Replicas: 2, + }, + Client: datav1alpha1.CacheRuntimeClientSpec{ + RuntimeComponentCommonSpec: datav1alpha1.RuntimeComponentCommonSpec{ + Disabled: false, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + }) + + It("should create ConfigMap with correct owner reference and data", func() { + err := engine.Sync(ctx) + Expect(err).NotTo(HaveOccurred()) + + // Verify ConfigMap was created + configMap := &corev1.ConfigMap{} + err = fakeClient.Get(context.Background(), types.NamespacedName{ + Name: "fluid-runtime-config-test-runtime", + Namespace: "default", + }, configMap) + Expect(err).NotTo(HaveOccurred()) + + // Verify owner reference + Expect(configMap.OwnerReferences).To(HaveLen(1)) + Expect(configMap.OwnerReferences[0].Name).To(Equal("test-runtime")) + Expect(configMap.OwnerReferences[0].UID).To(Equal(types.UID("test-uid-789"))) + Expect(*configMap.OwnerReferences[0].Controller).To(BeTrue()) + + // Verify ConfigMap data contains runtime.json key + Expect(configMap.Data).To(HaveKey("runtime.json")) + Expect(configMap.Data["runtime.json"]).NotTo(BeEmpty()) + }) + }) + + Context("when ConfigMap already exists with same data", func() { + BeforeEach(func() { + // Create Dataset + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{ + { + Name: "test-mount", + MountPoint: "local:///mnt/test", + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), dataset)).NotTo(HaveOccurred()) + + // Create RuntimeClass + runtimeClass := &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + Worker: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + }, + Client: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + // Create Runtime + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-sync"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + }) + + It("should not update ConfigMap when data is unchanged", func() { + // First call to Sync will create the ConfigMap + err := engine.Sync(ctx) + // May fail at UFS step, but ConfigMap should be created + Expect(err).NotTo(HaveOccurred()) + + // Get the created ConfigMap + originalCM := &corev1.ConfigMap{} + err = fakeClient.Get(context.Background(), types.NamespacedName{ + Name: "fluid-runtime-config-test-runtime", + Namespace: "default", + }, originalCM) + Expect(err).NotTo(HaveOccurred()) + originalData := originalCM.Data["runtime.json"] + Expect(originalData).NotTo(BeEmpty()) + + // Second call to Sync should not change the ConfigMap data + err = engine.Sync(ctx) + Expect(err).NotTo(HaveOccurred()) + + // Verify ConfigMap data was not changed + updatedCM := &corev1.ConfigMap{} + err = fakeClient.Get(context.Background(), types.NamespacedName{ + Name: "fluid-runtime-config-test-runtime", + Namespace: "default", + }, updatedCM) + Expect(err).NotTo(HaveOccurred()) + // The data content should remain exactly the same + Expect(updatedCM.Data["runtime.json"]).To(Equal(originalData)) + }) + }) + + Context("when ConfigMap exists with different data", func() { + BeforeEach(func() { + // Create Dataset + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{ + { + Name: "new-mount", + MountPoint: "s3://new-bucket", + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), dataset)).NotTo(HaveOccurred()) + + // Create RuntimeClass + runtimeClass := &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + Worker: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + }, + Client: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + // Create Runtime + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-update"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + + // Pre-create ConfigMap with old data + oldCM := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fluid-runtime-config-test-runtime", + Namespace: "default", + }, + Data: map[string]string{ + "runtime.json": `{"old":"data"}`, + }, + } + Expect(fakeClient.Create(context.Background(), oldCM)).NotTo(HaveOccurred()) + }) + + It("should update ConfigMap with new data", func() { + err := engine.Sync(ctx) + // May fail at UFS step, but ConfigMap should be updated + Expect(err).NotTo(HaveOccurred()) + + // Verify ConfigMap was updated + updatedCM := &corev1.ConfigMap{} + err = fakeClient.Get(context.Background(), types.NamespacedName{ + Name: "fluid-runtime-config-test-runtime", + Namespace: "default", + }, updatedCM) + Expect(err).NotTo(HaveOccurred()) + + // Data should no longer contain old data + Expect(updatedCM.Data["runtime.json"]).NotTo(ContainSubstring(`"old":"data"`)) + // Should contain new mount information + Expect(updatedCM.Data["runtime.json"]).To(ContainSubstring("new-mount")) + Expect(updatedCM.Data["runtime.json"]).To(ContainSubstring("s3://new-bucket")) + }) + }) + + Context("when UpdateOnUFSChange encounters various scenarios", func() { + BeforeEach(func() { + // Setup basic dependencies for all UFS tests + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{ + { + Name: "test-mount", + MountPoint: "local:///mnt/test", + }, + }, + }, + Status: datav1alpha1.DatasetStatus{ + Phase: datav1alpha1.BoundDatasetPhase, + }, + } + Expect(fakeClient.Create(context.Background(), dataset)).NotTo(HaveOccurred()) + + runtimeClass := &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + Worker: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + }, + Client: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + }) + + Context("when master is disabled", func() { + BeforeEach(func() { + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-disabled"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + Master: datav1alpha1.CacheRuntimeMasterSpec{ + RuntimeComponentCommonSpec: datav1alpha1.RuntimeComponentCommonSpec{ + Disabled: true, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + }) + + It("should skip UFS update when master is disabled", func() { + err := engine.Sync(ctx) + // Should succeed because UFS update is skipped when master is disabled + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when runtime class has no MountUFS execution entry", func() { + BeforeEach(func() { + // Update runtime class without MountUFS + runtimeClass := &datav1alpha1.CacheRuntimeClass{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime-class"}, runtimeClass)).NotTo(HaveOccurred()) + runtimeClass.Topology.Master.ExecutionEntries = nil + Expect(fakeClient.Update(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-no-mount"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + }) + + It("should skip UFS update when no MountUFS command is defined", func() { + err := engine.Sync(ctx) + // Should succeed because UFS update is skipped when no MountUFS is defined + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("when no UFS changes are detected", func() { + BeforeEach(func() { + now := time.Now() + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-stable"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + }, + Status: datav1alpha1.CacheRuntimeStatus{ + MountTime: &metav1.Time{Time: now.Add(-5 * time.Minute)}, + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + + // Create master pod with old start time + masterPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-master-0", + Namespace: "default", + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "master", + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.Time{Time: now.Add(-1 * time.Hour)}, + }, + }, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), masterPod)).NotTo(HaveOccurred()) + }) + + It("should skip UFS update when no changes detected", func() { + err := engine.Sync(ctx) + // Should succeed because no UFS changes are detected + Expect(err).NotTo(HaveOccurred()) + }) + }) + }) + + Context("when runtime spec has disabled components", func() { + BeforeEach(func() { + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{ + { + Name: "test-mount", + MountPoint: "local:///mnt/test", + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), dataset)).NotTo(HaveOccurred()) + + runtimeClass := &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + Worker: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + }, + Client: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-disabled-comp"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + Master: datav1alpha1.CacheRuntimeMasterSpec{ + RuntimeComponentCommonSpec: datav1alpha1.RuntimeComponentCommonSpec{ + Disabled: true, + }, + }, + Worker: datav1alpha1.CacheRuntimeWorkerSpec{ + RuntimeComponentCommonSpec: datav1alpha1.RuntimeComponentCommonSpec{ + Disabled: true, + }, + }, + Client: datav1alpha1.CacheRuntimeClientSpec{ + RuntimeComponentCommonSpec: datav1alpha1.RuntimeComponentCommonSpec{ + Disabled: true, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + }) + + It("should generate configmap without disabled components", func() { + err := engine.Sync(ctx) + // May fail at UFS step since master is disabled, but ConfigMap should be created + Expect(err).NotTo(HaveOccurred()) + + configMap := &corev1.ConfigMap{} + err = fakeClient.Get(context.Background(), types.NamespacedName{ + Name: "fluid-runtime-config-test-runtime", + Namespace: "default", + }, configMap) + Expect(err).NotTo(HaveOccurred()) + + // Verify configmap data does not contain master/worker/client sections + jsonData := configMap.Data["runtime.json"] + Expect(jsonData).NotTo(BeEmpty()) + // When all components are disabled, they should not appear in the config + Expect(jsonData).NotTo(ContainSubstring(`"master":`)) + Expect(jsonData).NotTo(ContainSubstring(`"worker":`)) + Expect(jsonData).NotTo(ContainSubstring(`"client":`)) + }) + }) + + Context("when dataset has shared options and encrypt options", func() { + BeforeEach(func() { + // Create secret for encrypt options + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "access-key": []byte("my-access-key"), + }, + } + Expect(fakeClient.Create(context.Background(), secret)).NotTo(HaveOccurred()) + + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + SharedOptions: map[string]string{ + "shared-opt": "shared-value", + }, + SharedEncryptOptions: []datav1alpha1.EncryptOption{ + { + Name: "SHARED_SECRET", + ValueFrom: datav1alpha1.EncryptOptionSource{ + SecretKeyRef: datav1alpha1.SecretKeySelector{ + Name: "test-secret", + Key: "access-key", + }, + }, + }, + }, + Mounts: []datav1alpha1.Mount{ + { + Name: "s3-mount", + MountPoint: "s3://my-bucket", + Options: map[string]string{ + "mount-opt": "mount-value", + }, + EncryptOptions: []datav1alpha1.EncryptOption{ + { + Name: "MOUNT_SECRET", + ValueFrom: datav1alpha1.EncryptOptionSource{ + SecretKeyRef: datav1alpha1.SecretKeySelector{ + Name: "test-secret", + Key: "access-key", + }, + }, + }, + }, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), dataset)).NotTo(HaveOccurred()) + + runtimeClass := &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + Worker: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + }, + Client: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + rt := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid-encrypt"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + }, + } + Expect(fakeClient.Create(context.Background(), rt)).NotTo(HaveOccurred()) + }) + + It("should include shared and encrypt options in configmap", func() { + err := engine.Sync(ctx) + // May fail at UFS step, but ConfigMap should be created with correct options + Expect(err).NotTo(HaveOccurred()) + + configMap := &corev1.ConfigMap{} + err = fakeClient.Get(context.Background(), types.NamespacedName{ + Name: "fluid-runtime-config-test-runtime", + Namespace: "default", + }, configMap) + Expect(err).NotTo(HaveOccurred()) + + jsonData := configMap.Data["runtime.json"] + Expect(jsonData).To(ContainSubstring("shared-opt")) + Expect(jsonData).To(ContainSubstring("shared-value")) + Expect(jsonData).To(ContainSubstring("mount-opt")) + Expect(jsonData).To(ContainSubstring("mount-value")) + // Encrypt options should be converted to file paths + Expect(jsonData).To(ContainSubstring("/etc/fluid/secrets/test-secret/access-key")) + }) + }) + }) + +}) diff --git a/pkg/ddc/cache/engine/ufs.go b/pkg/ddc/cache/engine/ufs.go index 32730ba36a8..47a04c0e1cb 100644 --- a/pkg/ddc/cache/engine/ufs.go +++ b/pkg/ddc/cache/engine/ufs.go @@ -17,31 +17,228 @@ package engine import ( + "encoding/json" + "fmt" + "strings" "time" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func (e *CacheEngine) PrepareUFS(mountUfs *datav1alpha1.ExecutionCommonEntry, value *common.CacheRuntimeValue) error { +// Minimum timeout for mount operations to ensure reliability across different storage backends. +// Some operations (e.g., large directory listings, network latency) may require more time. +const minMountTimeoutSeconds = 20 + +func (e *CacheEngine) PrepareUFS(runtimeClass *datav1alpha1.CacheRuntimeClass) (string, error) { + if runtimeClass.Topology == nil || runtimeClass.Topology.Master == nil || + runtimeClass.Topology.Master.ExecutionEntries == nil || runtimeClass.Topology.Master.ExecutionEntries.MountUFS == nil { + e.Log.Info("No mount ufs command found in runtime class") + return "", nil + } + mountUfs := runtimeClass.Topology.Master.ExecutionEntries.MountUFS + // execute mount command in master pod - if mountUfs == nil { - return nil + podName, containerName, err := e.getMasterPodInfo(runtimeClass) + if err != nil { + return "", err + } + + fileUtil := NewCacheFileUtil(podName, containerName, e.namespace, e.Log) + + // at least 20 seconds + timeoutSeconds := max(mountUfs.TimeoutSeconds, minMountTimeoutSeconds) + + stdout, err := fileUtil.Mount(mountUfs.Command, time.Duration(timeoutSeconds)*time.Second) + if err != nil { + return "", err + } + + return stdout, nil +} + +// shouldUpdateUFS determines whether the UFS configuration needs to be updated. +// It analyzes path differences between the provided dataset spec and status to identify +// which UFS entries require updates. It also checks if the master pod has restarted +// since the last mount operation. +// Returns true if either mount paths have changed or remount is required due to pod restart. +func (e *CacheEngine) shouldUpdateUFS(dataset *datav1alpha1.Dataset, runtimeClass *datav1alpha1.CacheRuntimeClass, + runtime *datav1alpha1.CacheRuntime) bool { + // whether the ufs mount paths need to update + ufsToUpdate := utils.NewUFSToUpdate(dataset) + ufsToUpdate.AnalyzePathsDelta() + if ufsToUpdate != nil && ufsToUpdate.ShouldUpdate() { + e.Log.Info("Detected UFS changes, updating mount points", "toAdd", ufsToUpdate.ToAdd(), "toRemove", ufsToUpdate.ToRemove()) + return true + } + + // check if master pod restart after the latest mount time + restart := e.checkIfRemountRequired(runtimeClass, runtime) + if restart { + e.Log.Info("Master pod restart after the latest mount time, need to remount") + } + + return restart +} + +// UpdateOnUFSChange handles changes to the UFS configuration by updating mount points dynamically. +// When dataset mount information changes, this method ensures the changes take effect without requiring a restart. +func (e *CacheEngine) UpdateOnUFSChange(runtime *datav1alpha1.CacheRuntime) (err error) { + runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName) + if err != nil { + return } - podName, containerName, err := e.getMasterPodInfo(value) + + // update only for master-worker architecture and has mount ufs command + if runtimeClass.Topology == nil || runtimeClass.Topology.Master == nil || runtime.Spec.Master.Disabled || + runtimeClass.Topology.Master.ExecutionEntries == nil || + runtimeClass.Topology.Master.ExecutionEntries.MountUFS == nil { + return + } + + // 1. get the dataset + dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) if err != nil { + e.Log.Error(err, "Failed to get the dataset") + return + } + + shouldUpdate := e.shouldUpdateUFS(dataset, runtimeClass, runtime) + // 1. check if need to update ufs + if !shouldUpdate { + return + } + + // 2. set update status to updating + err = utils.UpdateMountStatus(e.Client, e.name, e.namespace, datav1alpha1.UpdatingDatasetPhase) + if err != nil { + e.Log.Error(err, "Failed to update dataset status to updating") + return + } + + // 3. use the same mount script to add or remove mount points + stdout, err := e.PrepareUFS(runtimeClass) + if err != nil { + e.Log.Error(err, "Failed to add or remove mount points") + return + } + + // 4. parse mount output, and sync dataset mounts + stdout = strings.TrimSpace(stdout) + if stdout == "" { + return errors.New("mount ufs command produced empty output") + } + + mountOutput := &datav1alpha1.CacheRuntimeMountUfsOutput{} + err = json.Unmarshal([]byte(stdout), mountOutput) + if err != nil { + return errors.Wrapf(err, "failed to parse mount ufs output as CacheRuntimeMountUfsOutput, output: %q", stdout) + } + + // 5. sync dataset mounts, to prevent the runtime config not updated + err = e.syncDatasetMounts(dataset, mountOutput) + if err != nil { + e.Log.Error(err, "Failed to sync dataset mounts") return err } - fileUtils := newCacheFileUtils(podName, containerName, e.namespace, e.Log) + // 5. update latest mount time + err = e.updateMountTime() + if err != nil { + return + } - // at least 20 seconds - timeoutSeconds := max(mountUfs.TimeoutSeconds, 20) + return +} + +// syncDatasetMounts synchronizes the dataset mount points with the current runtime state. +// This ensures that any changes to dataset.spec.mounts are reflected in the running system. +func (e *CacheEngine) syncDatasetMounts(dataset *datav1alpha1.Dataset, mountOutput *datav1alpha1.CacheRuntimeMountUfsOutput) (err error) { + + // Update MountPoints based on current dataset mounts + var mountedPaths = map[string]bool{} + for _, path := range mountOutput.Mounted { + mountedPaths[path] = true + } - err = fileUtils.Mount(mountUfs.Command, time.Duration(timeoutSeconds)*time.Second) + // check mounted path is the same as dataset spec mounts + for _, mount := range dataset.Spec.Mounts { + if common.IsFluidNativeScheme(mount.MountPoint) { + continue + } + datasetMountPath := utils.UFSPathBuilder{}.GenUFSPathInUnifiedNamespace(mount) + if !mountedPaths[datasetMountPath] { + e.Log.Info("Waiting for mount point to be mounted", "Mount point", datasetMountPath) + return fmt.Errorf("mount point %s is not yet mounted", datasetMountPath) + } + delete(mountedPaths, datasetMountPath) + } + if len(mountedPaths) != 0 { + e.Log.Info("Waiting for mount point to be unmounted", "Mount points", mountedPaths) + return fmt.Errorf("unexpected mounted paths remain: %v", mountedPaths) + } + + // update dataset status mount and phase status with retry + err = utils.UpdateMountStatus(e.Client, e.name, e.namespace, datav1alpha1.BoundDatasetPhase) if err != nil { return err } return nil } + +func (e *CacheEngine) checkIfRemountRequired(runtimeClass *datav1alpha1.CacheRuntimeClass, runtime *datav1alpha1.CacheRuntime) bool { + masterPodName, masterContainerName, err := e.getMasterPodInfo(runtimeClass) + if err != nil { + e.Log.Error(err, "get runtime pod container name failed", "method", "checkIfRemountRequired", "runtimeClass name", e.name) + return false + } + + masterPod, err := kubeclient.GetPodByName(e.Client, masterPodName, e.namespace) + if err != nil { + e.Log.Error(err, "Got master pod failed, skip remount check", "pod name", masterPodName) + return false + } + if masterPod == nil { + e.Log.Info("Master pod not found, skip remount check", "pod name", masterPodName) + return false + } + + // Check pod phase to ensure it's actually running + if masterPod.Status.Phase != corev1.PodRunning { + e.Log.Info("Master pod is not in Running phase, skip remount check", + "pod", masterPodName, "phase", masterPod.Status.Phase) + return false + } + + var startedAt *v1.Time + for _, containerStatus := range masterPod.Status.ContainerStatuses { + if containerStatus.Name == masterContainerName { + if containerStatus.State.Running == nil { + e.Log.Info("Container not running, skip remount check", + "container", masterContainerName, "pod", masterPodName) + return false + } + + startedAt = &containerStatus.State.Running.StartedAt + break + } + } + if startedAt == nil { + e.Log.Info("Cannot get container start time, skip remount check", "pod name", masterPodName, + "container name", masterContainerName) + return false + } + + // If mount time is earlier than master container start time, remount is necessary + if runtime.Status.MountTime == nil || runtime.Status.MountTime.Before(startedAt) { + return true + } + + return false +} diff --git a/pkg/ddc/cache/engine/ufs_test.go b/pkg/ddc/cache/engine/ufs_test.go new file mode 100644 index 00000000000..fca0540c0fc --- /dev/null +++ b/pkg/ddc/cache/engine/ufs_test.go @@ -0,0 +1,624 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "time" + + "github.com/agiledragon/gomonkey/v2" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// MockExecutions is a mock implementation of CacheFileUtil for testing +type MockExecutions struct { + MountFunc func(command []string, timeout time.Duration) (stdout string, err error) +} + +func (m *MockExecutions) Mount(command []string, timeout time.Duration) (stdout string, err error) { + if m.MountFunc != nil { + return m.MountFunc(command, timeout) + } + return "", nil +} + +var _ = Describe("CacheEngine UpdateOnUFSChange Tests", Label("pkg.ddc.cache.engine.ufs_test.go"), func() { + var ( + fakeClient client.Client + engine *CacheEngine + testScheme *runtime.Scheme + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(testScheme)).NotTo(HaveOccurred()) + Expect(corev1.AddToScheme(testScheme)).NotTo(HaveOccurred()) + + fakeClient = fake.NewClientBuilder().WithScheme(testScheme).WithStatusSubresource(&datav1alpha1.CacheRuntime{}, &datav1alpha1.Dataset{}).Build() + + log := GinkgoLogr + recorder := record.NewFakeRecorder(100) + + engine = &CacheEngine{ + Client: fakeClient, + Log: log, + Recorder: recorder, + name: "test-runtime", + namespace: "default", + runtimeType: "cache", + engineImpl: "cache", + gracefulShutdownLimits: 5, + retryShutdown: 0, + syncRetryDuration: 5 * time.Second, + timeOfLastSync: time.Now(), + } + }) + + Describe("PrepareUFS Tests (Public Method)", func() { + var ( + runtimeClass *datav1alpha1.CacheRuntimeClass + patches *gomonkey.Patches + ) + + BeforeEach(func() { + // Reset patches before each test + if patches != nil { + patches.Reset() + } + }) + + AfterEach(func() { + if patches != nil { + patches.Reset() + } + }) + + Context("when no mount ufs command found", func() { + It("should return empty string and no error when topology is nil", func() { + runtimeClass = &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + } + stdout, err := engine.PrepareUFS(runtimeClass) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(BeEmpty()) + }) + + It("should return empty string and no error when master is nil", func() { + runtimeClass = &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{}, + } + stdout, err := engine.PrepareUFS(runtimeClass) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(BeEmpty()) + }) + + It("should return empty string and no error when execution entries is nil", func() { + runtimeClass = &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{}, + }, + } + stdout, err := engine.PrepareUFS(runtimeClass) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(BeEmpty()) + }) + + It("should return empty string and no error when MountUFS is nil", func() { + runtimeClass = &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + ExecutionEntries: &datav1alpha1.ExecutionEntries{}, + }, + }, + } + stdout, err := engine.PrepareUFS(runtimeClass) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(BeEmpty()) + }) + }) + + Context("when MountUFS exists but getMasterPodInfo fails", func() { + BeforeEach(func() { + runtimeClass = &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + ExecutionEntries: &datav1alpha1.ExecutionEntries{ + MountUFS: &datav1alpha1.ExecutionCommonEntry{ + Command: []string{"/mount.sh"}, + }, + }, + }, + }, + } + }) + + It("should return error from getMasterPodInfo", func() { + // getMasterPodInfo will fail because Template is not set + stdout, err := engine.PrepareUFS(runtimeClass) + Expect(err).To(HaveOccurred()) + Expect(stdout).To(BeEmpty()) + }) + }) + + Context("when MountUFS executes successfully", func() { + It("should return stdout from Mount command with valid JSON output", func() { + runtimeClass = &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "master", + }, + }, + }, + }, + ExecutionEntries: &datav1alpha1.ExecutionEntries{ + MountUFS: &datav1alpha1.ExecutionCommonEntry{ + Command: []string{"/mount.sh"}, + TimeoutSeconds: 30, + }, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + mockExecutions := &MockExecutions{MountFunc: func(command []string, timeout time.Duration) (stdout string, err error) { + return `{"mounted": ["/mount1", "/mount2"]}`, nil + }} + patches = gomonkey.ApplyFunc(NewCacheFileUtil, func(podName, containerName, namespace string, log logr.Logger) CacheFileUtil { + return mockExecutions + }) + + stdout, err := engine.PrepareUFS(runtimeClass) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(Equal(`{"mounted": ["/mount1", "/mount2"]}`)) + }) + + It("should return empty stdout when Mount returns empty output", func() { + runtimeClass = &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "master", + }, + }, + }, + }, + ExecutionEntries: &datav1alpha1.ExecutionEntries{ + MountUFS: &datav1alpha1.ExecutionCommonEntry{ + Command: []string{"/mount.sh"}, + TimeoutSeconds: 30, + }, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + mockExecutions := &MockExecutions{MountFunc: func(command []string, timeout time.Duration) (stdout string, err error) { + return "", nil + }} + patches = gomonkey.ApplyFunc(NewCacheFileUtil, func(podName, containerName, namespace string, log logr.Logger) CacheFileUtil { + return mockExecutions + }) + + stdout, err := engine.PrepareUFS(runtimeClass) + Expect(err).NotTo(HaveOccurred()) + Expect(stdout).To(BeEmpty()) + }) + + It("should return error when Mount command fails", func() { + runtimeClass = &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "master", + }, + }, + }, + }, + ExecutionEntries: &datav1alpha1.ExecutionEntries{ + MountUFS: &datav1alpha1.ExecutionCommonEntry{ + Command: []string{"/mount.sh"}, + TimeoutSeconds: 30, + }, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), runtimeClass)).NotTo(HaveOccurred()) + + mockExecutions := &MockExecutions{MountFunc: func(command []string, timeout time.Duration) (stdout string, err error) { + return "", errors.New("mount command failed") + }} + patches = gomonkey.ApplyFunc(NewCacheFileUtil, func(podName, containerName, namespace string, log logr.Logger) CacheFileUtil { + return mockExecutions + }) + + stdout, err := engine.PrepareUFS(runtimeClass) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("mount command failed")) + Expect(stdout).To(BeEmpty()) + }) + }) + }) + + Describe("UpdateOnUFSChange Tests (Public Method)", func() { + var patches *gomonkey.Patches + + AfterEach(func() { + if patches != nil { + patches.Reset() + } + }) + + // Helper function to update runtime mount time + updateRuntimeMountTime := func(offset time.Duration) { + rt := &datav1alpha1.CacheRuntime{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, rt)).NotTo(HaveOccurred()) + rt.Status.MountTime = &metav1.Time{Time: time.Now().Add(offset)} + Expect(fakeClient.Status().Update(context.Background(), rt)).NotTo(HaveOccurred()) + } + + // Helper function to create complete test environment for UpdateOnUFSChange + setupUpdateOnUFSTest := func(mountFunc func([]string, time.Duration) (string, error)) (*datav1alpha1.CacheRuntimeClass, *datav1alpha1.Dataset, *datav1alpha1.CacheRuntime) { + // Create RuntimeClass with MountUFS + rc := &datav1alpha1.CacheRuntimeClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-class", + }, + FileSystemType: "cache", + Topology: &datav1alpha1.RuntimeTopology{ + Master: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "master", + }, + }, + }, + }, + ExecutionEntries: &datav1alpha1.ExecutionEntries{ + MountUFS: &datav1alpha1.ExecutionCommonEntry{ + Command: []string{"/mount.sh"}, + TimeoutSeconds: 30, + }, + }, + }, + Worker: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + }, + Client: &datav1alpha1.RuntimeComponentDefinition{ + Options: map[string]string{}, + Service: datav1alpha1.RuntimeComponentService{}, + }, + }, + } + Expect(fakeClient.Create(context.Background(), rc)).NotTo(HaveOccurred()) + + // Create Dataset + ds := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + }, + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{ + { + Name: "mount1", + MountPoint: "s3://bucket1/path", + }, + }, + }, + Status: datav1alpha1.DatasetStatus{ + Phase: datav1alpha1.BoundDatasetPhase, + Mounts: []datav1alpha1.Mount{ + { + Name: "mount1", + MountPoint: "s3://bucket1/path", + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), ds)).NotTo(HaveOccurred()) + + // Create CacheRuntime + runtime := &datav1alpha1.CacheRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime", + Namespace: "default", + UID: types.UID("test-uid"), + }, + Spec: datav1alpha1.CacheRuntimeSpec{ + RuntimeClassName: "test-runtime-class", + Master: datav1alpha1.CacheRuntimeMasterSpec{ + Replicas: 1, + }, + }, + Status: datav1alpha1.CacheRuntimeStatus{ + MountTime: &metav1.Time{Time: time.Now().Add(-1 * time.Hour)}, + }, + } + Expect(fakeClient.Create(context.Background(), runtime)).NotTo(HaveOccurred()) + + // Create master pod + masterPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-runtime-master-0", + Namespace: "default", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "master", + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.Time{Time: time.Now().Add(-2 * time.Hour)}, + }, + }, + }, + }, + }, + } + Expect(fakeClient.Create(context.Background(), masterPod)).NotTo(HaveOccurred()) + + // Trigger UFS update by adding a mount + datasetToUpdate := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, datasetToUpdate)).NotTo(HaveOccurred()) + datasetToUpdate.Spec.Mounts = append(datasetToUpdate.Spec.Mounts, datav1alpha1.Mount{ + Name: "mount2", + MountPoint: "s3://bucket2/path", + }) + Expect(fakeClient.Update(context.Background(), datasetToUpdate)).NotTo(HaveOccurred()) + + // Mock newCacheFileUtils if mountFunc is provided + if mountFunc != nil { + mockExecutions := &MockExecutions{MountFunc: mountFunc} + patches = gomonkey.ApplyFunc(NewCacheFileUtil, func(podName, containerName, namespace string, log logr.Logger) CacheFileUtil { + return mockExecutions + }) + } + + return rc, datasetToUpdate, runtime + } + + Context("when no update is needed", func() { + It("should return early when no UFS changes and no remount required", func() { + _, _, rt := setupUpdateOnUFSTest(nil) + + // Remove the added mount to make spec match status (no changes) + dataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, dataset)).NotTo(HaveOccurred()) + dataset.Spec.Mounts = []datav1alpha1.Mount{ + { + Name: "mount1", + MountPoint: "s3://bucket1/path", + }, + } + Expect(fakeClient.Update(context.Background(), dataset)).NotTo(HaveOccurred()) + + // Set recent mount time (after pod start) to avoid remount + updateRuntimeMountTime(-10 * time.Minute) + + err := engine.UpdateOnUFSChange(rt) + Expect(err).NotTo(HaveOccurred()) + + // Verify dataset status remains Bound (not changed to Updating) + updatedDataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedDataset)).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.BoundDatasetPhase)) + }) + }) + + Context("when PrepareUFS returns invalid JSON", func() { + It("should return JSON parse error", func() { + _, _, rt := setupUpdateOnUFSTest(func(command []string, timeout time.Duration) (stdout string, err error) { + return "invalid json output", nil + }) + + err := engine.UpdateOnUFSChange(rt) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to parse mount ufs output")) + + // Verify dataset status was set to Updating before the error + updatedDataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedDataset)).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.UpdatingDatasetPhase)) + }) + }) + + Context("when PrepareUFS returns empty output", func() { + It("should return empty output error", func() { + _, _, rt := setupUpdateOnUFSTest(func(command []string, timeout time.Duration) (stdout string, err error) { + return " ", nil // Whitespace only + }) + + err := engine.UpdateOnUFSChange(rt) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("empty output")) + + // Verify dataset status was set to Updating + updatedDataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedDataset)).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.UpdatingDatasetPhase)) + }) + }) + + Context("when syncDatasetMounts fails due to missing mount point", func() { + It("should return mount point not mounted error", func() { + _, _, rt := setupUpdateOnUFSTest(func(command []string, timeout time.Duration) (stdout string, err error) { + return `{"mounted": ["/mount1"]}`, nil + }) + + err := engine.UpdateOnUFSChange(rt) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("is not yet mounted")) + + // Verify dataset status remains in Updating state + updatedDataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedDataset)).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.UpdatingDatasetPhase)) + }) + }) + + Context("when syncDatasetMounts fails due to unexpected mounted paths", func() { + It("should return unexpected mounted paths error", func() { + _, _, rt := setupUpdateOnUFSTest(func(command []string, timeout time.Duration) (stdout string, err error) { + return `{"mounted": ["/mount1", "/mount2", "/extra-path"]}`, nil + }) + + err := engine.UpdateOnUFSChange(rt) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("unexpected mounted paths remain")) + + // Verify dataset status remains in Updating state + updatedDataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedDataset)).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.UpdatingDatasetPhase)) + }) + }) + + Context("when all mount points are correctly mounted", func() { + It("should successfully update mounts and set status to Bound", func() { + _, _, rt := setupUpdateOnUFSTest(func(command []string, timeout time.Duration) (stdout string, err error) { + return `{"mounted": ["/mount1", "/mount2"]}`, nil + }) + + err := engine.UpdateOnUFSChange(rt) + Expect(err).NotTo(HaveOccurred()) + + // Verify dataset status was updated to Bound + updatedDataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedDataset)).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.BoundDatasetPhase)) + + // Verify runtime mount time was updated + updatedRuntime := &datav1alpha1.CacheRuntime{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedRuntime)).NotTo(HaveOccurred()) + Expect(updatedRuntime.Status.MountTime).NotTo(BeNil()) + }) + }) + + Context("when dataset has native scheme mounts", func() { + It("should skip native scheme mounts and succeed", func() { + _, dataset, rt := setupUpdateOnUFSTest(func(command []string, timeout time.Duration) (stdout string, err error) { + return `{"mounted": ["/mount1", "/mount2"]}`, nil + }) + + // Add a native scheme mount + dataset.Spec.Mounts = append(dataset.Spec.Mounts, datav1alpha1.Mount{ + Name: "native-mount", + MountPoint: "local:///mnt/local", + }) + Expect(fakeClient.Update(context.Background(), dataset)).NotTo(HaveOccurred()) + + err := engine.UpdateOnUFSChange(rt) + Expect(err).NotTo(HaveOccurred()) + + // Verify dataset status was updated to Bound + updatedDataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedDataset)).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.BoundDatasetPhase)) + }) + }) + + Context("when triggered by master pod restart", func() { + It("should detect restart and perform remount", func() { + _, _, rt := setupUpdateOnUFSTest(func(command []string, timeout time.Duration) (stdout string, err error) { + return `{"mounted": ["/mount1", "/mount2"]}`, nil + }) + + // Remove the added mount to make spec match status (no UFS changes) + dataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, dataset)).NotTo(HaveOccurred()) + dataset.Spec.Mounts = []datav1alpha1.Mount{ + { + Name: "mount1", + MountPoint: "s3://bucket1/path", + }, + } + Expect(fakeClient.Update(context.Background(), dataset)).NotTo(HaveOccurred()) + + // Set old mount time (before pod start) to trigger remount + updateRuntimeMountTime(-2 * time.Hour) + + err := engine.UpdateOnUFSChange(rt) + Expect(err).NotTo(HaveOccurred()) + + // Verify dataset status was updated to Bound after remount + updatedDataset := &datav1alpha1.Dataset{} + Expect(fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-runtime", Namespace: "default"}, updatedDataset)).NotTo(HaveOccurred()) + Expect(updatedDataset.Status.Phase).To(Equal(datav1alpha1.BoundDatasetPhase)) + }) + }) + }) +}) diff --git a/test/gha-e2e/curvine/mount.yaml b/test/gha-e2e/curvine/mount.yaml index 4cc1d7a903a..2a8a6049f9d 100644 --- a/test/gha-e2e/curvine/mount.yaml +++ b/test/gha-e2e/curvine/mount.yaml @@ -28,12 +28,12 @@ data: path=$(echo "$item" | sed -nE 's/.*"path":"([^"]+)".*/\1/p') mounted=$(/app/curvine/bin/cv mount) - if echo "$mounted" | grep "$mountPoint"; then + if echo "$mounted" | grep "$mountPoint" >/dev/null 2>&1; then continue; fi - [ -z "$mountPoint" ] && { echo "mountPoint is not set or empty"; exit 1; } - [ -z "$path" ] && { echo "path is not set or empty"; exit 1; } + [ -z "$mountPoint" ] && { echo "mountPoint is not set or empty" >&2; exit 1; } + [ -z "$path" ] && { echo "path is not set or empty" >&2; exit 1; } # ==================== 字段提取加p 仅输出匹配结果 ==================== # Extract encryptOptions paths from the JSON @@ -70,9 +70,9 @@ data: # 必填参数校验 CV_PARAMS="" - [ -z "$endpoint" ] && { echo "endpoint option is not set or empty"; exit 1; } - [ -z "$access" ] && { echo "access option is not set or empty"; exit 1; } - [ -z "$secret" ] && { echo "secret option is not set or empty"; exit 1; } + [ -z "$endpoint" ] && { echo "endpoint option is not set or empty" >&2; exit 1; } + [ -z "$access" ] && { echo "access option is not set or empty" >&2; exit 1; } + [ -z "$secret" ] && { echo "secret option is not set or empty" >&2; exit 1; } # 拼接必填参数 CV_PARAMS="$CV_PARAMS -c s3.endpoint_url=$endpoint" @@ -85,11 +85,62 @@ data: # 最终命令 CMD="/app/curvine/bin/cv mount $mountPoint $path --check-path-consist false $CV_PARAMS" - echo "执行命令:$CMD" + #echo "执行命令:$CMD" - eval "$CMD" + eval "$CMD" > /dev/null 2>&1 if [ $? -ne 0 ]; then - echo "mount $mountPoint failed" + echo "mount $mountPoint failed" >&2 exit 1 fi done + + # /app/curvine/bin/cv mount 的输出如下所示 + # Mount Table: + # +-------------+----------------+-------------+-------------+-------------------+ + # | ID | Curvine Path | UFS Path | Write Type | Read Verify UFS | + # +-------------+----------------+-------------+-------------+-------------------+ + # | 1664140379 | /minio | s3://test/ | fs_mode | no | + # +-------------+----------------+-------------+-------------+-------------------+ + # Total mount points: 1 + + # 获取所有已挂载的 Curvine Path 并组装成 CacheRuntimeMountUfsOutput 格式的 JSON + getCurvinePathsAsJson() { + local output + output=$(/app/curvine/bin/cv mount) + + # 提取表格中的数据行(跳过表头和分隔线) + local paths=() + while IFS= read -r line; do + # 跳过空行、表头行和分隔线 + if [[ -z "$line" ]] || [[ "$line" == *"|"*"ID"* ]] || [[ "$line" == *"+"*"-"*"+"* ]] || [[ ! "$line" =~ ^\| ]] ; then + continue + fi + + # 提取第二列(Curvine Path)的值 + # 格式: | ID | Curvine Path | UFS Path | Write Type | Read Verify UFS | + local path + path=$(echo "$line" | sed -E 's/^\|[[:space:]]*[^|]+\|[[:space:]]*([^|]+)\|.*/\1/' | sed 's/[[:space:]]*$//') + + if [[ -n "$path" ]]; then + paths+=("$path") + fi + done <<< "$output" + + # 组装成 CacheRuntimeMountUfsOutput 格式的 JSON: {"mounted": ["path1", "path2", ...]} + local json='{"mounted":[' + for i in "${!paths[@]}"; do + if [ $i -gt 0 ]; then + json+="," + fi + # 转义特殊字符并添加到 JSON 数组 + local escaped_path + escaped_path=$(echo "${paths[$i]}" | sed 's/\\/\\\\/g; s/"/\\"/g') + json+="\"${escaped_path}\"" + done + json+=']}' + echo "$json" + } + + # 调用函数获取 CacheRuntimeMountUfsOutput 格式的 JSON + CURVINE_PATHS_JSON=$(getCurvinePathsAsJson) + echo "$CURVINE_PATHS_JSON"