Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 75 additions & 26 deletions pkg/ddc/base/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -52,8 +54,6 @@ type Conventions interface {

GetDatasetNumLabelName() string

GetWorkerStatefulsetName() string

GetExclusiveLabelValue() string
}

Expand All @@ -63,6 +63,9 @@ type Conventions interface {
type RuntimeInfoInterface interface {
Conventions

// GetWorkerPods returns the worker object and selector for runtime workers.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment for GetWorkerPods is inaccurate as it mentions returning a "worker object and selector", but the method signature returns a slice of pods.

Suggested change
// GetWorkerPods returns the worker object and selector for runtime workers.
// GetWorkerPods returns the pods for runtime workers.

GetWorkerPods(client client.Client) ([]corev1.Pod, error)

GetTieredStoreInfo() TieredStoreInfo

GetName() string
Expand Down Expand Up @@ -375,6 +378,21 @@ func (info *RuntimeInfo) SetupFuseCleanPolicy(policy datav1alpha1.FuseCleanPolic
info.fuse.CleanPolicy = policy
}

func (info *RuntimeInfo) GetWorkerPods(client client.Client) ([]corev1.Pod, error) {
workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace())
if err != nil {
return nil, err
}
Comment on lines +382 to +385
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the worker StatefulSet is not found, GetWorkerPods should return an empty list and no error to avoid failing the reconciliation loop during initial setup. Using utils.IgnoreNotFound is a clean way to handle this.

Suggested change
workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace())
if err != nil {
return nil, err
}
workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace())
if err != nil {
return nil, utils.IgnoreNotFound(err)
}

workerSelector, err := metav1.LabelSelectorAsSelector(workers.Spec.Selector)
if err != nil {
return nil, err
}

workerPods, err := kubeclient.GetPodsForStatefulSet(client, workers, workerSelector)

return workerPods, err
}

func (info *RuntimeInfo) GetFuseCleanPolicy() datav1alpha1.FuseCleanPolicy {
return info.fuse.CleanPolicy
}
Expand Down Expand Up @@ -598,66 +616,97 @@ func GetRuntimeInfo(reader client.Reader, name, namespace string) (runtimeInfo R
return runtimeInfo, err
}

