Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ down: ## Stop the dependencies via docker compose
$(call print-target)
docker compose down --remove-orphans --volumes

.PHONY: up-replicated
up-replicated: ## Start a 2-node replicated ClickHouse cluster (docker-compose.replicated.yaml) for testing the ReplicatedMergeTree code path
$(call print-target)
docker compose -f docker-compose.replicated.yaml up -d

.PHONY: down-replicated
down-replicated: ## Stop the replicated ClickHouse cluster
$(call print-target)
docker compose -f docker-compose.replicated.yaml down --remove-orphans --volumes

.PHONY: patch-oapi-templates
patch-oapi-templates: ## Patch oapi-codegen chi-middleware template with custom filter parsing
$(call print-target)
Expand Down
12 changes: 9 additions & 3 deletions app/common/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ func NewStreamingConnector(
var err error

connector, err = clickhouseconnector.New(ctx, clickhouseconnector.Config{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
EventsTableName: conf.EventsTableName,
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
EventsTableName: conf.EventsTableName,
EventsTableEngine: clickhouseconnector.EventsTableEngine{
Type: clickhouseconnector.EventsTableEngineType(conf.ClickHouse.EventsTableEngine.ResolvedType()),
ZooKeeperPath: conf.ClickHouse.EventsTableEngine.ZooKeeperPath,
ReplicaName: conf.ClickHouse.EventsTableEngine.ReplicaName,
Cluster: conf.ClickHouse.EventsTableEngine.Cluster,
},
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
Expand Down
94 changes: 94 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -79,6 +80,89 @@ type ClickHouseAggregationConfiguration struct {
PoolMetrics ClickhousePoolMetricsConfig

Retry ClickhouseQueryRetryConfig

// EventsTableEngine controls the storage engine used when creating the
// events table on startup. Defaults to "MergeTree", which ClickHouse Cloud
// transparently rewrites to SharedMergeTree. Self-hosted clusters that
// require replication should set Type to "ReplicatedMergeTree" and supply
// ZooKeeperPath and ReplicaName.
EventsTableEngine ClickhouseEventsTableEngineConfig
}

// ClickhouseEventsTableEngineType is the storage engine used for the events table.
type ClickhouseEventsTableEngineType string

const (
ClickhouseEventsTableEngineMergeTree ClickhouseEventsTableEngineType = "MergeTree"
ClickhouseEventsTableEngineReplicatedMergeTree ClickhouseEventsTableEngineType = "ReplicatedMergeTree"
)

// ClickhouseEventsTableEngineConfig describes the storage engine used when
// creating the events table on startup.
type ClickhouseEventsTableEngineConfig struct {
// Type is the storage engine name. Allowed values: "MergeTree" (default)
// and "ReplicatedMergeTree".
Type ClickhouseEventsTableEngineType

// ZooKeeperPath is the ZooKeeper/Keeper path passed as the first argument
// to ReplicatedMergeTree. Required when Type is "ReplicatedMergeTree".
// Supports ClickHouse macros, for example
// "/clickhouse/tables/{shard}/{database}/{table}".
ZooKeeperPath string

// ReplicaName is the replica identifier passed as the second argument to
// ReplicatedMergeTree. Required when Type is "ReplicatedMergeTree".
// Typically a macro like "{replica}".
ReplicaName string

// Cluster, when set, renders the CREATE TABLE statement with an
// ON CLUSTER {cluster} clause. Required for multi-node self-hosted
// deployments that route DDL through a cluster macro.
Cluster string
}

// Validate validates the configuration.
func (c ClickhouseEventsTableEngineConfig) Validate() error {
var errs []error

// The cluster name is backtick-quoted by the SQL builder, so any value
// ClickHouse accepts as an identifier is fine. Whitespace-only is almost
// always a config typo, so we reject it explicitly.
if c.Cluster != "" && strings.TrimSpace(c.Cluster) == "" {
errs = append(errs, errors.New("cluster name must not be whitespace-only"))
}

switch c.Type {
case "", ClickhouseEventsTableEngineMergeTree:
if c.Cluster != "" {
errs = append(errs, fmt.Errorf("cluster requires %s engine; MergeTree with ON CLUSTER produces independent (non-replicated) tables per node",
ClickhouseEventsTableEngineReplicatedMergeTree,
))
}
case ClickhouseEventsTableEngineReplicatedMergeTree:
if strings.TrimSpace(c.ZooKeeperPath) == "" {
errs = append(errs, errors.New("zooKeeperPath is required for ReplicatedMergeTree"))
}
if strings.TrimSpace(c.ReplicaName) == "" {
errs = append(errs, errors.New("replicaName is required for ReplicatedMergeTree"))
}
default:
errs = append(errs, fmt.Errorf("unsupported events table engine type %q (allowed: %q, %q)",
c.Type,
ClickhouseEventsTableEngineMergeTree,
ClickhouseEventsTableEngineReplicatedMergeTree,
))
}

return errors.Join(errs...)
}

// ResolvedType returns the engine type with empty defaulted to MergeTree.
func (c ClickhouseEventsTableEngineConfig) ResolvedType() ClickhouseEventsTableEngineType {
if c.Type == "" {
return ClickhouseEventsTableEngineMergeTree
}
return c.Type
}

// Validate validates the configuration.
Expand Down Expand Up @@ -117,6 +201,10 @@ func (c ClickHouseAggregationConfiguration) Validate() error {
errs = append(errs, fmt.Errorf("pool metrics: %w", err))
}

if err := c.EventsTableEngine.Validate(); err != nil {
errs = append(errs, fmt.Errorf("events table engine: %w", err))
}

return errors.Join(errs...)
}

Expand Down Expand Up @@ -223,6 +311,12 @@ func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.clickhouse.poolMetrics.enabled", true)
v.SetDefault("aggregation.clickhouse.poolMetrics.pollInterval", "5s")

// Events table storage engine. Defaults to MergeTree (ClickHouse Cloud
// rewrites this to SharedMergeTree transparently). Self-hosted clusters
// that require replication should set type to ReplicatedMergeTree and
// supply zooKeeperPath / replicaName / (optionally) cluster.
v.SetDefault("aggregation.clickhouse.eventsTableEngine.type", string(ClickhouseEventsTableEngineMergeTree))

// Decimal precision
v.SetDefault("aggregation.enableDecimalPrecision", false)
}
126 changes: 126 additions & 0 deletions app/config/aggregation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package config

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestClickhouseEventsTableEngineConfigValidate(t *testing.T) {
tests := []struct {
name string
cfg ClickhouseEventsTableEngineConfig
wantErr string
}{
{
name: "empty type defaults to MergeTree and is valid",
cfg: ClickhouseEventsTableEngineConfig{},
},
{
name: "explicit MergeTree is valid",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineMergeTree,
},
},
{
name: "MergeTree with cluster is rejected (non-replicated ON CLUSTER produces independent tables)",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineMergeTree,
Cluster: "c1",
},
wantErr: "cluster requires ReplicatedMergeTree",
},
{
name: "ReplicatedMergeTree without zk path is invalid",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineReplicatedMergeTree,
ReplicaName: "{replica}",
},
wantErr: "zooKeeperPath",
},
{
name: "ReplicatedMergeTree without replica is invalid",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineReplicatedMergeTree,
ZooKeeperPath: "/p",
},
wantErr: "replicaName",
},
{
name: "ReplicatedMergeTree fully populated is valid",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineReplicatedMergeTree,
ZooKeeperPath: "/clickhouse/tables/{shard}/{database}/{table}",
ReplicaName: "{replica}",
Cluster: "openmeter_cluster",
},
},
{
name: "unsupported engine type is rejected",
cfg: ClickhouseEventsTableEngineConfig{
Type: "SharedMergeTree",
},
wantErr: "unsupported events table engine type",
},
{
name: "cluster name with hyphen is valid (backtick-quoted at render time)",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineReplicatedMergeTree,
ZooKeeperPath: "/p",
ReplicaName: "{replica}",
Cluster: "prod-cluster-1",
},
},
{
name: "cluster name with whitespace-only is rejected",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineReplicatedMergeTree,
ZooKeeperPath: "/p",
ReplicaName: "{replica}",
Cluster: " ",
},
wantErr: "must not be whitespace-only",
},
{
name: "ReplicatedMergeTree with whitespace-only zk path is invalid",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineReplicatedMergeTree,
ZooKeeperPath: " ",
ReplicaName: "{replica}",
},
wantErr: "zooKeeperPath",
},
{
name: "ReplicatedMergeTree with whitespace-only replica is invalid",
cfg: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineReplicatedMergeTree,
ZooKeeperPath: "/p",
ReplicaName: "\t",
},
wantErr: "replicaName",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.wantErr == "" {
assert.NoError(t, err)
return
}
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.wantErr)
})
}
}

