Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ type K8sSyncStageOptions struct {
AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"`
// Whether the resources that are no longer defined in Git should be removed or not.
Prune bool `json:"prune"`
// Limit this stage to a subset of deploy targets by name.
// Empty means the stage applies to all targets.
MultiTargets []string `json:"multiTargets,omitempty"`
}

// FindDeployTarget finds the deploy target configuration by the given name.
Expand Down Expand Up @@ -220,10 +223,17 @@ type K8sCanaryRolloutStageOptions struct {
CreateService bool `json:"createService"`
// List of patches used to customize manifests for CANARY variant.
Patches []K8sResourcePatch `json:"patches"`
// Limit this stage to a subset of deploy targets by name.
// Empty means the stage applies to all targets.
MultiTargets []string `json:"multiTargets,omitempty"`
}

// K8sCanaryCleanStageOptions contains all configurable values for a K8S_CANARY_CLEAN stage.
type K8sCanaryCleanStageOptions struct{}
type K8sCanaryCleanStageOptions struct {
// Limit this stage to a subset of deploy targets by name.
// Empty means the stage applies to all targets.
MultiTargets []string `json:"multiTargets,omitempty"`
}

// K8sPrimaryRolloutStageOptions contains all configurable values for a K8S_PRIMARY_ROLLOUT stage.
type K8sPrimaryRolloutStageOptions struct {
Expand All @@ -236,6 +246,9 @@ type K8sPrimaryRolloutStageOptions struct {
AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"`
// Whether the resources that are no longer defined in Git should be removed or not.
Prune bool `json:"prune"`
// Limit this stage to a subset of deploy targets by name.
// Empty means the stage applies to all targets.
MultiTargets []string `json:"multiTargets,omitempty"`
}

func (o *K8sPrimaryRolloutStageOptions) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -263,6 +276,9 @@ type K8sBaselineRolloutStageOptions struct {
Suffix string `json:"suffix" default:"baseline"`
// Whether the BASELINE service should be created.
CreateService bool `json:"createService"`
// Limit this stage to a subset of deploy targets by name.
// Empty means the stage applies to all targets.
MultiTargets []string `json:"multiTargets,omitempty"`
}

func (o *K8sBaselineRolloutStageOptions) UnmarshalJSON(data []byte) error {
Expand All @@ -279,7 +295,11 @@ func (o *K8sBaselineRolloutStageOptions) UnmarshalJSON(data []byte) error {
}

// K8sBaselineCleanStageOptions contains all configurable values for a K8S_BASELINE_CLEAN stage.
type K8sBaselineCleanStageOptions struct{}
type K8sBaselineCleanStageOptions struct {
// Limit this stage to a subset of deploy targets by name.
// Empty means the stage applies to all targets.
MultiTargets []string `json:"multiTargets,omitempty"`
}

// K8sResourcePatch represents a patch operation for a Kubernetes resource.
type K8sResourcePatch struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type K8sTrafficRoutingStageOptions struct {
Canary unit.Percentage `json:"canary"`
// The percentage of traffic should be routed to BASELINE variant.
Baseline unit.Percentage `json:"baseline"`
// Limit this stage to a subset of deploy targets by name.
// Empty means the stage applies to all targets.
MultiTargets []string `json:"multiTargets,omitempty"`
}

// Percentages returns the primary, canary, and baseline percentages from the K8sTrafficRoutingStageOptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,8 @@ func (p *Plugin) executeK8sMultiBaselineRolloutStage(ctx context.Context, input
}
}

type targetConfig struct {
deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]
multiTarget *kubeconfig.KubernetesMultiTarget
}

deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], 0)
targetConfigs := make([]targetConfig, 0, len(dts))
targetConfigs := make([]stageTargetConfig, 0, len(dts))

