From 866f51dcaf0a420c1ff3284176d0f5dfccdd9f8c Mon Sep 17 00:00:00 2001 From: RickjanHoornbeeck <51879@hoornbeeck.nl> Date: Mon, 23 Mar 2026 10:31:41 +0100 Subject: [PATCH 1/3] feat: add user subscription status check and corresponding tests --- internal/pubsub/consumer/campaigns.go | 13 +++++++ internal/store/management/subscriptions.go | 16 +++++++++ .../store/management/subscriptions_test.go | 34 +++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/internal/pubsub/consumer/campaigns.go b/internal/pubsub/consumer/campaigns.go index ad4663c7..25018f0f 100644 --- a/internal/pubsub/consumer/campaigns.go +++ b/internal/pubsub/consumer/campaigns.go @@ -151,6 +151,19 @@ func CampaignsSendHandler(logger *zap.Logger, mgmt *management.State, usrs *subj return err } + if !campaign.Transactional && campaign.SubscriptionID != nil { + unsubscribed, err := mgmt.IsUserUnsubscribed(ctx, event.UserID, *campaign.SubscriptionID) + if err != nil { + logger.Error("failed to check subscription status", zap.Error(err)) + return err + } + if unsubscribed { + logger.Info("skipping send, user has unsubscribed from subscription", + zap.String("subscription_id", campaign.SubscriptionID.String())) + return nil + } + } + provider, exists := registry.Get(campaign.Provider.Module) if !exists { logger.Error("provider module not found", zap.String("module", campaign.Provider.Module)) diff --git a/internal/store/management/subscriptions.go b/internal/store/management/subscriptions.go index 79575345..7edb6fc9 100644 --- a/internal/store/management/subscriptions.go +++ b/internal/store/management/subscriptions.go @@ -2,6 +2,8 @@ package management import ( "context" + "database/sql" + "errors" "time" "github.com/google/uuid" @@ -209,6 +211,20 @@ func (s *SubscriptionsStore) SetSubscriptionState(ctx context.Context, userID, s return s.Unsubscribe(ctx, userID, subscriptionID) } +func (s *SubscriptionsStore) IsUserUnsubscribed(ctx context.Context, userID, subscriptionID uuid.UUID) (bool, error) { + var state *int + err := s.db.GetContext(ctx, &state, + `SELECT state FROM user_subscription WHERE user_id = $1 AND subscription_id = $2`, + userID, subscriptionID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, err + } + return state != nil && *state == 1, nil +} + func (s *SubscriptionsStore) ListSubscriptions(ctx context.Context, projectID uuid.UUID, pagination store.Pagination) (Subscriptions, int, error) { query := ` SELECT diff --git a/internal/store/management/subscriptions_test.go b/internal/store/management/subscriptions_test.go index dd5256c9..b3d35e02 100644 --- a/internal/store/management/subscriptions_test.go +++ b/internal/store/management/subscriptions_test.go @@ -57,4 +57,38 @@ func TestSubscriptionsStore(t *testing.T) { assert.GreaterOrEqual(t, total, 1) assert.GreaterOrEqual(t, len(subs), 1) }) + + t.Run("IsUserUnsubscribed", func(t *testing.T) { + subID, err := db.CreateSubscription(ctx, Subscription{ + ProjectID: projectID, + Name: "Promotions", + Channel: "email", + IsPublic: true, + }) + require.NoError(t, err) + + userID := uuid.New() + + t.Run("returns false when no record exists", func(t *testing.T) { + unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID) + require.NoError(t, err) + assert.False(t, unsubscribed) + }) + + t.Run("returns true after unsubscribe", func(t *testing.T) { + require.NoError(t, db.Unsubscribe(ctx, userID, subID)) + + unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID) + require.NoError(t, err) + assert.True(t, unsubscribed) + }) + + t.Run("returns false after re-subscribe", func(t *testing.T) { + require.NoError(t, db.Subscribe(ctx, userID, subID)) + + unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID) + require.NoError(t, err) + assert.False(t, unsubscribed) + }) + }) } From 12cc8ff24c2cde9249f1f898235aef44febbdfdc Mon Sep 17 00:00:00 2001 From: RickjanHoornbeeck <51879@hoornbeeck.nl> Date: Mon, 23 Mar 2026 10:50:41 +0100 Subject: [PATCH 2/3] fix: generate failing --- console/src/oapi/management.generated.ts | 32 +++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/console/src/oapi/management.generated.ts b/console/src/oapi/management.generated.ts index be9765f4..59ac7e39 100644 --- a/console/src/oapi/management.generated.ts +++ b/console/src/oapi/management.generated.ts @@ -486,7 +486,11 @@ export interface paths { * @description Triggers a user into a journey at a specific entrance step, typically used for testing or manual overrides */ post: operations["triggerUser"]; - delete?: never; + /** + * Cancel user journey + * @description Cancels all active (non-completed) journey states for a user, stopping further step processing including delayed steps + */ + delete: operations["cancelUserJourney"]; options?: never; head?: never; patch?: never; @@ -5355,6 +5359,32 @@ export interface operations { default: components["responses"]["Error"]; }; }; + cancelUserJourney: { + parameters: { + query?: never; + header?: never; + path: { + /** @description The project ID */ + projectID: string; + /** @description The journey ID */ + journeyID: string; + /** @description The user ID whose journey execution should be cancelled */ + userID: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description User journey cancelled successfully */ + 204: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + default: components["responses"]["Error"]; + }; + }; getJourneySteps: { parameters: { query?: never; From 4532e015b2209d2c6aec2c4f461f512d65dd0392 Mon Sep 17 00:00:00 2001 From: RickjanHoornbeeck <51879@hoornbeeck.nl> Date: Mon, 23 Mar 2026 11:02:46 +0100 Subject: [PATCH 3/3] fix: copilot suggestions added --- internal/pubsub/consumer/campaigns.go | 24 +++++++++++----------- internal/store/management/subscriptions.go | 21 +++++++++++-------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/internal/pubsub/consumer/campaigns.go b/internal/pubsub/consumer/campaigns.go index 25018f0f..25532b11 100644 --- a/internal/pubsub/consumer/campaigns.go +++ b/internal/pubsub/consumer/campaigns.go @@ -139,18 +139,6 @@ func CampaignsSendHandler(logger *zap.Logger, mgmt *management.State, usrs *subj return err } - project, err := mgmt.GetProject(ctx, event.ProjectID) - if err != nil { - logger.Error("failed to get project", zap.Error(err)) - return err - } - - user, err := usrs.GetUser(ctx, event.ProjectID, event.UserID) - if err != nil { - logger.Error("failed to get user", zap.Error(err)) - return err - } - if !campaign.Transactional && campaign.SubscriptionID != nil { unsubscribed, err := mgmt.IsUserUnsubscribed(ctx, event.UserID, *campaign.SubscriptionID) if err != nil { @@ -164,6 +152,18 @@ func CampaignsSendHandler(logger *zap.Logger, mgmt *management.State, usrs *subj } } + project, err := mgmt.GetProject(ctx, event.ProjectID) + if err != nil { + logger.Error("failed to get project", zap.Error(err)) + return err + } + + user, err := usrs.GetUser(ctx, event.ProjectID, event.UserID) + if err != nil { + logger.Error("failed to get user", zap.Error(err)) + return err + } + provider, exists := registry.Get(campaign.Provider.Module) if !exists { logger.Error("provider module not found", zap.String("module", campaign.Provider.Module)) diff --git a/internal/store/management/subscriptions.go b/internal/store/management/subscriptions.go index 7edb6fc9..b7fb946d 100644 --- a/internal/store/management/subscriptions.go +++ b/internal/store/management/subscriptions.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "fmt" "time" "github.com/google/uuid" @@ -11,6 +12,8 @@ import ( "github.com/lunogram/platform/internal/store" ) +const subscriptionStateUnsubscribed = 1 + type Subscriptions []Subscription func (s Subscriptions) OAPI() []oapi.Subscription { @@ -78,18 +81,18 @@ type SubscriptionsStore struct { } func (s *SubscriptionsStore) GetUserSubscriptions(ctx context.Context, projectID, userID uuid.UUID, pagination store.Pagination) (UserSubscriptions, int, error) { - query := ` + query := fmt.Sprintf(` SELECT s.id AS subscription_id, s.name, s.channel, - CASE WHEN us.state = 1 THEN 'unsubscribed' ELSE 'subscribed' END AS state, + CASE WHEN us.state = %d THEN 'unsubscribed' ELSE 'subscribed' END AS state, COUNT(*) OVER () AS total_count FROM subscriptions s LEFT JOIN user_subscription us ON us.subscription_id = s.id AND us.user_id = $2 WHERE s.project_id = $1 AND s.is_public = true ORDER BY s.name - LIMIT $3 OFFSET $4` + LIMIT $3 OFFSET $4`, subscriptionStateUnsubscribed) type result struct { SubscriptionID uuid.UUID `db:"subscription_id"` @@ -124,16 +127,16 @@ func (s *SubscriptionsStore) GetUserSubscriptions(ctx context.Context, projectID } func (s *SubscriptionsStore) GetAllUserSubscriptions(ctx context.Context, projectID, userID uuid.UUID) (UserSubscriptions, error) { - query := ` + query := fmt.Sprintf(` SELECT s.id AS subscription_id, s.name, s.channel, - CASE WHEN us.state = 1 THEN 'unsubscribed' ELSE 'subscribed' END AS state + CASE WHEN us.state = %d THEN 'unsubscribed' ELSE 'subscribed' END AS state FROM subscriptions s LEFT JOIN user_subscription us ON us.subscription_id = s.id AND us.user_id = $2 WHERE s.project_id = $1 AND s.is_public = true - ORDER BY s.name` + ORDER BY s.name`, subscriptionStateUnsubscribed) var subscriptions []UserSubscription err := s.db.SelectContext(ctx, &subscriptions, query, projectID, userID) @@ -198,9 +201,9 @@ func (s *SubscriptionsStore) Unsubscribe(ctx context.Context, userID, subscripti } // Insert unsubscribe record - _, err = s.db.ExecContext(ctx, ` + _, err = s.db.ExecContext(ctx, fmt.Sprintf(` INSERT INTO user_subscription (user_id, subscription_id, state, created_at, updated_at) - VALUES ($1, $2, 1, NOW(), NOW())`, userID, subscriptionID) + VALUES ($1, $2, %d, NOW(), NOW())`, subscriptionStateUnsubscribed), userID, subscriptionID) return err } @@ -222,7 +225,7 @@ func (s *SubscriptionsStore) IsUserUnsubscribed(ctx context.Context, userID, sub } return false, err } - return state != nil && *state == 1, nil + return state != nil && *state == subscriptionStateUnsubscribed, nil } func (s *SubscriptionsStore) ListSubscriptions(ctx context.Context, projectID uuid.UUID, pagination store.Pagination) (Subscriptions, int, error) {