func TestClickhouseEventsTableEngineConfigResolvedType(t *testing.T) {
assert.Equal(t,
ClickhouseEventsTableEngineMergeTree,
ClickhouseEventsTableEngineConfig{}.ResolvedType(),
)
assert.Equal(t,
ClickhouseEventsTableEngineReplicatedMergeTree,
ClickhouseEventsTableEngineConfig{Type: ClickhouseEventsTableEngineReplicatedMergeTree}.ResolvedType(),
)
}
6 changes: 6 additions & 0 deletions app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ func TestComplete(t *testing.T) {
Enabled: true,
PollInterval: 5 * time.Second,
},
EventsTableEngine: ClickhouseEventsTableEngineConfig{
Type: ClickhouseEventsTableEngineReplicatedMergeTree,
ZooKeeperPath: "/clickhouse/tables/{shard}/{database}/{table}",
ReplicaName: "{replica}",
Cluster: "openmeter_cluster",
},
},
EventsTableName: "om_events",
AsyncInsert: false,
Expand Down
6 changes: 6 additions & 0 deletions app/config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ aggregation:
retry:
enabled: true

eventsTableEngine:
type: ReplicatedMergeTree
zooKeeperPath: "/clickhouse/tables/{shard}/{database}/{table}"
replicaName: "{replica}"
cluster: openmeter_cluster

sink:
groupId: openmeter-sink-worker
minCommitCount: 500
Expand Down
Loading