for _, target := range dts {
deployTargetMap[target.Name] = target
Expand All @@ -61,7 +56,7 @@ func (p *Plugin) executeK8sMultiBaselineRolloutStage(ctx context.Context, input
// If no multi-targets are specified, roll out baseline to all deploy targets.
if len(cfg.Spec.Input.MultiTargets) == 0 {
for _, dt := range dts {
targetConfigs = append(targetConfigs, targetConfig{
targetConfigs = append(targetConfigs, stageTargetConfig{
deployTarget: dt,
multiTarget: nil,
})
Expand All @@ -74,13 +69,15 @@ func (p *Plugin) executeK8sMultiBaselineRolloutStage(ctx context.Context, input
continue
}

targetConfigs = append(targetConfigs, targetConfig{
targetConfigs = append(targetConfigs, stageTargetConfig{
deployTarget: dt,
multiTarget: &multiTarget,
})
}
}

targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets)

eg, ctx := errgroup.WithContext(ctx)
for _, tc := range targetConfigs {
eg.Go(func() error {
Expand Down Expand Up @@ -245,20 +242,23 @@ func (p *Plugin) executeK8sMultiBaselineCleanStage(ctx context.Context, input *s
return sdk.StageStatusFailure
}

var stageCfg kubeconfig.K8sBaselineCleanStageOptions
if len(input.Request.StageConfig) > 0 {
if err := json.Unmarshal(input.Request.StageConfig, &stageCfg); err != nil {
lp.Errorf("Failed while unmarshalling stage config (%v)", err)
return sdk.StageStatusFailure
}
}

deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], len(dts))
for _, dt := range dts {
deployTargetMap[dt.Name] = dt
}

type targetConfig struct {
deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]
multiTarget *kubeconfig.KubernetesMultiTarget
}

targetConfigs := make([]targetConfig, 0, len(dts))
targetConfigs := make([]stageTargetConfig, 0, len(dts))
if len(cfg.Spec.Input.MultiTargets) == 0 {
for _, dt := range dts {
targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt})
targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt})
}
} else {
for _, mt := range cfg.Spec.Input.MultiTargets {
Expand All @@ -267,10 +267,12 @@ func (p *Plugin) executeK8sMultiBaselineCleanStage(ctx context.Context, input *s
lp.Infof("Ignore multi target '%s': not matched any deployTarget", mt.Target.Name)
continue
}
targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt, multiTarget: &mt})
targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt, multiTarget: &mt})
}
}

targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets)

eg, ctx := errgroup.WithContext(ctx)
for _, tc := range targetConfigs {
eg.Go(func() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,8 @@ func (p *Plugin) executeK8sMultiCanaryRolloutStage(ctx context.Context, input *s
}
}

type targetConfig struct {
deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]
multiTarget *kubeconfig.KubernetesMultiTarget
}

deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], 0)
targetConfigs := make([]targetConfig, 0, len(dts))
targetConfigs := make([]stageTargetConfig, 0, len(dts))

for _, target := range dts {
deployTargetMap[target.Name] = target
Expand All @@ -62,7 +57,7 @@ func (p *Plugin) executeK8sMultiCanaryRolloutStage(ctx context.Context, input *s
// If no multi-targets are specified, roll out canary to all deploy targets.
if len(cfg.Spec.Input.MultiTargets) == 0 {
for _, dt := range dts {
targetConfigs = append(targetConfigs, targetConfig{
targetConfigs = append(targetConfigs, stageTargetConfig{
deployTarget: dt,
multiTarget: nil,
})
Expand All @@ -75,13 +70,15 @@ func (p *Plugin) executeK8sMultiCanaryRolloutStage(ctx context.Context, input *s
continue
}

targetConfigs = append(targetConfigs, targetConfig{
targetConfigs = append(targetConfigs, stageTargetConfig{
deployTarget: dt,
multiTarget: &multiTarget,
})
}
}

targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets)

eg, ctx := errgroup.WithContext(ctx)
for _, tc := range targetConfigs {
eg.Go(func() error {
Expand Down Expand Up @@ -366,19 +363,23 @@ func (p *Plugin) executeK8sMultiCanaryCleanStage(ctx context.Context, input *sdk
return sdk.StageStatusFailure
}

var stageCfg kubeconfig.K8sCanaryCleanStageOptions
if len(input.Request.StageConfig) > 0 {
if err := json.Unmarshal(input.Request.StageConfig, &stageCfg); err != nil {
lp.Errorf("Failed while unmarshalling stage config (%v)", err)
return sdk.StageStatusFailure
}
}

deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], len(dts))
for _, dt := range dts {
deployTargetMap[dt.Name] = dt
}

type targetConfig struct {
deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]
}

targetConfigs := make([]targetConfig, 0, len(dts))
targetConfigs := make([]stageTargetConfig, 0, len(dts))
if len(cfg.Spec.Input.MultiTargets) == 0 {
for _, dt := range dts {
targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt})
targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt})
}
} else {
for _, mt := range cfg.Spec.Input.MultiTargets {
Expand All @@ -387,10 +388,12 @@ func (p *Plugin) executeK8sMultiCanaryCleanStage(ctx context.Context, input *sdk
lp.Infof("Ignore multi target '%s': not matched any deployTarget", mt.Target.Name)
continue
}
targetConfigs = append(targetConfigs, targetConfig{deployTarget: dt})
targetConfigs = append(targetConfigs, stageTargetConfig{deployTarget: dt})
}
}

targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets)