// GetRuntimeStatus gets the runtime status according to the runtime type, name, and namespace.
// This function is primarily responsible for retrieving the current status of a specific runtime
// based on its type from the Kubernetes cluster.
//
// Parameters:
// - client (client.Client): The Kubernetes client used to interact with the API server.
// - runtimeType (string): The type of the runtime (e.g., AlluxioRuntime, JindoRuntime, GooseFSRuntime).
// - name (string): The name of the runtime.
// - namespace (string): The namespace where the runtime is located.
//
// Returns:
// - status (*datav1alpha1.RuntimeStatus): The status of the requested runtime.
// - err (error): Returns an error if the runtime status cannot be retrieved or the runtime type is unsupported, otherwise returns nil.
func GetRuntimeStatus(client client.Client, runtimeType, name, namespace string) (status *datav1alpha1.RuntimeStatus, err error) {
// RuntimeStatusAccessor provides a unified interface to access common status fields across different runtime types
type RuntimeStatusAccessor interface {
// GetCacheAffinity returns the cache affinity from the runtime status
GetCacheAffinity() (*corev1.NodeAffinity, error)
}

// GetRuntimeStatusAccessor returns a unified status accessor for the given runtime
func GetRuntimeStatusAccessor(client client.Client, runtimeType, name, namespace string) (RuntimeStatusAccessor, error) {
switch runtimeType {
case common.AlluxioRuntime, common.JindoRuntime, common.GooseFSRuntime,
common.JuiceFSRuntime, common.EFCRuntime, common.ThinRuntime, common.VineyardRuntime:
status, err := GetDDCRuntimeStatus(client, runtimeType, name, namespace)
if err != nil {
return nil, err
}
return &DDCRuntimeStatusAccessor{status: status}, nil
case common.CacheRuntime:
runtime, err := utils.GetCacheRuntime(client, name, namespace)
if err != nil {
return nil, err
}
return &CacheRuntimeStatusAccessor{status: &runtime.Status}, nil
default:
return nil, fmt.Errorf("fail to get runtime status accessor for runtime type: %s", runtimeType)
}
}

// GetDDCRuntimeStatus retrieves the runtime object based on runtime type for DDC-based runtimes
func GetDDCRuntimeStatus(client client.Client, runtimeType, name, namespace string) (*datav1alpha1.RuntimeStatus, error) {
switch runtimeType {
case common.AlluxioRuntime:
runtime, err := utils.GetAlluxioRuntime(client, name, namespace)
if err != nil {
return status, err
return nil, err
}
return &runtime.Status, nil
case common.JindoRuntime:
runtime, err := utils.GetJindoRuntime(client, name, namespace)
if err != nil {
return status, err
return nil, err
}
return &runtime.Status, nil
case common.GooseFSRuntime:
runtime, err := utils.GetGooseFSRuntime(client, name, namespace)
if err != nil {
return status, err
return nil, err
}
return &runtime.Status, nil
case common.JuiceFSRuntime:
runtime, err := utils.GetJuiceFSRuntime(client, name, namespace)
if err != nil {
return status, err
return nil, err
}
return &runtime.Status, nil
case common.EFCRuntime:
runtime, err := utils.GetEFCRuntime(client, name, namespace)
if err != nil {
return status, err
return nil, err
}
return &runtime.Status, nil
case common.ThinRuntime:
runtime, err := utils.GetThinRuntime(client, name, namespace)
if err != nil {
return status, err
return nil, err
}
return &runtime.Status, nil
case common.VineyardRuntime:
runtime, err := utils.GetVineyardRuntime(client, name, namespace)
if err != nil {
return status, err
return nil, err
}
return &runtime.Status, nil
// TODO: how to handle with cache runtime? (currently used in app pod affinity scene)
default:
err = fmt.Errorf("fail to get runtimeInfo for runtime type: %s", runtimeType)
return nil, err
return nil, fmt.Errorf("unsupported DDC runtime type: %s", runtimeType)
}
}

// DDCRuntimeStatusAccessor implements RuntimeStatusAccessor for DDC-based runtimes (Alluxio, Jindo, GooseFS, etc.)
type DDCRuntimeStatusAccessor struct {
status *datav1alpha1.RuntimeStatus
}

func (d *DDCRuntimeStatusAccessor) GetCacheAffinity() (*corev1.NodeAffinity, error) {
return d.status.CacheAffinity, nil
}

// CacheRuntimeStatusAccessor implements RuntimeStatusAccessor for CacheRuntime
type CacheRuntimeStatusAccessor struct {
status *datav1alpha1.CacheRuntimeStatus
}

func (c *CacheRuntimeStatusAccessor) GetCacheAffinity() (*corev1.NodeAffinity, error) {
return c.status.CacheAffinity, nil
}
3 changes: 2 additions & 1 deletion pkg/ddc/base/runtime_conventions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ var _ = Describe("RuntimeInfo.GetWorkerStatefulsetName", func() {
DescribeTable("returns correct statefulset name",
func(runtimeName, runtimeType, suffix string) {
info, err := BuildRuntimeInfo(runtimeName, testNamespace, runtimeType)
realInfo := info.(*RuntimeInfo)
Expect(err).NotTo(HaveOccurred())
Expect(info.GetWorkerStatefulsetName()).To(Equal(runtimeName + suffix))
Expect(realInfo.GetWorkerStatefulsetName()).To(Equal(runtimeName + suffix))
},
Entry("JindoRuntime uses jindofs suffix", "mydata", common.JindoRuntime, "-jindofs-worker"),
Entry("JindoCacheEngineImpl uses jindofs suffix", "cache", common.JindoCacheEngineImpl, "-jindofs-worker"),
Expand Down
Loading
Loading