diff --git a/cmd/bricksllm/main.go b/cmd/bricksllm/main.go index 0bdd4b3..a95a273 100644 --- a/cmd/bricksllm/main.go +++ b/cmd/bricksllm/main.go @@ -162,6 +162,8 @@ func main() { log.Sugar().Fatalf("error creating user id for users table: %v", err) } + go store.PrepareEventsIndexes(log) + cpMemStore, err := memdb.NewCustomProvidersMemDb(store, log, cfg.InMemoryDbUpdateInterval) if err != nil { log.Sugar().Fatalf("cannot initialize custom providers memdb: %v", err) diff --git a/internal/manager/reporting.go b/internal/manager/reporting.go index 2560a37..ffe2578 100644 --- a/internal/manager/reporting.go +++ b/internal/manager/reporting.go @@ -171,11 +171,11 @@ func (rm *ReportingManager) GetSpentKeyReporting(r *event.SpentKeyReportingReque func (rm *ReportingManager) GetUsageReporting(r *event.UsageReportingRequest) (*event.UsageReportingResponse, error) { if r == nil { - return nil, internal_errors.NewValidationError("key reporting requst cannot be nil") + return nil, internal_errors.NewValidationError("key reporting request cannot be nil") } for _, tag := range r.Tags { if len(tag) == 0 { - return nil, internal_errors.NewValidationError("key reporting requst tag cannot be empty") + return nil, internal_errors.NewValidationError("key reporting request tag cannot be empty") } } usage, err := rm.es.GetUsageData(r.Tags) diff --git a/internal/storage/postgresql/event.go b/internal/storage/postgresql/event.go index 274d19b..c3877e6 100644 --- a/internal/storage/postgresql/event.go +++ b/internal/storage/postgresql/event.go @@ -10,8 +10,10 @@ import ( "strings" "time" + internal_errors "github.com/bricks-cloud/bricksllm/internal/errors" "github.com/bricks-cloud/bricksllm/internal/event" "github.com/lib/pq" + "go.uber.org/zap" ) var allowedTopBy = []string{"total_cost_in_usd", "total_requests"} @@ -97,6 +99,31 @@ func (s *Store) CreateKeyIdIndexForEventsTable() error { return nil } +func (s *Store) PrepareEventsIndexes(logger *zap.Logger) error { + queries := []string{ + `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_tags ON events USING GIN(tags);`, + `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_created_at_brin ON events USING BRIN(created_at);`, + `CREATE EXTENSION IF NOT EXISTS btree_gin;`, + `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_tags_created_at_gin ON events USING GIN (tags, created_at);`, + } + + indexTimeout := 15 * time.Minute + ctxTimeout, cancel := context.WithTimeout(context.Background(), indexTimeout) + defer cancel() + + for _, query := range queries { + start := time.Now() + _, err := s.db.ExecContext(ctxTimeout, query) + if err != nil { + logger.Sugar().Errorf("error preparing events indexes: %v, with query: %s", err, query) + } + execT := time.Since(start).Milliseconds() + logger.Sugar().Infof("Exec query: %s. time: %d ms", query, execT) + } + + return nil +} + func (s *Store) CreateEventsTable() error { createTableQuery := ` CREATE TABLE IF NOT EXISTS events ( @@ -217,7 +244,7 @@ func (s *Store) GetLatencyPercentiles(start, end int64, tags, keyIds []string) ( eventSelectionBlock := ` WITH events_table AS ( - SELECT * FROM events + SELECT * FROM events ` conditionBlock := fmt.Sprintf("WHERE created_at >= %d AND created_at <= %d ", start, end) @@ -402,9 +429,9 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord WITH keys_table AS ( SELECT key_id FROM keys WHERE created_at >= %d AND created_at < %d %s - ),top_keys_table AS + ),top_keys_table AS ( - SELECT + SELECT events.key_id, SUM(cost_in_usd) AS "CostInUsd" FROM events @@ -416,12 +443,12 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord SELECT CASE WHEN top_keys_table.key_id IS NOT NULL THEN top_keys_table.key_id ELSE keys_table.key_id - END + END AS key_id , COALESCE(top_keys_table."CostInUsd", 0) AS cost_in_usd FROM keys_table FULL JOIN top_keys_table - ON top_keys_table.key_id = keys_table.key_id + ON top_keys_table.key_id = keys_table.key_id `, start, end, condition, start, end, condition2) @@ -431,7 +458,7 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord } query += fmt.Sprintf(` - ORDER BY cost_in_usd %s + ORDER BY cost_in_usd %s `, qorder) if limit != 0 { @@ -507,9 +534,9 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s WITH keys_table AS ( SELECT key_id, key_ring FROM keys WHERE created_at >= %d AND created_at < %d %s - ),top_keys_table AS + ),top_keys_table AS ( - SELECT + SELECT key_ring, SUM(cost_in_usd) AS total_cost_in_usd, COUNT(*) AS total_requests @@ -531,7 +558,7 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s qtopBy = topBy } query += fmt.Sprintf(` - ORDER BY %s %s + ORDER BY %s %s `, qtopBy, qorder) if limit != 0 { @@ -576,35 +603,31 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s } func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) { + if len(tags) == 0 { + return nil, internal_errors.NewValidationError("key reporting request tag cannot be empty") + } + condition := "tags @> $1 AND created_at > $2" nowTime := time.Now() + sixMonthsAgo := nowTime.Add(-6 * 30 * 24 * time.Hour).Unix() dayAgo := nowTime.Add(-24 * time.Hour).Unix() weekAgo := nowTime.Add(-7 * 24 * time.Hour).Unix() monthAgo := nowTime.Add(-30 * 24 * time.Hour).Unix() - args := []any{} - condition := "" - - index := 1 - if len(tags) > 0 { - condition += fmt.Sprintf(" tags @> $%d", index) - - args = append(args, pq.Array(tags)) - index++ - } + args := []any{pq.Array(tags), sixMonthsAgo, dayAgo, weekAgo, monthAgo} query := fmt.Sprintf(` - SELECT - COALESCE(SUM(cost_in_usd), 0) AS total_cost_in_usd, - COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_day, - COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_week, - COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_month, - COALESCE(SUM(1), 0) AS total_requests, - COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_day, - COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_week, - COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_month - FROM events - WHERE %s - `, dayAgo, weekAgo, monthAgo, dayAgo, weekAgo, monthAgo, condition) + SELECT + COALESCE(SUM(cost_in_usd), 0) AS total_cost_in_usd, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $3), 0) AS total_cost_in_usd_last_day, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $4), 0) AS total_cost_in_usd_last_week, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $5), 0) AS total_cost_in_usd_last_month, + COALESCE(COUNT(*), 0) AS total_requests, + COALESCE(COUNT(*) FILTER (WHERE created_at > $3), 0) AS total_requests_last_day, + COALESCE(COUNT(*) FILTER (WHERE created_at > $4), 0) AS total_requests_last_week, + COALESCE(COUNT(*) FILTER (WHERE created_at > $5), 0) AS total_requests_last_month + FROM events + WHERE %s + `, condition) ctx, cancel := context.WithTimeout(context.Background(), s.rt) defer cancel() @@ -620,7 +643,7 @@ func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) { &data.LastWeekUsageRequests, &data.LastMonthUsageRequests, ); err != nil { - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return nil, nil } return nil, err @@ -722,7 +745,7 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds, cu %s FROM time_series_table LEFT JOIN events_table - ON events_table.created_at >= time_series_table.series + ON events_table.created_at >= time_series_table.series AND events_table.created_at < time_series_table.series + %d %s ORDER BY time_series_table.series; @@ -733,7 +756,7 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds, cu eventSelectionBlock := ` WITH events_table AS ( - SELECT * FROM events + SELECT * FROM events ` conditionBlock := fmt.Sprintf("WHERE created_at >= %d AND created_at < %d ", start, end)