diff --git a/api/v3/handlers/governance/handler.go b/api/v3/handlers/governance/handler.go new file mode 100644 index 0000000000..6aba8e6e1d --- /dev/null +++ b/api/v3/handlers/governance/handler.go @@ -0,0 +1,38 @@ +package governance + +import ( + "context" + + "github.com/openmeterio/openmeter/openmeter/customer" + "github.com/openmeterio/openmeter/openmeter/entitlement" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/pkg/framework/transport/httptransport" +) + +type Handler interface { + QueryGovernanceAccess() QueryGovernanceAccessHandler +} + +type handler struct { + resolveNamespace func(ctx context.Context) (string, error) + customerService customer.Service + entitlementService entitlement.Service + featureConnector feature.FeatureConnector + options []httptransport.HandlerOption +} + +func New( + resolveNamespace func(ctx context.Context) (string, error), + customerService customer.Service, + entitlementService entitlement.Service, + featureConnector feature.FeatureConnector, + options ...httptransport.HandlerOption, +) Handler { + return &handler{ + resolveNamespace: resolveNamespace, + customerService: customerService, + entitlementService: entitlementService, + featureConnector: featureConnector, + options: options, + } +} diff --git a/api/v3/handlers/governance/mapping.go b/api/v3/handlers/governance/mapping.go new file mode 100644 index 0000000000..dba9240d33 --- /dev/null +++ b/api/v3/handlers/governance/mapping.go @@ -0,0 +1,61 @@ +package governance + +import ( + api "github.com/openmeterio/openmeter/api/v3" + "github.com/openmeterio/openmeter/openmeter/entitlement" + booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" + meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" + staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" +) + +// mapEntitlementToAccess converts an entitlement value to a governance feature access result. +// When has_access is false, the reason code is derived from the entitlement type. +func mapEntitlementToAccess(v entitlement.EntitlementValue) api.GovernanceFeatureAccess { + switch ent := v.(type) { + case *meteredentitlement.MeteredEntitlementValue: + if ent.HasAccess() { + return api.GovernanceFeatureAccess{HasAccess: true} + } + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeUsageLimitReached, + Message: "usage limit for feature reached", + }, + } + + case *booleanentitlement.BooleanEntitlementValue: + if ent.HasAccess() { + return api.GovernanceFeatureAccess{HasAccess: true} + } + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: "feature is not available for customer", + }, + } + + case *staticentitlement.StaticEntitlementValue: + if ent.HasAccess() { + return api.GovernanceFeatureAccess{HasAccess: true} + } + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: "feature is not available for customer", + }, + } + + default: + // NoAccessValue or unknown type + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: "feature is not available for customer", + }, + } + } +} diff --git a/api/v3/handlers/governance/mapping_test.go b/api/v3/handlers/governance/mapping_test.go new file mode 100644 index 0000000000..24614e9975 --- /dev/null +++ b/api/v3/handlers/governance/mapping_test.go @@ -0,0 +1,70 @@ +package governance + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + api "github.com/openmeterio/openmeter/api/v3" + "github.com/openmeterio/openmeter/openmeter/entitlement" + booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" + meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" + staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" +) + +func TestMapEntitlementToAccess(t *testing.T) { + tests := []struct { + name string + value entitlement.EntitlementValue + wantHasAccess bool + wantCode *api.GovernanceFeatureAccessReasonCode + }{ + { + name: "metered with balance — has access", + value: &meteredentitlement.MeteredEntitlementValue{Balance: 10}, + wantHasAccess: true, + }, + { + name: "metered exhausted — usage limit reached", + value: &meteredentitlement.MeteredEntitlementValue{Balance: 0}, + wantHasAccess: false, + wantCode: ptr(api.GovernanceFeatureAccessReasonCodeUsageLimitReached), + }, + { + // BooleanEntitlementValue is always HasAccess=true; the gateway returns + // NoAccessValue when the entitlement is inactive/not in plan. + name: "boolean — has access", + value: &booleanentitlement.BooleanEntitlementValue{}, + wantHasAccess: true, + }, + { + // StaticEntitlementValue is always HasAccess=true. + name: "static — has access", + value: &staticentitlement.StaticEntitlementValue{Config: `{"limit":100}`}, + wantHasAccess: true, + }, + { + // NoAccessValue is returned when the entitlement is inactive (not in current period). + name: "no access value — feature unavailable", + value: &entitlement.NoAccessValue{}, + wantHasAccess: false, + wantCode: ptr(api.GovernanceFeatureAccessReasonCodeFeatureUnavailable), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := mapEntitlementToAccess(tc.value) + assert.Equal(t, tc.wantHasAccess, got.HasAccess) + if tc.wantCode != nil { + if assert.NotNil(t, got.Reason) { + assert.Equal(t, *tc.wantCode, got.Reason.Code) + } + } else { + assert.Nil(t, got.Reason) + } + }) + } +} + +func ptr[T any](v T) *T { return &v } diff --git a/api/v3/handlers/governance/query.go b/api/v3/handlers/governance/query.go new file mode 100644 index 0000000000..bbd6639854 --- /dev/null +++ b/api/v3/handlers/governance/query.go @@ -0,0 +1,373 @@ +package governance + +import ( + "context" + "errors" + "fmt" + "net/http" + "sort" + "time" + + "github.com/oapi-codegen/nullable" + "github.com/samber/lo" + + api "github.com/openmeterio/openmeter/api/v3" + "github.com/openmeterio/openmeter/api/v3/apierrors" + customershandler "github.com/openmeterio/openmeter/api/v3/handlers/customers" + "github.com/openmeterio/openmeter/openmeter/customer" + "github.com/openmeterio/openmeter/openmeter/entitlement" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/pkg/clock" + "github.com/openmeterio/openmeter/pkg/framework/commonhttp" + "github.com/openmeterio/openmeter/pkg/framework/transport/httptransport" + "github.com/openmeterio/openmeter/pkg/models" + pagination "github.com/openmeterio/openmeter/pkg/pagination/v2" +) + +const ( + defaultPageSize = 100 + maxPageSize = 100 +) + +type ( + QueryGovernanceAccessParams = api.QueryGovernanceAccessParams + QueryGovernanceAccessResponse = api.GovernanceQueryResponse + QueryGovernanceAccessHandler = httptransport.HandlerWithArgs[queryGovernanceAccessRequest, QueryGovernanceAccessResponse, QueryGovernanceAccessParams] +) + +type queryGovernanceAccessRequest struct { + Namespace string + Body api.GovernanceQueryRequest + PageSize int + AfterCursor *pagination.Cursor + BeforeCursor *pagination.Cursor +} + +func (h *handler) QueryGovernanceAccess() QueryGovernanceAccessHandler { + return httptransport.NewHandlerWithArgs( + func(ctx context.Context, r *http.Request, params QueryGovernanceAccessParams) (queryGovernanceAccessRequest, error) { + ns, err := h.resolveNamespace(ctx) + if err != nil { + return queryGovernanceAccessRequest{}, err + } + + var body api.GovernanceQueryRequest + if err := commonhttp.JSONRequestBodyDecoder(r, &body); err != nil { + return queryGovernanceAccessRequest{}, err + } + + pageSize := defaultPageSize + var afterCursor, beforeCursor *pagination.Cursor + + if params.Page != nil { + if params.Page.Size != nil { + pageSize = *params.Page.Size + if pageSize < 1 || pageSize > maxPageSize { + return queryGovernanceAccessRequest{}, apierrors.NewBadRequestError(ctx, + fmt.Errorf("page[size] must be between 1 and %d", maxPageSize), + apierrors.InvalidParameters{{ + Field: "page[size]", + Reason: fmt.Sprintf("must be between 1 and %d", maxPageSize), + Source: apierrors.InvalidParamSourceQuery, + }}, + ) + } + } + + if params.Page.After != nil && params.Page.Before != nil { + return queryGovernanceAccessRequest{}, apierrors.NewBadRequestError(ctx, + fmt.Errorf("page[after] and page[before] are mutually exclusive"), + apierrors.InvalidParameters{{ + Field: "page[after]", + Reason: "cannot be combined with page[before]", + Source: apierrors.InvalidParamSourceQuery, + }}, + ) + } + + if params.Page.After != nil { + decoded, err := pagination.DecodeCursor(*params.Page.After) + if err != nil { + return queryGovernanceAccessRequest{}, apierrors.NewBadRequestError(ctx, err, apierrors.InvalidParameters{{ + Field: "page[after]", + Reason: err.Error(), + Source: apierrors.InvalidParamSourceQuery, + }}) + } + afterCursor = decoded + } + + if params.Page.Before != nil { + decoded, err := pagination.DecodeCursor(*params.Page.Before) + if err != nil { + return queryGovernanceAccessRequest{}, apierrors.NewBadRequestError(ctx, err, apierrors.InvalidParameters{{ + Field: "page[before]", + Reason: err.Error(), + Source: apierrors.InvalidParamSourceQuery, + }}) + } + beforeCursor = decoded + } + } + + req := queryGovernanceAccessRequest{ + Namespace: ns, + Body: body, + PageSize: pageSize, + AfterCursor: afterCursor, + BeforeCursor: beforeCursor, + } + + return req, nil + }, + func(ctx context.Context, req queryGovernanceAccessRequest) (QueryGovernanceAccessResponse, error) { + return h.processGovernanceQuery(ctx, req) + }, + commonhttp.JSONResponseEncoderWithStatus[QueryGovernanceAccessResponse](http.StatusOK), + httptransport.AppendOptions( + h.options, + httptransport.WithOperationName("query-governance-access"), + httptransport.WithErrorEncoder(apierrors.GenericErrorEncoder()), + )..., + ) +} + +// resolvedCustomer groups matched input keys for a single customer. +type resolvedCustomer struct { + Customer customer.Customer + Matched []string +} + +func (h *handler) processGovernanceQuery(ctx context.Context, req queryGovernanceAccessRequest) (QueryGovernanceAccessResponse, error) { + var featureKeys []string + if req.Body.Feature != nil { + featureKeys = req.Body.Feature.Keys + } + + // Resolve each input key to a customer; deduplicate by customer ID. + customerMap := make(map[string]*resolvedCustomer) + var queryErrors []api.GovernanceQueryError + + for _, key := range req.Body.Customer.Keys { + cus, err := h.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{ + Namespace: req.Namespace, + Key: key, + }) + if err != nil { + if models.IsGenericNotFoundError(err) { + queryErrors = append(queryErrors, api.GovernanceQueryError{ + Customer: lo.ToPtr(key), + Code: api.GovernanceQueryErrorCodeCustomerNotFound, + Message: "customer not found", + }) + continue + } + return QueryGovernanceAccessResponse{}, fmt.Errorf("resolve customer key %q: %w", key, err) + } + + if rc, ok := customerMap[cus.ID]; ok { + rc.Matched = append(rc.Matched, key) + } else { + customerMap[cus.ID] = &resolvedCustomer{ + Customer: *cus, + Matched: []string{key}, + } + } + } + + // Sort by (CreatedAt, ID) for stable cursor pagination. + customers := lo.Values(customerMap) + sort.Slice(customers, func(i, j int) bool { + ti := customers[i].Customer.CreatedAt + tj := customers[j].Customer.CreatedAt + if !ti.Equal(tj) { + return ti.Before(tj) + } + return customers[i].Customer.ID < customers[j].Customer.ID + }) + + // Apply cursor pagination. + var hasPrev, hasNext bool + if req.BeforeCursor != nil { + // Backward: take the last pageSize items strictly before the cursor. + bc := *req.BeforeCursor + end := 0 + for i, rc := range customers { + c := pagination.NewCursor(rc.Customer.CreatedAt.Truncate(time.Second), rc.Customer.ID) + if c.Time.After(bc.Time) || (c.Time.Equal(bc.Time) && c.ID >= bc.ID) { + break + } + end = i + 1 + } + candidates := customers[:end] + hasPrev = len(candidates) > req.PageSize + if hasPrev { + candidates = candidates[len(candidates)-req.PageSize:] + } + // next is always set in backward mode: the before-cursor item itself is forward. + hasNext = true + customers = candidates + } else { + // Forward (after cursor or first page). + start := 0 + if req.AfterCursor != nil { + ac := *req.AfterCursor + start = len(customers) // beyond all items if cursor is past the end + for i, rc := range customers { + c := pagination.NewCursor(rc.Customer.CreatedAt.Truncate(time.Second), rc.Customer.ID) + if c.Time.After(ac.Time) || (c.Time.Equal(ac.Time) && c.ID > ac.ID) { + start = i + break + } + } + } + hasPrev = start > 0 + customers = customers[start:] + hasNext = len(customers) > req.PageSize + if hasNext { + customers = customers[:req.PageSize] + } + } + + // Compute feature access for each paged customer. + now := clock.Now() + results := make([]api.GovernanceQueryResult, 0, len(customers)) + + for _, rc := range customers { + access, err := h.entitlementService.GetAccess(ctx, req.Namespace, rc.Customer.ID) + if err != nil { + return QueryGovernanceAccessResponse{}, fmt.Errorf("get access for customer %s: %w", rc.Customer.ID, err) + } + + featureAccess, err := h.buildFeatureAccess(ctx, req.Namespace, featureKeys, access) + if err != nil { + return QueryGovernanceAccessResponse{}, fmt.Errorf("build feature access for customer %s: %w", rc.Customer.ID, err) + } + + results = append(results, api.GovernanceQueryResult{ + Matched: rc.Matched, + Customer: customershandler.ToAPIBillingCustomer(rc.Customer), + Features: featureAccess, + UpdatedAt: now, + }) + } + + return QueryGovernanceAccessResponse{ + Data: results, + Errors: queryErrors, + Meta: buildCursorMeta(customers, req.PageSize, hasPrev, hasNext), + }, nil +} + +// buildFeatureAccess returns the feature access map for a single customer. +// If featureKeys is non-empty, only those keys are evaluated. +// If featureKeys is empty, all non-archived features in the org are returned; +// features the customer has no entitlement for are marked FEATURE_UNAVAILABLE. +func (h *handler) buildFeatureAccess(ctx context.Context, ns string, featureKeys []string, access entitlement.Access) (map[string]api.GovernanceFeatureAccess, error) { + result := make(map[string]api.GovernanceFeatureAccess) + + if len(featureKeys) == 0 { + orgFeatures, err := h.listAllOrgFeatures(ctx, ns) + if err != nil { + return nil, err + } + for _, f := range orgFeatures { + if ev, ok := access.Entitlements[f.Key]; ok { + result[f.Key] = mapEntitlementToAccess(ev.Value) + } else { + result[f.Key] = api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: fmt.Sprintf("feature %q is not available for this customer", f.Key), + }, + } + } + } + return result, nil + } + + for _, key := range featureKeys { + ev, ok := access.Entitlements[key] + if !ok { + access, err := h.resolveAbsentFeature(ctx, ns, key) + if err != nil { + return nil, err + } + result[key] = access + continue + } + result[key] = mapEntitlementToAccess(ev.Value) + } + + return result, nil +} + +// listAllOrgFeatures fetches all non-archived features in the namespace. +// Uses the deprecated Limit field to fetch in one shot; acceptable for prototype scale. +func (h *handler) listAllOrgFeatures(ctx context.Context, ns string) ([]feature.Feature, error) { + const featureFetchLimit = 10_000 + res, err := h.featureConnector.ListFeatures(ctx, feature.ListFeaturesParams{ + Namespace: ns, + IncludeArchived: false, + Limit: featureFetchLimit, + }) + if err != nil { + return nil, fmt.Errorf("list org features: %w", err) + } + return res.Items, nil +} + +// resolveAbsentFeature determines the reason a requested feature key is absent from GetAccess results: +// either the feature doesn't exist in the org (FeatureNotFound) or the customer has no entitlement for it (FeatureUnavailable). +func (h *handler) resolveAbsentFeature(ctx context.Context, ns, featureKey string) (api.GovernanceFeatureAccess, error) { + _, err := h.featureConnector.GetFeature(ctx, ns, featureKey, feature.IncludeArchivedFeatureFalse) + if err != nil { + var fne *feature.FeatureNotFoundError + if errors.As(err, &fne) || models.IsGenericNotFoundError(err) { + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureNotFound, + Message: fmt.Sprintf("feature %q not found", featureKey), + }, + }, nil + } + return api.GovernanceFeatureAccess{}, fmt.Errorf("get feature %q: %w", featureKey, err) + } + + return api.GovernanceFeatureAccess{ + HasAccess: false, + Reason: &api.GovernanceFeatureAccessReason{ + Code: api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, + Message: fmt.Sprintf("feature %q is not available for this customer", featureKey), + }, + }, nil +} + +func buildCursorMeta(customers []*resolvedCustomer, pageSize int, hasPrev, hasNext bool) api.CursorMeta { + meta := api.CursorMeta{ + Page: api.CursorMetaPage{ + Next: nullable.NewNullNullable[string](), + Previous: nullable.NewNullNullable[string](), + Size: float32(pageSize), + }, + } + + if len(customers) > 0 { + first := customers[0] + last := customers[len(customers)-1] + firstCursor := pagination.NewCursor(first.Customer.CreatedAt.Truncate(time.Second), first.Customer.ID) + lastCursor := pagination.NewCursor(last.Customer.CreatedAt.Truncate(time.Second), last.Customer.ID) + meta.Page.First = lo.ToPtr(firstCursor.Encode()) + meta.Page.Last = lo.ToPtr(lastCursor.Encode()) + if hasNext { + meta.Page.Next = nullable.NewNullableWithValue(lastCursor.Encode()) + } + if hasPrev { + meta.Page.Previous = nullable.NewNullableWithValue(firstCursor.Encode()) + } + } + + return meta +} diff --git a/api/v3/handlers/governance/query_test.go b/api/v3/handlers/governance/query_test.go new file mode 100644 index 0000000000..76c000786f --- /dev/null +++ b/api/v3/handlers/governance/query_test.go @@ -0,0 +1,620 @@ +package governance + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/oklog/ulid/v2" + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" + + api "github.com/openmeterio/openmeter/api/v3" + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/customer" + customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" + customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/entitlement" + "github.com/openmeterio/openmeter/openmeter/meter" + meteradapter "github.com/openmeterio/openmeter/openmeter/meter/mockadapter" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/registry" + registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder" + streamingtestutils "github.com/openmeterio/openmeter/openmeter/streaming/testutils" + "github.com/openmeterio/openmeter/openmeter/subject" + subjectadapter "github.com/openmeterio/openmeter/openmeter/subject/adapter" + subjectservice "github.com/openmeterio/openmeter/openmeter/subject/service" + "github.com/openmeterio/openmeter/openmeter/testutils" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/pkg/clock" + "github.com/openmeterio/openmeter/pkg/datetime" + "github.com/openmeterio/openmeter/pkg/framework/lockr" + pagination "github.com/openmeterio/openmeter/pkg/pagination/v2" + "github.com/openmeterio/openmeter/pkg/timeutil" +) + +func newTestNamespace(t *testing.T) string { + t.Helper() + return ulid.Make().String() +} + +// migrateOnce serializes schema migrations to avoid concurrent-write errors from ent. +var migrateOnce sync.Mutex + +type testDeps struct { + dbClient *testutils.TestDB + subjectService subject.Service + customerService customer.Service + meterService meter.ManageService + featureRepo feature.FeatureRepo + registry *registry.Entitlement + streamingConnector *streamingtestutils.MockStreamingConnector +} + +func (d *testDeps) close(t *testing.T) { + t.Helper() + if err := d.dbClient.EntDriver.Close(); err != nil { + t.Errorf("close ent driver: %v", err) + } + if err := d.dbClient.PGDriver.Close(); err != nil { + t.Errorf("close pg driver: %v", err) + } +} + +func setupTestDeps(t *testing.T) *testDeps { + t.Helper() + + logger := testutils.NewDiscardLogger(t) + testdb := testutils.InitPostgresDB(t) + dbClient := testdb.EntDriver.Client() + + migrateOnce.Lock() + require.NoError(t, dbClient.Schema.Create(context.Background())) + migrateOnce.Unlock() + + meterService, err := meteradapter.NewManage(nil) + require.NoError(t, err) + + subjectAdapter, err := subjectadapter.New(dbClient) + require.NoError(t, err) + + subjectSvc, err := subjectservice.New(subjectAdapter) + require.NoError(t, err) + + customerAdapter, err := customeradapter.New(customeradapter.Config{ + Client: dbClient, + Logger: logger, + }) + require.NoError(t, err) + + customerSvc, err := customerservice.New(customerservice.Config{ + Adapter: customerAdapter, + Publisher: eventbus.NewMock(t), + }) + require.NoError(t, err) + + locker, err := lockr.NewLocker(&lockr.LockerConfig{Logger: logger}) + require.NoError(t, err) + + streamingConnector := streamingtestutils.NewMockStreamingConnector(t) + + reg := registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{ + DatabaseClient: dbClient, + StreamingConnector: streamingConnector, + Logger: logger, + Tracer: noop.NewTracerProvider().Tracer("test"), + MeterService: meterService, + CustomerService: customerSvc, + Publisher: eventbus.NewMock(t), + EntitlementsConfiguration: config.EntitlementsConfiguration{ + GracePeriod: datetime.ISODurationString("P1D"), + }, + Locker: locker, + }) + + return &testDeps{ + dbClient: testdb, + subjectService: subjectSvc, + customerService: customerSvc, + meterService: meterService, + featureRepo: reg.FeatureRepo, + registry: reg, + streamingConnector: streamingConnector, + } +} + +func newTestHandler(deps *testDeps) *handler { + return &handler{ + resolveNamespace: func(_ context.Context) (string, error) { panic("not used in direct calls") }, + customerService: deps.customerService, + entitlementService: deps.registry.Entitlement, + featureConnector: deps.registry.Feature, + } +} + +func createCustomer(t *testing.T, deps *testDeps, ns, key string, subjectKeys []string) *customer.Customer { + t.Helper() + + for _, sk := range subjectKeys { + _, err := deps.subjectService.Create(t.Context(), subject.CreateInput{ + Namespace: ns, + Key: sk, + }) + require.NoError(t, err) + } + + cust, err := deps.customerService.CreateCustomer(t.Context(), customer.CreateCustomerInput{ + Namespace: ns, + CustomerMutate: customer.CustomerMutate{ + Key: lo.ToPtr(key), + Name: key, + UsageAttribution: &customer.CustomerUsageAttribution{ + SubjectKeys: subjectKeys, + }, + }, + }) + require.NoError(t, err) + return cust +} + +func createBooleanFeatureAndEntitlement(t *testing.T, deps *testDeps, ns, featureKey string, cust *customer.Customer) { + t.Helper() + + feat, err := deps.featureRepo.CreateFeature(t.Context(), feature.CreateFeatureInputs{ + Key: featureKey, + Name: featureKey, + Namespace: ns, + }) + require.NoError(t, err) + + _, err = deps.registry.Entitlement.CreateEntitlement(t.Context(), entitlement.CreateEntitlementInputs{ + Namespace: ns, + UsageAttribution: cust.GetUsageAttribution(), + FeatureKey: lo.ToPtr(featureKey), + FeatureID: lo.ToPtr(feat.ID), + EntitlementType: entitlement.EntitlementTypeBoolean, + }, nil) + require.NoError(t, err) +} + +func createOrphanFeature(t *testing.T, deps *testDeps, ns, featureKey string) { + t.Helper() + _, err := deps.featureRepo.CreateFeature(t.Context(), feature.CreateFeatureInputs{ + Key: featureKey, + Name: featureKey, + Namespace: ns, + }) + require.NoError(t, err) +} + +// --- Tests --- + +func TestQueryGovernanceAccess_UnknownCustomerKey(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"ghost"}}}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + assert.Empty(t, resp.Data) + require.Len(t, resp.Errors, 1) + assert.Equal(t, api.GovernanceQueryErrorCodeCustomerNotFound, resp.Errors[0].Code) + assert.Equal(t, lo.ToPtr("ghost"), resp.Errors[0].Customer) +} + +func TestQueryGovernanceAccess_KnownCustomerNoEntitlements(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: []string{cust.GetUsageAttribution().SubjectKeys[0]}}}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Empty(t, resp.Data[0].Features) + assert.Empty(t, resp.Errors) +} + +func TestQueryGovernanceAccess_BooleanEntitlement_HasAccess(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + createBooleanFeatureAndEntitlement(t, deps, ns, "premium", cust) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"premium"}}, + }, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Empty(t, resp.Errors) + + featureAccess := resp.Data[0].Features["premium"] + assert.True(t, featureAccess.HasAccess) + assert.Nil(t, featureAccess.Reason) +} + +func TestQueryGovernanceAccess_FeatureNotFound(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + _ = cust + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"does-not-exist"}}, + }, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + featureAccess := resp.Data[0].Features["does-not-exist"] + assert.False(t, featureAccess.HasAccess) + require.NotNil(t, featureAccess.Reason) + assert.Equal(t, api.GovernanceFeatureAccessReasonCodeFeatureNotFound, featureAccess.Reason.Code) +} + +func TestQueryGovernanceAccess_FeatureUnavailable(t *testing.T) { + // Feature exists in org but customer has no entitlement for it. + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + _ = cust + createOrphanFeature(t, deps, ns, "enterprise") + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"enterprise"}}, + }, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + featureAccess := resp.Data[0].Features["enterprise"] + assert.False(t, featureAccess.HasAccess) + require.NotNil(t, featureAccess.Reason) + assert.Equal(t, api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, featureAccess.Reason.Code) +} + +func TestQueryGovernanceAccess_MultipleKeysSameCustomer(t *testing.T) { + // Two input keys resolve to the same customer; response has one entry with both keys in matched[]. + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + // customer key = "acme", usage attribution subject key = "acme-sub" + createCustomer(t, deps, ns, "acme", []string{"acme-sub"}) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme", "acme-sub"}}}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + assert.Empty(t, resp.Errors) + require.Len(t, resp.Data, 1, "two keys resolving to same customer should collapse into one result") + assert.Len(t, resp.Data[0].Matched, 2) + assert.ElementsMatch(t, []string{"acme", "acme-sub"}, resp.Data[0].Matched) +} + +func TestQueryGovernanceAccess_MixedHitsAndMisses(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + createBooleanFeatureAndEntitlement(t, deps, ns, "feature-a", cust) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme", "unknown-key"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"feature-a"}}, + }, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + require.Len(t, resp.Errors, 1) + assert.Equal(t, api.GovernanceQueryErrorCodeCustomerNotFound, resp.Errors[0].Code) + assert.True(t, resp.Data[0].Features["feature-a"].HasAccess) +} + +func TestQueryGovernanceAccess_NoFeatureKeysReturnsAll(t *testing.T) { + // When feature.keys is omitted, all org features are returned — including ones + // the customer has no entitlement for (marked FEATURE_UNAVAILABLE). + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + createBooleanFeatureAndEntitlement(t, deps, ns, "feat-1", cust) + createBooleanFeatureAndEntitlement(t, deps, ns, "feat-2", cust) + // feat-3 exists in the org but the customer has no entitlement for it. + createOrphanFeature(t, deps, ns, "feat-3") + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}}, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Len(t, resp.Data[0].Features, 3) + assert.True(t, resp.Data[0].Features["feat-1"].HasAccess) + assert.True(t, resp.Data[0].Features["feat-2"].HasAccess) + feat3 := resp.Data[0].Features["feat-3"] + assert.False(t, feat3.HasAccess) + require.NotNil(t, feat3.Reason) + assert.Equal(t, api.GovernanceFeatureAccessReasonCodeFeatureUnavailable, feat3.Reason.Code) +} + +// createMeterInPG writes a meter row to ent DB (FK constraint on features.meter_id). +// The mock meter adapter only stores in memory; this must be called after CreateMeter. +func createMeterInPG(t *testing.T, dbClient *entdb.Client, mtr meter.Meter) { + t.Helper() + _, err := dbClient.Meter.Create(). + SetID(mtr.ID). + SetNamespace(mtr.Namespace). + SetName(mtr.Name). + SetKey(mtr.Key). + SetAggregation(mtr.Aggregation). + SetEventType(mtr.EventType). + SetNillableValueProperty(mtr.ValueProperty). + Save(t.Context()) + require.NoError(t, err) +} + +func createMeter(t *testing.T, deps *testDeps, ns, key string) meter.Meter { + t.Helper() + mtr, err := deps.meterService.CreateMeter(t.Context(), meter.CreateMeterInput{ + Namespace: ns, + Name: key, + Key: key, + Aggregation: meter.MeterAggregationSum, + EventType: "test", + ValueProperty: lo.ToPtr("$.value"), + }) + require.NoError(t, err) + createMeterInPG(t, deps.dbClient.EntDriver.Client(), mtr) + return mtr +} + +func createMeteredFeatureAndEntitlement(t *testing.T, deps *testDeps, ns, featureKey string, mtr meter.Meter, cust *customer.Customer, issueAfterReset *float64) { + t.Helper() + feat, err := deps.featureRepo.CreateFeature(t.Context(), feature.CreateFeatureInputs{ + Key: featureKey, + Name: featureKey, + Namespace: ns, + MeterID: lo.ToPtr(mtr.ID), + }) + require.NoError(t, err) + + _, err = deps.registry.Entitlement.CreateEntitlement(t.Context(), entitlement.CreateEntitlementInputs{ + Namespace: ns, + UsageAttribution: cust.GetUsageAttribution(), + FeatureKey: lo.ToPtr(featureKey), + FeatureID: lo.ToPtr(feat.ID), + EntitlementType: entitlement.EntitlementTypeMetered, + UsagePeriod: lo.ToPtr(entitlement.NewUsagePeriodInputFromRecurrence(timeutil.Recurrence{ + Interval: timeutil.RecurrencePeriodDaily, + Anchor: clock.Now(), + })), + IssueAfterReset: issueAfterReset, + }, nil) + require.NoError(t, err) +} + +func TestQueryGovernanceAccess_MeteredEntitlement_HasAccess(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + clock.SetTime(now) + defer clock.ResetTime() + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + mtr := createMeter(t, deps, ns, "api-calls") + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + // IssueAfterReset=10.0 → balance starts at 10, HasAccess=true + createMeteredFeatureAndEntitlement(t, deps, ns, "premium", mtr, cust, lo.ToPtr(10.0)) + + // Add an event so the streaming mock has data for the meter. + deps.streamingConnector.AddSimpleEvent(mtr.Key, 1, now) + + clock.SetTime(now.Add(time.Hour)) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"premium"}}, + }, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Empty(t, resp.Errors) + featureAccess := resp.Data[0].Features["premium"] + assert.True(t, featureAccess.HasAccess) + assert.Nil(t, featureAccess.Reason) +} + +func TestQueryGovernanceAccess_MeteredEntitlement_Exhausted(t *testing.T) { + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + clock.SetTime(now) + defer clock.ResetTime() + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + mtr := createMeter(t, deps, ns, "api-calls") + cust := createCustomer(t, deps, ns, "acme", []string{"acme"}) + // No IssueAfterReset → balance=0, HasAccess=false → UsageLimitReached + createMeteredFeatureAndEntitlement(t, deps, ns, "premium", mtr, cust, nil) + + deps.streamingConnector.AddSimpleEvent(mtr.Key, 1, now) + + clock.SetTime(now.Add(time.Hour)) + + resp, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{ + Customer: api.GovernanceQueryRequestCustomers{Keys: []string{"acme"}}, + Feature: &api.GovernanceQueryRequestFeatures{Keys: []string{"premium"}}, + }, + PageSize: defaultPageSize, + }) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + assert.Empty(t, resp.Errors) + featureAccess := resp.Data[0].Features["premium"] + assert.False(t, featureAccess.HasAccess) + require.NotNil(t, featureAccess.Reason) + assert.Equal(t, api.GovernanceFeatureAccessReasonCodeUsageLimitReached, featureAccess.Reason.Code) +} + +func TestQueryGovernanceAccess_Pagination(t *testing.T) { + // given: 3 customers (c1, c2, c3 in creation order); pageSize=1 + deps := setupTestDeps(t) + t.Cleanup(func() { deps.close(t) }) + + now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + clock.SetTime(now) + defer clock.ResetTime() + + h := newTestHandler(deps) + ns := newTestNamespace(t) + + createCustomer(t, deps, ns, "c1", []string{"c1"}) + clock.SetTime(now.Add(time.Second)) + createCustomer(t, deps, ns, "c2", []string{"c2"}) + clock.SetTime(now.Add(2 * time.Second)) + createCustomer(t, deps, ns, "c3", []string{"c3"}) + + allKeys := []string{"c1", "c2", "c3"} + + decodeCursor := func(encoded string) *pagination.Cursor { + c, err := pagination.DecodeCursor(encoded) + require.NoError(t, err) + return c + } + + // Page 1: [c1] — no previous, next points to c1 + page1, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + }) + require.NoError(t, err) + require.Len(t, page1.Data, 1) + assert.Equal(t, "c1", page1.Data[0].Matched[0]) + assert.True(t, page1.Meta.Page.Previous.IsNull(), "no previous on first page") + require.False(t, page1.Meta.Page.Next.IsNull(), "next must be set on page 1") + + next1, _ := page1.Meta.Page.Next.Get() + + // Page 2: [c2] — previous set, next set + page2, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + AfterCursor: decodeCursor(next1), + }) + require.NoError(t, err) + require.Len(t, page2.Data, 1) + assert.Equal(t, "c2", page2.Data[0].Matched[0]) + require.False(t, page2.Meta.Page.Previous.IsNull(), "previous must be set on page 2") + require.False(t, page2.Meta.Page.Next.IsNull(), "next must be set on page 2") + + next2, _ := page2.Meta.Page.Next.Get() + prev2, _ := page2.Meta.Page.Previous.Get() + + // Page 3 (last): [c3] — previous set, no next + page3, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + AfterCursor: decodeCursor(next2), + }) + require.NoError(t, err) + require.Len(t, page3.Data, 1) + assert.Equal(t, "c3", page3.Data[0].Matched[0]) + require.False(t, page3.Meta.Page.Previous.IsNull(), "previous must be set on last page") + assert.True(t, page3.Meta.Page.Next.IsNull(), "no next on last page") + + // Cursor past end → empty data + require.NotNil(t, page3.Meta.Page.Last, "last cursor must be set") + last3 := *page3.Meta.Page.Last + pastEnd, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + AfterCursor: decodeCursor(last3), + }) + require.NoError(t, err) + assert.Empty(t, pastEnd.Data) + assert.True(t, pastEnd.Meta.Page.Next.IsNull()) + assert.True(t, pastEnd.Meta.Page.Previous.IsNull()) + + // Backward from page 2's previous cursor → [c1], no previous, next set + pageBack, err := h.processGovernanceQuery(t.Context(), queryGovernanceAccessRequest{ + Namespace: ns, + Body: api.GovernanceQueryRequest{Customer: api.GovernanceQueryRequestCustomers{Keys: allKeys}}, + PageSize: 1, + BeforeCursor: decodeCursor(prev2), + }) + require.NoError(t, err) + require.Len(t, pageBack.Data, 1) + assert.Equal(t, "c1", pageBack.Data[0].Matched[0]) + assert.True(t, pageBack.Meta.Page.Previous.IsNull(), "no previous before c1") + assert.False(t, pageBack.Meta.Page.Next.IsNull(), "next must be set in backward result") +} diff --git a/api/v3/server/routes.go b/api/v3/server/routes.go index e70399f98a..5c36ea179d 100644 --- a/api/v3/server/routes.go +++ b/api/v3/server/routes.go @@ -470,5 +470,5 @@ func (s *Server) UpdateOrganizationDefaultTaxCodes(w http.ResponseWriter, r *htt // Governance func (s *Server) QueryGovernanceAccess(w http.ResponseWriter, r *http.Request, params api.QueryGovernanceAccessParams) { - unimplemented.QueryGovernanceAccess(w, r, params) + s.governanceHandler.QueryGovernanceAccess().With(params).ServeHTTP(w, r) } diff --git a/api/v3/server/server.go b/api/v3/server/server.go index 00d7cbd459..ab28f100f7 100644 --- a/api/v3/server/server.go +++ b/api/v3/server/server.go @@ -27,6 +27,7 @@ import ( eventshandler "github.com/openmeterio/openmeter/api/v3/handlers/events" featurecosthandler "github.com/openmeterio/openmeter/api/v3/handlers/featurecost" featureshandler "github.com/openmeterio/openmeter/api/v3/handlers/features" + governancehandler "github.com/openmeterio/openmeter/api/v3/handlers/governance" llmcosthandler "github.com/openmeterio/openmeter/api/v3/handlers/llmcost" metershandler "github.com/openmeterio/openmeter/api/v3/handlers/meters" planshandler "github.com/openmeterio/openmeter/api/v3/handlers/plans" @@ -238,6 +239,7 @@ type Server struct { customersBillingHandler customersbillinghandler.Handler customersCreditsHandler customerscreditshandler.Handler customersEntitlementHandler customersentitlementhandler.Handler + governanceHandler governancehandler.Handler metersHandler metershandler.Handler subscriptionsHandler subscriptionshandler.Handler subscriptionAddonsHandler subscriptionaddonshandler.Handler @@ -318,6 +320,7 @@ func NewServer(config *Config) (*Server, error) { } featuresH := featureshandler.New(resolveNamespace, config.FeatureConnector, config.MeterService, config.LLMCostService, httptransport.WithErrorHandler(config.ErrorHandler)) + governanceH := governancehandler.New(resolveNamespace, config.CustomerService, config.EntitlementService, config.FeatureConnector, httptransport.WithErrorHandler(config.ErrorHandler)) var llmcostH llmcosthandler.Handler if config.LLMCostService != nil { @@ -351,6 +354,7 @@ func NewServer(config *Config) (*Server, error) { currenciesHandler: currenciesHandler, featuresHandler: featuresH, featureCostHandler: featureCostH, + governanceHandler: governanceH, }, nil }