Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions internal/controller/clickhouse/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "github.com/ClickHouse/clickhouse-operator/api/v1alpha1"
"github.com/ClickHouse/clickhouse-operator/internal/controller/testutil"
Expand Down Expand Up @@ -339,4 +341,84 @@ var _ = When("reconciling ClickHouseCluster", Ordered, func() {
Expect(suite.Client.List(ctx, &pdbs, listOpts)).To(Succeed())
Expect(pdbs.Items).To(BeEmpty())
})

It("should update all replica resources, but not proceed to the next if failed", func(ctx context.Context) {
By("creating a new cluster with DataVolumeClaimSpec")

pvcCR := &v1.ClickHouseCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-test",
Namespace: "default",
},
Spec: v1.ClickHouseClusterSpec{
Replicas: ptr.To[int32](2),
Shards: ptr.To[int32](1),
KeeperClusterRef: &corev1.LocalObjectReference{Name: keeperName},
DataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("10Gi"),
},
},
},
},
}
Expect(suite.Client.Create(ctx, pvcCR)).To(Succeed())

By("reconcile to create all resources including STS with VolumeClaimTemplates")

_, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: pvcCR.NamespacedName()})
Expect(err).NotTo(HaveOccurred())
Expect(suite.Client.Get(ctx, pvcCR.NamespacedName(), pvcCR)).To(Succeed())
testutil.AssertEvents(recorder.Events, map[string]int{
"ClusterNotReady": 1,
})

By("marking StatefulSets as ready")
testutil.ReconcileStatefulSets(ctx, pvcCR, suite)

By("recording STS state before PVC change")

replicaID := v1.ClickHouseReplicaID{ShardID: 0, Index: 1}
stsName := pvcCR.StatefulSetNameByReplicaID(replicaID)

var sts appsv1.StatefulSet
Expect(suite.Client.Get(ctx, types.NamespacedName{Namespace: pvcCR.Namespace, Name: stsName}, &sts)).To(Succeed())

By("make STS and PVC changes")

pvcCR.Spec.DataVolumeClaimSpec.StorageClassName = new("changed")
pvcCR.Spec.ContainerTemplate.Image = v1.ContainerImage{Tag: "changed"}
Expect(suite.Client.Update(ctx, pvcCR)).To(Succeed())

By("reconcile updated CR")

_, err = controller.Reconcile(ctx, ctrl.Request{NamespacedName: pvcCR.NamespacedName()})
Expect(err).NotTo(HaveOccurred())
testutil.AssertEvents(recorder.Events, map[string]int{
"FailedUpdate": 1,
})

By("ensuring STS updated")
Expect(suite.Client.Get(ctx, pvcCR.NamespacedName(), pvcCR)).To(Succeed())
Expect(suite.Client.Get(ctx, client.ObjectKeyFromObject(&sts), &sts)).To(Succeed())
Expect(sts.Annotations[controllerutil.AnnotationSpecHash]).To(Equal(pvcCR.Status.StatefulSetRevision))

By("retry reconcile")

_, err = controller.Reconcile(ctx, ctrl.Request{NamespacedName: pvcCR.NamespacedName()})
Expect(err).NotTo(HaveOccurred())
testutil.AssertEvents(recorder.Events, map[string]int{
"FailedUpdate": 1,
})

