From b1d0490402b179d17add875a2ca848e325725449 Mon Sep 17 00:00:00 2001 From: Gergely Tamas Kurucz Date: Thu, 21 May 2026 18:26:30 +0200 Subject: [PATCH 1/6] feat: add basic governance handler --- api/v3/handlers/governance/handler.go | 38 ++++ api/v3/handlers/governance/mapping.go | 61 ++++++ api/v3/handlers/governance/query.go | 290 ++++++++++++++++++++++++++ api/v3/server/routes.go | 2 +- api/v3/server/server.go | 4 + 5 files changed, 394 insertions(+), 1 deletion(-) create mode 100644 api/v3/handlers/governance/handler.go create mode 100644 api/v3/handlers/governance/mapping.go create mode 100644 api/v3/handlers/governance/query.go diff --git a/api/v3/handlers/governance/handler.go b/api/v3/handlers/governance/handler.go new file mode 100644 index 0000000000..6aba8e6e1d --- /dev/null +++ b/api/v3/handlers/governance/handler.go @@ -0,0 +1,38 @@ +package governance + +import ( + "context" + + "github.com/openmeterio/openmeter/openmeter/customer" + "github.com/openmeterio/openmeter/openmeter/entitlement" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/pkg/framework/transport/httptransport" +) + +type Handler interface { + QueryGovernanceAccess() QueryGovernanceAccessHandler +} + +type handler struct { + resolveNamespace func(ctx context.Context) (string, error) + customerService customer.Service + entitlementService entitlement.Service + featureConnector feature.FeatureConnector + options []httptransport.HandlerOption +} + +func New( + resolveNamespace func(ctx context.Context) (string, error), + customerService customer.Service, + entitlementService entitlement.Service, + featureConnector feature.FeatureConnector, + options ...httptransport.HandlerOption, +) Handler { + return &handler{ + resolveNamespace: resolveNamespace, + customerService: customerService, + entitlementService: entitlementService, + featureConnector: featureConnector, + options: options, + } +} diff --git a/api/v3/handlers/governance/mapping.go b/api/v3/handlers/governance/mapping.go new file mode 100644 index 0000000000..dba9240d33 --- /dev/null +++ b/api/v3/handlers/governance/mapping.go @@ -0,0 +1,61 @@ +package governance + +import ( + api "github.com/openmeterio/openmeter/api/v3" + "github.com/openmeterio/openmeter/openmeter/entitlement" + booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" + meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" + staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" +) + +// mapEntitlementToAccess converts an entitlement value to a governance feature access result. +// When has_access is false, the reason code is derived from the entitlement type. +func mapEntitlementToAccess(v entitlement.EntitlementValue) api.GovernanceFeatureAccess { + switch ent := v.(type) { + case *meteredentitlement.MeteredEntitlementValue: + if ent.HasAccess() { + return api.GovernanceFeatureAccess{HasAccess: true} + } + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeUsageLimitReached, + Message: "usage limit for feature reached", + }, + } + + case *booleanentitlement.BooleanEntitlementValue: + if ent.HasAccess() { + return api.GovernanceFeatureAccess{HasAccess: true} + } + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: "feature is not available for customer", + }, + } + + case *staticentitlement.StaticEntitlementValue: + if ent.HasAccess() { + return api.GovernanceFeatureAccess{HasAccess: true} + } + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: "feature is not available for customer", + }, + } + + default: + // NoAccessValue or unknown type + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: "feature is not available for customer", + }, + } + } +} diff --git a/api/v3/handlers/governance/query.go b/api/v3/handlers/governance/query.go new file mode 100644 index 0000000000..d99e66072f --- /dev/null +++ b/api/v3/handlers/governance/query.go @@ -0,0 +1,290 @@ +package governance + +import ( + "context" + "fmt" + "net/http" + "sort" + + "github.com/oapi-codegen/nullable" + "github.com/samber/lo" + + api "github.com/openmeterio/openmeter/api/v3" + "github.com/openmeterio/openmeter/api/v3/apierrors" + customershandler "github.com/openmeterio/openmeter/api/v3/handlers/customers" + "github.com/openmeterio/openmeter/openmeter/customer" + "github.com/openmeterio/openmeter/openmeter/entitlement" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/pkg/clock" + "github.com/openmeterio/openmeter/pkg/framework/commonhttp" + "github.com/openmeterio/openmeter/pkg/framework/transport/httptransport" + "github.com/openmeterio/openmeter/pkg/models" + pagination "github.com/openmeterio/openmeter/pkg/pagination/v2" +) + +const ( + defaultPageSize = 100 + maxPageSize = 100 +) + +type ( + QueryGovernanceAccessParams = api.QueryGovernanceAccessParams + QueryGovernanceAccessResponse = api.GovernanceQueryResponse + QueryGovernanceAccessHandler = httptransport.HandlerWithArgs[queryGovernanceAccessRequest, QueryGovernanceAccessResponse, QueryGovernanceAccessParams] +) + +type queryGovernanceAccessRequest struct { + Namespace string + CustomerKeys []string + FeatureKeys []string // nil means all features + IncludeCredits bool + PageSize int + Cursor *pagination.Cursor +} + +func (h *handler) QueryGovernanceAccess() QueryGovernanceAccessHandler { + return httptransport.NewHandlerWithArgs( + func(ctx context.Context, r *http.Request, params QueryGovernanceAccessParams) (queryGovernanceAccessRequest, error) { + ns, err := h.resolveNamespace(ctx) + if err != nil { + return queryGovernanceAccessRequest{}, err + } + + var body api.GovernanceQueryRequest + if err := commonhttp.JSONRequestBodyDecoder(r, &body); err != nil { + return queryGovernanceAccessRequest{}, err + } + + pageSize := defaultPageSize + var cursor *pagination.Cursor + + if params.Page != nil { + if params.Page.Size != nil { + pageSize = *params.Page.Size + if pageSize < 1 || pageSize > maxPageSize { + return queryGovernanceAccessRequest{}, apierrors.NewBadRequestError(ctx, + fmt.Errorf("page[size] must be between 1 and %d", maxPageSize), + apierrors.InvalidParameters{{ + Field: "page[size]", + Reason: fmt.Sprintf("must be between 1 and %d", maxPageSize), + Source: apierrors.InvalidParamSourceQuery, + }}, + ) + } + } + + if params.Page.After != nil { + decoded, err := pagination.DecodeCursor(*params.Page.After) + if err != nil { + return queryGovernanceAccessRequest{}, apierrors.NewBadRequestError(ctx, err, apierrors.InvalidParameters{{ + Field: "page[after]", + Reason: err.Error(), + Source: apierrors.InvalidParamSourceQuery, + }}) + } + cursor = decoded + } + } + + req := queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: body.Customer.Keys, + IncludeCredits: lo.FromPtrOr(body.IncludeCredits, false), + PageSize: pageSize, + Cursor: cursor, + } + + if body.Feature != nil { + req.FeatureKeys = body.Feature.Keys + } + + return req, nil + }, + func(ctx context.Context, req queryGovernanceAccessRequest) (QueryGovernanceAccessResponse, error) { + return h.processGovernanceQuery(ctx, req) + }, + commonhttp.JSONResponseEncoderWithStatus[QueryGovernanceAccessResponse](http.StatusOK), + httptransport.AppendOptions( + h.options, + httptransport.WithOperationName("query-governance-access"), + httptransport.WithErrorEncoder(apierrors.GenericErrorEncoder()), + )..., + ) +} + +// resolvedCustomer groups matched input keys for a single customer. +type resolvedCustomer struct { + Customer customer.Customer + Matched []string +} + +func (h *handler) processGovernanceQuery(ctx context.Context, req queryGovernanceAccessRequest) (QueryGovernanceAccessResponse, error) { + // Resolve each input key to a customer; deduplicate by customer ID. + customerMap := make(map[string]*resolvedCustomer) + var queryErrors []api.GovernanceQueryError + + for _, key := range req.CustomerKeys { + cus, err := h.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{ + Namespace: req.Namespace, + Key: key, + }) + if err != nil { + if models.IsGenericNotFoundError(err) { + queryErrors = append(queryErrors, api.GovernanceQueryError{ + Customer: lo.ToPtr(key), + Code: api.GovernanceQueryErrorCodeCustomerNotFound, + Message: "customer not found", + }) + continue + } + return QueryGovernanceAccessResponse{}, fmt.Errorf("resolve customer key %q: %w", key, err) + } + + if rc, ok := customerMap[cus.ID]; ok { + rc.Matched = append(rc.Matched, key) + } else { + customerMap[cus.ID] = &resolvedCustomer{ + Customer: *cus, + Matched: []string{key}, + } + } + } + + // Sort by (CreatedAt, ID) for stable cursor pagination. + customers := lo.Values(customerMap) + sort.Slice(customers, func(i, j int) bool { + ti := customers[i].Customer.CreatedAt + tj := customers[j].Customer.CreatedAt + if !ti.Equal(tj) { + return ti.Before(tj) + } + return customers[i].Customer.ID < customers[j].Customer.ID + }) + + // Apply cursor: skip everything at or before the cursor position. + if req.Cursor != nil { + afterCursor := *req.Cursor + start := len(customers) // default: nothing left if cursor is beyond all items + for i, rc := range customers { + c := pagination.NewCursor(rc.Customer.CreatedAt, rc.Customer.ID) + if c.Time.After(afterCursor.Time) || (c.Time.Equal(afterCursor.Time) && c.ID > afterCursor.ID) { + start = i + break + } + } + customers = customers[start:] + } + + // Apply page size. + hasMore := len(customers) > req.PageSize + if hasMore { + customers = customers[:req.PageSize] + } + + // Compute feature access for each paged customer. + now := clock.Now() + results := make([]api.GovernanceQueryResult, 0, len(customers)) + + for _, rc := range customers { + access, err := h.entitlementService.GetAccess(ctx, req.Namespace, rc.Customer.ID) + if err != nil { + return QueryGovernanceAccessResponse{}, fmt.Errorf("get access for customer %s: %w", rc.Customer.ID, err) + } + + featureAccess, err := h.buildFeatureAccess(ctx, req.Namespace, req.FeatureKeys, access) + if err != nil { + return QueryGovernanceAccessResponse{}, fmt.Errorf("build feature access for customer %s: %w", rc.Customer.ID, err) + } + + results = append(results, api.GovernanceQueryResult{ + Matched: rc.Matched, + Customer: customershandler.ToAPIBillingCustomer(rc.Customer), + Features: featureAccess, + UpdatedAt: now, + }) + } + + return QueryGovernanceAccessResponse{ + Data: results, + Errors: queryErrors, + Meta: buildCursorMeta(customers, req.PageSize, hasMore), + }, nil +} + +// buildFeatureAccess returns the feature access map for a single customer. +// If featureKeys is non-empty, only those keys are evaluated; otherwise all entitlements are returned. +func (h *handler) buildFeatureAccess(ctx context.Context, ns string, featureKeys []string, access entitlement.Access) (map[string]api.GovernanceFeatureAccess, error) { + result := make(map[string]api.GovernanceFeatureAccess) + + if len(featureKeys) == 0 { + for key, ev := range access.Entitlements { + result[key] = mapEntitlementToAccess(ev.Value) + } + return result, nil + } + + for _, key := range featureKeys { + ev, ok := access.Entitlements[key] + if !ok { + access, err := h.resolveAbsentFeature(ctx, ns, key) + if err != nil { + return nil, err + } + result[key] = access + continue + } + result[key] = mapEntitlementToAccess(ev.Value) + } + + return result, nil +} + +// resolveAbsentFeature determines the reason a requested feature key is absent from GetAccess results: +// either the feature doesn't exist in the org (FeatureNotFound) or the customer has no entitlement for it (FeatureUnavailable). +func (h *handler) resolveAbsentFeature(ctx context.Context, ns, featureKey string) (api.GovernanceFeatureAccess, error) { + _, err := h.featureConnector.GetFeature(ctx, ns, featureKey, feature.IncludeArchivedFeatureFalse) + if err != nil { + if models.IsGenericNotFoundError(err) { + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureNotFound, + Message: fmt.Sprintf("feature %q not found", featureKey), + }, + }, nil + } + return api.GovernanceFeatureAccess{}, fmt.Errorf("get feature %q: %w", featureKey, err) + } + + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: fmt.Sprintf("feature %q is not available for this customer", featureKey), + }, + }, nil +} + +func buildCursorMeta(customers []*resolvedCustomer, pageSize int, hasMore bool) api.CursorMeta { + meta := api.CursorMeta{ + Page: api.CursorMetaPage{ + Next: nullable.NewNullNullable[string](), + Previous: nullable.NewNullNullable[string](), + Size: float32(pageSize), + }, + } + + if len(customers) > 0 { + first := customers[0] + last := customers[len(customers)-1] + firstCursor := pagination.NewCursor(first.Customer.CreatedAt, first.Customer.ID) + lastCursor := pagination.NewCursor(last.Customer.CreatedAt, last.Customer.ID) + meta.Page.First = lo.ToPtr(firstCursor.Encode()) + meta.Page.Last = lo.ToPtr(lastCursor.Encode()) + if hasMore { + meta.Page.Next = nullable.NewNullableWithValue(lastCursor.Encode()) + } + } + + return meta +} diff --git a/api/v3/server/routes.go b/api/v3/server/routes.go index e70399f98a..5c36ea179d 100644 --- a/api/v3/server/routes.go +++ b/api/v3/server/routes.go @@ -470,5 +470,5 @@ func (s *Server) UpdateOrganizationDefaultTaxCodes(w http.ResponseWriter, r *htt // Governance func (s *Server) QueryGovernanceAccess(w http.ResponseWriter, r *http.Request, params api.QueryGovernanceAccessParams) { - unimplemented.QueryGovernanceAccess(w, r, params) + s.governanceHandler.QueryGovernanceAccess().With(params).ServeHTTP(w, r) } diff --git a/api/v3/server/server.go b/api/v3/server/server.go index 00d7cbd459..ab28f100f7 100644 --- a/api/v3/server/server.go +++ b/api/v3/server/server.go @@ -27,6 +27,7 @@ import ( eventshandler "github.com/openmeterio/openmeter/api/v3/handlers/events" featurecosthandler "github.com/openmeterio/openmeter/api/v3/handlers/featurecost" featureshandler "github.com/openmeterio/openmeter/api/v3/handlers/features" + governancehandler "github.com/openmeterio/openmeter/api/v3/handlers/governance" llmcosthandler "github.com/openmeterio/openmeter/api/v3/handlers/llmcost" metershandler "github.com/openmeterio/openmeter/api/v3/handlers/meters" planshandler "github.com/openmeterio/openmeter/api/v3/handlers/plans" @@ -238,6 +239,7 @@ type Server struct { customersBillingHandler customersbillinghandler.Handler customersCreditsHandler customerscreditshandler.Handler customersEntitlementHandler customersentitlementhandler.Handler + governanceHandler governancehandler.Handler metersHandler metershandler.Handler subscriptionsHandler subscriptionshandler.Handler subscriptionAddonsHandler subscriptionaddonshandler.Handler @@ -318,6 +320,7 @@ func NewServer(config *Config) (*Server, error) { } featuresH := featureshandler.New(resolveNamespace, config.FeatureConnector, config.MeterService, config.LLMCostService, httptransport.WithErrorHandler(config.ErrorHandler)) + governanceH := governancehandler.New(resolveNamespace, config.CustomerService, config.EntitlementService, config.FeatureConnector, httptransport.WithErrorHandler(config.ErrorHandler)) var llmcostH llmcosthandler.Handler if config.LLMCostService != nil { @@ -351,6 +354,7 @@ func NewServer(config *Config) (*Server, error) { currenciesHandler: currenciesHandler, featuresHandler: featuresH, featureCostHandler: featureCostH, + governanceHandler: governanceH, }, nil } From ccebee5e3e34ddc7aecb75a511aba6122cc386b5 Mon Sep 17 00:00:00 2001 From: Gergely Tamas Kurucz Date: Thu, 21 May 2026 18:47:10 +0200 Subject: [PATCH 2/6] test: add governance query tests --- api/v3/handlers/governance/mapping_test.go | 70 ++++ api/v3/handlers/governance/query.go | 4 +- api/v3/handlers/governance/query_test.go | 367 +++++++++++++++++++++ 3 files changed, 440 insertions(+), 1 deletion(-) create mode 100644 api/v3/handlers/governance/mapping_test.go create mode 100644 api/v3/handlers/governance/query_test.go diff --git a/api/v3/handlers/governance/mapping_test.go b/api/v3/handlers/governance/mapping_test.go new file mode 100644 index 0000000000..24614e9975 --- /dev/null +++ b/api/v3/handlers/governance/mapping_test.go @@ -0,0 +1,70 @@ +package governance + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + api "github.com/openmeterio/openmeter/api/v3" + "github.com/openmeterio/openmeter/openmeter/entitlement" + booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" + meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" + staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" +) + +func TestMapEntitlementToAccess(t *testing.T) { + tests := []struct { + name string + value entitlement.EntitlementValue + wantHasAccess bool + wantCode *api.GovernanceFeatureAccessReasonCode + }{ + { + name: "metered with balance — has access", + value: &meteredentitlement.MeteredEntitlementValue{Balance: 10}, + wantHasAccess: true, + }, + { + name: "metered exhausted — usage limit reached", + value: &meteredentitlement.MeteredEntitlementValue{Balance: 0}, + wantHasAccess: false, + wantCode: ptr(api.GovernanceFeatureAccessReasonCodeUsageLimitReached), + }, + { + // BooleanEntitlementValue is always HasAccess=true; the gateway returns + // NoAccessValue when the entitlement is inactive/not in plan. + name: "boolean — has access", + value: &booleanentitlement.BooleanEntitlementValue{}, + wantHasAccess: true, + }, + { + // StaticEntitlementValue is always HasAccess=true. + name: "static — has access", + value: &staticentitlement.StaticEntitlementValue{Config: `{"limit":100}`}, + wantHasAccess: true, + }, + { + // NoAccessValue is returned when the entitlement is inactive (not in current period). + name: "no access value — feature unavailable", + value: &entitlement.NoAccessValue{}, + wantHasAccess: false, + wantCode: ptr(api.GovernanceFeatureAccessReasonCodeFeatureUnavailable), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := mapEntitlementToAccess(tc.value) + assert.Equal(t, tc.wantHasAccess, got.HasAccess) + if tc.wantCode != nil { + if assert.NotNil(t, got.Reason) { + assert.Equal(t, *tc.wantCode, got.Reason.Code) + } + } else { + assert.Nil(t, got.Reason) + } + }) + } +} + +func ptr[T any](v T) *T { return &v } diff --git a/api/v3/handlers/governance/query.go b/api/v3/handlers/governance/query.go index d99e66072f..2d610cbc3b 100644 --- a/api/v3/handlers/governance/query.go +++ b/api/v3/handlers/governance/query.go @@ -2,6 +2,7 @@ package governance import ( "context" + "errors" "fmt" "net/http" "sort" @@ -244,7 +245,8 @@ func (h *handler) buildFeatureAccess(ctx context.Context, ns string, featureKeys func (h *handler) resolveAbsentFeature(ctx context.Context, ns, featureKey string) (api.GovernanceFeatureAccess, error) { _, err := h.featureConnector.GetFeature(ctx, ns, featureKey, feature.IncludeArchivedFeatureFalse) if err != nil { - if models.IsGenericNotFoundError(err) { + var fne *feature.FeatureNotFoundError + if errors.As(err, &fne) || models.IsGenericNotFoundError(err) { return api.GovernanceFeatureAccess{ HasAccess: false, Reason: &api.GovernanceFeatureAccessReason{ diff --git a/api/v3/handlers/governance/query_test.go b/api/v3/handlers/governance/query_test.go new file mode 100644 index 0000000000..92f3a82ee8 --- /dev/null +++ b/api/v3/handlers/governance/query_test.go @@ -0,0 +1,367 @@ +package governance + +import ( + "context" + "sync" + "testing" + + "github.com/oklog/ulid/v2" + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" + + api "github.com/openmeterio/openmeter/api/v3" + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/customer" + customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" + customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" + "github.com/openmeterio/openmeter/openmeter/entitlement" + meteradapter "github.com/openmeterio/openmeter/openmeter/meter/mockadapter" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/registry" + registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder" + streamingtestutils "github.com/openmeterio/openmeter/openmeter/streaming/testutils" + "github.com/openmeterio/openmeter/openmeter/subject" + subjectadapter "github.com/openmeterio/openmeter/openmeter/subject/adapter" + subjectservice "github.com/openmeterio/openmeter/openmeter/subject/service" + "github.com/openmeterio/openmeter/openmeter/testutils" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/pkg/datetime" + "github.com/openmeterio/openmeter/pkg/framework/lockr" +) + +func newTestNamespace(t *testing.T) string { + t.Helper() + return ulid.Make().String() +} + +// migrateOnce serialises schema migrations to avoid concurrent-write errors from ent. +var migrateOnce sync.Mutex + +type testDeps struct { + dbClient *testutils.TestDB + subjectService subject.Service + customerService customer.Service + featureRepo feature.FeatureRepo + registry *registry.Entitlement +} + +func (d *testDeps) close(t *testing.T) { + t.Helper() + if err := d.dbClient.EntDriver.Close(); err != nil { + t.Errorf("close ent driver: %v", err) + } + if err := d.dbClient.PGDriver.Close(); err != nil { + t.Errorf("close pg driver: %v", err) + } +} + +func setupTestDeps(t *testing.T) *testDeps { + t.Helper() + + logger := testutils.NewDiscardLogger(t) + testdb := testutils.InitPostgresDB(t) + dbClient := testdb.EntDriver.Client() + + migrateOnce.Lock() + require.NoError(t, dbClient.Schema.Create(context.Background())) + migrateOnce.Unlock() + + meterService, err := meteradapter.NewManage(nil) + require.NoError(t, err) + + subjectAdapter, err := subjectadapter.New(dbClient) + require.NoError(t, err) + + subjectSvc, err := subjectservice.New(subjectAdapter) + require.NoError(t, err) + + customerAdapter, err := customeradapter.New(customeradapter.Config{ + Client: dbClient, + Logger: logger, + }) + require.NoError(t, err) + + customerSvc, err := customerservice.New(customerservice.Config{ + Adapter: customerAdapter, + Publisher: eventbus.NewMock(t), + }) + require.NoError(t, err) + + locker, err := lockr.NewLocker(&lockr.LockerConfig{Logger: logger}) + require.NoError(t, err) + + reg := registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{ + DatabaseClient: dbClient, + StreamingConnector: streamingtestutils.NewMockStreamingConnector(t), + Logger: logger, + Tracer: noop.NewTracerProvider().Tracer("test"), + MeterService: meterService, + CustomerService: customerSvc, + Publisher: eventbus.NewMock(t), + EntitlementsConfiguration: config.EntitlementsConfiguration{ + GracePeriod: datetime.ISODurationString("P1D"), + }, + Locker: locker, + }) + + return &testDeps{ + dbClient: testdb, + subjectService: subjectSvc, + customerService: customerSvc, + featureRepo: reg.FeatureRepo, + registry: reg, + } +} + +func newTestHandler(deps *testDeps) *handler { + return &handler{ + resolveNamespace: func(_ context.Context) (string, error) { panic("not used in direct calls") }, + customerService: deps.customerService, + entitlementService: deps.registry.Entitlement, + featureConnector: deps.registry.Feature, + } +} + +func createCustomer(t *testing.T, deps *testDeps, ns, key string, subjectKeys []string) *customer.Customer { + t.Helper() + + for _, sk := range subjectKeys { + _, err := deps.subjectService.Create(t.Context(), subject.CreateInput{ + Namespace: ns, + Key: sk, + }) + require.NoError(t, err) + } + + cust, err := deps.customerService.CreateCustomer(t.Context(), customer.CreateCustomerInput{ + Namespace: ns, + CustomerMutate: customer.CustomerMutate{ + Key: lo.ToPtr(key), + Name: key, + UsageAttribution: &customer.CustomerUsageAttribution{ + SubjectKeys: subjectKeys, + }, + }, + }) + require.NoError(t, err) + return cust +} + +func createBooleanFeatureAndEntitlement(t *testing.T, deps *testDeps, ns, featureKey string, cust *customer.Customer) { + t.Helper() + + feat, err := deps.featureRepo.CreateFeature(t.Context(), feature.CreateFeatureInputs{ + Key: featureKey, + Name: featureKey, + Namespace: ns, + }) + require.NoError(t, err) + + _, err = deps.registry.Entitlement.CreateEntitlement(t.Context(), entitlement.CreateEntitlementInputs{ + Namespace: ns, + UsageAttribution: cust.GetUsageAttribution(), + FeatureKey: lo.ToPtr(featureKey), + FeatureID: lo.ToPtr(feat.ID), + EntitlementType: entitlement.EntitlementTypeBoolean, + }, nil) + require.NoError(t, err) +} + +func createOrphanFeature(t *testing.T, deps *testDeps, ns, featureKey string) { + t.Helper() + _, err := deps.featureRepo.CreateFeature(t.Context(), feature.CreateFeatureInputs{ + Key: featureKey, + Name: featureKey, + Namespace: ns, + }) + require.NoError(t, err) +} + +// --- Tests --- + +func TestQueryGovernanceAccess_UnknownCustomerKey(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: []string{"ghost"}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + assert.Empty(t, resp.Data) + require.Len(t, resp.Errors, 1) + assert.Equal(t, api.GovernanceQueryErrorCodeCustomerNotFound, resp.Errors[0].Code) + assert.Equal(t, lo.ToPtr("ghost"), resp.Errors[0].Customer) +} + +func TestQueryGovernanceAccess_KnownCustomerNoEntitlements(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: []string{cust.GetUsageAttribution().SubjectKeys[0]}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Empty(t, resp.Data[0].Features) + assert.Empty(t, resp.Errors) +} + +func TestQueryGovernanceAccess_BooleanEntitlement_HasAccess(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + createBooleanFeatureAndEntitlement(t, deps, ns, "premium", cust) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: []string{"acme"}, + FeatureKeys: []string{"premium"}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Empty(t, resp.Errors) + + featureAccess := resp.Data[0].Features["premium"] + assert.True(t, featureAccess.HasAccess) + assert.Nil(t, featureAccess.Reason) +} + +func TestQueryGovernanceAccess_FeatureNotFound(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + _ = cust + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: []string{"acme"}, + FeatureKeys: []string{"does-not-exist"}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + featureAccess := resp.Data[0].Features["does-not-exist"] + assert.False(t, featureAccess.HasAccess) + require.NotNil(t, featureAccess.Reason) + assert.Equal(t, api.GovernanceFeatureAccessReasonCodeFeatureNotFound, featureAccess.Reason.Code) +} + +func TestQueryGovernanceAccess_FeatureUnavailable(t *testing.T) { + // Feature exists in org but customer has no entitlement for it. + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + _ = cust + createOrphanFeature(t, deps, ns, "enterprise") + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: []string{"acme"}, + FeatureKeys: []string{"enterprise"}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + featureAccess := resp.Data[0].Features["enterprise"] + assert.False(t, featureAccess.HasAccess) + require.NotNil(t, featureAccess.Reason) + assert.Equal(t, api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, featureAccess.Reason.Code) +} + +func TestQueryGovernanceAccess_MultipleKeysSameCustomer(t *testing.T) { + // Two input keys resolve to the same customer; response has one entry with both keys in matched[]. + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + // customer key = "acme", usage attribution subject key = "acme-sub" + createCustomer(t, deps, ns, "acme", []string{"acme-sub"}) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: []string{"acme", "acme-sub"}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + assert.Empty(t, resp.Errors) + require.Len(t, resp.Data, 1, "two keys resolving to same customer should collapse into one result") + assert.Len(t, resp.Data[0].Matched, 2) + assert.ElementsMatch(t, []string{"acme", "acme-sub"}, resp.Data[0].Matched) +} + +func TestQueryGovernanceAccess_MixedHitsAndMisses(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + createBooleanFeatureAndEntitlement(t, deps, ns, "feature-a", cust) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: []string{"acme", "unknown-key"}, + FeatureKeys: []string{"feature-a"}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + require.Len(t, resp.Errors, 1) + assert.Equal(t, api.GovernanceQueryErrorCodeCustomerNotFound, resp.Errors[0].Code) + assert.True(t, resp.Data[0].Features["feature-a"].HasAccess) +} + +func TestQueryGovernanceAccess_NoFeatureKeysReturnsAll(t *testing.T) { + // When feature.keys is omitted, all entitlements are returned. + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + createBooleanFeatureAndEntitlement(t, deps, ns, "feat-1", cust) + createBooleanFeatureAndEntitlement(t, deps, ns, "feat-2", cust) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + CustomerKeys: []string{"acme"}, + FeatureKeys: nil, // no filter — return all + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Len(t, resp.Data[0].Features, 2) + assert.True(t, resp.Data[0].Features["feat-1"].HasAccess) + assert.True(t, resp.Data[0].Features["feat-2"].HasAccess) +} From 6612a92b4fff211aae6460e541839ce3a298dbaa Mon Sep 17 00:00:00 2001 From: Gergely Tamas Kurucz Date: Tue, 26 May 2026 12:01:49 +0200 Subject: [PATCH 3/6] refactor: rely on api package types --- api/v3/handlers/governance/query.go | 32 ++++++------ api/v3/handlers/governance/query_test.go | 65 +++++++++++++----------- 2 files changed, 51 insertions(+), 46 deletions(-) diff --git a/api/v3/handlers/governance/query.go b/api/v3/handlers/governance/query.go index 2d610cbc3b..8897a74e28 100644 --- a/api/v3/handlers/governance/query.go +++ b/api/v3/handlers/governance/query.go @@ -35,12 +35,10 @@ type ( ) type queryGovernanceAccessRequest struct { - Namespace string - CustomerKeys []string - FeatureKeys []string // nil means all features - IncludeCredits bool - PageSize int - Cursor *pagination.Cursor + Namespace string + Body api.GovernanceQueryRequest + PageSize int + Cursor *pagination.Cursor } func (h *handler) QueryGovernanceAccess() QueryGovernanceAccessHandler { @@ -88,15 +86,10 @@ func (h *handler) QueryGovernanceAccess() QueryGovernanceAccessHandler { } req := queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: body.Customer.Keys, - IncludeCredits: lo.FromPtrOr(body.IncludeCredits, false), - PageSize: pageSize, - Cursor: cursor, - } - - if body.Feature != nil { - req.FeatureKeys = body.Feature.Keys + Namespace: ns, + Body: body, + PageSize: pageSize, + Cursor: cursor, } return req, nil @@ -120,11 +113,16 @@ type resolvedCustomer struct { } func (h *handler) processGovernanceQuery(ctx context.Context, req queryGovernanceAccessRequest) (QueryGovernanceAccessResponse, error) { + var featureKeys []string + if req.Body.Feature != nil { + featureKeys = req.Body.Feature.Keys + } + // Resolve each input key to a customer; deduplicate by customer ID. customerMap := make(map[string]*resolvedCustomer) var queryErrors []api.GovernanceQueryError - for _, key := range req.CustomerKeys { + for _, key := range req.Body.Customer.Keys { cus, err := h.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{ Namespace: req.Namespace, Key: key, @@ -192,7 +190,7 @@ func (h *handler) processGovernanceQuery(ctx context.Context, req queryGovernanc return QueryGovernanceAccessResponse{}, fmt.Errorf("get access for customer %s: %w", rc.Customer.ID, err) } - featureAccess, err := h.buildFeatureAccess(ctx, req.Namespace, req.FeatureKeys, access) + featureAccess, err := h.buildFeatureAccess(ctx, req.Namespace, featureKeys, access) if err != nil { return QueryGovernanceAccessResponse{}, fmt.Errorf("build feature access for customer %s: %w", rc.Customer.ID, err) } diff --git a/api/v3/handlers/governance/query_test.go b/api/v3/handlers/governance/query_test.go index 92f3a82ee8..7ae4c1a301 100644 --- a/api/v3/handlers/governance/query_test.go +++ b/api/v3/handlers/governance/query_test.go @@ -189,9 +189,9 @@ func TestQueryGovernanceAccess_UnknownCustomerKey(t *testing.T) { ns := newTestNamespace(t) resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: []string{"ghost"}, - PageSize: defaultPageSize, + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"ghost"}}}, + PageSize: defaultPageSize, }) require.NoError(t, err) assert.Empty(t, resp.Data) @@ -210,9 +210,9 @@ func TestQueryGovernanceAccess_KnownCustomerNoEntitlements(t *testing.T) { cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: []string{cust.GetUsageAttribution().SubjectKeys[0]}, - PageSize: defaultPageSize, + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: []string{cust.GetUsageAttribution().SubjectKeys[0]}}}, + PageSize: defaultPageSize, }) require.NoError(t, err) require.Len(t, resp.Data, 1) @@ -231,10 +231,12 @@ func TestQueryGovernanceAccess_BooleanEntitlement_HasAccess(t *testing.T) { createBooleanFeatureAndEntitlement(t, deps, ns, "premium", cust) resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: []string{"acme"}, - FeatureKeys: []string{"premium"}, - PageSize: defaultPageSize, + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"premium"}}, + }, + PageSize: defaultPageSize, }) require.NoError(t, err) require.Len(t, resp.Data, 1) @@ -256,10 +258,12 @@ func TestQueryGovernanceAccess_FeatureNotFound(t *testing.T) { _ = cust resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: []string{"acme"}, - FeatureKeys: []string{"does-not-exist"}, - PageSize: defaultPageSize, + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"does-not-exist"}}, + }, + PageSize: defaultPageSize, }) require.NoError(t, err) require.Len(t, resp.Data, 1) @@ -282,10 +286,12 @@ func TestQueryGovernanceAccess_FeatureUnavailable(t *testing.T) { createOrphanFeature(t, deps, ns, "enterprise") resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: []string{"acme"}, - FeatureKeys: []string{"enterprise"}, - PageSize: defaultPageSize, + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"enterprise"}}, + }, + PageSize: defaultPageSize, }) require.NoError(t, err) require.Len(t, resp.Data, 1) @@ -307,9 +313,9 @@ func TestQueryGovernanceAccess_MultipleKeysSameCustomer(t *testing.T) { createCustomer(t, deps, ns, "acme", []string{"acme-sub"}) resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: []string{"acme", "acme-sub"}, - PageSize: defaultPageSize, + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme", "acme-sub"}}}, + PageSize: defaultPageSize, }) require.NoError(t, err) assert.Empty(t, resp.Errors) @@ -329,10 +335,12 @@ func TestQueryGovernanceAccess_MixedHitsAndMisses(t *testing.T) { createBooleanFeatureAndEntitlement(t, deps, ns, "feature-a", cust) resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: []string{"acme", "unknown-key"}, - FeatureKeys: []string{"feature-a"}, - PageSize: defaultPageSize, + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme", "unknown-key"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"feature-a"}}, + }, + PageSize: defaultPageSize, }) require.NoError(t, err) require.Len(t, resp.Data, 1) @@ -354,10 +362,9 @@ func TestQueryGovernanceAccess_NoFeatureKeysReturnsAll(t *testing.T) { createBooleanFeatureAndEntitlement(t, deps, ns, "feat-2", cust) resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ - Namespace: ns, - CustomerKeys: []string{"acme"}, - FeatureKeys: nil, // no filter — return all - PageSize: defaultPageSize, + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}}, + PageSize: defaultPageSize, }) require.NoError(t, err) require.Len(t, resp.Data, 1) From 3d118d25592adf001ad8e6ee4922d1c18064f87b Mon Sep 17 00:00:00 2001 From: Gergely Tamas Kurucz Date: Tue, 26 May 2026 13:46:58 +0200 Subject: [PATCH 4/6] test: add metered entitlement test cases --- api/v3/handlers/governance/query_test.go | 165 +++++++++++++++++++++-- 1 file changed, 153 insertions(+), 12 deletions(-) diff --git a/api/v3/handlers/governance/query_test.go b/api/v3/handlers/governance/query_test.go index 7ae4c1a301..2311fe5384 100644 --- a/api/v3/handlers/governance/query_test.go +++ b/api/v3/handlers/governance/query_test.go @@ -4,6 +4,7 @@ import ( "context" "sync" "testing" + "time" "github.com/oklog/ulid/v2" "github.com/samber/lo" @@ -16,7 +17,9 @@ import ( "github.com/openmeterio/openmeter/openmeter/customer" customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/entitlement" + "github.com/openmeterio/openmeter/openmeter/meter" meteradapter "github.com/openmeterio/openmeter/openmeter/meter/mockadapter" "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" "github.com/openmeterio/openmeter/openmeter/registry" @@ -27,8 +30,10 @@ import ( subjectservice "github.com/openmeterio/openmeter/openmeter/subject/service" "github.com/openmeterio/openmeter/openmeter/testutils" "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/pkg/clock" "github.com/openmeterio/openmeter/pkg/datetime" "github.com/openmeterio/openmeter/pkg/framework/lockr" + "github.com/openmeterio/openmeter/pkg/timeutil" ) func newTestNamespace(t *testing.T) string { @@ -36,15 +41,17 @@ func newTestNamespace(t *testing.T) string { return ulid.Make().String() } -// migrateOnce serialises schema migrations to avoid concurrent-write errors from ent. +// migrateOnce serializes schema migrations to avoid concurrent-write errors from ent. var migrateOnce sync.Mutex type testDeps struct { - dbClient *testutils.TestDB - subjectService subject.Service - customerService customer.Service - featureRepo feature.FeatureRepo - registry *registry.Entitlement + dbClient *testutils.TestDB + subjectService subject.Service + customerService customer.Service + meterService meter.ManageService + featureRepo feature.FeatureRepo + registry *registry.Entitlement + streamingConnector *streamingtestutils.MockStreamingConnector } func (d *testDeps) close(t *testing.T) { @@ -92,9 +99,11 @@ func setupTestDeps(t *testing.T) *testDeps { locker, err := lockr.NewLocker(&lockr.LockerConfig{Logger: logger}) require.NoError(t, err) + streamingConnector := streamingtestutils.NewMockStreamingConnector(t) + reg := registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{ DatabaseClient: dbClient, - StreamingConnector: streamingtestutils.NewMockStreamingConnector(t), + StreamingConnector: streamingConnector, Logger: logger, Tracer: noop.NewTracerProvider().Tracer("test"), MeterService: meterService, @@ -107,11 +116,13 @@ func setupTestDeps(t *testing.T) *testDeps { }) return &testDeps{ - dbClient: testdb, - subjectService: subjectSvc, - customerService: customerSvc, - featureRepo: reg.FeatureRepo, - registry: reg, + dbClient: testdb, + subjectService: subjectSvc, + customerService: customerSvc, + meterService: meterService, + featureRepo: reg.FeatureRepo, + registry: reg, + streamingConnector: streamingConnector, } } @@ -372,3 +383,133 @@ func TestQueryGovernanceAccess_NoFeatureKeysReturnsAll(t *testing.T) { assert.True(t, resp.Data[0].Features["feat-1"].HasAccess) assert.True(t, resp.Data[0].Features["feat-2"].HasAccess) } + +// createMeterInPG writes a meter row to ent DB (FK constraint on features.meter_id). +// The mock meter adapter only stores in memory; this must be called after CreateMeter. +func createMeterInPG(t *testing.T, dbClient *entdb.Client, mtr meter.Meter) { + t.Helper() + _, err := dbClient.Meter.Create(). + SetID(mtr.ID). + SetNamespace(mtr.Namespace). + SetName(mtr.Name). + SetKey(mtr.Key). + SetAggregation(mtr.Aggregation). + SetEventType(mtr.EventType). + SetNillableValueProperty(mtr.ValueProperty). + Save(t.Context()) + require.NoError(t, err) +} + +func createMeter(t *testing.T, deps *testDeps, ns, key string) meter.Meter { + t.Helper() + mtr, err := deps.meterService.CreateMeter(t.Context(), meter.CreateMeterInput{ + Namespace: ns, + Name: key, + Key: key, + Aggregation: meter.MeterAggregationSum, + EventType: "test", + ValueProperty: lo.ToPtr("$.value"), + }) + require.NoError(t, err) + createMeterInPG(t, deps.dbClient.EntDriver.Client(), mtr) + return mtr +} + +func createMeteredFeatureAndEntitlement(t *testing.T, deps *testDeps, ns, featureKey string, mtr meter.Meter, cust *customer.Customer, issueAfterReset *float64) { + t.Helper() + feat, err := deps.featureRepo.CreateFeature(t.Context(), feature.CreateFeatureInputs{ + Key: featureKey, + Name: featureKey, + Namespace: ns, + MeterID: lo.ToPtr(mtr.ID), + }) + require.NoError(t, err) + + _, err = deps.registry.Entitlement.CreateEntitlement(t.Context(), entitlement.CreateEntitlementInputs{ + Namespace: ns, + UsageAttribution: cust.GetUsageAttribution(), + FeatureKey: lo.ToPtr(featureKey), + FeatureID: lo.ToPtr(feat.ID), + EntitlementType: entitlement.EntitlementTypeMetered, + UsagePeriod: lo.ToPtr(entitlement.NewUsagePeriodInputFromRecurrence(timeutil.Recurrence{ + Interval: timeutil.RecurrencePeriodDaily, + Anchor: clock.Now(), + })), + IssueAfterReset: issueAfterReset, + }, nil) + require.NoError(t, err) +} + +func TestQueryGovernanceAccess_MeteredEntitlement_HasAccess(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + clock.SetTime(now) + defer clock.ResetTime() + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + mtr := createMeter(t, deps, ns, "api-calls") + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + // IssueAfterReset=10.0 → balance starts at 10, HasAccess=true + createMeteredFeatureAndEntitlement(t, deps, ns, "premium", mtr, cust, lo.ToPtr(10.0)) + + // Add an event so the streaming mock has data for the meter. + deps.streamingConnector.AddSimpleEvent(mtr.Key, 1, now) + + clock.SetTime(now.Add(time.Hour)) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"premium"}}, + }, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Empty(t, resp.Errors) + featureAccess := resp.Data[0].Features["premium"] + assert.True(t, featureAccess.HasAccess) + assert.Nil(t, featureAccess.Reason) +} + +func TestQueryGovernanceAccess_MeteredEntitlement_Exhausted(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + clock.SetTime(now) + defer clock.ResetTime() + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + mtr := createMeter(t, deps, ns, "api-calls") + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + // No IssueAfterReset → balance=0, HasAccess=false → UsageLimitReached + createMeteredFeatureAndEntitlement(t, deps, ns, "premium", mtr, cust, nil) + + deps.streamingConnector.AddSimpleEvent(mtr.Key, 1, now) + + clock.SetTime(now.Add(time.Hour)) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"premium"}}, + }, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Empty(t, resp.Errors) + featureAccess := resp.Data[0].Features["premium"] + assert.False(t, featureAccess.HasAccess) + require.NotNil(t, featureAccess.Reason) + assert.Equal(t, api.GovernanceFeatureAccessReasonCodeUsageLimitReached, featureAccess.Reason.Code) +} From 23966d91c472a4ad9fb81c85b8852cb149ebfc61 Mon Sep 17 00:00:00 2001 From: Gergely Tamas Kurucz Date: Thu, 28 May 2026 16:10:46 +0200 Subject: [PATCH 5/6] feat: update feature access handling to include all org features if featureKeys is empty --- api/v3/handlers/governance/query.go | 37 ++++++++++++++++++++++-- api/v3/handlers/governance/query_test.go | 11 +++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/api/v3/handlers/governance/query.go b/api/v3/handlers/governance/query.go index 8897a74e28..adfe686401 100644 --- a/api/v3/handlers/governance/query.go +++ b/api/v3/handlers/governance/query.go @@ -211,13 +211,29 @@ func (h *handler) processGovernanceQuery(ctx context.Context, req queryGovernanc } // buildFeatureAccess returns the feature access map for a single customer. -// If featureKeys is non-empty, only those keys are evaluated; otherwise all entitlements are returned. +// If featureKeys is non-empty, only those keys are evaluated. +// If featureKeys is empty, all non-archived features in the org are returned; +// features the customer has no entitlement for are marked FEATURE_UNAVAILABLE. func (h *handler) buildFeatureAccess(ctx context.Context, ns string, featureKeys []string, access entitlement.Access) (map[string]api.GovernanceFeatureAccess, error) { result := make(map[string]api.GovernanceFeatureAccess) if len(featureKeys) == 0 { - for key, ev := range access.Entitlements { - result[key] = mapEntitlementToAccess(ev.Value) + orgFeatures, err := h.listAllOrgFeatures(ctx, ns) + if err != nil { + return nil, err + } + for _, f := range orgFeatures { + if ev, ok := access.Entitlements[f.Key]; ok { + result[f.Key] = mapEntitlementToAccess(ev.Value) + } else { + result[f.Key] = api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: fmt.Sprintf("feature %q is not available for this customer", f.Key), + }, + } + } } return result, nil } @@ -238,6 +254,21 @@ func (h *handler) buildFeatureAccess(ctx context.Context, ns string, featureKeys return result, nil } +// listAllOrgFeatures fetches all non-archived features in the namespace. +// Uses the deprecated Limit field to fetch in one shot; acceptable for prototype scale. +func (h *handler) listAllOrgFeatures(ctx context.Context, ns string) ([]feature.Feature, error) { + const featureFetchLimit = 10_000 + res, err := h.featureConnector.ListFeatures(ctx, feature.ListFeaturesParams{ + Namespace: ns, + IncludeArchived: false, + Limit: featureFetchLimit, + }) + if err != nil { + return nil, fmt.Errorf("list org features: %w", err) + } + return res.Items, nil +} + // resolveAbsentFeature determines the reason a requested feature key is absent from GetAccess results: // either the feature doesn't exist in the org (FeatureNotFound) or the customer has no entitlement for it (FeatureUnavailable). func (h *handler) resolveAbsentFeature(ctx context.Context, ns, featureKey string) (api.GovernanceFeatureAccess, error) { diff --git a/api/v3/handlers/governance/query_test.go b/api/v3/handlers/governance/query_test.go index 2311fe5384..fda4255789 100644 --- a/api/v3/handlers/governance/query_test.go +++ b/api/v3/handlers/governance/query_test.go @@ -361,7 +361,8 @@ func TestQueryGovernanceAccess_MixedHitsAndMisses(t *testing.T) { } func TestQueryGovernanceAccess_NoFeatureKeysReturnsAll(t *testing.T) { - // When feature.keys is omitted, all entitlements are returned. + // When feature.keys is omitted, all org features are returned — including ones + // the customer has no entitlement for (marked FEATURE_UNAVAILABLE). deps := setupTestDeps(t) t.Cleanup(func() { deps.close(t) }) @@ -371,6 +372,8 @@ func TestQueryGovernanceAccess_NoFeatureKeysReturnsAll(t *testing.T) { cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) createBooleanFeatureAndEntitlement(t, deps, ns, "feat-1", cust) createBooleanFeatureAndEntitlement(t, deps, ns, "feat-2", cust) + // feat-3 exists in the org but the customer has no entitlement for it. + createOrphanFeature(t, deps, ns, "feat-3") resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ Namespace: ns, @@ -379,9 +382,13 @@ func TestQueryGovernanceAccess_NoFeatureKeysReturnsAll(t *testing.T) { }) require.NoError(t, err) require.Len(t, resp.Data, 1) - assert.Len(t, resp.Data[0].Features, 2) + assert.Len(t, resp.Data[0].Features, 3) assert.True(t, resp.Data[0].Features["feat-1"].HasAccess) assert.True(t, resp.Data[0].Features["feat-2"].HasAccess) + feat3 := resp.Data[0].Features["feat-3"] + assert.False(t, feat3.HasAccess) + require.NotNil(t, feat3.Reason) + assert.Equal(t, api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, feat3.Reason.Code) } // createMeterInPG writes a meter row to ent DB (FK constraint on features.meter_id). From 91d87c6f6b1259d59c864bbcd0d287756940eae2 Mon Sep 17 00:00:00 2001 From: Gergely Tamas Kurucz Date: Thu, 28 May 2026 16:51:52 +0200 Subject: [PATCH 6/6] feat: add full pagination support --- api/v3/handlers/governance/query.go | 108 +++++++++++++++++------ api/v3/handlers/governance/query_test.go | 98 ++++++++++++++++++++ 2 files changed, 178 insertions(+), 28 deletions(-) diff --git a/api/v3/handlers/governance/query.go b/api/v3/handlers/governance/query.go index adfe686401..bbd6639854 100644 --- a/api/v3/handlers/governance/query.go +++ b/api/v3/handlers/governance/query.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "sort" + "time" "github.com/oapi-codegen/nullable" "github.com/samber/lo" @@ -35,10 +36,11 @@ type ( ) type queryGovernanceAccessRequest struct { - Namespace string - Body api.GovernanceQueryRequest - PageSize int - Cursor *pagination.Cursor + Namespace string + Body api.GovernanceQueryRequest + PageSize int + AfterCursor *pagination.Cursor + BeforeCursor *pagination.Cursor } func (h *handler) QueryGovernanceAccess() QueryGovernanceAccessHandler { @@ -55,7 +57,7 @@ func (h *handler) QueryGovernanceAccess() QueryGovernanceAccessHandler { } pageSize := defaultPageSize - var cursor *pagination.Cursor + var afterCursor, beforeCursor *pagination.Cursor if params.Page != nil { if params.Page.Size != nil { @@ -72,6 +74,17 @@ func (h *handler) QueryGovernanceAccess() QueryGovernanceAccessHandler { } } + if params.Page.After != nil && params.Page.Before != nil { + return queryGovernanceAccessRequest{}, apierrors.NewBadRequestError(ctx, + fmt.Errorf("page[after] and page[before] are mutually exclusive"), + apierrors.InvalidParameters{{ + Field: "page[after]", + Reason: "cannot be combined with page[before]", + Source: apierrors.InvalidParamSourceQuery, + }}, + ) + } + if params.Page.After != nil { decoded, err := pagination.DecodeCursor(*params.Page.After) if err != nil { @@ -81,15 +94,28 @@ func (h *handler) QueryGovernanceAccess() QueryGovernanceAccessHandler { Source: apierrors.InvalidParamSourceQuery, }}) } - cursor = decoded + afterCursor = decoded + } + + if params.Page.Before != nil { + decoded, err := pagination.DecodeCursor(*params.Page.Before) + if err != nil { + return queryGovernanceAccessRequest{}, apierrors.NewBadRequestError(ctx, err, apierrors.InvalidParameters{{ + Field: "page[before]", + Reason: err.Error(), + Source: apierrors.InvalidParamSourceQuery, + }}) + } + beforeCursor = decoded } } req := queryGovernanceAccessRequest{ - Namespace: ns, - Body: body, - PageSize: pageSize, - Cursor: cursor, + Namespace: ns, + Body: body, + PageSize: pageSize, + AfterCursor: afterCursor, + BeforeCursor: beforeCursor, } return req, nil @@ -160,24 +186,47 @@ func (h *handler) processGovernanceQuery(ctx context.Context, req queryGovernanc return customers[i].Customer.ID < customers[j].Customer.ID }) - // Apply cursor: skip everything at or before the cursor position. - if req.Cursor != nil { - afterCursor := *req.Cursor - start := len(customers) // default: nothing left if cursor is beyond all items + // Apply cursor pagination. + var hasPrev, hasNext bool + if req.BeforeCursor != nil { + // Backward: take the last pageSize items strictly before the cursor. + bc := *req.BeforeCursor + end := 0 for i, rc := range customers { - c := pagination.NewCursor(rc.Customer.CreatedAt, rc.Customer.ID) - if c.Time.After(afterCursor.Time) || (c.Time.Equal(afterCursor.Time) && c.ID > afterCursor.ID) { - start = i + c := pagination.NewCursor(rc.Customer.CreatedAt.Truncate(time.Second), rc.Customer.ID) + if c.Time.After(bc.Time) || (c.Time.Equal(bc.Time) && c.ID >= bc.ID) { break } + end = i + 1 + } + candidates := customers[:end] + hasPrev = len(candidates) > req.PageSize + if hasPrev { + candidates = candidates[len(candidates)-req.PageSize:] + } + // next is always set in backward mode: the before-cursor item itself is forward. + hasNext = true + customers = candidates + } else { + // Forward (after cursor or first page). + start := 0 + if req.AfterCursor != nil { + ac := *req.AfterCursor + start = len(customers) // beyond all items if cursor is past the end + for i, rc := range customers { + c := pagination.NewCursor(rc.Customer.CreatedAt.Truncate(time.Second), rc.Customer.ID) + if c.Time.After(ac.Time) || (c.Time.Equal(ac.Time) && c.ID > ac.ID) { + start = i + break + } + } } + hasPrev = start > 0 customers = customers[start:] - } - - // Apply page size. - hasMore := len(customers) > req.PageSize - if hasMore { - customers = customers[:req.PageSize] + hasNext = len(customers) > req.PageSize + if hasNext { + customers = customers[:req.PageSize] + } } // Compute feature access for each paged customer. @@ -206,7 +255,7 @@ func (h *handler) processGovernanceQuery(ctx context.Context, req queryGovernanc return QueryGovernanceAccessResponse{ Data: results, Errors: queryErrors, - Meta: buildCursorMeta(customers, req.PageSize, hasMore), + Meta: buildCursorMeta(customers, req.PageSize, hasPrev, hasNext), }, nil } @@ -296,7 +345,7 @@ func (h *handler) resolveAbsentFeature(ctx context.Context, ns, featureKey strin }, nil } -func buildCursorMeta(customers []*resolvedCustomer, pageSize int, hasMore bool) api.CursorMeta { +func buildCursorMeta(customers []*resolvedCustomer, pageSize int, hasPrev, hasNext bool) api.CursorMeta { meta := api.CursorMeta{ Page: api.CursorMetaPage{ Next: nullable.NewNullNullable[string](), @@ -308,13 +357,16 @@ func buildCursorMeta(customers []*resolvedCustomer, pageSize int, hasMore bool) if len(customers) > 0 { first := customers[0] last := customers[len(customers)-1] - firstCursor := pagination.NewCursor(first.Customer.CreatedAt, first.Customer.ID) - lastCursor := pagination.NewCursor(last.Customer.CreatedAt, last.Customer.ID) + firstCursor := pagination.NewCursor(first.Customer.CreatedAt.Truncate(time.Second), first.Customer.ID) + lastCursor := pagination.NewCursor(last.Customer.CreatedAt.Truncate(time.Second), last.Customer.ID) meta.Page.First = lo.ToPtr(firstCursor.Encode()) meta.Page.Last = lo.ToPtr(lastCursor.Encode()) - if hasMore { + if hasNext { meta.Page.Next = nullable.NewNullableWithValue(lastCursor.Encode()) } + if hasPrev { + meta.Page.Previous = nullable.NewNullableWithValue(firstCursor.Encode()) + } } return meta diff --git a/api/v3/handlers/governance/query_test.go b/api/v3/handlers/governance/query_test.go index fda4255789..76c000786f 100644 --- a/api/v3/handlers/governance/query_test.go +++ b/api/v3/handlers/governance/query_test.go @@ -33,6 +33,7 @@ import ( "github.com/openmeterio/openmeter/pkg/clock" "github.com/openmeterio/openmeter/pkg/datetime" "github.com/openmeterio/openmeter/pkg/framework/lockr" + pagination "github.com/openmeterio/openmeter/pkg/pagination/v2" "github.com/openmeterio/openmeter/pkg/timeutil" ) @@ -520,3 +521,100 @@ func TestQueryGovernanceAccess_MeteredEntitlement_Exhausted(t *testing.T) { require.NotNil(t, featureAccess.Reason) assert.Equal(t, api.GovernanceFeatureAccessReasonCodeUsageLimitReached, featureAccess.Reason.Code) } + +func TestQueryGovernanceAccess_Pagination(t *testing.T) { + // given: 3 customers (c1, c2, c3 in creation order); pageSize=1 + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + clock.SetTime(now) + defer clock.ResetTime() + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + createCustomer(t, deps, ns, "c1", []string{"c1"}) + clock.SetTime(now.Add(time.Second)) + createCustomer(t, deps, ns, "c2", []string{"c2"}) + clock.SetTime(now.Add(2 * time.Second)) + createCustomer(t, deps, ns, "c3", []string{"c3"}) + + allKeys := []string{"c1", "c2", "c3"} + + decodeCursor := func(encoded string) *pagination.Cursor { + c, err := pagination.DecodeCursor(encoded) + require.NoError(t, err) + return c + } + + // Page 1: [c1] — no previous, next points to c1 + page1, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + }) + require.NoError(t, err) + require.Len(t, page1.Data, 1) + assert.Equal(t, "c1", page1.Data[0].Matched[0]) + assert.True(t, page1.Meta.Page.Previous.IsNull(), "no previous on first page") + require.False(t, page1.Meta.Page.Next.IsNull(), "next must be set on page 1") + + next1, _ := page1.Meta.Page.Next.Get() + + // Page 2: [c2] — previous set, next set + page2, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + AfterCursor: decodeCursor(next1), + }) + require.NoError(t, err) + require.Len(t, page2.Data, 1) + assert.Equal(t, "c2", page2.Data[0].Matched[0]) + require.False(t, page2.Meta.Page.Previous.IsNull(), "previous must be set on page 2") + require.False(t, page2.Meta.Page.Next.IsNull(), "next must be set on page 2") + + next2, _ := page2.Meta.Page.Next.Get() + prev2, _ := page2.Meta.Page.Previous.Get() + + // Page 3 (last): [c3] — previous set, no next + page3, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + AfterCursor: decodeCursor(next2), + }) + require.NoError(t, err) + require.Len(t, page3.Data, 1) + assert.Equal(t, "c3", page3.Data[0].Matched[0]) + require.False(t, page3.Meta.Page.Previous.IsNull(), "previous must be set on last page") + assert.True(t, page3.Meta.Page.Next.IsNull(), "no next on last page") + + // Cursor past end → empty data + require.NotNil(t, page3.Meta.Page.Last, "last cursor must be set") + last3 := *page3.Meta.Page.Last + pastEnd, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + AfterCursor: decodeCursor(last3), + }) + require.NoError(t, err) + assert.Empty(t, pastEnd.Data) + assert.True(t, pastEnd.Meta.Page.Next.IsNull()) + assert.True(t, pastEnd.Meta.Page.Previous.IsNull()) + + // Backward from page 2's previous cursor → [c1], no previous, next set + pageBack, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + BeforeCursor: decodeCursor(prev2), + }) + require.NoError(t, err) + require.Len(t, pageBack.Data, 1) + assert.Equal(t, "c1", pageBack.Data[0].Matched[0]) + assert.True(t, pageBack.Meta.Page.Previous.IsNull(), "no previous before c1") + assert.False(t, pageBack.Meta.Page.Next.IsNull(), "next must be set in backward result") +}