From af4539d6314e32c99cf856e2e6db66f3c23f22f4 Mon Sep 17 00:00:00 2001 From: Pierre SOUCHAY Date: Thu, 11 Dec 2025 17:11:04 +0100 Subject: [PATCH 1/7] Ensure collection returns within a specified delay **Context**: As explained in https://github.com/prometheus-community/postgres_exporter/issues/1228, when database is very slow to answer, every call to prometheus exporter might consume a new connection and possibly consume all available connections **Solution**: Ensure a Context with a specified Timeout is specified, so the connection will end if duration of collection is too long Signed-off-by: Pierre Souchay --- cmd/postgres_exporter/main.go | 3 ++- collector/collector.go | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 6b93725d4..be582268f 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -50,6 +50,7 @@ var ( excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String() includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String() metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String() + collectionTimeout = kingpin.Flag("collection-timeout", "Maximum duration of collection").Default("1m").Envar("PG_EXPORTER_COLLECTION_TIMEOUT").String() logger = promslog.NewNopLogger() ) @@ -137,7 +138,7 @@ func main() { excludedDatabases, dsn, []string{}, - ) + collector.CollectionTimeout(*collectionTimeout)) if err != nil { logger.Warn("Failed to create PostgresCollector", "err", err.Error()) } else { diff --git a/collector/collector.go b/collector/collector.go index de7203486..0b64096b1 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -92,7 +92,8 @@ type PostgresCollector struct { Collectors map[string]Collector logger *slog.Logger - instance *instance + instance *instance + CollectionTimeout time.Duration } type Option func(*PostgresCollector) error @@ -158,6 +159,17 @@ func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, dsn st return p, nil } +func CollectionTimeout(s string) Option { + return func(e *PostgresCollector) error { + duration, err := time.ParseDuration(s) + if err != nil { + return err + } + e.CollectionTimeout = duration + return nil + } +} + // Describe implements the prometheus.Collector interface. func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) { ch <- scrapeDurationDesc @@ -166,8 +178,8 @@ func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) { // Collect implements the prometheus.Collector interface. func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { - ctx := context.TODO() - + ctx, cancel := context.WithTimeout(context.Background(), p.CollectionTimeout) + defer cancel() // copy the instance so that concurrent scrapes have independent instances inst := p.instance.copy() From 49e6cd3a41a1ef35efe3c0d09a6630f0af47793e Mon Sep 17 00:00:00 2001 From: Pierre SOUCHAY Date: Thu, 11 Dec 2025 23:12:19 +0100 Subject: [PATCH 2/7] test: added TestPGDatabaseTimeout to ensure our timing behaves as it should Signed-off-by: Pierre Souchay --- collector/collector.go | 8 +++-- collector/pg_database_test.go | 67 +++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index 0b64096b1..ed77ef865 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -178,8 +178,6 @@ func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) { // Collect implements the prometheus.Collector interface. func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { - ctx, cancel := context.WithTimeout(context.Background(), p.CollectionTimeout) - defer cancel() // copy the instance so that concurrent scrapes have independent instances inst := p.instance.copy() @@ -190,6 +188,12 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { p.logger.Error("Error opening connection to database", "err", err) return } + p.collectFromConnection(inst, ch) +} + +func (p PostgresCollector) collectFromConnection(inst *instance, ch chan<- prometheus.Metric) { + ctx, cancel := context.WithTimeout(context.Background(), p.CollectionTimeout) + defer cancel() wg := sync.WaitGroup{} wg.Add(len(p.Collectors)) diff --git a/collector/pg_database_test.go b/collector/pg_database_test.go index fe94166e9..09c225bca 100644 --- a/collector/pg_database_test.go +++ b/collector/pg_database_test.go @@ -15,13 +15,80 @@ package collector import ( "context" "testing" + "time" "github.com/DATA-DOG/go-sqlmock" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/promslog" "github.com/smartystreets/goconvey/convey" ) +// We ensure that when the database respond after a long time +// The collection process still occurs in a predictable manner +// Will avoid accumulation of queries on a completely frozen DB +func TestPGDatabaseTimeout(t *testing.T) { + + timeoutForQuery := time.Duration(100 * time.Millisecond) + + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{"pg_roles.rolname", "pg_roles.rolconnlimit"} + rows := sqlmock.NewRows(columns).AddRow("role1", 2) + mock.ExpectQuery(pgRolesConnectionLimitsQuery). + WillDelayFor(30 * time.Second). + WillReturnRows(rows) + + log_config := promslog.Config{} + + logger := promslog.New(&log_config) + + c, err := NewPostgresCollector(logger, []string{}, "postgresql://local", []string{}, CollectionTimeout(timeoutForQuery.String())) + if err != nil { + t.Fatalf("error creating NewPostgresCollector: %s", err) + } + collector_config := collectorConfig{ + logger: logger, + excludeDatabases: []string{}, + } + + collector, err := NewPGRolesCollector(collector_config) + if err != nil { + t.Fatalf("error creating collector: %s", err) + } + c.Collectors["test"] = collector + c.instance = inst + + ch := make(chan prometheus.Metric) + defer close(ch) + + go func() { + for { + <-ch + time.Sleep(1 * time.Millisecond) + } + }() + + startTime := time.Now() + c.collectFromConnection(inst, ch) + elapsed := time.Since(startTime) + + if elapsed <= timeoutForQuery { + t.Errorf("elapsed time was %v, should be bigger than timeout=%v", elapsed, timeoutForQuery) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + func TestPGDatabaseCollector(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { From 619e34f11e4edebe310a60af0e7290b1a33ac07e Mon Sep 17 00:00:00 2001 From: Pierre SOUCHAY Date: Fri, 12 Dec 2025 00:24:31 +0100 Subject: [PATCH 3/7] document the new variable `PG_EXPORTER_COLLECTION_TIMEOUT` in the README.md Signed-off-by: Pierre SOUCHAY --- README.md | 7 +++++++ cmd/postgres_exporter/main.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index beff2c793..c616709fe 100644 --- a/README.md +++ b/README.md @@ -255,6 +255,13 @@ The following environment variables configure the exporter: * `DATA_SOURCE_PASS_FILE` The same as above but reads the password from a file. +* `PG_EXPORTER_COLLECTION_TIMEOUT` + Timeout duration to use when collecting the statistics, default to `1m`. + When the timeout is reached, the database connection will be dropped. + It avoids connections stacking when the database answers too slowly + (for instance if the database creates/drop a huge table and locks the tables) + and will avoid exhausting the pool of connections of the database. + * `PG_EXPORTER_WEB_TELEMETRY_PATH` Path under which to expose metrics. Default is `/metrics`. diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index be582268f..ab728fc28 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -50,7 +50,7 @@ var ( excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String() includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String() metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String() - collectionTimeout = kingpin.Flag("collection-timeout", "Maximum duration of collection").Default("1m").Envar("PG_EXPORTER_COLLECTION_TIMEOUT").String() + collectionTimeout = kingpin.Flag("collection-timeout", "Timeout for collecting the statistics when the database is slow").Default("1m").Envar("PG_EXPORTER_COLLECTION_TIMEOUT").String() logger = promslog.NewNopLogger() ) From d573c0e35b3d907ea126a9b530fbdf139a026551 Mon Sep 17 00:00:00 2001 From: Pierre SOUCHAY Date: Fri, 12 Dec 2025 08:36:54 +0100 Subject: [PATCH 4/7] CollectionTimeout -> WithCollectionTimeout Signed-off-by: Pierre SOUCHAY --- cmd/postgres_exporter/main.go | 2 +- collector/collector.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index ab728fc28..c901ec722 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -138,7 +138,7 @@ func main() { excludedDatabases, dsn, []string{}, - collector.CollectionTimeout(*collectionTimeout)) + collector.WithCollectionTimeout(*collectionTimeout)) if err != nil { logger.Warn("Failed to create PostgresCollector", "err", err.Error()) } else { diff --git a/collector/collector.go b/collector/collector.go index ed77ef865..8477994e2 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -159,7 +159,7 @@ func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, dsn st return p, nil } -func CollectionTimeout(s string) Option { +func WithCollectionTimeout(s string) Option { return func(e *PostgresCollector) error { duration, err := time.ParseDuration(s) if err != nil { From c2c23977426b1034e11dac710e91ec2c1f6e4d63 Mon Sep 17 00:00:00 2001 From: Pierre SOUCHAY Date: Fri, 12 Dec 2025 08:52:10 +0100 Subject: [PATCH 5/7] Document and report 0 value for PG_EXPORTER_COLLECTION_TIMEOUT Signed-off-by: Pierre SOUCHAY --- README.md | 1 + collector/collector.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/README.md b/README.md index c616709fe..8ac9f0953 100644 --- a/README.md +++ b/README.md @@ -261,6 +261,7 @@ The following environment variables configure the exporter: It avoids connections stacking when the database answers too slowly (for instance if the database creates/drop a huge table and locks the tables) and will avoid exhausting the pool of connections of the database. + Value of `0` or less than `1ms` is considered invalid and will report an error. * `PG_EXPORTER_WEB_TELEMETRY_PATH` Path under which to expose metrics. Default is `/metrics`. diff --git a/collector/collector.go b/collector/collector.go index 8477994e2..130f5313b 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -165,6 +165,9 @@ func WithCollectionTimeout(s string) Option { if err != nil { return err } + if duration < 1*time.Millisecond { + return errors.New("timeout must be greater than 1ms") + } e.CollectionTimeout = duration return nil } @@ -192,6 +195,7 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { } func (p PostgresCollector) collectFromConnection(inst *instance, ch chan<- prometheus.Metric) { + // Eventually, connect this to the http scraping context ctx, cancel := context.WithTimeout(context.Background(), p.CollectionTimeout) defer cancel() From 5955d1936a03aca147c800ee7e3d7f8ac78e9a00 Mon Sep 17 00:00:00 2001 From: Pierre SOUCHAY Date: Fri, 12 Dec 2025 08:57:08 +0100 Subject: [PATCH 6/7] move test to the right place Signed-off-by: Pierre SOUCHAY --- collector/collector_test.go | 68 +++++++++++++++++++++++++++++++++++ collector/pg_database_test.go | 67 ---------------------------------- 2 files changed, 68 insertions(+), 67 deletions(-) diff --git a/collector/collector_test.go b/collector/collector_test.go index d3b473b43..2b3d1300c 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -14,9 +14,13 @@ package collector import ( "strings" + "testing" + "time" + "github.com/DATA-DOG/go-sqlmock" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/promslog" ) type labelMap map[string]string @@ -60,3 +64,67 @@ func sanitizeQuery(q string) string { q = strings.ReplaceAll(q, "$", "\\$") return q } + +// We ensure that when the database respond after a long time +// The collection process still occurs in a predictable manner +// Will avoid accumulation of queries on a completely frozen DB +func TestWithConnectionTimeout(t *testing.T) { + + timeoutForQuery := time.Duration(100 * time.Millisecond) + + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{"pg_roles.rolname", "pg_roles.rolconnlimit"} + rows := sqlmock.NewRows(columns).AddRow("role1", 2) + mock.ExpectQuery(pgRolesConnectionLimitsQuery). + WillDelayFor(30 * time.Second). + WillReturnRows(rows) + + log_config := promslog.Config{} + + logger := promslog.New(&log_config) + + c, err := NewPostgresCollector(logger, []string{}, "postgresql://local", []string{}, WithCollectionTimeout(timeoutForQuery.String())) + if err != nil { + t.Fatalf("error creating NewPostgresCollector: %s", err) + } + collector_config := collectorConfig{ + logger: logger, + excludeDatabases: []string{}, + } + + collector, err := NewPGRolesCollector(collector_config) + if err != nil { + t.Fatalf("error creating collector: %s", err) + } + c.Collectors["test"] = collector + c.instance = inst + + ch := make(chan prometheus.Metric) + defer close(ch) + + go func() { + for { + <-ch + time.Sleep(1 * time.Millisecond) + } + }() + + startTime := time.Now() + c.collectFromConnection(inst, ch) + elapsed := time.Since(startTime) + + if elapsed <= timeoutForQuery { + t.Errorf("elapsed time was %v, should be bigger than timeout=%v", elapsed, timeoutForQuery) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_database_test.go b/collector/pg_database_test.go index 09c225bca..fe94166e9 100644 --- a/collector/pg_database_test.go +++ b/collector/pg_database_test.go @@ -15,80 +15,13 @@ package collector import ( "context" "testing" - "time" "github.com/DATA-DOG/go-sqlmock" "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/promslog" "github.com/smartystreets/goconvey/convey" ) -// We ensure that when the database respond after a long time -// The collection process still occurs in a predictable manner -// Will avoid accumulation of queries on a completely frozen DB -func TestPGDatabaseTimeout(t *testing.T) { - - timeoutForQuery := time.Duration(100 * time.Millisecond) - - db, mock, err := sqlmock.New() - if err != nil { - t.Fatalf("Error opening a stub db connection: %s", err) - } - defer db.Close() - - inst := &instance{db: db} - - columns := []string{"pg_roles.rolname", "pg_roles.rolconnlimit"} - rows := sqlmock.NewRows(columns).AddRow("role1", 2) - mock.ExpectQuery(pgRolesConnectionLimitsQuery). - WillDelayFor(30 * time.Second). - WillReturnRows(rows) - - log_config := promslog.Config{} - - logger := promslog.New(&log_config) - - c, err := NewPostgresCollector(logger, []string{}, "postgresql://local", []string{}, CollectionTimeout(timeoutForQuery.String())) - if err != nil { - t.Fatalf("error creating NewPostgresCollector: %s", err) - } - collector_config := collectorConfig{ - logger: logger, - excludeDatabases: []string{}, - } - - collector, err := NewPGRolesCollector(collector_config) - if err != nil { - t.Fatalf("error creating collector: %s", err) - } - c.Collectors["test"] = collector - c.instance = inst - - ch := make(chan prometheus.Metric) - defer close(ch) - - go func() { - for { - <-ch - time.Sleep(1 * time.Millisecond) - } - }() - - startTime := time.Now() - c.collectFromConnection(inst, ch) - elapsed := time.Since(startTime) - - if elapsed <= timeoutForQuery { - t.Errorf("elapsed time was %v, should be bigger than timeout=%v", elapsed, timeoutForQuery) - } - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("there were unfulfilled exceptions: %s", err) - } -} - func TestPGDatabaseCollector(t *testing.T) { db, mock, err := sqlmock.New() if err != nil { From ac7050d23ff0e153066c469c3c5032a7185c94fa Mon Sep 17 00:00:00 2001 From: Pierre SOUCHAY Date: Sun, 14 Dec 2025 10:04:44 +0100 Subject: [PATCH 7/7] ensure that UTest do not take too much time Signed-off-by: Pierre SOUCHAY --- collector/collector_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/collector/collector_test.go b/collector/collector_test.go index 2b3d1300c..984b2c0ff 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -124,6 +124,11 @@ func TestWithConnectionTimeout(t *testing.T) { t.Errorf("elapsed time was %v, should be bigger than timeout=%v", elapsed, timeoutForQuery) } + // Ensure we took more than timeout, but not too much + if elapsed >= timeoutForQuery+500*time.Millisecond { + t.Errorf("elapsed time was %v, should not be much bigger than timeout=%v", elapsed, timeoutForQuery) + } + if err := mock.ExpectationsWereMet(); err != nil { t.Errorf("there were unfulfilled exceptions: %s", err) }