Skip to content

Commit c1894a2

Browse files
committed
Improve patch Deployment
1 parent 8c47597 commit c1894a2

4 files changed

Lines changed: 155 additions & 1 deletion

File tree

helm/dataflow-operator/templates/deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ spec:
5252
value: {{ .Release.Namespace | quote }}
5353
- name: LOG_LEVEL
5454
value: {{ .Values.logLevel | default "info" | quote }}
55+
- name: PROCESSOR_LOG_LEVEL
56+
value: {{ .Values.processorLogLevel | default "info" | quote }}
5557
args:
5658
- --metrics-bind-address=:{{ .Values.metrics.port }}
5759
- --health-probe-bind-address=:{{ .Values.health.probePort }}

helm/dataflow-operator/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ metrics:
6565
# Log level for the operator (sets env LOG_LEVEL: debug, info, warn, error)
6666
logLevel: "info"
6767

68+
# Log level for processor pods (sets env PROCESSOR_LOG_LEVEL in the operator; operator passes it as LOG_LEVEL to each processor pod)
69+
processorLogLevel: "info"
70+
6871
# Health probe configuration
6972
health:
7073
# The address the probe endpoint binds to

internal/controller/dataflow_controller.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
appsv1 "k8s.io/api/apps/v1"
3131
corev1 "k8s.io/api/core/v1"
3232
apierrors "k8s.io/apimachinery/pkg/api/errors"
33+
"k8s.io/apimachinery/pkg/api/equality"
3334
"k8s.io/apimachinery/pkg/api/resource"
3435
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3536
"k8s.io/apimachinery/pkg/runtime"
@@ -475,7 +476,10 @@ func (r *DataFlowReconciler) createOrUpdateDeployment(ctx context.Context, req c
475476
} else if err != nil {
476477
return fmt.Errorf("failed to get Deployment: %w", err)
477478
} else {
478-
// Обновляем существующий Deployment, если spec изменился
479+
// Обновляем существующий Deployment только при реальном изменении spec
480+
if equality.Semantic.DeepEqual(existing.Spec, deployment.Spec) {
481+
return nil
482+
}
479483
existing.Spec = deployment.Spec
480484
if err := r.Update(ctx, existing); err != nil {
481485
return fmt.Errorf("failed to update Deployment: %w", err)

internal/controller/dataflow_controller_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,151 @@ func TestDataFlowReconciler_Reconcile_WithResourcesAndNodeSelector(t *testing.T)
408408
assert.Equal(t, ctrl.Result{}, result)
409409
}
410410

411+
// TestCreateOrUpdateDeployment_NoUpdateWhenSpecUnchanged проверяет, что при второй реконсиляции
412+
// с неизменным spec DataFlow не вызывается Update Deployment (нет лишних PATCH и rolling update).
413+
func TestCreateOrUpdateDeployment_NoUpdateWhenSpecUnchanged(t *testing.T) {
414+
scheme := runtime.NewScheme()
415+
require.NoError(t, dataflowv1.AddToScheme(scheme))
416+
require.NoError(t, clientgoscheme.AddToScheme(scheme))
417+
418+
fakeRecorder := record.NewFakeRecorder(10)
419+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
420+
reconciler := NewDataFlowReconciler(fakeClient, scheme, fakeRecorder)
421+
422+
ctx := context.Background()
423+
dataflow := &dataflowv1.DataFlow{
424+
TypeMeta: metav1.TypeMeta{
425+
APIVersion: "dataflow.dataflow.io/v1",
426+
Kind: "DataFlow",
427+
},
428+
ObjectMeta: metav1.ObjectMeta{
429+
Name: "test-dataflow",
430+
Namespace: "default",
431+
},
432+
Spec: dataflowv1.DataFlowSpec{
433+
Source: dataflowv1.SourceSpec{
434+
Type: "kafka",
435+
Kafka: &dataflowv1.KafkaSourceSpec{
436+
Brokers: []string{"localhost:9092"},
437+
Topic: "test-topic",
438+
ConsumerGroup: "test-group",
439+
},
440+
},
441+
Sink: dataflowv1.SinkSpec{
442+
Type: "kafka",
443+
Kafka: &dataflowv1.KafkaSinkSpec{
444+
Brokers: []string{"localhost:9092"},
445+
Topic: "output-topic",
446+
},
447+
},
448+
},
449+
}
450+
require.NoError(t, fakeClient.Create(ctx, dataflow))
451+
452+
req := ctrl.Request{
453+
NamespacedName: types.NamespacedName{Name: "test-dataflow", Namespace: "default"},
454+
}
455+
456+
// Первая реконсиляция — создаётся Deployment (DeploymentCreated)
457+
_, _ = reconciler.Reconcile(ctx, req)
458+
drainRecorderEvents(fakeRecorder) // сбрасываем события создания
459+
460+
// Вторая реконсиляция без изменения spec — Update не должен вызываться (DeploymentUpdated не должно быть)
461+
_, _ = reconciler.Reconcile(ctx, req)
462+
var deploymentUpdatedCount int
463+
for {
464+
select {
465+
case e := <-fakeRecorder.Events:
466+
if strings.Contains(e, "DeploymentUpdated") {
467+
deploymentUpdatedCount++
468+
}
469+
default:
470+
goto done
471+
}
472+
}
473+
done:
474+
assert.Equal(t, 0, deploymentUpdatedCount,
475+
"expected no DeploymentUpdated event on second reconcile when spec unchanged")
476+
}
477+
478+
// TestCreateOrUpdateDeployment_UpdateWhenSpecChanged проверяет, что при изменении spec DataFlow
479+
// вызывается Update Deployment и желаемое состояние применяется.
480+
func TestCreateOrUpdateDeployment_UpdateWhenSpecChanged(t *testing.T) {
481+
scheme := runtime.NewScheme()
482+
require.NoError(t, dataflowv1.AddToScheme(scheme))
483+
require.NoError(t, clientgoscheme.AddToScheme(scheme))
484+
485+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
486+
reconciler := NewDataFlowReconciler(fakeClient, scheme, nil)
487+
488+
ctx := context.Background()
489+
dataflow := &dataflowv1.DataFlow{
490+
TypeMeta: metav1.TypeMeta{
491+
APIVersion: "dataflow.dataflow.io/v1",
492+
Kind: "DataFlow",
493+
},
494+
ObjectMeta: metav1.ObjectMeta{
495+
Name: "test-dataflow",
496+
Namespace: "default",
497+
},
498+
Spec: dataflowv1.DataFlowSpec{
499+
Source: dataflowv1.SourceSpec{
500+
Type: "kafka",
501+
Kafka: &dataflowv1.KafkaSourceSpec{
502+
Brokers: []string{"localhost:9092"},
503+
Topic: "test-topic",
504+
ConsumerGroup: "test-group",
505+
},
506+
},
507+
Sink: dataflowv1.SinkSpec{
508+
Type: "kafka",
509+
Kafka: &dataflowv1.KafkaSinkSpec{
510+
Brokers: []string{"localhost:9092"},
511+
Topic: "output-topic",
512+
},
513+
},
514+
},
515+
}
516+
require.NoError(t, fakeClient.Create(ctx, dataflow))
517+
518+
req := ctrl.Request{
519+
NamespacedName: types.NamespacedName{Name: "test-dataflow", Namespace: "default"},
520+
}
521+
522+
// Первая реконсиляция — создаётся Deployment
523+
_, _ = reconciler.Reconcile(ctx, req)
524+
525+
deploymentName := types.NamespacedName{Name: "dataflow-test-dataflow", Namespace: "default"}
526+
var deployment appsv1.Deployment
527+
require.NoError(t, fakeClient.Get(ctx, deploymentName, &deployment))
528+
generationBefore := deployment.Generation
529+
530+
// Меняем spec DataFlow (NodeSelector попадает в spec Deployment)
531+
require.NoError(t, fakeClient.Get(ctx, req.NamespacedName, dataflow))
532+
dataflow.Spec.NodeSelector = map[string]string{"node-type": "compute"}
533+
require.NoError(t, fakeClient.Update(ctx, dataflow))
534+
535+
// Вторая реконсиляция — должен вызваться Update
536+
_, _ = reconciler.Reconcile(ctx, req)
537+
538+
require.NoError(t, fakeClient.Get(ctx, deploymentName, &deployment))
539+
assert.Greater(t, deployment.Generation, generationBefore,
540+
"Deployment Generation should increase after spec change")
541+
assert.Equal(t, "compute", deployment.Spec.Template.Spec.NodeSelector["node-type"],
542+
"Deployment NodeSelector should reflect updated DataFlow spec")
543+
}
544+
545+
func drainRecorderEvents(r *record.FakeRecorder) {
546+
for {
547+
select {
548+
case <-r.Events:
549+
// продолжаем вычитывать
550+
default:
551+
return
552+
}
553+
}
554+
}
555+
411556
func TestDataFlowReconciler_Reconcile_InvalidSpec(t *testing.T) {
412557
scheme := runtime.NewScheme()
413558
err := dataflowv1.AddToScheme(scheme)

0 commit comments

Comments
 (0)