Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pkg/k8s/persistent_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,47 @@ func DeletePersistentVolumeClaims(ctx context.Context, namespaceOverride string,
return client.CoreV1().PersistentVolumeClaims(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions)
}

// DeletePersistentVolumeClaim deletes a single PVC by name
func DeletePersistentVolumeClaim(ctx context.Context, name, namespaceOverride string) error {
client, namespace, err := NewClientAndResolvedNamespace(namespaceOverride)
if err != nil {
return err
}

err = client.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return fmt.Errorf("failed to delete PVC %s: %w", name, err)
}
return nil
}

// WaitForPVCDeletion waits for a PVC to be fully deleted (not just in Terminating state)
func WaitForPVCDeletion(ctx context.Context, name, namespaceOverride string) error {
client, namespace, err := NewClientAndResolvedNamespace(namespaceOverride)
if err != nil {
return err
}

// Poll until PVC is gone
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
_, err := client.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, name, metav1.GetOptions{})
if k8serrors.IsNotFound(err) {
// PVC is fully deleted
return nil
}
if err != nil {
return fmt.Errorf("error checking PVC deletion status: %w", err)
}
// PVC still exists (possibly Terminating), wait and retry
time.Sleep(time.Second)
}
}
}

var TarImage = "ghcr.io/knative/func-utils:v2"

