diff --git a/console/src/oapi/management.generated.ts b/console/src/oapi/management.generated.ts index be9765f4..59ac7e39 100644 --- a/console/src/oapi/management.generated.ts +++ b/console/src/oapi/management.generated.ts @@ -486,7 +486,11 @@ export interface paths { * @description Triggers a user into a journey at a specific entrance step, typically used for testing or manual overrides */ post: operations["triggerUser"]; - delete?: never; + /** + * Cancel user journey + * @description Cancels all active (non-completed) journey states for a user, stopping further step processing including delayed steps + */ + delete: operations["cancelUserJourney"]; options?: never; head?: never; patch?: never; @@ -5355,6 +5359,32 @@ export interface operations { default: components["responses"]["Error"]; }; }; + cancelUserJourney: { + parameters: { + query?: never; + header?: never; + path: { + /** @description The project ID */ + projectID: string; + /** @description The journey ID */ + journeyID: string; + /** @description The user ID whose journey execution should be cancelled */ + userID: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description User journey cancelled successfully */ + 204: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + default: components["responses"]["Error"]; + }; + }; getJourneySteps: { parameters: { query?: never; diff --git a/internal/pubsub/consumer/campaigns.go b/internal/pubsub/consumer/campaigns.go index ad4663c7..25532b11 100644 --- a/internal/pubsub/consumer/campaigns.go +++ b/internal/pubsub/consumer/campaigns.go @@ -139,6 +139,19 @@ func CampaignsSendHandler(logger *zap.Logger, mgmt *management.State, usrs *subj return err } + if !campaign.Transactional && campaign.SubscriptionID != nil { + unsubscribed, err := mgmt.IsUserUnsubscribed(ctx, event.UserID, *campaign.SubscriptionID) + if err != nil { + logger.Error("failed to check subscription status", zap.Error(err)) + return err + } + if unsubscribed { + logger.Info("skipping send, user has unsubscribed from subscription", + zap.String("subscription_id", campaign.SubscriptionID.String())) + return nil + } + } + project, err := mgmt.GetProject(ctx, event.ProjectID) if err != nil { logger.Error("failed to get project", zap.Error(err)) diff --git a/internal/store/management/subscriptions.go b/internal/store/management/subscriptions.go index 79575345..b7fb946d 100644 --- a/internal/store/management/subscriptions.go +++ b/internal/store/management/subscriptions.go @@ -2,6 +2,9 @@ package management import ( "context" + "database/sql" + "errors" + "fmt" "time" "github.com/google/uuid" @@ -9,6 +12,8 @@ import ( "github.com/lunogram/platform/internal/store" ) +const subscriptionStateUnsubscribed = 1 + type Subscriptions []Subscription func (s Subscriptions) OAPI() []oapi.Subscription { @@ -76,18 +81,18 @@ type SubscriptionsStore struct { } func (s *SubscriptionsStore) GetUserSubscriptions(ctx context.Context, projectID, userID uuid.UUID, pagination store.Pagination) (UserSubscriptions, int, error) { - query := ` + query := fmt.Sprintf(` SELECT s.id AS subscription_id, s.name, s.channel, - CASE WHEN us.state = 1 THEN 'unsubscribed' ELSE 'subscribed' END AS state, + CASE WHEN us.state = %d THEN 'unsubscribed' ELSE 'subscribed' END AS state, COUNT(*) OVER () AS total_count FROM subscriptions s LEFT JOIN user_subscription us ON us.subscription_id = s.id AND us.user_id = $2 WHERE s.project_id = $1 AND s.is_public = true ORDER BY s.name - LIMIT $3 OFFSET $4` + LIMIT $3 OFFSET $4`, subscriptionStateUnsubscribed) type result struct { SubscriptionID uuid.UUID `db:"subscription_id"` @@ -122,16 +127,16 @@ func (s *SubscriptionsStore) GetUserSubscriptions(ctx context.Context, projectID } func (s *SubscriptionsStore) GetAllUserSubscriptions(ctx context.Context, projectID, userID uuid.UUID) (UserSubscriptions, error) { - query := ` + query := fmt.Sprintf(` SELECT s.id AS subscription_id, s.name, s.channel, - CASE WHEN us.state = 1 THEN 'unsubscribed' ELSE 'subscribed' END AS state + CASE WHEN us.state = %d THEN 'unsubscribed' ELSE 'subscribed' END AS state FROM subscriptions s LEFT JOIN user_subscription us ON us.subscription_id = s.id AND us.user_id = $2 WHERE s.project_id = $1 AND s.is_public = true - ORDER BY s.name` + ORDER BY s.name`, subscriptionStateUnsubscribed) var subscriptions []UserSubscription err := s.db.SelectContext(ctx, &subscriptions, query, projectID, userID) @@ -196,9 +201,9 @@ func (s *SubscriptionsStore) Unsubscribe(ctx context.Context, userID, subscripti } // Insert unsubscribe record - _, err = s.db.ExecContext(ctx, ` + _, err = s.db.ExecContext(ctx, fmt.Sprintf(` INSERT INTO user_subscription (user_id, subscription_id, state, created_at, updated_at) - VALUES ($1, $2, 1, NOW(), NOW())`, userID, subscriptionID) + VALUES ($1, $2, %d, NOW(), NOW())`, subscriptionStateUnsubscribed), userID, subscriptionID) return err } @@ -209,6 +214,20 @@ func (s *SubscriptionsStore) SetSubscriptionState(ctx context.Context, userID, s return s.Unsubscribe(ctx, userID, subscriptionID) } +func (s *SubscriptionsStore) IsUserUnsubscribed(ctx context.Context, userID, subscriptionID uuid.UUID) (bool, error) { + var state *int + err := s.db.GetContext(ctx, &state, + `SELECT state FROM user_subscription WHERE user_id = $1 AND subscription_id = $2`, + userID, subscriptionID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, err + } + return state != nil && *state == subscriptionStateUnsubscribed, nil +} + func (s *SubscriptionsStore) ListSubscriptions(ctx context.Context, projectID uuid.UUID, pagination store.Pagination) (Subscriptions, int, error) { query := ` SELECT diff --git a/internal/store/management/subscriptions_test.go b/internal/store/management/subscriptions_test.go index dd5256c9..b3d35e02 100644 --- a/internal/store/management/subscriptions_test.go +++ b/internal/store/management/subscriptions_test.go @@ -57,4 +57,38 @@ func TestSubscriptionsStore(t *testing.T) { assert.GreaterOrEqual(t, total, 1) assert.GreaterOrEqual(t, len(subs), 1) }) + + t.Run("IsUserUnsubscribed", func(t *testing.T) { + subID, err := db.CreateSubscription(ctx, Subscription{ + ProjectID: projectID, + Name: "Promotions", + Channel: "email", + IsPublic: true, + }) + require.NoError(t, err) + + userID := uuid.New() + + t.Run("returns false when no record exists", func(t *testing.T) { + unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID) + require.NoError(t, err) + assert.False(t, unsubscribed) + }) + + t.Run("returns true after unsubscribe", func(t *testing.T) { + require.NoError(t, db.Unsubscribe(ctx, userID, subID)) + + unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID) + require.NoError(t, err) + assert.True(t, unsubscribed) + }) + + t.Run("returns false after re-subscribe", func(t *testing.T) { + require.NoError(t, db.Subscribe(ctx, userID, subID)) + + unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID) + require.NoError(t, err) + assert.False(t, unsubscribed) + }) + }) }