Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 125 additions & 39 deletions pkg/k8s/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,92 @@ func WithDeployerDecorator(decorator deployer.DeployDecorator) DeployerOpt {
}
}

// References holds the names of cluster resources referenced by a function.
// It is populated during deployment manifest generation and consumed by
// CheckResourcesArePresent to validate that every referenced resource exists.
type References struct {
Secrets sets.Set[string]
ConfigMaps sets.Set[string]
PVCs sets.Set[string]
}

// Tracker accumulates References across a chain of processing calls.
// Using a receiver ensures all processing steps write into the same sets,
// making it structurally impossible for any caller to silently bypass
// validation by threading a disconnected set through the call chain.
type Tracker struct {
References
}

// NewTracker returns a Tracker with all sets initialized and ready for use.
func NewTracker() *Tracker {
return &Tracker{
References: References{
Secrets: sets.New[string](),
ConfigMaps: sets.New[string](),
PVCs: sets.New[string](),
},
}
}
Comment thread
Ankitsinghsisodya marked this conversation as resolved.

// ensureInit initializes any nil sets in the embedded References so that
// methods on a zero-value or partially-constructed Tracker never panic on a
// nil-map Insert.
func (t *Tracker) ensureInit() {
if t.Secrets == nil {
t.Secrets = sets.New[string]()
}
if t.ConfigMaps == nil {
t.ConfigMaps = sets.New[string]()
}
if t.PVCs == nil {
t.PVCs = sets.New[string]()
}
}

// ProcessEnvs is a package-level wrapper kept for backwards compatibility.
//
// Deprecated: Create a Tracker via NewTracker() and call its ProcessEnvs
// method instead. The out-parameter sets are populated from the tracker's
// accumulated references after the call.
func ProcessEnvs(envs []fn.Env, referencedSecrets, referencedConfigMaps *sets.Set[string]) ([]corev1.EnvVar, []corev1.EnvFromSource, error) {
t := NewTracker()
vars, from, err := t.ProcessEnvs(envs)
if err != nil {
return nil, nil, err
}
if referencedSecrets != nil {
referencedSecrets.Insert(t.Secrets.UnsortedList()...)
}
if referencedConfigMaps != nil {
referencedConfigMaps.Insert(t.ConfigMaps.UnsortedList()...)
}
return vars, from, nil
}

// ProcessVolumes is a package-level wrapper kept for backwards compatibility.
//
// Deprecated: Create a Tracker via NewTracker() and call its ProcessVolumes
// method instead. The out-parameter sets are populated from the tracker's
// accumulated references after the call.
func ProcessVolumes(volumes []fn.Volume, referencedSecrets, referencedConfigMaps, referencedPVCs *sets.Set[string]) ([]corev1.Volume, []corev1.VolumeMount, error) {
t := NewTracker()
vols, mounts, err := t.ProcessVolumes(volumes)
if err != nil {
return nil, nil, err
}
if referencedSecrets != nil {
referencedSecrets.Insert(t.Secrets.UnsortedList()...)
}
if referencedConfigMaps != nil {
referencedConfigMaps.Insert(t.ConfigMaps.UnsortedList()...)
}
if referencedPVCs != nil {
referencedPVCs.Insert(t.PVCs.UnsortedList()...)
}
return vols, mounts, nil
}