eg, ctx := errgroup.WithContext(ctx)
for _, tc := range targetConfigs {
eg.Go(func() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,36 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider"
)

// stageTargetConfig pairs a resolved deploy target with its per-spec multiTarget config.
// It is the unit of work passed to per-target goroutines inside a stage handler.
type stageTargetConfig struct {
deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]
multiTarget *kubeconfig.KubernetesMultiTarget
}

// filterStageTargets returns only the elements of tcs whose deploy target name
// appears in stageTargets. If stageTargets is empty all elements are returned unchanged.
func filterStageTargets(tcs []stageTargetConfig, stageTargets []string) []stageTargetConfig {
if len(stageTargets) == 0 {
return tcs
}
allowed := make(map[string]struct{}, len(stageTargets))
for _, name := range stageTargets {
allowed[name] = struct{}{}
}
out := make([]stageTargetConfig, 0, len(stageTargets))
for _, tc := range tcs {
if _, ok := allowed[tc.deployTarget.Name]; ok {
out = append(out, tc)
}
}
return out
}

func ensureVariantSelectorInWorkload(m provider.Manifest, variantLabel, variant string) error {
variantMap := map[string]string{
variantLabel: variant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

sdk "github.com/pipe-cd/piped-plugin-sdk-go"

kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider"
)

Expand Down Expand Up @@ -235,3 +238,59 @@ metadata:
})
}
}

func TestFilterStageTargets(t *testing.T) {
t.Parallel()

makeTC := func(name string) stageTargetConfig {
return stageTargetConfig{
deployTarget: &sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{Name: name},
}
}

all := []stageTargetConfig{makeTC("us"), makeTC("eu"), makeTC("ap")}

tests := []struct {
name string
stageTargets []string
wantNames []string
}{
{
name: "empty filter returns all targets",
stageTargets: nil,
wantNames: []string{"us", "eu", "ap"},
},
{
name: "single target selected",
stageTargets: []string{"us"},
wantNames: []string{"us"},
},
{
name: "multiple targets selected",
stageTargets: []string{"us", "ap"},
wantNames: []string{"us", "ap"},
},
{
name: "unknown target name is silently ignored",
stageTargets: []string{"unknown"},
wantNames: []string{},
},
{
name: "mix of known and unknown names",
stageTargets: []string{"eu", "unknown"},
wantNames: []string{"eu"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got := filterStageTargets(all, tt.stageTargets)
names := make([]string, len(got))
for i, tc := range got {
names[i] = tc.deployTarget.Name
}
assert.Equal(t, tt.wantNames, names)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,8 @@ func (p *Plugin) executeK8sMultiPrimaryRolloutStage(ctx context.Context, input *
}
}

type targetConfig struct {
deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]
multiTarget *kubeconfig.KubernetesMultiTarget
}

deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig])
targetConfigs := make([]targetConfig, 0, len(dts))
targetConfigs := make([]stageTargetConfig, 0, len(dts))

for _, target := range dts {
deployTargetMap[target.Name] = target
Expand All @@ -62,7 +57,7 @@ func (p *Plugin) executeK8sMultiPrimaryRolloutStage(ctx context.Context, input *
// If no multi-targets are specified, roll out primary to all deploy targets.
if len(cfg.Spec.Input.MultiTargets) == 0 {
for _, dt := range dts {
targetConfigs = append(targetConfigs, targetConfig{
targetConfigs = append(targetConfigs, stageTargetConfig{
deployTarget: dt,
multiTarget: nil,
})
Expand All @@ -74,13 +69,15 @@ func (p *Plugin) executeK8sMultiPrimaryRolloutStage(ctx context.Context, input *
lp.Infof("Ignore multi target '%s': not matched any deployTarget", multiTarget.Target.Name)
continue
}
targetConfigs = append(targetConfigs, targetConfig{
targetConfigs = append(targetConfigs, stageTargetConfig{
deployTarget: dt,
multiTarget: &multiTarget,
})
}
}

targetConfigs = filterStageTargets(targetConfigs, stageCfg.MultiTargets)

eg, ctx := errgroup.WithContext(ctx)
for _, tc := range targetConfigs {
eg.Go(func() error {
Expand Down
Loading
Loading