diff --git a/Makefile b/Makefile index a12c4a31..94a059e9 100644 --- a/Makefile +++ b/Makefile @@ -43,7 +43,7 @@ e2e_args=-tags=e2e_test -count=1 -timeout=45m \ $(if $(filter 1,$(E2E_DEBUG)),-debug) \ $(if $(E2E_DEBUG_DIR),-debug-dir $(E2E_DEBUG_DIR)) -cluster_test_args=-tags=cluster_test -count=1 -timeout=10m \ +cluster_test_args=-tags=cluster_test -count=1 -timeout=15m \ $(if $(CLUSTER_TEST_PARALLEL),-parallel $(CLUSTER_TEST_PARALLEL)) \ $(if $(CLUSTER_TEST_RUN),-run $(CLUSTER_TEST_RUN)) \ -args \ diff --git a/changes/unreleased/Added-20260504-150235.yaml b/changes/unreleased/Added-20260504-150235.yaml new file mode 100644 index 00000000..05fa0862 --- /dev/null +++ b/changes/unreleased/Added-20260504-150235.yaml @@ -0,0 +1,3 @@ +kind: Added +body: Added a feature to enable manual Postgres minor version updates in systemd clusters. The Control Plane will now update its copy of the database spec when it detects changes to an instance's Postgres or Spock version. +time: 2026-05-04T15:02:35.045407-04:00 diff --git a/changes/unreleased/Changed-20260504-152348.yaml b/changes/unreleased/Changed-20260504-152348.yaml new file mode 100644 index 00000000..fa9dc8bb --- /dev/null +++ b/changes/unreleased/Changed-20260504-152348.yaml @@ -0,0 +1,3 @@ +kind: Changed +body: Changed the instance monitoring system to query the Postgres and Spock versions for replica instances and report them in the databases API. +time: 2026-05-04T15:23:48.324604-04:00 diff --git a/clustertest/external_upgrade_test.go b/clustertest/external_upgrade_test.go new file mode 100644 index 00000000..d857aa69 --- /dev/null +++ b/clustertest/external_upgrade_test.go @@ -0,0 +1,292 @@ +//go:build cluster_test + +package clustertest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" + "github.com/pgEdge/control-plane/client" +) + +func TestExternalUpgrade(t *testing.T) { + // Tests that the control plane updates its records when the user upgrades + // a database outside of our API. + t.Parallel() + ctx := t.Context() + + const ( + startPostgresVersion string = "18.2" + upgradePostgresVersion string = "18.3" + upgradeImage string = "ghcr.io/pgedge/pgedge-postgres:18.3-spock5.0.6-standard-1" + spockVersion string = "5" + sleepDuration time.Duration = 5 * time.Second + ) + + // Helper functions + assertSpecVersions := func(t *testing.T, spec *controlplane.DatabaseSpec, expectedSpecVersion string, expectedNodeVersions map[string]string) { + t.Helper() + + actualNodeVersions := make(map[string]string, len(spec.Nodes)) + for _, node := range spec.Nodes { + var version string + if node.PostgresVersion != nil { + version = *node.PostgresVersion + } + actualNodeVersions[node.Name] = version + } + var actualSpecVersion string + if spec.PostgresVersion != nil { + actualSpecVersion = *spec.PostgresVersion + } + require.Equal(t, expectedSpecVersion, actualSpecVersion) + require.Equal(t, expectedNodeVersions, actualNodeVersions) + } + assertInstanceVersions := func(t *testing.T, instances []*controlplane.Instance, expectedNodeHostVersions map[string]map[string]string) { + t.Helper() + + actualNodeHostVersions := map[string]map[string]string{} + for _, instance := range instances { + require.Equal(t, client.InstanceStateAvailable, instance.State) + + if _, ok := actualNodeHostVersions[instance.NodeName]; !ok { + actualNodeHostVersions[instance.NodeName] = map[string]string{} + } + var version string + if instance.Postgres.Version != nil { + version = *instance.Postgres.Version + } + actualNodeHostVersions[instance.NodeName][instance.HostID] = version + } + require.Equal(t, expectedNodeHostVersions, actualNodeHostVersions) + } + upgradeService := func(t *testing.T, databaseID, nodeName, hostID string) { + t.Helper() + + tLogf(t, "upgrading %s %s instance", nodeName, hostID) + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + serviceName := dockerCmd(t, ctx, + "service", + "ls", + fmt.Sprintf("--filter=label=pgedge.database.id=%s", databaseID), + fmt.Sprintf("--filter=label=pgedge.node.name=%s", nodeName), + fmt.Sprintf("--filter=label=pgedge.host.id=%s", hostID), + "--format={{.Name}}", + ) + require.NotEmpty(t, serviceName) + dockerCmd(t, ctx, + "service", + "update", + fmt.Sprintf("--image=%s", upgradeImage), + // disabling healthchecks to speed up startup time + "--no-healthcheck", + serviceName, + ) + } + + env := map[string]string{ + "PGEDGE_DATABASES_MONITOR_INTERVAL_SECONDS": "3", + } + cluster := NewCluster(t, ClusterConfig{ + Hosts: []HostConfig{ + {ID: "host-1", ExtraEnv: env}, + {ID: "host-2", ExtraEnv: env}, + {ID: "host-3", ExtraEnv: env}, + }, + }) + cluster.Init(t) + + spec := &controlplane.DatabaseSpec{ + DatabaseName: "test_upgrade", + PostgresVersion: pointerTo(startPostgresVersion), + SpockVersion: pointerTo(spockVersion), + Nodes: []*controlplane.DatabaseNodeSpec{ + { + Name: "n1", + HostIds: []controlplane.Identifier{"host-1", "host-2"}, + }, + { + Name: "n2", + HostIds: []controlplane.Identifier{"host-3"}, + }, + }, + } + + tLog(t, "creating database") + + createResp, err := cluster.Client().CreateDatabase(ctx, &controlplane.CreateDatabaseRequest{ + Spec: spec, + }) + require.NoError(t, err) + + databaseID := createResp.Database.ID + + t.Cleanup(func() { + // Use a new context for cleanup operations since t.Context is canceled. + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + if testConfig.skipCleanup { + tLogf(t, "skipping cleanup for database '%s'", databaseID) + return + } + + tLogf(t, "cleaning up database '%s'", databaseID) + + resp, err := cluster.Client().DeleteDatabase(ctx, &controlplane.DeleteDatabasePayload{ + DatabaseID: databaseID, + }) + if err != nil { + tLogf(t, "failed to delete database '%s': %v", databaseID, err) + return + } + + tLog(t, "waiting for database deletion to complete") + + err = waitForTaskComplete(ctx, cluster.Client(), databaseID, resp.Task.TaskID, time.Minute) + if err != nil { + tLogf(t, "failed while waiting for database deletion '%s'", databaseID) + return + } + }) + + tLog(t, "waiting for database creation to complete") + + err = waitForTaskComplete(ctx, cluster.Client(), databaseID, createResp.Task.TaskID, 3*time.Minute) + require.NoError(t, err) + + tLog(t, "sleeping to allow instance monitor interval to complete") + + time.Sleep(sleepDuration) + + tLogf(t, "asserting that all instances and spec versions are %s", startPostgresVersion) + + db, err := cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: databaseID, + }) + require.NoError(t, err) + + assertSpecVersions(t, db.Spec, startPostgresVersion, map[string]string{ + "n1": "", + "n2": "", + }) + assertInstanceVersions(t, db.Instances, map[string]map[string]string{ + "n1": map[string]string{ + "host-1": startPostgresVersion, + "host-2": startPostgresVersion, + }, + "n2": map[string]string{ + "host-3": startPostgresVersion, + }, + }) + + tLog(t, "getting database docker service names") + + upgradeService(t, string(databaseID), "n1", "host-2") + upgradeService(t, string(databaseID), "n2", "host-3") + + tLog(t, "sleeping to allow instance monitor interval and version reconciliation to complete") + + time.Sleep(sleepDuration) + + tLogf(t, "asserting that n2 is %s in the spec and that the n1-host-2 and n2-host-3 instances are %s", upgradePostgresVersion, upgradePostgresVersion) + + db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: databaseID, + }) + require.NoError(t, err) + + assertSpecVersions(t, db.Spec, startPostgresVersion, map[string]string{ + "n1": "", + "n2": upgradePostgresVersion, + }) + assertInstanceVersions(t, db.Instances, map[string]map[string]string{ + "n1": map[string]string{ + "host-1": startPostgresVersion, + "host-2": upgradePostgresVersion, + }, + "n2": map[string]string{ + "host-3": upgradePostgresVersion, + }, + }) + + upgradeService(t, string(databaseID), "n1", "host-1") + + tLog(t, "sleeping to allow monitor interval and version reconciliation to complete") + + time.Sleep(sleepDuration) + + tLogf(t, "asserting the top-level version is %s and that all instances are %s", upgradePostgresVersion, upgradePostgresVersion) + + db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: databaseID, + }) + require.NoError(t, err) + + assertSpecVersions(t, db.Spec, upgradePostgresVersion, map[string]string{ + "n1": "", + "n2": "", + }) + assertInstanceVersions(t, db.Instances, map[string]map[string]string{ + "n1": map[string]string{ + "host-1": upgradePostgresVersion, + "host-2": upgradePostgresVersion, + }, + "n2": map[string]string{ + "host-3": upgradePostgresVersion, + }, + }) + + tLog(t, "performing a no-op update") + + // We still expect to see some resource updates in the logs because the + // version number shows up in a few resources states. This does trigger a + // patroni reload in Swarm databases, which eats up time, but no actual + // changes should occur. + + updateResp, err := cluster.Client().UpdateDatabase(ctx, &controlplane.UpdateDatabasePayload{ + DatabaseID: databaseID, + Request: &controlplane.UpdateDatabaseRequest{ + Spec: db.Spec, + }, + }) + require.NoError(t, err) + + tLog(t, "waiting for database update to complete") + + err = waitForTaskComplete(ctx, cluster.Client(), databaseID, updateResp.Task.TaskID, 3*time.Minute) + require.NoError(t, err) + + tLog(t, "sleeping to allow instance monitor interval to complete") + + time.Sleep(sleepDuration) + + tLog(t, "asserting that top-level versions have not changed") + + db, err = cluster.Client().GetDatabase(ctx, &controlplane.GetDatabasePayload{ + DatabaseID: databaseID, + }) + require.NoError(t, err) + + assertSpecVersions(t, db.Spec, upgradePostgresVersion, map[string]string{ + "n1": "", + "n2": "", + }) + assertInstanceVersions(t, db.Instances, map[string]map[string]string{ + "n1": map[string]string{ + "host-1": upgradePostgresVersion, + "host-2": upgradePostgresVersion, + }, + "n2": map[string]string{ + "host-3": upgradePostgresVersion, + }, + }) +} diff --git a/clustertest/host_test.go b/clustertest/host_test.go index 375cfdca..75af6f05 100644 --- a/clustertest/host_test.go +++ b/clustertest/host_test.go @@ -294,7 +294,7 @@ func printContainerLogs(ctx context.Context, t testing.TB, hostID string, contai tLog(t, "container is nil") return } - logs, err := containerLogs(t.Context(), t, container) + logs, err := containerLogs(ctx, t, container) if err != nil { tLogf(t, "failed to extract container logs: %s", err) } else { diff --git a/clustertest/utils_test.go b/clustertest/utils_test.go index 14ae94cb..f9efeda5 100644 --- a/clustertest/utils_test.go +++ b/clustertest/utils_test.go @@ -12,6 +12,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" "sync" "testing" "time" @@ -159,42 +160,43 @@ func pointerTo[T any](v T) *T { return &v } +func dockerCmd(t testing.TB, ctx context.Context, args ...string) string { + t.Helper() + + tLogf(t, "executing command: docker %s", strings.Join(args, " ")) + + var w strings.Builder + cmd := exec.CommandContext(ctx, "docker", args...) + cmd.Stdout = &w + cmd.Stderr = &w + err := cmd.Run() + out := w.String() + require.NoError(t, err, "docker command failed: %s", out) + + return strings.TrimSpace(out) +} + // waitForTaskComplete polls a database task until it completes, fails, or times out. func waitForTaskComplete(ctx context.Context, c client.Client, dbID api.Identifier, taskID string, timeout time.Duration) error { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return fmt.Errorf("timeout waiting for task %s to complete", taskID) - case <-ticker.C: - task, err := c.GetDatabaseTask(ctx, &api.GetDatabaseTaskPayload{ - DatabaseID: dbID, - TaskID: taskID, - }) - if err != nil { - return fmt.Errorf("failed to get task: %w", err) - } - - switch task.Status { - case client.TaskStatusCompleted: - return nil - case client.TaskStatusFailed: - errMsg := "unknown error" - if task.Error != nil { - errMsg = *task.Error - } - return fmt.Errorf("task failed: %s", errMsg) - case client.TaskStatusCanceled: - return fmt.Errorf("task was canceled") - // "pending", "running", "canceling" - continue waiting - } + task, err := c.WaitForDatabaseTask(ctx, &api.GetDatabaseTaskPayload{ + DatabaseID: dbID, + TaskID: taskID, + }) + if err != nil { + return fmt.Errorf("failed to wait for task: %w", err) + } + if task.Status != client.TaskStatusCompleted { + var taskError string + if task.Error != nil { + taskError = *task.Error } + return fmt.Errorf("task status is '%s' instead of 'completed', error=%s", task.Status, taskError) } + + return nil } // waitForDatabaseAvailable polls a database until it reaches available state or times out. diff --git a/docs/installation/configuration.md b/docs/installation/configuration.md index 6471066b..8bd75cf5 100644 --- a/docs/installation/configuration.md +++ b/docs/installation/configuration.md @@ -16,11 +16,6 @@ Use a comma-separated string to specify string array properties, such as PGEDGE_CLIENT_ADDRESSES='192.168.1.2,my-host.internal' ``` -- [Configuration Reference](#configuration-reference) - - [Required Settings](#required-settings) - - [Optional Settings](#optional-settings) - - [Components](#components) - ## Required Settings | Property | Environment variable | Type | Description | Constraints | @@ -55,12 +50,14 @@ PGEDGE_CLIENT_ADDRESSES='192.168.1.2,my-host.internal' | `docker_swarm.database_networks_cidr` | `PGEDGE_DOCKER_SWARM__DATABASE_NETWORKS_CIDR` | string | `10.128.128.0/18` | The CIDR used to allocate per-database networks. | Must not be changed after creating databases. | | `docker_swarm.database_networks_subnet_bits` | `PGEDGE_DOCKER_SWARM__DATABASE_NETWORKS_SUBNET_BITS` | int | `26` | The subnet size for per-database networks. | Must not be changed after creating databases. | | `database_owner_uid` | `PGEDGE_DATABASE_OWNER_UID` | int | `26` | The UID to use for database configuration and data. | Must match the UID that owns the Postgres server processes. | +| `databases_monitor_interval_seconds` | `PGEDGE_DATABASES_MONITOR_INTERVAL_SECONDS` | uint | `30` | The refresh interval for the 'databases' monitor. This monitor watches for database version changes that happen outside of the Control Plane API, such as through a system package update. | Set to `0` to disable this monitor. | ### Components This is the current list of components that can be configured in the `logging.component_levels` setting: - `api_server` +- `database_service` - `election_candidate` - `embedded_etcd` - `migration` diff --git a/docs/installation/systemd.md b/docs/installation/systemd.md index 95dd1604..c6cfdbdb 100644 --- a/docs/installation/systemd.md +++ b/docs/installation/systemd.md @@ -25,8 +25,9 @@ manage Postgres instances rather than Docker containers. The systemd installation method has the following known limitations in the current release. -- The Postgres version of a database must not be changed after the database is - created; support for package upgrades is coming in a subsequent release. +- Database upgrades are not supported via the API. Minor version upgrades can be + performed manually by a system administrator. Further support for package + management and database upgrades will be added in subsequent releases. - Supporting Services are not yet supported on systemd clusters; support is coming in a subsequent release. - All hosts in a cluster must use the same orchestrator (either `swarm` or @@ -236,6 +237,37 @@ instructions. > systemd clusters. As with other port fields, you can specify `0` to > assign a random port. +## Performing Postgres Minor Version Upgrades + +Database upgrades are not yet supported via the Control Plane API, but system +administrators can perform minor Postgres version upgrades by updating the +packages on each machine. Follow these steps on each host in the cluster: + +1. Upgrade Postgres and/or other components using `dnf upgrade`. For example: + + ```sh + sudo dnf upgrade pgedge-postgresql18 + ``` + +2. Find the systemd unit names for your database instances by listing units that + have the `patroni-*` prefix: + + ```sh + sudo systemctl list-units 'patroni-*' + ``` + +3. Restart each service: + + ```sh + sudo systemctl try-restart + ``` + +To minimize the risk of downtime, we recommend upgrading one host at a time, +starting with hosts running replica instances. + +After completing the upgrade on all hosts, it may take up to 30 seconds for the +new versions to be reflected in the database spec in the Control Plane API. + ## Updating the Control Plane Updating the Control Plane requires stopping the service, installing the new diff --git a/e2e/cancel_task_test.go b/e2e/cancel_task_test.go index 895acd2b..b866fab1 100644 --- a/e2e/cancel_task_test.go +++ b/e2e/cancel_task_test.go @@ -51,6 +51,11 @@ func testCancelDB(t *testing.T) { database := create_resp.Database t.Logf("successfully created cancel task test db") + // TODO: even after many attempts at fixing this, rapidly cancelling a task + // still occasionally causes problems. This sleep is a workaround until + // we're able to track down the issue. + time.Sleep(500 * time.Millisecond) + cancelation_task, err := fixture.Client.CancelDatabaseTask(t.Context(), &controlplane.CancelDatabaseTaskPayload{ DatabaseID: database.ID, TaskID: controlplane.Identifier(creation_task.TaskID), diff --git a/lima/roles/install_prerequisites/tasks/main.yaml b/lima/roles/install_prerequisites/tasks/main.yaml index 5dfe7549..c6881ee2 100644 --- a/lima/roles/install_prerequisites/tasks/main.yaml +++ b/lima/roles/install_prerequisites/tasks/main.yaml @@ -15,6 +15,24 @@ name: https://dnf.pgedge.com/reporpm/pgedge-release-latest.noarch.rpm state: present disable_gpg_check: true +- name: Add pgEdge old repository + ansible.builtin.yum_repository: + name: pgedge-old + description: pgEdge Old RPM Repository + baseurl: "https://dnf.pgedge.com/release_old/{{ansible_facts['distribution_major_version']}}/RPMS/{{ansible_facts['architecture']}}/" + # We only host one version of our GPG keys and its invalid for these old + # packages. + gpgcheck: no + enabled: yes + sslverify: yes +- name: Add pgEdge noarch old repository + ansible.builtin.yum_repository: + name: pgedge-noarch-old + description: pgEdge Old RPM Repository (noarch) + baseurl: "https://dnf.pgedge.com/release_old/{{ansible_facts['distribution_major_version']}}/RPMS/noarch/" + gpgcheck: no + enabled: yes + sslverify: yes - name: Install prerequisites ansible.builtin.package: name: '{{ item }}' @@ -75,5 +93,8 @@ when: chronycfg.changed - name: Install delve debugger ansible.builtin.command: /usr/local/go/bin/go install github.com/go-delve/delve/cmd/dlv@latest + args: + creates: /root/go/bin/dlv - name: Fix clocks ansible.builtin.command: chronyc -a makestep + changed_when: false diff --git a/server/internal/config/config.go b/server/internal/config/config.go index 55bdf9c0..c4059ebe 100644 --- a/server/internal/config/config.go +++ b/server/internal/config/config.go @@ -245,29 +245,30 @@ var defaultRandomPorts = RandomPorts{ } type Config struct { - TenantID string `koanf:"tenant_id" json:"tenant_id,omitempty"` - HostID string `koanf:"host_id" json:"host_id,omitempty"` - Orchestrator Orchestrator `koanf:"orchestrator" json:"orchestrator,omitempty"` - DataDir string `koanf:"data_dir" json:"data_dir,omitempty"` - PeerAddresses []string `koanf:"peer_addresses" json:"peer_addresses,omitempty"` - ClientAddresses []string `koanf:"client_addresses" json:"client_addresses,omitempty"` - StopGracePeriodSeconds int64 `koanf:"stop_grace_period_seconds" json:"stop_grace_period_seconds,omitempty"` - MQTT MQTT `koanf:"mqtt" json:"mqtt,omitzero"` - HTTP HTTP `koanf:"http" json:"http,omitzero"` - Logging Logging `koanf:"logging" json:"logging,omitzero"` - EtcdMode EtcdMode `koanf:"etcd_mode" json:"etcd_mode,omitempty"` - EtcdUsername string `koanf:"etcd_username" json:"etcd_username,omitempty"` - EtcdPassword string `koanf:"etcd_password" json:"etcd_password,omitempty"` - EtcdKeyRoot string `koanf:"etcd_key_root" json:"etcd_key_root,omitempty"` - EtcdServer EtcdServer `koanf:"etcd_server" json:"etcd_server,omitzero"` - EtcdClient EtcdClient `koanf:"etcd_client" json:"etcd_client,omitzero"` - TraefikEnabled bool `koanf:"traefik_enabled" json:"traefik_enabled,omitempty"` - VectorEnabled bool `koanf:"vector_enabled" json:"vector_enabled,omitempty"` - DockerSwarm DockerSwarm `koanf:"docker_swarm" json:"docker_swarm,omitzero"` - SystemD SystemD `koanf:"systemd" json:"systemd,omitzero"` - DatabaseOwnerUID int `koanf:"database_owner_uid" json:"database_owner_uid,omitempty"` - ProfilingEnabled bool `koanf:"profiling_enabled" json:"profiling_enabled,omitempty"` - RandomPorts RandomPorts `koanf:"random_ports" json:"random_ports,omitzero"` + TenantID string `koanf:"tenant_id" json:"tenant_id,omitempty"` + HostID string `koanf:"host_id" json:"host_id,omitempty"` + Orchestrator Orchestrator `koanf:"orchestrator" json:"orchestrator,omitempty"` + DataDir string `koanf:"data_dir" json:"data_dir,omitempty"` + PeerAddresses []string `koanf:"peer_addresses" json:"peer_addresses,omitempty"` + ClientAddresses []string `koanf:"client_addresses" json:"client_addresses,omitempty"` + StopGracePeriodSeconds int64 `koanf:"stop_grace_period_seconds" json:"stop_grace_period_seconds,omitempty"` + MQTT MQTT `koanf:"mqtt" json:"mqtt,omitzero"` + HTTP HTTP `koanf:"http" json:"http,omitzero"` + Logging Logging `koanf:"logging" json:"logging,omitzero"` + EtcdMode EtcdMode `koanf:"etcd_mode" json:"etcd_mode,omitempty"` + EtcdUsername string `koanf:"etcd_username" json:"etcd_username,omitempty"` + EtcdPassword string `koanf:"etcd_password" json:"etcd_password,omitempty"` + EtcdKeyRoot string `koanf:"etcd_key_root" json:"etcd_key_root,omitempty"` + EtcdServer EtcdServer `koanf:"etcd_server" json:"etcd_server,omitzero"` + EtcdClient EtcdClient `koanf:"etcd_client" json:"etcd_client,omitzero"` + TraefikEnabled bool `koanf:"traefik_enabled" json:"traefik_enabled,omitempty"` + VectorEnabled bool `koanf:"vector_enabled" json:"vector_enabled,omitempty"` + DockerSwarm DockerSwarm `koanf:"docker_swarm" json:"docker_swarm,omitzero"` + SystemD SystemD `koanf:"systemd" json:"systemd,omitzero"` + DatabaseOwnerUID int `koanf:"database_owner_uid" json:"database_owner_uid,omitempty"` + ProfilingEnabled bool `koanf:"profiling_enabled" json:"profiling_enabled,omitempty"` + RandomPorts RandomPorts `koanf:"random_ports" json:"random_ports,omitzero"` + DatabasesMonitorIntervalSeconds uint64 `koanf:"databases_monitor_interval_seconds" json:"databases_monitor_interval_seconds,omitempty"` } // ClientAddress is a convenience function to return the first client address. @@ -405,20 +406,21 @@ func DefaultConfig() (Config, error) { } return Config{ - HostID: hostID, - Orchestrator: OrchestratorSwarm, - EtcdMode: EtcdModeServer, - PeerAddresses: addresses, - ClientAddresses: addresses, - Logging: loggingDefault, - HTTP: httpDefault, - StopGracePeriodSeconds: 30, - EtcdServer: etcdServerDefault, - EtcdClient: etcdClientDefault, - DockerSwarm: defaultDockerSwarm, - SystemD: defaultSystemD, - DatabaseOwnerUID: 26, - RandomPorts: defaultRandomPorts, + HostID: hostID, + Orchestrator: OrchestratorSwarm, + EtcdMode: EtcdModeServer, + PeerAddresses: addresses, + ClientAddresses: addresses, + Logging: loggingDefault, + HTTP: httpDefault, + StopGracePeriodSeconds: 30, + EtcdServer: etcdServerDefault, + EtcdClient: etcdClientDefault, + DockerSwarm: defaultDockerSwarm, + SystemD: defaultSystemD, + DatabaseOwnerUID: 26, + RandomPorts: defaultRandomPorts, + DatabasesMonitorIntervalSeconds: 30, }, nil } diff --git a/server/internal/database/instance.go b/server/internal/database/instance.go index b4343088..0c647ccf 100644 --- a/server/internal/database/instance.go +++ b/server/internal/database/instance.go @@ -111,6 +111,10 @@ func (s *InstanceStatus) IsPrimary() bool { return s.Role != nil && *s.Role == patroni.InstanceRolePrimary } +func (s *InstanceStatus) IsStale() bool { + return s.StatusUpdatedAt == nil || s.StatusUpdatedAt.Before(time.Now().Add(-2*InstanceMonitorRefreshInterval)) +} + func storedToInstance(instance *StoredInstance, status *StoredInstanceStatus) *Instance { if instance == nil { return nil @@ -135,8 +139,7 @@ func storedToInstance(instance *StoredInstance, status *StoredInstanceStatus) *I // We want to infer the instance state if the instance is supposed to be // available. if out.State == InstanceStateAvailable && status != nil { - breakpoint := time.Now().Add(-2 * InstanceMonitorRefreshInterval) - if out.Status.StatusUpdatedAt.Before(breakpoint) { + if out.Status.IsStale() { out.State = InstanceStateUnknown out.Status = nil return out diff --git a/server/internal/database/instance_resource.go b/server/internal/database/instance_resource.go index a51a395b..d624f8b1 100644 --- a/server/internal/database/instance_resource.go +++ b/server/internal/database/instance_resource.go @@ -31,12 +31,13 @@ func InstanceResourceIdentifier(instanceID string) resource.Identifier { } type InstanceResource struct { - Spec *InstanceSpec `json:"spec"` - InstanceHostname string `json:"instance_hostname"` - PrimaryInstanceID string `json:"primary_instance_id"` - OrchestratorDependencies []resource.Identifier `json:"dependencies"` - ConnectionInfo *ConnectionInfo `json:"connection_info"` - PostInit *Script `json:"post_init"` + Spec *InstanceSpec `json:"spec"` + InstanceHostname string `json:"instance_hostname"` + PrimaryInstanceID string `json:"primary_instance_id"` + PrimaryInstanceIDUpdatedAt time.Time `json:"primary_instance_id_updated_at"` + OrchestratorDependencies []resource.Identifier `json:"dependencies"` + ConnectionInfo *ConnectionInfo `json:"connection_info"` + PostInit *Script `json:"post_init"` } func (r *InstanceResource) ResourceVersion() string { @@ -46,6 +47,7 @@ func (r *InstanceResource) ResourceVersion() string { func (r *InstanceResource) DiffIgnore() []string { return []string{ "/primary_instance_id", + "/primary_instance_id_updated_at", "/connection_info", } } @@ -80,13 +82,9 @@ func (r *InstanceResource) Refresh(ctx context.Context, rc *resource.Context) er if err := r.updateConnectionInfo(ctx, rc); err != nil { return resource.ErrNotFound } - - primaryInstanceID, err := GetPrimaryInstanceID(ctx, r.patroniClient(), 30*time.Second) - if err != nil { + if err := r.updatePrimaryInstanceID(ctx, 30*time.Second); err != nil { return resource.ErrNotFound } - r.PrimaryInstanceID = primaryInstanceID - if err := SetScriptNeedsToRun(ctx, rc, r.PostInit); err != nil { return err } @@ -190,16 +188,24 @@ func (r *InstanceResource) Paths(orchestrator Orchestrator) (InstancePaths, erro return paths, nil } -func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.Context) error { - patroniClient := r.patroniClient() - primaryInstanceID, err := GetPrimaryInstanceID(ctx, patroniClient, time.Minute) +func (r *InstanceResource) updatePrimaryInstanceID(ctx context.Context, timeout time.Duration) error { + primaryInstanceID, err := GetPrimaryInstanceID(ctx, r.patroniClient(), timeout) if err != nil { return err } r.PrimaryInstanceID = primaryInstanceID + r.PrimaryInstanceIDUpdatedAt = time.Now() + + return nil +} + +func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.Context) error { + if err := r.updatePrimaryInstanceID(ctx, time.Minute); err != nil { + return err + } if r.Spec.InstanceID != r.PrimaryInstanceID { - err = r.updateInstanceRecord(ctx, rc, &InstanceUpdateOptions{State: InstanceStateAvailable}) + err := r.updateInstanceRecord(ctx, rc, &InstanceUpdateOptions{State: InstanceStateAvailable}) if err != nil { return r.recordError(ctx, rc, err) } @@ -209,7 +215,7 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource. // Enable failsafe mode if this instance is the only one in the node. // Otherwise, disable it. - _, err = patroniClient.PatchDynamicConfig(ctx, &patroni.DynamicConfig{ + _, err := r.patroniClient().PatchDynamicConfig(ctx, &patroni.DynamicConfig{ FailsafeMode: utils.PointerTo(r.Spec.NodeSize == 1), }) if err != nil { diff --git a/server/internal/database/instance_store.go b/server/internal/database/instance_store.go index 047af123..755c67c7 100644 --- a/server/internal/database/instance_store.go +++ b/server/internal/database/instance_store.go @@ -137,6 +137,11 @@ func (s *InstanceStore) Put(item *StoredInstance) storage.PutOp[*StoredInstance] return storage.NewPutOp(s.client, key, item) } +func (s *InstanceStore) Update(item *StoredInstance) storage.PutOp[*StoredInstance] { + key := s.Key(item.DatabaseID, item.InstanceID) + return storage.NewUpdateOp(s.client, key, item) +} + func (s *InstanceStore) DeleteByKey(databaseID, instanceID string) storage.DeleteOp { key := s.Key(databaseID, instanceID) return storage.NewDeleteKeyOp(s.client, key) diff --git a/server/internal/database/node_resource.go b/server/internal/database/node_resource.go index 333ea52a..d1a33f6f 100644 --- a/server/internal/database/node_resource.go +++ b/server/internal/database/node_resource.go @@ -6,6 +6,7 @@ import ( "fmt" "slices" "strings" + "time" "github.com/pgEdge/control-plane/server/internal/postgres" "github.com/pgEdge/control-plane/server/internal/resource" @@ -72,7 +73,8 @@ func (n *NodeResource) Create(ctx context.Context, rc *resource.Context) error { // Some instances may be down or in a bad state. We'll want to check all of // them to find one that knows the primary instance ID. - n.PrimaryInstanceID = "" + var primaryInstanceID string + var primaryInstanceUpdatedAt time.Time for _, id := range n.InstanceIDs { instance, err := resource.FromContext[*InstanceResource](rc, InstanceResourceIdentifier(id)) if errors.Is(err, resource.ErrNotFound) { @@ -81,11 +83,18 @@ func (n *NodeResource) Create(ctx context.Context, rc *resource.Context) error { return fmt.Errorf("failed to get instance %q: %w", id, err) } - if instance.PrimaryInstanceID != "" { - n.PrimaryInstanceID = instance.PrimaryInstanceID - break + if instance.PrimaryInstanceID == "" { + continue + } + // Instances are updated sequentially and a switchover can happen after + // an earlier instance update. We use the 'updated at' field to pick the + // most recently fetched primary instance ID. + if primaryInstanceID == "" || instance.PrimaryInstanceIDUpdatedAt.After(primaryInstanceUpdatedAt) { + primaryInstanceID = instance.PrimaryInstanceID + primaryInstanceUpdatedAt = instance.PrimaryInstanceIDUpdatedAt } } + n.PrimaryInstanceID = primaryInstanceID return nil } diff --git a/server/internal/database/provide.go b/server/internal/database/provide.go index ce7c6848..62fb6ec9 100644 --- a/server/internal/database/provide.go +++ b/server/internal/database/provide.go @@ -6,6 +6,7 @@ import ( "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/host" + "github.com/pgEdge/control-plane/server/internal/logging" "github.com/pgEdge/control-plane/server/internal/ports" ) @@ -36,7 +37,11 @@ func provideService(i *do.Injector) { if err != nil { return nil, err } - return NewService(cfg, orch, store, hostSvc, portsSvc), nil + loggerFactory, err := do.Invoke[*logging.Factory](i) + if err != nil { + return nil, err + } + return NewService(cfg, orch, store, hostSvc, portsSvc, loggerFactory), nil }) } diff --git a/server/internal/database/reconcile_versions.go b/server/internal/database/reconcile_versions.go new file mode 100644 index 00000000..a07cf425 --- /dev/null +++ b/server/internal/database/reconcile_versions.go @@ -0,0 +1,214 @@ +package database + +import ( + "context" + "errors" + "fmt" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/pgEdge/control-plane/server/internal/utils" +) + +func (s *Service) ReconcileAllDatabaseVersions(ctx context.Context) error { + databases, err := s.store.Database.GetAll().Exec(ctx) + if err != nil { + return fmt.Errorf("failed to get databases: %w", err) + } + for _, database := range databases { + if database.State.IsInProgress() { + continue + } + spec, err := s.store.Spec. + GetByKey(database.DatabaseID). + Exec(ctx) + if errors.Is(err, storage.ErrNotFound) { + continue + } else if err != nil { + return fmt.Errorf("failed to get spec for database '%s': %w", database.DatabaseID, err) + } + instances, err := s.store.Instance. + GetByDatabaseID(database.DatabaseID). + Exec(ctx) + if err != nil { + return fmt.Errorf("failed to get instances for database '%s': %w", database.DatabaseID, err) + } + instanceStatuses, err := s.store.InstanceStatus. + GetByDatabaseID(database.DatabaseID). + Exec(ctx) + if err != nil { + return fmt.Errorf("failed to get instance statuses for database '%s': %w", database.DatabaseID, err) + } + + logger := s.logger.With(). + Str("database_id", database.DatabaseID). + Logger() + + var ops []storage.TxnOperation + updatedSpec, updatedInstances := ReconcileVersions(spec, instances, instanceStatuses) + for _, instance := range updatedInstances { + logger.Info(). + Str("node_name", instance.NodeName). + Str("host_id", instance.HostID). + Str("instance_id", instance.InstanceID). + Stringer("postgres_version", instance.PgEdgeVersion.PostgresVersion). + Stringer("spock_version", instance.PgEdgeVersion.SpockVersion). + Msg("detected updated instance version") + + instanceSpec, err := s.store.InstanceSpec. + GetByKey(instance.DatabaseID, instance.InstanceID). + Exec(ctx) + if err != nil && !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("failed to get instance spec for instance '%s': %w", instance.InstanceID, err) + } else if err == nil { + instanceSpec.Spec.PgEdgeVersion = instance.PgEdgeVersion + ops = append(ops, s.store.InstanceSpec.Update(instanceSpec)) + } + + ops = append(ops, s.store.Instance.Update(instance)) + } + if updatedSpec != nil { + logger.Info().Msg("detected updated node versions") + ops = append(ops, s.store.Spec.Update(updatedSpec)) + } + if len(ops) == 0 { + continue + } + + // We want to abandon this update if the database has been updated since + // we last fetched it. + databaseNotUpdated := clientv3.Compare(clientv3.Version(s.store.Database.Key(database.DatabaseID)), "=", database.Version()) + txn := s.store.Txn(ops...) + txn.AddConditions(databaseNotUpdated) + err = txn.Commit(ctx) + switch { + case errors.Is(err, storage.ErrOperationConstraintViolated): + logger.Warn().Msg("database modified while updating detected version. skipping update.") + case err != nil: + return fmt.Errorf("failed to update records for database '%s': %w", database.DatabaseID, err) + default: + logger.Info().Msg("successfully updated with detected versions") + } + } + + return nil +} + +func ReconcileVersions( + spec *StoredSpec, + instances []*StoredInstance, + statuses []*StoredInstanceStatus, +) (*StoredSpec, []*StoredInstance) { + instancesByNodeHost, updatedInstances := reconcileInstanceVersions(instances, statuses) + updatedSpec := reconcileNodeVersions(spec, instancesByNodeHost) + + return updatedSpec, updatedInstances +} + +type nodeHostKey struct { + nodeName string + hostID string +} + +func reconcileInstanceVersions( + instances []*StoredInstance, + statuses []*StoredInstanceStatus, +) (map[nodeHostKey]*StoredInstance, []*StoredInstance) { + var updatedInstances []*StoredInstance + statusesByID := make(map[string]*StoredInstanceStatus, len(statuses)) + for _, status := range statuses { + statusesByID[status.InstanceID] = status + } + instancesByNodeHost := make(map[nodeHostKey]*StoredInstance, len(instances)) + for _, instance := range instances { + if instance.PgEdgeVersion == nil { + continue + } + status, ok := statusesByID[instance.InstanceID] + if !ok || status.Status.IsStale() { + continue + } + postgresVersion := utils.FromPointer(status.Status.PostgresVersion) + spockVersion := utils.FromPointer(status.Status.SpockVersion) + pgEdgeVersion, err := ds.ParsePgEdgeVersion(postgresVersion, spockVersion) + if err != nil { + continue + } + pgEdgeVersion, err = pgEdgeVersion.Normalize() + if err != nil { + continue + } + if !instance.PgEdgeVersion.Equals(pgEdgeVersion) { + instance.PgEdgeVersion = pgEdgeVersion + updatedInstances = append(updatedInstances, instance) + } + instancesByNodeHost[nodeHostKey{instance.NodeName, instance.HostID}] = instance + } + + return instancesByNodeHost, updatedInstances +} + +func observedNodeVersion( + node *Node, + instancesByNodeHost map[nodeHostKey]*StoredInstance, +) *ds.PgEdgeVersion { + var version *ds.PgEdgeVersion + for _, hostID := range node.HostIDs { + instance, ok := instancesByNodeHost[nodeHostKey{nodeName: node.Name, hostID: hostID}] + switch { + case !ok: + return nil + case version == nil: + version = instance.PgEdgeVersion + case !version.Equals(instance.PgEdgeVersion): + return nil + } + } + return version +} + +func reconcileNodeVersions( + spec *StoredSpec, + instancesByNodeHost map[nodeHostKey]*StoredInstance, +) *StoredSpec { + var updatedSpec *StoredSpec + var commonSpockVersion string + spockMatches := true + for _, node := range spec.Nodes { + currentPostgresVersion := node.PostgresVersion + if currentPostgresVersion == "" { + currentPostgresVersion = spec.PostgresVersion + } + observed := observedNodeVersion(node, instancesByNodeHost) + if observed == nil { + // we only want to update our spock version when _all_ nodes are + // observed to have the same spock version + spockMatches = false + continue + } + observedPostgresVersion := observed.PostgresVersion.String() + observedSpockVersion := observed.SpockVersion.String() + + if observedPostgresVersion != currentPostgresVersion { + node.PostgresVersion = observedPostgresVersion + // signals that we've modified the spec + updatedSpec = spec + } + if commonSpockVersion == "" { + commonSpockVersion = observedSpockVersion + } else if commonSpockVersion != observedSpockVersion { + spockMatches = false + } + } + if updatedSpec != nil { + updatedSpec.NormalizePostgresVersions() + } + if spockMatches && commonSpockVersion != "" && commonSpockVersion != spec.SpockVersion { + spec.SpockVersion = commonSpockVersion + updatedSpec = spec + } + + return updatedSpec +} diff --git a/server/internal/database/reconcile_versions_test.go b/server/internal/database/reconcile_versions_test.go new file mode 100644 index 00000000..92ffb774 --- /dev/null +++ b/server/internal/database/reconcile_versions_test.go @@ -0,0 +1,540 @@ +package database_test + +import ( + "testing" + "time" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/utils" + "github.com/stretchr/testify/assert" +) + +func TestReconcileVersions(t *testing.T) { + for _, tc := range []struct { + name string + spec *database.StoredSpec + instances []*database.StoredInstance + statuses []*database.StoredInstanceStatus + expectedSpec *database.StoredSpec + expectedInstances []*database.StoredInstance + }{ + { + name: "observed matches spec", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + }, + }, + { + name: "all nodes updated", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.5", + SpockVersion: "6", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "all nodes updated spock only", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + { + Name: "n1", + HostIDs: []string{"host-1"}, + PostgresVersion: "17.5", + }, + { + Name: "n2", + HostIDs: []string{"host-2"}, + PostgresVersion: "17.5", + }, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "6", + Nodes: []*database.Node{ + { + Name: "n1", + HostIDs: []string{"host-1"}, + PostgresVersion: "17.5", // These overrides should remain unnormalized since only the spock version changed + }, + { + Name: "n2", + HostIDs: []string{"host-2"}, + PostgresVersion: "17.5", + }, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "all nodes updated with override", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}, PostgresVersion: "17.5"}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.5", + SpockVersion: "6", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "one node updated", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}, PostgresVersion: "17.5"}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "one node updated, one with stale status", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now().Add(-3 * database.InstanceMonitorRefreshInterval)), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRolePrimary), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedSpec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}, PostgresVersion: "17.5"}, + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "not all instances updated", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1", "host-2"}}, + }, + }, + }, + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + { + InstanceID: "n1-host-2", + NodeName: "n1", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.4", "5"), + }, + }, + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRoleReplica), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + { + InstanceID: "n1-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRoleReplica), + PostgresVersion: utils.PointerTo("17.5"), + SpockVersion: utils.PointerTo("6.0.0"), + }, + }, + }, + expectedInstances: []*database.StoredInstance{ + { + InstanceID: "n1-host-2", + NodeName: "n1", + HostID: "host-2", + PgEdgeVersion: ds.MustParsePgEdgeVersion("17.5", "6"), + }, + }, + }, + { + name: "no instances", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1"}, + }, + }, + }, + }, + { + name: "malformed instance records", + spec: &database.StoredSpec{ + Spec: &database.Spec{ + PostgresVersion: "17.4", + SpockVersion: "5", + Nodes: []*database.Node{ + {Name: "n1", HostIDs: []string{"host-1"}}, + {Name: "n2", HostIDs: []string{"host-2"}}, + }, + }, + }, + // These instances are missing a PgEdgeVersion due to a failure + // somewhere else in the system. + instances: []*database.StoredInstance{ + { + InstanceID: "n1-host-1", + NodeName: "n1", + HostID: "host-1", + }, + { + InstanceID: "n2-host-2", + NodeName: "n2", + HostID: "host-2", + }, + }, + // These instances are up and running even though the instance + // records are malformed. Otherwise, reconcileVersions will skip + // these instances. + statuses: []*database.StoredInstanceStatus{ + { + InstanceID: "n1-host-1", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRoleReplica), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + { + InstanceID: "n2-host-2", + Status: &database.InstanceStatus{ + StatusUpdatedAt: utils.PointerTo(time.Now()), + Role: utils.PointerTo(patroni.InstanceRoleReplica), + PostgresVersion: utils.PointerTo("17.4"), + SpockVersion: utils.PointerTo("5.0.6"), + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + updatedSpec, updatedInstances := database.ReconcileVersions( + tc.spec, + tc.instances, + tc.statuses, + ) + assert.Equal(t, tc.expectedSpec, updatedSpec) + assert.Equal(t, tc.expectedInstances, updatedInstances) + }) + } +} diff --git a/server/internal/database/service.go b/server/internal/database/service.go index 5fa90e2a..d1bc1590 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -9,10 +9,12 @@ import ( "time" "github.com/google/uuid" + "github.com/rs/zerolog" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/ds" "github.com/pgEdge/control-plane/server/internal/host" + "github.com/pgEdge/control-plane/server/internal/logging" "github.com/pgEdge/control-plane/server/internal/pgbackrest" "github.com/pgEdge/control-plane/server/internal/ports" "github.com/pgEdge/control-plane/server/internal/storage" @@ -36,6 +38,7 @@ type Service struct { store *Store hostSvc *host.Service portsSvc *ports.Service + logger zerolog.Logger } func NewService( @@ -44,6 +47,7 @@ func NewService( store *Store, hostSvc *host.Service, portsSvc *ports.Service, + loggerFactory *logging.Factory, ) *Service { return &Service{ cfg: cfg, @@ -51,6 +55,7 @@ func NewService( store: store, hostSvc: hostSvc, portsSvc: portsSvc, + logger: loggerFactory.Logger(logging.ComponentDatabaseService), } } @@ -608,7 +613,7 @@ func (s *Service) PopulateSpecDefaults(ctx context.Context, spec *Spec) error { if spec.SpockVersion == "" { spec.SpockVersion = defaultVersion.SpockVersion.String() } - specVersion, err := ds.NewPgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) + specVersion, err := ds.ParsePgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) if err != nil { return fmt.Errorf("failed to parse versions from spec: %w", err) } @@ -628,7 +633,7 @@ func (s *Service) PopulateSpecDefaults(ctx context.Context, spec *Spec) error { return fmt.Errorf("host %s not found in host list", hostID) } if node.PostgresVersion != "" { - nodeVersion, err := ds.NewPgEdgeVersion(node.PostgresVersion, spec.SpockVersion) + nodeVersion, err := ds.ParsePgEdgeVersion(node.PostgresVersion, spec.SpockVersion) if err != nil { return fmt.Errorf("failed to parse versions from nodes[%d] spec: %w", idx, err) } diff --git a/server/internal/database/spec.go b/server/internal/database/spec.go index 4d3c024d..eb921b87 100644 --- a/server/internal/database/spec.go +++ b/server/internal/database/spec.go @@ -473,6 +473,30 @@ func (s *Spec) RemoveHost(hostId string) (ok bool) { return ok } +// NormalizePostgresVersions checks if all nodes have an equal postgres version +// and, if so, sets the top-level postgres version to that version and sets each +// node's postgres version to an empty string. +func (s *Spec) NormalizePostgresVersions() { + var common string + for _, node := range s.Nodes { + nodeVersion := node.PostgresVersion + if nodeVersion == "" { + nodeVersion = s.PostgresVersion + } + if common == "" { + common = nodeVersion + } else if nodeVersion != common { + return + } + } + if common != "" { + s.PostgresVersion = common + for _, node := range s.Nodes { + node.PostgresVersion = "" + } + } +} + func (s Spec) defaultOptionalFieldFromNodes(other []*Node) { otherNodesByName := make(map[string]*Node) for _, n := range other { @@ -622,7 +646,7 @@ func (n *NodeInstances) InstanceIDs() []string { } func (s *Spec) NodeInstances() ([]*NodeInstances, error) { - specVersion, err := ds.NewPgEdgeVersion(s.PostgresVersion, s.SpockVersion) + specVersion, err := ds.ParsePgEdgeVersion(s.PostgresVersion, s.SpockVersion) if err != nil { return nil, fmt.Errorf("failed to parse version from spec: %w", err) } @@ -659,7 +683,7 @@ func (s *Spec) NodeInstances() ([]*NodeInstances, error) { // Respect node-level overrides nodeVersion := specVersion if node.PostgresVersion != "" { - nodeVersion, err = ds.NewPgEdgeVersion(node.PostgresVersion, s.SpockVersion) + nodeVersion, err = ds.ParsePgEdgeVersion(node.PostgresVersion, s.SpockVersion) if err != nil { return nil, fmt.Errorf("failed to parse version from node spec: %w", err) } diff --git a/server/internal/database/status.go b/server/internal/database/status.go deleted file mode 100644 index 3ed55037..00000000 --- a/server/internal/database/status.go +++ /dev/null @@ -1,4 +0,0 @@ -package database - -// type InstanceStatus struct { -// } diff --git a/server/internal/ds/versions.go b/server/internal/ds/versions.go index 21ef942e..5128dd3f 100644 --- a/server/internal/ds/versions.go +++ b/server/internal/ds/versions.go @@ -73,6 +73,16 @@ func (v *Version) MajorVersion() *Version { } } +func (v *Version) MajorMinorVersion() *Version { + components := slices.Clone(v.Components) + if len(components) > 2 { + components = components[:2] + } + return &Version{ + Components: components, + } +} + func (v *Version) String() string { components := make([]string, len(v.Components)) for i, c := range v.Components { @@ -193,24 +203,42 @@ func (v *PgEdgeVersion) GreaterThan(other *PgEdgeVersion) bool { return v.Compare(other) > 0 } -func MustPgEdgeVersion(postgresVersion, spockVersion string) *PgEdgeVersion { - v, err := NewPgEdgeVersion(postgresVersion, spockVersion) +// Normalize returns the Postgres version in major.minor format and the Spock +// version in major format. This matches the way that versions are currently +// provided in our API. +func (v *PgEdgeVersion) Normalize() (*PgEdgeVersion, error) { + pv := v.PostgresVersion.MajorMinorVersion() + if len(pv.Components) != 2 { + return nil, fmt.Errorf("expected at least a major and minor version for postgres, got '%s'", pv) + } + sv := v.SpockVersion.MajorVersion() + if len(sv.Components) != 1 { + return nil, fmt.Errorf("expected at least a major version for spock, got '%s'", sv) + } + + return &PgEdgeVersion{ + PostgresVersion: pv, + SpockVersion: sv, + }, nil +} + +func MustParsePgEdgeVersion(postgresVersion, spockVersion string) *PgEdgeVersion { + v, err := ParsePgEdgeVersion(postgresVersion, spockVersion) if err != nil { panic(err) } return v } -func NewPgEdgeVersion(postgresVersion, spockVersion string) (*PgEdgeVersion, error) { +func ParsePgEdgeVersion(postgresVersion, spockVersion string) (*PgEdgeVersion, error) { pv, err := ParseVersion(postgresVersion) if err != nil { - return nil, fmt.Errorf("invalid postgres version: %q", postgresVersion) + return nil, fmt.Errorf("invalid postgres version: '%s'", postgresVersion) } sv, err := ParseVersion(spockVersion) if err != nil { - return nil, fmt.Errorf("invalid spock version: %q", spockVersion) + return nil, fmt.Errorf("invalid spock version: '%s'", spockVersion) } - return &PgEdgeVersion{ PostgresVersion: pv, SpockVersion: sv, diff --git a/server/internal/ds/versions_test.go b/server/internal/ds/versions_test.go index a72e0cdb..7b19dd90 100644 --- a/server/internal/ds/versions_test.go +++ b/server/internal/ds/versions_test.go @@ -230,7 +230,7 @@ func TestNewPgEdgeVersion(t *testing.T) { }, } { t.Run(tc.postgresVersion+"_"+tc.spockVersion, func(t *testing.T) { - result, err := ds.NewPgEdgeVersion(tc.postgresVersion, tc.spockVersion) + result, err := ds.ParsePgEdgeVersion(tc.postgresVersion, tc.spockVersion) if tc.expectedErr != "" { assert.Nil(t, result) assert.ErrorContains(t, err, tc.expectedErr) @@ -244,7 +244,7 @@ func TestNewPgEdgeVersion(t *testing.T) { func TestPgEdgeVersion(t *testing.T) { t.Run("String", func(t *testing.T) { - version := ds.MustPgEdgeVersion("17.6", "5.0.0") + version := ds.MustParsePgEdgeVersion("17.6", "5.0.0") assert.Equal(t, "17.6_5.0.0", version.String()) }) @@ -255,28 +255,28 @@ func TestPgEdgeVersion(t *testing.T) { expected int }{ { - a: ds.MustPgEdgeVersion("17.6", "5.0.0"), - b: ds.MustPgEdgeVersion("17.6", "5.0.0"), + a: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), + b: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), expected: 0, }, { - a: ds.MustPgEdgeVersion("18.0", "5.0.0"), - b: ds.MustPgEdgeVersion("17.6", "5.0.0"), + a: ds.MustParsePgEdgeVersion("18.0", "5.0.0"), + b: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), expected: 1, }, { - a: ds.MustPgEdgeVersion("17.6", "5.0.0"), - b: ds.MustPgEdgeVersion("18.0", "5.0.0"), + a: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), + b: ds.MustParsePgEdgeVersion("18.0", "5.0.0"), expected: -1, }, { - a: ds.MustPgEdgeVersion("17.6", "5.0.0"), - b: ds.MustPgEdgeVersion("17.6", "5.0.1"), + a: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), + b: ds.MustParsePgEdgeVersion("17.6", "5.0.1"), expected: -1, }, { - a: ds.MustPgEdgeVersion("17.6", "5.0.0"), - b: ds.MustPgEdgeVersion("17.6", "4.10.0"), + a: ds.MustParsePgEdgeVersion("17.6", "5.0.0"), + b: ds.MustParsePgEdgeVersion("17.6", "4.10.0"), expected: 1, }, } { diff --git a/server/internal/host/host_test.go b/server/internal/host/host_test.go index 97a76bdf..f8ab283a 100644 --- a/server/internal/host/host_test.go +++ b/server/internal/host/host_test.go @@ -20,78 +20,78 @@ func TestGreatestCommonDefaultVersion(t *testing.T) { { name: "same supported versions", defaultVersions: []*ds.PgEdgeVersion{ - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, supportedVersions: [][]*ds.PgEdgeVersion{ { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, }, - expected: ds.MustPgEdgeVersion("17.6", "5"), + expected: ds.MustParsePgEdgeVersion("17.6", "5"), }, { name: "one newer", defaultVersions: []*ds.PgEdgeVersion{ - ds.MustPgEdgeVersion("17.7", "5"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.7", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, supportedVersions: [][]*ds.PgEdgeVersion{ { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.7", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.7", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, }, - expected: ds.MustPgEdgeVersion("17.6", "5"), + expected: ds.MustParsePgEdgeVersion("17.6", "5"), }, { name: "no overlaps", defaultVersions: []*ds.PgEdgeVersion{ - ds.MustPgEdgeVersion("18.0", "6"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("18.0", "6"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, supportedVersions: [][]*ds.PgEdgeVersion{ { - ds.MustPgEdgeVersion("16.11", "6"), - ds.MustPgEdgeVersion("17.7", "6"), - ds.MustPgEdgeVersion("18.0", "6"), + ds.MustParsePgEdgeVersion("16.11", "6"), + ds.MustParsePgEdgeVersion("17.7", "6"), + ds.MustParsePgEdgeVersion("18.0", "6"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, }, expectedErr: "no common default versions found between the given hosts", @@ -104,25 +104,25 @@ func TestGreatestCommonDefaultVersion(t *testing.T) { // version. name: "no overlapping defaults", defaultVersions: []*ds.PgEdgeVersion{ - ds.MustPgEdgeVersion("18.0", "6"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("18.0", "6"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), }, supportedVersions: [][]*ds.PgEdgeVersion{ { - ds.MustPgEdgeVersion("16.11", "5"), - ds.MustPgEdgeVersion("17.6", "5"), - ds.MustPgEdgeVersion("18.0", "5"), + ds.MustParsePgEdgeVersion("16.11", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("18.0", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, { - ds.MustPgEdgeVersion("16.10", "5"), - ds.MustPgEdgeVersion("18.1", "5"), - ds.MustPgEdgeVersion("17.6", "5"), + ds.MustParsePgEdgeVersion("16.10", "5"), + ds.MustParsePgEdgeVersion("18.1", "5"), + ds.MustParsePgEdgeVersion("17.6", "5"), }, }, expectedErr: "no common default versions found between the given hosts", diff --git a/server/internal/logging/factory.go b/server/internal/logging/factory.go index 30e4c5aa..dbe9ede5 100644 --- a/server/internal/logging/factory.go +++ b/server/internal/logging/factory.go @@ -16,6 +16,7 @@ func (c Component) String() string { const ( ComponentAPIServer Component = "api_server" + ComponentDatabaseService Component = "database_service" ComponentElectionCandidate Component = "election_candidate" ComponentEmbeddedEtcd Component = "embedded_etcd" ComponentMigration Component = "migration" diff --git a/server/internal/monitor/databases_monitor.go b/server/internal/monitor/databases_monitor.go new file mode 100644 index 00000000..c0a2babf --- /dev/null +++ b/server/internal/monitor/databases_monitor.go @@ -0,0 +1,66 @@ +package monitor + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog" + + "github.com/pgEdge/control-plane/server/internal/config" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/election" +) + +type DatabasesMonitor struct { + monitor *Monitor + svc *database.Service + candidate *election.Candidate +} + +func NewDatabasesMonitor( + logger zerolog.Logger, + svc *database.Service, + candidate *election.Candidate, + cfg config.Config, +) *DatabasesMonitor { + m := &DatabasesMonitor{ + svc: svc, + candidate: candidate, + } + interval := time.Duration(cfg.DatabasesMonitorIntervalSeconds) * time.Second + m.monitor = NewMonitor(logger, interval, m.update) + return m +} + +func (m *DatabasesMonitor) Start(ctx context.Context) error { + if err := m.candidate.Start(ctx); err != nil { + return fmt.Errorf("failed to start candidate: %w", err) + } + m.monitor.Start(ctx) + + return nil +} + +func (m *DatabasesMonitor) Stop() error { + ctx, cancel := context.WithTimeout(context.Background(), electionTTL/3) + defer cancel() + + m.monitor.Stop() + if err := m.candidate.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop candidate: %w", err) + } + + return nil +} + +func (m *DatabasesMonitor) update(ctx context.Context) error { + if !m.candidate.IsLeader() { + return nil + } + if err := m.svc.ReconcileAllDatabaseVersions(ctx); err != nil { + return fmt.Errorf("failed to reconcile database versions: %w", err) + } + + return nil +} diff --git a/server/internal/monitor/host_monitor.go b/server/internal/monitor/host_monitor.go index 9575c116..f6e19cc9 100644 --- a/server/internal/monitor/host_monitor.go +++ b/server/internal/monitor/host_monitor.go @@ -2,6 +2,7 @@ package monitor import ( "context" + "fmt" "github.com/rs/zerolog" @@ -23,7 +24,7 @@ func NewHostMonitor( m.monitor = NewMonitor( logger, host.HostMonitorRefreshInterval, - m.checkStatus, + m.update, ) return m } @@ -36,7 +37,13 @@ func (m *HostMonitor) Stop() { m.monitor.Stop() } -func (m *HostMonitor) checkStatus(ctx context.Context) error { - return m.svc.UpdateHostStatus(ctx) +func (m *HostMonitor) update(ctx context.Context) error { + if err := m.svc.UpdateHost(ctx); err != nil { + return fmt.Errorf("failed to update host: %w", err) + } + if err := m.svc.UpdateHostStatus(ctx); err != nil { + return fmt.Errorf("failed to update host status: %w", err) + } + return nil } diff --git a/server/internal/monitor/instance_monitor.go b/server/internal/monitor/instance_monitor.go index 57ef2505..2385fa0f 100644 --- a/server/internal/monitor/instance_monitor.go +++ b/server/internal/monitor/instance_monitor.go @@ -89,12 +89,9 @@ func (m *InstanceMonitor) checkStatus(ctx context.Context) error { if err != nil { return m.updateInstanceErrStatus(ctx, status, err) } - - if status.IsPrimary() { - err = m.populateFromDbConn(ctx, dbState, info, tlsCfg, status) - if err != nil { - return m.updateInstanceErrStatus(ctx, status, err) - } + err = m.populateFromDbConn(ctx, dbState, info, tlsCfg, status) + if err != nil { + return m.updateInstanceErrStatus(ctx, status, err) } currentInstance, err := m.dbSvc.GetInstance(ctx, m.databaseID, m.instanceID) if err != nil { @@ -160,22 +157,24 @@ func (m *InstanceMonitor) populateFromDbConn( } status.SpockVersion = utils.PointerTo(spockVersion) - spockReadOnly, err := postgres.GetSpockReadOnly().Scalar(ctx, conn) - if err != nil { - return fmt.Errorf("failed to query spock read-only status: %w", err) - } - status.ReadOnly = utils.PointerTo(spockReadOnly) + if status.IsPrimary() { + spockReadOnly, err := postgres.GetSpockReadOnly().Scalar(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query spock read-only status: %w", err) + } + status.ReadOnly = utils.PointerTo(spockReadOnly) - subStatuses, err := postgres.GetSubscriptionStatuses().Scalars(ctx, conn) - if err != nil { - return fmt.Errorf("failed to query subscription statuses: %w", err) - } - for _, sub := range subStatuses { - status.Subscriptions = append(status.Subscriptions, database.SubscriptionStatus{ - ProviderNode: sub.ProviderNode, - Name: sub.SubscriptionName, - Status: sub.Status, - }) + subStatuses, err := postgres.GetSubscriptionStatuses().Scalars(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query subscription statuses: %w", err) + } + for _, sub := range subStatuses { + status.Subscriptions = append(status.Subscriptions, database.SubscriptionStatus{ + ProviderNode: sub.ProviderNode, + Name: sub.SubscriptionName, + Status: sub.Status, + }) + } } return nil diff --git a/server/internal/monitor/provide.go b/server/internal/monitor/provide.go index 858274ff..7e197a39 100644 --- a/server/internal/monitor/provide.go +++ b/server/internal/monitor/provide.go @@ -1,6 +1,8 @@ package monitor import ( + "time" + "github.com/rs/zerolog" "github.com/samber/do" clientv3 "go.etcd.io/etcd/client/v3" @@ -8,9 +10,13 @@ import ( "github.com/pgEdge/control-plane/server/internal/certificates" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/election" "github.com/pgEdge/control-plane/server/internal/host" ) +const electionName election.Name = "databases-monitor" +const electionTTL time.Duration = 30 * time.Second + func Provide(i *do.Injector) { provideStore(i) provideService(i) @@ -46,7 +52,13 @@ func provideService(i *do.Injector) { if err != nil { return nil, err } - return NewService(cfg, logger, dbSvc, certSvc, dbOrch, store, hostSvc), nil + electionSvc, err := do.Invoke[*election.Service](i) + if err != nil { + return nil, err + } + + candidate := electionSvc.NewCandidate(electionName, cfg.HostID, electionTTL) + return NewService(cfg, logger, dbSvc, certSvc, dbOrch, store, hostSvc, candidate), nil }) } diff --git a/server/internal/monitor/service.go b/server/internal/monitor/service.go index cef1b0d1..b3d4d022 100644 --- a/server/internal/monitor/service.go +++ b/server/internal/monitor/service.go @@ -10,6 +10,7 @@ import ( "github.com/pgEdge/control-plane/server/internal/certificates" "github.com/pgEdge/control-plane/server/internal/config" "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/election" "github.com/pgEdge/control-plane/server/internal/host" ) @@ -26,6 +27,7 @@ type Service struct { hostMonitor *HostMonitor instances map[string]*InstanceMonitor serviceInstances map[string]*ServiceInstanceMonitor + databasesMonitor *DatabasesMonitor } func NewService( @@ -36,7 +38,12 @@ func NewService( dbOrch database.Orchestrator, store *Store, hostSvc *host.Service, + candidate *election.Candidate, ) *Service { + var databasesMonitor *DatabasesMonitor + if cfg.DatabasesMonitorIntervalSeconds > 0 { + databasesMonitor = NewDatabasesMonitor(logger, dbSvc, candidate, cfg) + } return &Service{ cfg: cfg, logger: logger, @@ -47,6 +54,7 @@ func NewService( instances: map[string]*InstanceMonitor{}, serviceInstances: map[string]*ServiceInstanceMonitor{}, hostMonitor: NewHostMonitor(logger, hostSvc), + databasesMonitor: databasesMonitor, } } @@ -57,6 +65,11 @@ func (s *Service) Start(ctx context.Context) error { // the lifetime of a single operation. s.appCtx = ctx s.hostMonitor.Start(ctx) + if s.databasesMonitor != nil { + if err := s.databasesMonitor.Start(ctx); err != nil { + return fmt.Errorf("failed to start databases monitor: %w", err) + } + } stored, err := s.store.InstanceMonitor. GetAllByHostID(s.cfg.HostID). @@ -106,6 +119,11 @@ func (s *Service) Shutdown() error { s.serviceInstances = map[string]*ServiceInstanceMonitor{} s.hostMonitor.Stop() + if s.databasesMonitor != nil { + if err := s.databasesMonitor.Stop(); err != nil { + return fmt.Errorf("failed to stop databases monitor: %w", err) + } + } return nil } diff --git a/server/internal/orchestrator/common/patroni_config_generator_test.go b/server/internal/orchestrator/common/patroni_config_generator_test.go index aabfee36..b495f51b 100644 --- a/server/internal/orchestrator/common/patroni_config_generator_test.go +++ b/server/internal/orchestrator/common/patroni_config_generator_test.go @@ -52,7 +52,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, }, HostCPUs: 4, @@ -109,7 +109,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, BackupConfig: &database.BackupConfig{}, }, @@ -167,7 +167,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, RestoreConfig: &database.RestoreConfig{}, }, @@ -225,7 +225,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, RestoreConfig: &database.RestoreConfig{}, InPlaceRestore: true, @@ -284,7 +284,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, }, HostCPUs: 4, @@ -325,7 +325,7 @@ func TestPatroniConfigGenerator(t *testing.T) { DatabaseName: "app", NodeName: "n1", NodeOrdinal: 1, - PgEdgeVersion: ds.MustPgEdgeVersion("18.1", "5.0.4"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("18.1", "5.0.4"), ClusterSize: 3, }, HostCPUs: 4, diff --git a/server/internal/orchestrator/swarm/images.go b/server/internal/orchestrator/swarm/images.go index a474c252..fe59d251 100644 --- a/server/internal/orchestrator/swarm/images.go +++ b/server/internal/orchestrator/swarm/images.go @@ -25,49 +25,49 @@ func NewVersions(cfg config.Config) *Versions { } // pg16 - versions.addImage(ds.MustPgEdgeVersion("16.10", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("16.10", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.10-spock5.0.4-standard-3"), }) - versions.addImage(ds.MustPgEdgeVersion("16.11", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("16.11", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.11-spock5.0.4-standard-4"), }) - versions.addImage(ds.MustPgEdgeVersion("16.12", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("16.12", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.12-spock5.0.5-standard-1"), }) - versions.addImage(ds.MustPgEdgeVersion("16.13", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("16.13", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.13-spock5.0.7-standard-1"), }) // pg17 - versions.addImage(ds.MustPgEdgeVersion("17.6", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("17.6", "5"), &Images{ PgEdgeImage: imageTag(cfg, "17.6-spock5.0.4-standard-3"), }) - versions.addImage(ds.MustPgEdgeVersion("17.7", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("17.7", "5"), &Images{ PgEdgeImage: imageTag(cfg, "17.7-spock5.0.4-standard-4"), }) - versions.addImage(ds.MustPgEdgeVersion("17.8", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("17.8", "5"), &Images{ PgEdgeImage: imageTag(cfg, "17.8-spock5.0.5-standard-1"), }) - versions.addImage(ds.MustPgEdgeVersion("17.9", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("17.9", "5"), &Images{ PgEdgeImage: imageTag(cfg, "17.9-spock5.0.7-standard-1"), }) // pg18 - versions.addImage(ds.MustPgEdgeVersion("18.0", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("18.0", "5"), &Images{ PgEdgeImage: imageTag(cfg, "18.0-spock5.0.4-standard-3"), }) - versions.addImage(ds.MustPgEdgeVersion("18.1", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("18.1", "5"), &Images{ PgEdgeImage: imageTag(cfg, "18.1-spock5.0.4-standard-4"), }) - versions.addImage(ds.MustPgEdgeVersion("18.2", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("18.2", "5"), &Images{ PgEdgeImage: imageTag(cfg, "18.2-spock5.0.5-standard-1"), }) - versions.addImage(ds.MustPgEdgeVersion("18.3", "5"), &Images{ + versions.addImage(ds.MustParsePgEdgeVersion("18.3", "5"), &Images{ PgEdgeImage: imageTag(cfg, "18.3-spock5.0.7-standard-1"), }) - versions.defaultVersion = ds.MustPgEdgeVersion("18.3", "5") + versions.defaultVersion = ds.MustParsePgEdgeVersion("18.3", "5") return versions } diff --git a/server/internal/orchestrator/swarm/rag_instance_resources_test.go b/server/internal/orchestrator/swarm/rag_instance_resources_test.go index aa4d3355..383d07b2 100644 --- a/server/internal/orchestrator/swarm/rag_instance_resources_test.go +++ b/server/internal/orchestrator/swarm/rag_instance_resources_test.go @@ -227,7 +227,7 @@ func TestGenerateRAGInstanceResources_IncompatibleVersion(t *testing.T) { NodeName: "n1", ConnectAsUsername: "app_read_only", ConnectAsPassword: "secret", - PgEdgeVersion: ds.MustPgEdgeVersion("17", "5.0.0"), + PgEdgeVersion: ds.MustParsePgEdgeVersion("17", "5.0.0"), } _, err := o.generateRAGInstanceResources(spec) diff --git a/server/internal/storage/interface.go b/server/internal/storage/interface.go index 6c5d9589..7b8d23a0 100644 --- a/server/internal/storage/interface.go +++ b/server/internal/storage/interface.go @@ -46,6 +46,7 @@ type TxnOperation interface { // on a unique key. type Txn interface { AddOps(ops ...TxnOperation) + AddConditions(cmps ...clientv3.Cmp) Commit(ctx context.Context) error } diff --git a/server/internal/storage/txn.go b/server/internal/storage/txn.go index 93ae5b0d..3cc379f2 100644 --- a/server/internal/storage/txn.go +++ b/server/internal/storage/txn.go @@ -11,6 +11,7 @@ import ( type txn struct { ops []TxnOperation client *clientv3.Client + cmps []clientv3.Cmp } func NewTxn(client *clientv3.Client, ops ...TxnOperation) Txn { @@ -24,6 +25,10 @@ func (t *txn) AddOps(ops ...TxnOperation) { t.ops = append(t.ops, ops...) } +func (t *txn) AddConditions(cmps ...clientv3.Cmp) { + t.cmps = append(t.cmps, cmps...) +} + func (t *txn) Commit(ctx context.Context) error { var allOps []clientv3.Op var allCmps []clientv3.Cmp @@ -43,6 +48,7 @@ func (t *txn) Commit(ctx context.Context) error { cachedOps = append(cachedOps, c) } } + allCmps = append(allCmps, t.cmps...) // Etcd will reject the transaction if there are duplicate keys, and it // doesn't give a helpful error message. We can produce a better error by diff --git a/server/internal/workflows/plan_update.go b/server/internal/workflows/plan_update.go index f8947f97..162380ce 100644 --- a/server/internal/workflows/plan_update.go +++ b/server/internal/workflows/plan_update.go @@ -99,7 +99,7 @@ func (w *Workflows) getServiceResources( nodeInstances []*database.NodeInstances, ) (*operations.ServiceResources, error) { serviceInstanceID := database.GenerateServiceInstanceID(spec.DatabaseID, serviceSpec.ServiceID, hostID) - pgEdgeVersion, err := ds.NewPgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) + pgEdgeVersion, err := ds.ParsePgEdgeVersion(spec.PostgresVersion, spec.SpockVersion) if err != nil { return nil, fmt.Errorf("failed to parse pgedge version: %w", err) }