func onClusterFix(f fn.Function) fn.Function {
// This only exists because of a bootstrapping problem with On-Cluster
// builds: It appears that, when sending a function to be built on-cluster
Expand Down Expand Up @@ -147,16 +233,14 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu
var status fn.Status
if err == nil {
// Update the existing function
referencedSecrets := sets.New[string]()
referencedConfigMaps := sets.New[string]()
referencedPVCs := sets.New[string]()
tracker := NewTracker()

deployment, err := d.generateDeployment(f, namespace, daprInstalled, &referencedSecrets, &referencedConfigMaps, &referencedPVCs)
deployment, err := d.generateDeployment(f, namespace, daprInstalled, tracker)
if err != nil {
return fn.DeploymentResult{}, fmt.Errorf("failed to generate deployment resources: %w", err)
}

if err = CheckResourcesArePresent(ctx, namespace, &referencedSecrets, &referencedConfigMaps, &referencedPVCs, f.Deploy.ServiceAccountName, f.Deploy.ImagePullSecret); err != nil {
if err = CheckResourcesArePresent(ctx, namespace, tracker.References, f.Deploy.ServiceAccountName, f.Deploy.ImagePullSecret); err != nil {
return fn.DeploymentResult{}, fmt.Errorf("failed to validate referenced resources: %w", err)
}

Expand Down Expand Up @@ -196,16 +280,14 @@ func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResu
return fn.DeploymentResult{}, fmt.Errorf("failed to check for existing deployment: %w", err)
}

referencedSecrets := sets.New[string]()
referencedConfigMaps := sets.New[string]()
referencedPVCs := sets.New[string]()
tracker := NewTracker()

deployment, err := d.generateDeployment(f, namespace, daprInstalled, &referencedSecrets, &referencedConfigMaps, &referencedPVCs)
deployment, err := d.generateDeployment(f, namespace, daprInstalled, tracker)
if err != nil {
return fn.DeploymentResult{}, fmt.Errorf("failed to generate deployment resources: %w", err)
}

if err = CheckResourcesArePresent(ctx, namespace, &referencedSecrets, &referencedConfigMaps, &referencedPVCs, f.Deploy.ServiceAccountName, f.Deploy.ImagePullSecret); err != nil {
if err = CheckResourcesArePresent(ctx, namespace, tracker.References, f.Deploy.ServiceAccountName, f.Deploy.ImagePullSecret); err != nil {
return fn.DeploymentResult{}, fmt.Errorf("failed to validate referenced resources: %w", err)
}

Expand Down Expand Up @@ -379,7 +461,7 @@ func deleteStaleTriggers(ctx context.Context, eventingClient clienteventingv1.Kn
return nil
}

func (d *Deployer) generateDeployment(f fn.Function, namespace string, daprInstalled bool, referencedSecrets, referencedConfigMaps, referencedPVCs *sets.Set[string]) (*appsv1.Deployment, error) {
func (d *Deployer) generateDeployment(f fn.Function, namespace string, daprInstalled bool, tracker *Tracker) (*appsv1.Deployment, error) {
labels, err := deployer.GenerateCommonLabels(f, d.decorator)
if err != nil {
return nil, err
Expand All @@ -391,12 +473,12 @@ func (d *Deployer) generateDeployment(f fn.Function, namespace string, daprInsta
podAnnotations := make(map[string]string)
maps.Copy(podAnnotations, annotations)

envVars, envFrom, err := ProcessEnvs(f.Run.Envs, referencedSecrets, referencedConfigMaps)
envVars, envFrom, err := tracker.ProcessEnvs(f.Run.Envs)
if err != nil {
return nil, fmt.Errorf("failed to process environment variables: %w", err)
}

volumes, volumeMounts, err := ProcessVolumes(f.Run.Volumes, referencedSecrets, referencedConfigMaps, referencedPVCs)
volumes, volumeMounts, err := tracker.ProcessVolumes(f.Run.Volumes)
if err != nil {
return nil, fmt.Errorf("failed to process volumes: %w", err)
}
Expand Down Expand Up @@ -488,11 +570,13 @@ func (d *Deployer) generateService(f fn.Function, namespace string, daprInstalle
return service, nil
}

// CheckResourcesArePresent returns error if Secrets or ConfigMaps
// referenced in input sets are not deployed on the cluster in the specified namespace
func CheckResourcesArePresent(ctx context.Context, namespace string, referencedSecrets, referencedConfigMaps, referencedPVCs *sets.Set[string], referencedServiceAccount, imagePullSecret string) error {
// CheckResourcesArePresent returns an error if any of the cluster resources
// referenced by refs — Secrets, ConfigMaps, or PersistentVolumeClaims — are
// absent from the given namespace, or if the optional ServiceAccount or
// imagePullSecret do not exist there.
func CheckResourcesArePresent(ctx context.Context, namespace string, refs References, referencedServiceAccount, imagePullSecret string) error {
errMsg := ""
for s := range *referencedSecrets {
for s := range refs.Secrets {
_, err := GetSecret(ctx, s, namespace)
if err != nil {
if errors.IsForbidden(err) {
Expand All @@ -503,14 +587,14 @@ func CheckResourcesArePresent(ctx context.Context, namespace string, referencedS
}
}

for cm := range *referencedConfigMaps {
for cm := range refs.ConfigMaps {
_, err := GetConfigMap(ctx, cm, namespace)
if err != nil {
errMsg += fmt.Sprintf(" referenced ConfigMap \"%s\" is not present in namespace \"%s\"\n", cm, namespace)
}
}

for pvc := range *referencedPVCs {
for pvc := range refs.PVCs {
_, err := GetPersistentVolumeClaim(ctx, pvc, namespace)
if err != nil {
errMsg += fmt.Sprintf(" referenced PersistentVolumeClaim \"%s\" is not present in namespace \"%s\"\n", pvc, namespace)
Expand Down Expand Up @@ -609,7 +693,8 @@ func SetSecurityContext(container *corev1.Container) {
// - name: EXAMPLE4
// value: {{ configMap:configMapName:key }} # ENV from a key in ConfigMap
// - value: {{ configMap:configMapName }} # all key-pair values from ConfigMap are set as ENV
func ProcessEnvs(envs []fn.Env, referencedSecrets, referencedConfigMaps *sets.Set[string]) ([]corev1.EnvVar, []corev1.EnvFromSource, error) {
func (t *Tracker) ProcessEnvs(envs []fn.Env) ([]corev1.EnvVar, []corev1.EnvFromSource, error) {
t.ensureInit()

envs = withOpenAddress(envs) // prepends ADDRESS=0.0.0.0 if not extant

Expand All @@ -620,7 +705,7 @@ func ProcessEnvs(envs []fn.Env, referencedSecrets, referencedConfigMaps *sets.Se
if env.Name == nil && env.Value != nil {
// all key-pair values from secret/configMap are set as ENV, eg. {{ secret:secretName }} or {{ configMap:configMapName }}
if strings.HasPrefix(*env.Value, "{{") {
envFromSource, err := createEnvFromSource(*env.Value, referencedSecrets, referencedConfigMaps)
envFromSource, err := t.createEnvFromSource(*env.Value)
if err != nil {
return nil, nil, err
}
Expand All @@ -632,7 +717,7 @@ func ProcessEnvs(envs []fn.Env, referencedSecrets, referencedConfigMaps *sets.Se
slices := strings.Split(strings.Trim(*env.Value, "{} "), ":")
if len(slices) == 3 {
// ENV from a key in secret/configMap, eg. FOO={{ secret:secretName:key }} FOO={{ configMap:configMapName.key }}
valueFrom, err := createEnvVarSource(slices, referencedSecrets, referencedConfigMaps)
valueFrom, err := t.createEnvVarSource(slices)
envVars = append(envVars, corev1.EnvVar{Name: *env.Name, ValueFrom: valueFrom})
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -698,7 +783,7 @@ func withOpenAddress(ee []fn.Env) []fn.Env {
return ee
}

func createEnvFromSource(value string, referencedSecrets, referencedConfigMaps *sets.Set[string]) (*corev1.EnvFromSource, error) {
func (t *Tracker) createEnvFromSource(value string) (*corev1.EnvFromSource, error) {
slices := strings.Split(strings.Trim(value, "{} "), ":")
if len(slices) != 2 {
return nil, fmt.Errorf("env requires a value in form \"resourceType:name\" where \"resourceType\" can be one of \"configMap\" or \"secret\"; got %q", slices)
Expand All @@ -719,17 +804,17 @@ func createEnvFromSource(value string, referencedSecrets, referencedConfigMaps *
Name: sourceName,
}}

if !referencedConfigMaps.Has(sourceName) {
referencedConfigMaps.Insert(sourceName)
if !t.ConfigMaps.Has(sourceName) {
t.ConfigMaps.Insert(sourceName)
}
case "secret":
sourceType = "Secret"
envVarSource.SecretRef = &corev1.SecretEnvSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: sourceName,
}}
if !referencedSecrets.Has(sourceName) {
referencedSecrets.Insert(sourceName)
if !t.Secrets.Has(sourceName) {
t.Secrets.Insert(sourceName)
}
default:
return nil, fmt.Errorf("unsupported env source type %q; supported source types are \"configMap\" or \"secret\"", slices[0])
Expand All @@ -742,7 +827,7 @@ func createEnvFromSource(value string, referencedSecrets, referencedConfigMaps *
return &envVarSource, nil
}

func createEnvVarSource(slices []string, referencedSecrets, referencedConfigMaps *sets.Set[string]) (*corev1.EnvVarSource, error) {
func (t *Tracker) createEnvVarSource(slices []string) (*corev1.EnvVarSource, error) {
if len(slices) != 3 {
return nil, fmt.Errorf("env requires a value in form \"resourceType:name:key\" where \"resourceType\" can be one of \"configMap\" or \"secret\"; got %q", slices)
}
Expand All @@ -764,8 +849,8 @@ func createEnvVarSource(slices []string, referencedSecrets, referencedConfigMaps
},
Key: sourceKey}

if !referencedConfigMaps.Has(sourceName) {
referencedConfigMaps.Insert(sourceName)
if !t.ConfigMaps.Has(sourceName) {
t.ConfigMaps.Insert(sourceName)
}
case "secret":
sourceType = "Secret"
Expand All @@ -775,8 +860,8 @@ func createEnvVarSource(slices []string, referencedSecrets, referencedConfigMaps
},
Key: sourceKey}

if !referencedSecrets.Has(sourceName) {
referencedSecrets.Insert(sourceName)
if !t.Secrets.Has(sourceName) {
t.Secrets.Insert(sourceName)
}
default:
return nil, fmt.Errorf("unsupported env source type %q; supported source types are \"configMap\" or \"secret\"", slices[0])
Expand Down Expand Up @@ -826,7 +911,8 @@ func processLocalEnvValue(val string) (string, error) {
// path: /etc/secret-volume
// - emptyDir: {} # mount EmptyDir as Volume
// path: /etc/configMap-volume
func ProcessVolumes(volumes []fn.Volume, referencedSecrets, referencedConfigMaps, referencedPVCs *sets.Set[string]) ([]corev1.Volume, []corev1.VolumeMount, error) {
func (t *Tracker) ProcessVolumes(volumes []fn.Volume) ([]corev1.Volume, []corev1.VolumeMount, error) {
t.ensureInit()
createdVolumes := sets.NewString()
usedPaths := sets.NewString()

Expand All @@ -851,8 +937,8 @@ func ProcessVolumes(volumes []fn.Volume, referencedSecrets, referencedConfigMaps
})
createdVolumes.Insert(volumeName)

if !referencedSecrets.Has(*vol.Secret) {
referencedSecrets.Insert(*vol.Secret)
if !t.Secrets.Has(*vol.Secret) {
t.Secrets.Insert(*vol.Secret)
}
}
} else if vol.ConfigMap != nil {
Expand All @@ -871,8 +957,8 @@ func ProcessVolumes(volumes []fn.Volume, referencedSecrets, referencedConfigMaps
})
createdVolumes.Insert(volumeName)

if !referencedConfigMaps.Has(*vol.ConfigMap) {
referencedConfigMaps.Insert(*vol.ConfigMap)
if !t.ConfigMaps.Has(*vol.ConfigMap) {
t.ConfigMaps.Insert(*vol.ConfigMap)
}
}
} else if vol.PersistentVolumeClaim != nil {
Expand All @@ -890,8 +976,8 @@ func ProcessVolumes(volumes []fn.Volume, referencedSecrets, referencedConfigMaps
})
createdVolumes.Insert(volumeName)

if !referencedPVCs.Has(*vol.PersistentVolumeClaim.ClaimName) {
referencedPVCs.Insert(*vol.PersistentVolumeClaim.ClaimName)
if !t.PVCs.Has(*vol.PersistentVolumeClaim.ClaimName) {
t.PVCs.Insert(*vol.PersistentVolumeClaim.ClaimName)
}
}
} else if vol.EmptyDir != nil {
Expand Down
9 changes: 4 additions & 5 deletions pkg/k8s/deployer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
fn "knative.dev/func/pkg/functions"
)

Expand Down Expand Up @@ -121,8 +120,8 @@ func Test_generateDeployment_ImagePullSecret(t *testing.T) {
ImagePullSecret: "my-registry-secret",
},
}
rs, rcm, rpvc := sets.New[string](), sets.New[string](), sets.New[string]()
deployment, err := d.generateDeployment(f, "default", false, &rs, &rcm, &rpvc)
tracker := NewTracker()
deployment, err := d.generateDeployment(f, "default", false, tracker)
if err != nil {
t.Fatal(err)
}
Expand All @@ -139,8 +138,8 @@ func Test_generateDeployment_ImagePullSecret(t *testing.T) {
Image: "registry.example.com/test:latest",
},
}
rs, rcm, rpvc := sets.New[string](), sets.New[string](), sets.New[string]()
deployment, err := d.generateDeployment(f, "default", false, &rs, &rcm, &rpvc)
tracker := NewTracker()
deployment, err := d.generateDeployment(f, "default", false, tracker)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading
Loading