By("ensuring next replica not changed")
Expect(suite.Client.Get(ctx, pvcCR.NamespacedName(), pvcCR)).To(Succeed())
Expect(suite.Client.Get(ctx, types.NamespacedName{
Namespace: pvcCR.Namespace,
Name: pvcCR.StatefulSetNameByReplicaID(v1.ClickHouseReplicaID{ShardID: 0, Index: 0}),
}, &sts)).To(Succeed())
Expect(sts.Annotations[controllerutil.AnnotationSpecHash]).ToNot(Equal(pvcCR.Status.StatefulSetRevision))
})
})
64 changes: 48 additions & 16 deletions internal/controller/clickhouse/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func compareReplicaID(a, b v1.ClickHouseReplicaID) int {
type replicaState struct {
Error bool `json:"error"`
StatefulSet *appsv1.StatefulSet
PVC *corev1.PersistentVolumeClaim
Pinged bool
Version string
}
Expand All @@ -55,20 +56,30 @@ func (r replicaState) Ready() bool {
return r.Pinged && r.StatefulSet.Status.ReadyReplicas == 1 // Not reliable, but allows to wait until pod is `green`
}

func (r replicaState) HasStatefulSetDiff(rec *clickhouseReconciler) bool {
func (r replicaState) HasDiff(rec *clickhouseReconciler) bool {
if r.StatefulSet == nil {
return true
}

return ctrlutil.GetSpecHashFromObject(r.StatefulSet) != rec.Cluster.Status.StatefulSetRevision
}
if ctrlutil.GetSpecHashFromObject(r.StatefulSet) != rec.Cluster.Status.StatefulSetRevision {
return true
}

func (r replicaState) HasConfigMapDiff(rec *clickhouseReconciler) bool {
if r.StatefulSet == nil {
if ctrlutil.GetConfigHashFromObject(r.StatefulSet) != rec.Cluster.Status.ConfigurationRevision {
return true
}

return ctrlutil.GetConfigHashFromObject(r.StatefulSet) != rec.Cluster.Status.ConfigurationRevision
if rec.Cluster.Spec.DataVolumeClaimSpec != nil {
if r.PVC == nil {
return true
}

if ctrlutil.GetSpecHashFromObject(r.PVC) != rec.pvcRevision {
return true
}
}

return false
}

func (r replicaState) UpdateStage(rec *clickhouseReconciler) chctrl.ReplicaUpdateStage {
Expand All @@ -84,7 +95,7 @@ func (r replicaState) UpdateStage(rec *clickhouseReconciler) chctrl.ReplicaUpdat
return chctrl.StageUpdating
}

if r.HasConfigMapDiff(rec) || r.HasStatefulSetDiff(rec) {
if r.HasDiff(rec) {
return chctrl.StageHasDiff
}

Expand All @@ -109,6 +120,7 @@ type clickhouseReconciler struct {
versionProbe chctrl.VersionProbeResult
databasesInSync bool
staleReplicasCleanedUp bool
pvcRevision string
}

type reconcileFunc func(context.Context, ctrlutil.Logger) (*ctrl.Result, error)
Expand Down Expand Up @@ -316,6 +328,13 @@ func (r *clickhouseReconciler) reconcileClusterRevisions(ctx context.Context, lo
log.Debug(fmt.Sprintf("observed new StatefulSet revision %q", stsRevision))
}

if r.Cluster.Spec.DataVolumeClaimSpec != nil {
r.pvcRevision, err = ctrlutil.DeepHashObject(r.Cluster.Spec.DataVolumeClaimSpec)
if err != nil {
return nil, fmt.Errorf("get PVC revision: %w", err)
}
}

probeResult, err := r.VersionProbe(ctx, log, chctrl.VersionProbeConfig{
Binary: "clickhouse-server",
Labels: r.Cluster.Spec.Labels,
Expand Down Expand Up @@ -370,10 +389,19 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context,
}
}

var pvc *corev1.PersistentVolumeClaim
if r.Cluster.Spec.DataVolumeClaimSpec != nil {
pvc, err = r.GetPVCByStatefulSet(ctx, log.With("replica_id", id), &sts)
if err != nil {
log.Error(err, "failed to get PVC for replica", "replica_id", id)
}
}

log.Debug("load replica state done", "replica_id", id, "statefulset", sts.Name)

return id, replicaState{
StatefulSet: &sts,
PVC: pvc,
Error: hasError,
Pinged: pinged,
Version: version,
Expand Down Expand Up @@ -686,7 +714,7 @@ func (r *clickhouseReconciler) reconcileConditions(ctx context.Context, log ctrl
hasReady = true
}

if replica.HasConfigMapDiff(r) || replica.HasStatefulSetDiff(r) || !replica.Updated() {
if replica.HasDiff(r) || !replica.Updated() {
notUpdatedReplicas = append(notUpdatedReplicas, id)
}
}
Expand Down Expand Up @@ -812,15 +840,19 @@ func (r *clickhouseReconciler) updateReplica(ctx context.Context, log ctrlutil.L

replica := r.Replica(id)

result, err := r.ReconcileReplicaResources(ctx, log, id, chctrl.ReplicaUpdateInput{
ExistingSTS: replica.StatefulSet,
DesiredConfigMap: configMap,
DesiredSTS: statefulSet,
HasError: replica.Error,
result, err := r.ReconcileReplicaResources(ctx, log, chctrl.ReplicaUpdateInput{
ConfigurationRevision: r.Cluster.Status.ConfigurationRevision,
StatefulSetRevision: r.Cluster.Status.StatefulSetRevision,
BreakingSTSVersion: breakingStatefulSetVersion,
DataVolumeClaimSpec: r.Cluster.Spec.DataVolumeClaimSpec,
DesiredConfigMap: configMap,

StatefulSetRevision: r.Cluster.Status.StatefulSetRevision,
ExistingSTS: replica.StatefulSet,
DesiredSTS: statefulSet,
HasError: replica.Error,
BreakingSTSVersion: breakingStatefulSetVersion,

PVCRevision: r.pvcRevision,
ExistingPVC: replica.PVC,
DesiredPVCSpec: r.Cluster.Spec.DataVolumeClaimSpec,
})
if err != nil {
return nil, fmt.Errorf("reconcile replica %s resources: %w", id, err)
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/clickhouse/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func getStatefulSetRevision(r *clickhouseReconciler) (string, error) {
return "", fmt.Errorf("generate template StatefulSet: %w", err)
}

sts.Spec.VolumeClaimTemplates = nil

hash, err := controllerutil.DeepHashObject(sts)
if err != nil {
return "", fmt.Errorf("hash template StatefulSet: %w", err)
Expand Down
36 changes: 36 additions & 0 deletions internal/controller/clickhouse/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -300,6 +301,41 @@ var _ = Describe("PDB", func() {
})
})

var _ = Describe("getStatefulSetRevision", func() {
It("should not depend on data disk spec", func() {
r := clickhouseReconciler{
reconcilerBase: reconcilerBase{
Cluster: &v1.ClickHouseCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1.ClickHouseClusterSpec{
Replicas: ptr.To[int32](1),
DataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("10Gi"),
},
},
},
},
},
},
}

rev, err := getStatefulSetRevision(&r)
Expect(err).ToNot(HaveOccurred())
Expect(rev).ToNot(BeEmpty())

r.Cluster.Spec.DataVolumeClaimSpec.Resources.Requests[corev1.ResourceStorage] = resource.MustParse("20Gi")
rev2, err := getStatefulSetRevision(&r)
Expect(err).ToNot(HaveOccurred())

Expect(rev2).To(Equal(rev), "StatefulSet revision should not change when data disk spec changes")
})
})

func checkVolumeMounts(volumes []corev1.Volume, mounts []corev1.VolumeMount) {
volumeMap := map[string]struct{}{
internal.PersistentVolumeName: {},
Expand Down
Loading
Loading