diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/application.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/application.go index ca51727db0..40f023e1a8 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/application.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/application.go @@ -168,6 +168,9 @@ type K8sSyncStageOptions struct { AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"` // Whether the resources that are no longer defined in Git should be removed or not. Prune bool `json:"prune"` + // Limit this stage to a subset of deploy targets by name. + // Empty means the stage applies to all targets. + MultiTargets []string `json:"multiTargets,omitempty"` } // FindDeployTarget finds the deploy target configuration by the given name. @@ -220,10 +223,17 @@ type K8sCanaryRolloutStageOptions struct { CreateService bool `json:"createService"` // List of patches used to customize manifests for CANARY variant. Patches []K8sResourcePatch `json:"patches"` + // Limit this stage to a subset of deploy targets by name. + // Empty means the stage applies to all targets. + MultiTargets []string `json:"multiTargets,omitempty"` } // K8sCanaryCleanStageOptions contains all configurable values for a K8S_CANARY_CLEAN stage. -type K8sCanaryCleanStageOptions struct{} +type K8sCanaryCleanStageOptions struct { + // Limit this stage to a subset of deploy targets by name. + // Empty means the stage applies to all targets. + MultiTargets []string `json:"multiTargets,omitempty"` +} // K8sPrimaryRolloutStageOptions contains all configurable values for a K8S_PRIMARY_ROLLOUT stage. type K8sPrimaryRolloutStageOptions struct { @@ -236,6 +246,9 @@ type K8sPrimaryRolloutStageOptions struct { AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"` // Whether the resources that are no longer defined in Git should be removed or not. Prune bool `json:"prune"` + // Limit this stage to a subset of deploy targets by name. + // Empty means the stage applies to all targets. + MultiTargets []string `json:"multiTargets,omitempty"` } func (o *K8sPrimaryRolloutStageOptions) UnmarshalJSON(data []byte) error { @@ -263,6 +276,9 @@ type K8sBaselineRolloutStageOptions struct { Suffix string `json:"suffix" default:"baseline"` // Whether the BASELINE service should be created. CreateService bool `json:"createService"` + // Limit this stage to a subset of deploy targets by name. + // Empty means the stage applies to all targets. + MultiTargets []string `json:"multiTargets,omitempty"` } func (o *K8sBaselineRolloutStageOptions) UnmarshalJSON(data []byte) error { @@ -279,7 +295,11 @@ func (o *K8sBaselineRolloutStageOptions) UnmarshalJSON(data []byte) error { } // K8sBaselineCleanStageOptions contains all configurable values for a K8S_BASELINE_CLEAN stage. -type K8sBaselineCleanStageOptions struct{} +type K8sBaselineCleanStageOptions struct { + // Limit this stage to a subset of deploy targets by name. + // Empty means the stage applies to all targets. + MultiTargets []string `json:"multiTargets,omitempty"` +} // K8sResourcePatch represents a patch operation for a Kubernetes resource. type K8sResourcePatch struct { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/traffic.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/traffic.go index e0c64e2dac..a136ae86ef 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/traffic.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/traffic.go @@ -70,6 +70,9 @@ type K8sTrafficRoutingStageOptions struct { Canary unit.Percentage `json:"canary"` // The percentage of traffic should be routed to BASELINE variant. Baseline unit.Percentage `json:"baseline"` + // Limit this stage to a subset of deploy targets by name. + // Empty means the stage applies to all targets. + MultiTargets []string `json:"multiTargets,omitempty"` } // Percentages returns the primary, canary, and baseline percentages from the K8sTrafficRoutingStageOptions. diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/baseline.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/baseline.go index fb5c933ea4..8d9dfda46e 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/baseline.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/baseline.go @@ -46,13 +46,8 @@ func (p *Plugin) executeK8sMultiBaselineRolloutStage(ctx context.Context, input } } - type targetConfig struct { - deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] - multiTarget *kubeconfig.KubernetesMultiTarget - } - deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], 0) - targetConfigs := make([]targetConfig, 0, len(dts)) + targetConfigs := make([]stageTargetConfig, 0, len(dts)) for _, target := range dts { deployTargetMap[target.Name] = target @@ -61,7 +56,7 @@ func (p *Plugin) executeK8sMultiBaselineRolloutStage(ctx context.Context, input // If no multi-targets are specified, roll out baseline to all deploy targets. if len(cfg.Spec.Input.MultiTargets) == 0 { for _, dt := range dts { - targetConfigs = append(targetConfigs, targetConfig{ + targetConfigs = append(targetConfigs, stageTargetConfig{ deployTarget: dt, multiTarget: nil, }) @@ -74,13 +69,15 @@ func (p *Plugin) executeK8sMultiBaselineRolloutStage(ctx context.Context, input continue } - targetConfigs = append(targetConfigs, targetConfig{ + targetConfigs = append(targetConfigs, stageTargetConfig{ deployTarget: dt, multiTarget: &multiTarget, }) } } + targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets) + eg, ctx := errgroup.WithContext(ctx) for _, tc := range targetConfigs { eg.Go(func() error { @@ -245,20 +242,23 @@ func (p *Plugin) executeK8sMultiBaselineCleanStage(ctx context.Context, input *s return sdk.StageStatusFailure } + var stageCfg kubeconfig.K8sBaselineCleanStageOptions + if len(input.Request.StageConfig) > 0 { + if err := json.Unmarshal(input.Request.StageConfig, &stageCfg); err != nil { + lp.Errorf("Failed while unmarshalling stage config (%v)", err) + return sdk.StageStatusFailure + } + } + deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], len(dts)) for _, dt := range dts { deployTargetMap[dt.Name] = dt } - type targetConfig struct { - deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] - multiTarget *kubeconfig.KubernetesMultiTarget - } - - targetConfigs := make([]targetConfig, 0, len(dts)) + targetConfigs := make([]stageTargetConfig, 0, len(dts)) if len(cfg.Spec.Input.MultiTargets) == 0 { for _, dt := range dts { - targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt}) + targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt}) } } else { for _, mt := range cfg.Spec.Input.MultiTargets { @@ -267,10 +267,12 @@ func (p *Plugin) executeK8sMultiBaselineCleanStage(ctx context.Context, input *s lp.Infof("Ignore multi target '%s': not matched any deployTarget", mt.Target.Name) continue } - targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt, multiTarget: &mt}) + targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt, multiTarget: &mt}) } } + targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets) + eg, ctx := errgroup.WithContext(ctx) for _, tc := range targetConfigs { eg.Go(func() error { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go index 50aa244b2b..31a4b4ea01 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/canary.go @@ -47,13 +47,8 @@ func (p *Plugin) executeK8sMultiCanaryRolloutStage(ctx context.Context, input *s } } - type targetConfig struct { - deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] - multiTarget *kubeconfig.KubernetesMultiTarget - } - deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], 0) - targetConfigs := make([]targetConfig, 0, len(dts)) + targetConfigs := make([]stageTargetConfig, 0, len(dts)) for _, target := range dts { deployTargetMap[target.Name] = target @@ -62,7 +57,7 @@ func (p *Plugin) executeK8sMultiCanaryRolloutStage(ctx context.Context, input *s // If no multi-targets are specified, roll out canary to all deploy targets. if len(cfg.Spec.Input.MultiTargets) == 0 { for _, dt := range dts { - targetConfigs = append(targetConfigs, targetConfig{ + targetConfigs = append(targetConfigs, stageTargetConfig{ deployTarget: dt, multiTarget: nil, }) @@ -75,13 +70,15 @@ func (p *Plugin) executeK8sMultiCanaryRolloutStage(ctx context.Context, input *s continue } - targetConfigs = append(targetConfigs, targetConfig{ + targetConfigs = append(targetConfigs, stageTargetConfig{ deployTarget: dt, multiTarget: &multiTarget, }) } } + targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets) + eg, ctx := errgroup.WithContext(ctx) for _, tc := range targetConfigs { eg.Go(func() error { @@ -366,19 +363,23 @@ func (p *Plugin) executeK8sMultiCanaryCleanStage(ctx context.Context, input *sdk return sdk.StageStatusFailure } + var stageCfg kubeconfig.K8sCanaryCleanStageOptions + if len(input.Request.StageConfig) > 0 { + if err := json.Unmarshal(input.Request.StageConfig, &stageCfg); err != nil { + lp.Errorf("Failed while unmarshalling stage config (%v)", err) + return sdk.StageStatusFailure + } + } + deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], len(dts)) for _, dt := range dts { deployTargetMap[dt.Name] = dt } - type targetConfig struct { - deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] - } - - targetConfigs := make([]targetConfig, 0, len(dts)) + targetConfigs := make([]stageTargetConfig, 0, len(dts)) if len(cfg.Spec.Input.MultiTargets) == 0 { for _, dt := range dts { - targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt}) + targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt}) } } else { for _, mt := range cfg.Spec.Input.MultiTargets { @@ -387,10 +388,12 @@ func (p *Plugin) executeK8sMultiCanaryCleanStage(ctx context.Context, input *sdk lp.Infof("Ignore multi target '%s': not matched any deployTarget", mt.Target.Name) continue } - targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt}) + targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt}) } } + targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets) + eg, ctx := errgroup.WithContext(ctx) for _, tc := range targetConfigs { eg.Go(func() error { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go index d73b587eb0..1b1562d151 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc.go @@ -24,9 +24,36 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider" ) +// stageTargetConfig pairs a resolved deploy target with its per-spec multiTarget config. +// It is the unit of work passed to per-target goroutines inside a stage handler. +type stageTargetConfig struct { + deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] + multiTarget *kubeconfig.KubernetesMultiTarget +} + +// filterStageTargets returns only the elements of tcs whose deploy target name +// appears in stageTargets. If stageTargets is empty all elements are returned unchanged. +func filterStageTargets(tcs []stageTargetConfig, stageTargets []string) []stageTargetConfig { + if len(stageTargets) == 0 { + return tcs + } + allowed := make(map[string]struct{}, len(stageTargets)) + for _, name := range stageTargets { + allowed[name] = struct{}{} + } + out := make([]stageTargetConfig, 0, len(stageTargets)) + for _, tc := range tcs { + if _, ok := allowed[tc.deployTarget.Name]; ok { + out = append(out, tc) + } + } + return out +} + func ensureVariantSelectorInWorkload(m provider.Manifest, variantLabel, variant string) error { variantMap := map[string]string{ variantLabel: variant, diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc_test.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc_test.go index c175608730..5bd6414a86 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc_test.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/misc_test.go @@ -20,6 +20,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + + kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider" ) @@ -235,3 +238,59 @@ metadata: }) } } + +func TestFilterStageTargets(t *testing.T) { + t.Parallel() + + makeTC := func(name string) stageTargetConfig { + return stageTargetConfig{ + deployTarget: &sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{Name: name}, + } + } + + all := []stageTargetConfig{makeTC("us"), makeTC("eu"), makeTC("ap")} + + tests := []struct { + name string + stageTargets []string + wantNames []string + }{ + { + name: "empty filter returns all targets", + stageTargets: nil, + wantNames: []string{"us", "eu", "ap"}, + }, + { + name: "single target selected", + stageTargets: []string{"us"}, + wantNames: []string{"us"}, + }, + { + name: "multiple targets selected", + stageTargets: []string{"us", "ap"}, + wantNames: []string{"us", "ap"}, + }, + { + name: "unknown target name is silently ignored", + stageTargets: []string{"unknown"}, + wantNames: []string{}, + }, + { + name: "mix of known and unknown names", + stageTargets: []string{"eu", "unknown"}, + wantNames: []string{"eu"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got := filterStageTargets(all, tt.stageTargets) + names := make([]string, len(got)) + for i, tc := range got { + names[i] = tc.deployTarget.Name + } + assert.Equal(t, tt.wantNames, names) + }) + } +} diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary.go index d94e72a49c..b213e3233a 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary.go @@ -47,13 +47,8 @@ func (p *Plugin) executeK8sMultiPrimaryRolloutStage(ctx context.Context, input * } } - type targetConfig struct { - deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] - multiTarget *kubeconfig.KubernetesMultiTarget - } - deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]) - targetConfigs := make([]targetConfig, 0, len(dts)) + targetConfigs := make([]stageTargetConfig, 0, len(dts)) for _, target := range dts { deployTargetMap[target.Name] = target @@ -62,7 +57,7 @@ func (p *Plugin) executeK8sMultiPrimaryRolloutStage(ctx context.Context, input * // If no multi-targets are specified, roll out primary to all deploy targets. if len(cfg.Spec.Input.MultiTargets) == 0 { for _, dt := range dts { - targetConfigs = append(targetConfigs, targetConfig{ + targetConfigs = append(targetConfigs, stageTargetConfig{ deployTarget: dt, multiTarget: nil, }) @@ -74,13 +69,15 @@ func (p *Plugin) executeK8sMultiPrimaryRolloutStage(ctx context.Context, input * lp.Infof("Ignore multi target '%s': not matched any deployTarget", multiTarget.Target.Name) continue } - targetConfigs = append(targetConfigs, targetConfig{ + targetConfigs = append(targetConfigs, stageTargetConfig{ deployTarget: dt, multiTarget: &multiTarget, }) } } + targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets) + eg, ctx := errgroup.WithContext(ctx) for _, tc := range targetConfigs { eg.Go(func() error { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/sync.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/sync.go index b308f8b4ba..1bda10325f 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/sync.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/sync.go @@ -39,13 +39,16 @@ func (p *Plugin) executeK8sMultiSyncStage(ctx context.Context, input *sdk.Execut return sdk.StageStatusFailure } - type targetConfig struct { - deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] - multiTarget *kubeconfig.KubernetesMultiTarget + var stageCfg kubeconfig.K8sSyncStageOptions + if len(input.Request.StageConfig) > 0 { + if err := json.Unmarshal(input.Request.StageConfig, &stageCfg); err != nil { + lp.Errorf("Failed while unmarshalling stage config (%v)", err) + return sdk.StageStatusFailure + } } deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], 0) - targetConfigs := make([]targetConfig, 0, len(dts)) + targetConfigs := make([]stageTargetConfig, 0, len(dts)) // prevent the deployment when its deployTarget is not found in the piped config for _, target := range dts { @@ -55,7 +58,7 @@ func (p *Plugin) executeK8sMultiSyncStage(ctx context.Context, input *sdk.Execut // If no multi-targets are specified, sync to all deploy targets. if len(cfg.Spec.Input.MultiTargets) == 0 { for _, dt := range dts { - targetConfigs = append(targetConfigs, targetConfig{ + targetConfigs = append(targetConfigs, stageTargetConfig{ deployTarget: dt, multiTarget: nil, }) @@ -69,13 +72,15 @@ func (p *Plugin) executeK8sMultiSyncStage(ctx context.Context, input *sdk.Execut continue } - targetConfigs = append(targetConfigs, targetConfig{ + targetConfigs = append(targetConfigs, stageTargetConfig{ deployTarget: dt, multiTarget: &multiTarget, }) } } + targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets) + eg, ctx := errgroup.WithContext(ctx) for _, tc := range targetConfigs { // Start syncing the deployment to the target. diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/traffic.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/traffic.go index 0ee6fba71c..6c6234bc6b 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/traffic.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/traffic.go @@ -97,15 +97,10 @@ func (p *Plugin) executeK8sMultiTrafficRoutingStagePodSelector( deployTargetMap[dt.Name] = dt } - type targetConfig struct { - deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] - multiTarget *kubeconfig.KubernetesMultiTarget - } - - targetConfigs := make([]targetConfig, 0, len(dts)) + targetConfigs := make([]stageTargetConfig, 0, len(dts)) if len(cfg.Spec.Input.MultiTargets) == 0 { for _, dt := range dts { - targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt}) + targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt}) } } else { for _, mt := range cfg.Spec.Input.MultiTargets { @@ -114,10 +109,12 @@ func (p *Plugin) executeK8sMultiTrafficRoutingStagePodSelector( lp.Infof("Ignore multi target '%s': not matched any deployTarget", mt.Target.Name) continue } - targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt, multiTarget: &mt}) + targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt, multiTarget: &mt}) } } + targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets) + eg, ctx := errgroup.WithContext(ctx) for _, tc := range targetConfigs { eg.Go(func() error { @@ -228,15 +225,10 @@ func (p *Plugin) executeK8sMultiTrafficRoutingStageIstio( deployTargetMap[dt.Name] = dt } - type targetConfig struct { - deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] - multiTarget *kubeconfig.KubernetesMultiTarget - } - - targetConfigs := make([]targetConfig, 0, len(dts)) + targetConfigs := make([]stageTargetConfig, 0, len(dts)) if len(cfg.Spec.Input.MultiTargets) == 0 { for _, dt := range dts { - targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt}) + targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt}) } } else { for _, mt := range cfg.Spec.Input.MultiTargets { @@ -245,10 +237,12 @@ func (p *Plugin) executeK8sMultiTrafficRoutingStageIstio( lp.Infof("Ignore multi target '%s': not matched any deployTarget", mt.Target.Name) continue } - targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt, multiTarget: &mt}) + targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt, multiTarget: &mt}) } } + targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets) + eg, ctx := errgroup.WithContext(ctx) for _, tc := range targetConfigs { eg.Go(func() error {