// UploadToVolume uploads files (passed in form of tar stream) into volume.
Expand Down
40 changes: 34 additions & 6 deletions pkg/pipelines/tekton/pipelines_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,18 +578,46 @@ func findNewestPipelineRunWithRetry(ctx context.Context, f fn.Function, namespac

// allows simple mocking in unit tests, use with caution regarding concurrency
var createPersistentVolumeClaim = k8s.CreatePersistentVolumeClaim
var getPersistentVolumeClaim = k8s.GetPersistentVolumeClaim
var deletePersistentVolumeClaim = k8s.DeletePersistentVolumeClaim
var waitForPVCDeletion = k8s.WaitForPVCDeletion

func createPipelinePersistentVolumeClaim(ctx context.Context, f fn.Function, namespace string, labels map[string]string) error {
var err error
pvcs := DefaultPersistentVolumeClaimSize
pvcName := getPipelinePvcName(f)

// Check if PVC already exists
existingPVC, err := getPersistentVolumeClaim(ctx, pvcName, namespace)
if err != nil && !k8serrors.IsNotFound(err) {
return fmt.Errorf("failed to check existing PVC: %w", err)
}

// If PVC exists, delete it and wait for full deletion to ensure clean workspace
if existingPVC != nil {
err = deletePersistentVolumeClaim(ctx, pvcName, namespace)
if err != nil {
return fmt.Errorf("failed to delete existing PVC: %w", err)
}

// Wait for PVC to be fully deleted (not just Terminating)
err = waitForPVCDeletion(ctx, pvcName, namespace)
if err != nil {
return fmt.Errorf("failed waiting for PVC deletion: %w", err)
}
}

// Create fresh PVC
var pvcs resource.Quantity
pvcs = DefaultPersistentVolumeClaimSize
if f.Build.PVCSize != "" {
if pvcs, err = resource.ParseQuantity(f.Build.PVCSize); err != nil {
return fmt.Errorf("PVC size value could not be parsed. %w", err)
return fmt.Errorf("PVC size value could not be parsed: %w", err)
}
}
err = createPersistentVolumeClaim(ctx, getPipelinePvcName(f), namespace, labels, f.Deploy.Annotations, corev1.ReadWriteOnce, pvcs, f.Build.RemoteStorageClass)
if err != nil && !k8serrors.IsAlreadyExists(err) {
return fmt.Errorf("problem creating persistent volume claim: %v", err)

err = createPersistentVolumeClaim(ctx, pvcName, namespace, labels, f.Deploy.Annotations, corev1.ReadWriteOnce, pvcs, f.Build.RemoteStorageClass)
if err != nil {
return fmt.Errorf("problem creating persistent volume claim: %w", err)
}

return nil
}
96 changes: 79 additions & 17 deletions pkg/pipelines/tekton/pipelines_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ func TestSourcesAsTarStream(t *testing.T) {
}

func Test_createPipelinePersistentVolumeClaim(t *testing.T) {
type mockType func(ctx context.Context, name, namespaceOverride string, labels map[string]string, annotations map[string]string, accessMode corev1.PersistentVolumeAccessMode, resourceRequest resource.Quantity, storageClass string) (err error)
type mockCreateType func(ctx context.Context, name, namespaceOverride string, labels map[string]string, annotations map[string]string, accessMode corev1.PersistentVolumeAccessMode, resourceRequest resource.Quantity, storageClass string) (err error)
type mockGetType func(ctx context.Context, name, namespaceOverride string) (*corev1.PersistentVolumeClaim, error)
type mockDeleteType func(ctx context.Context, name, namespaceOverride string) error
type mockWaitType func(ctx context.Context, name, namespaceOverride string) error

type args struct {
ctx context.Context
Expand All @@ -92,10 +95,13 @@ func Test_createPipelinePersistentVolumeClaim(t *testing.T) {
size string
}
tests := []struct {
name string
args args
mock mockType
wantErr bool
name string
args args
mockCreate mockCreateType
mockGet mockGetType
mockDelete mockDeleteType
mockWait mockWaitType
wantErr bool
}{
{
name: "returns error if pvc creation failed",
Expand All @@ -106,46 +112,102 @@ func Test_createPipelinePersistentVolumeClaim(t *testing.T) {
labels: nil,
size: DefaultPersistentVolumeClaimSize.String(),
},
mock: func(ctx context.Context, name, namespaceOverride string, labels map[string]string, annotations map[string]string, accessMode corev1.PersistentVolumeAccessMode, resourceRequest resource.Quantity, storageClass string) (err error) {
mockGet: func(ctx context.Context, name, namespaceOverride string) (*corev1.PersistentVolumeClaim, error) {
return nil, &apiErrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonNotFound}}
},
mockCreate: func(ctx context.Context, name, namespaceOverride string, labels map[string]string, annotations map[string]string, accessMode corev1.PersistentVolumeAccessMode, resourceRequest resource.Quantity, storageClass string) (err error) {
return errors.New("creation of pvc failed")
},
wantErr: true,
},
{
name: "returns nil if pvc already exists",
name: "deletes and recreates if pvc already exists",
args: args{
ctx: t.Context(),
f: fn.Function{},
namespace: "test-ns",
labels: nil,
size: DefaultPersistentVolumeClaimSize.String(),
},
mock: func(ctx context.Context, name, namespaceOverride string, labels map[string]string, annotations map[string]string, accessMode corev1.PersistentVolumeAccessMode, resourceRequest resource.Quantity, storageClass string) (err error) {
return &apiErrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonAlreadyExists}}
mockGet: func(ctx context.Context, name, namespaceOverride string) (*corev1.PersistentVolumeClaim, error) {
return &corev1.PersistentVolumeClaim{}, nil
},
mockDelete: func(ctx context.Context, name, namespaceOverride string) error {
return nil
},
mockWait: func(ctx context.Context, name, namespaceOverride string) error {
return nil
},
mockCreate: func(ctx context.Context, name, namespaceOverride string, labels map[string]string, annotations map[string]string, accessMode corev1.PersistentVolumeAccessMode, resourceRequest resource.Quantity, storageClass string) (err error) {
return nil
},
wantErr: false,
},
{
name: "returns err if namespace not defined and default returns an err",
name: "returns error if deletion fails",
args: args{
ctx: t.Context(),
f: fn.Function{},
namespace: "test-ns",
labels: nil,
size: DefaultPersistentVolumeClaimSize.String(),
},
mockGet: func(ctx context.Context, name, namespaceOverride string) (*corev1.PersistentVolumeClaim, error) {
return &corev1.PersistentVolumeClaim{}, nil
},
mockDelete: func(ctx context.Context, name, namespaceOverride string) error {
return errors.New("deletion failed")
},
wantErr: true,
},
{
name: "returns error if waiting for deletion fails",
args: args{
ctx: t.Context(),
f: fn.Function{},
namespace: "",
namespace: "test-ns",
labels: nil,
size: DefaultPersistentVolumeClaimSize.String(),
},
mock: func(ctx context.Context, name, namespaceOverride string, labels map[string]string, annotations map[string]string, accessMode corev1.PersistentVolumeAccessMode, resourceRequest resource.Quantity, storageClass string) (err error) {
return errors.New("no namespace defined")
mockGet: func(ctx context.Context, name, namespaceOverride string) (*corev1.PersistentVolumeClaim, error) {
return &corev1.PersistentVolumeClaim{}, nil
},
mockDelete: func(ctx context.Context, name, namespaceOverride string) error {
return nil
},
mockWait: func(ctx context.Context, name, namespaceOverride string) error {
return errors.New("wait for deletion failed")
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { // save current function and restore it at the end
old := createPersistentVolumeClaim
defer func() { createPersistentVolumeClaim = old }()
t.Run(tt.name, func(t *testing.T) {
// save current functions and restore them at the end
oldCreate := createPersistentVolumeClaim
oldGet := getPersistentVolumeClaim
oldDelete := deletePersistentVolumeClaim
oldWait := waitForPVCDeletion
defer func() {
createPersistentVolumeClaim = oldCreate
getPersistentVolumeClaim = oldGet
deletePersistentVolumeClaim = oldDelete
waitForPVCDeletion = oldWait
}()

if tt.mockCreate != nil {
createPersistentVolumeClaim = tt.mockCreate
}
if tt.mockGet != nil {
getPersistentVolumeClaim = tt.mockGet
}
if tt.mockDelete != nil {
deletePersistentVolumeClaim = tt.mockDelete
}
if tt.mockWait != nil {
waitForPVCDeletion = tt.mockWait
}

createPersistentVolumeClaim = tt.mock
tt.args.f.Build.PVCSize = tt.args.size
if err := createPipelinePersistentVolumeClaim(tt.args.ctx, tt.args.f, tt.args.namespace, tt.args.labels); (err != nil) != tt.wantErr {
t.Errorf("createPipelinePersistentVolumeClaim() error = %v, wantErr %v", err, tt.wantErr)
Expand Down