Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion console/src/oapi/management.generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,11 @@ export interface paths {
* @description Triggers a user into a journey at a specific entrance step, typically used for testing or manual overrides
*/
post: operations["triggerUser"];
delete?: never;
/**
* Cancel user journey
* @description Cancels all active (non-completed) journey states for a user, stopping further step processing including delayed steps
*/
delete: operations["cancelUserJourney"];
options?: never;
head?: never;
patch?: never;
Expand Down Expand Up @@ -5355,6 +5359,32 @@ export interface operations {
default: components["responses"]["Error"];
};
};
cancelUserJourney: {
parameters: {
query?: never;
header?: never;
path: {
/** @description The project ID */
projectID: string;
/** @description The journey ID */
journeyID: string;
/** @description The user ID whose journey execution should be cancelled */
userID: string;
};
cookie?: never;
};
requestBody?: never;
responses: {
/** @description User journey cancelled successfully */
204: {
headers: {
[name: string]: unknown;
};
content?: never;
};
default: components["responses"]["Error"];
};
};
getJourneySteps: {
parameters: {
query?: never;
Expand Down
13 changes: 13 additions & 0 deletions internal/pubsub/consumer/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@
return err
}

if !campaign.Transactional && campaign.SubscriptionID != nil {

Check failure on line 142 in internal/pubsub/consumer/campaigns.go

View workflow job for this annotation

GitHub Actions / tests

campaign.Transactional undefined (type *management.Campaign has no field or method Transactional)

Check failure on line 142 in internal/pubsub/consumer/campaigns.go

View workflow job for this annotation

GitHub Actions / lint

campaign.Transactional undefined (type *management.Campaign has no field or method Transactional) (typecheck)

Check failure on line 142 in internal/pubsub/consumer/campaigns.go

View workflow job for this annotation

GitHub Actions / lint

campaign.Transactional undefined (type *management.Campaign has no field or method Transactional)) (typecheck)

Check failure on line 142 in internal/pubsub/consumer/campaigns.go

View workflow job for this annotation

GitHub Actions / lint

campaign.Transactional undefined (type *management.Campaign has no field or method Transactional)) (typecheck)

Check failure on line 142 in internal/pubsub/consumer/campaigns.go

View workflow job for this annotation

GitHub Actions / lint

campaign.Transactional undefined (type *management.Campaign has no field or method Transactional)) (typecheck)
unsubscribed, err := mgmt.IsUserUnsubscribed(ctx, event.UserID, *campaign.SubscriptionID)
if err != nil {
logger.Error("failed to check subscription status", zap.Error(err))
return err
}
if unsubscribed {
logger.Info("skipping send, user has unsubscribed from subscription",
zap.String("subscription_id", campaign.SubscriptionID.String()))
return nil
}
}

project, err := mgmt.GetProject(ctx, event.ProjectID)
if err != nil {
logger.Error("failed to get project", zap.Error(err))
Expand Down
35 changes: 27 additions & 8 deletions internal/store/management/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package management

import (
"context"
"database/sql"
"errors"
"fmt"
"time"

"github.com/google/uuid"
"github.com/lunogram/platform/internal/http/controllers/v1/management/oapi"
"github.com/lunogram/platform/internal/store"
)

const subscriptionStateUnsubscribed = 1

type Subscriptions []Subscription

func (s Subscriptions) OAPI() []oapi.Subscription {
Expand Down Expand Up @@ -76,18 +81,18 @@ type SubscriptionsStore struct {
}

func (s *SubscriptionsStore) GetUserSubscriptions(ctx context.Context, projectID, userID uuid.UUID, pagination store.Pagination) (UserSubscriptions, int, error) {
query := `
query := fmt.Sprintf(`
SELECT
s.id AS subscription_id,
s.name,
s.channel,
CASE WHEN us.state = 1 THEN 'unsubscribed' ELSE 'subscribed' END AS state,
CASE WHEN us.state = %d THEN 'unsubscribed' ELSE 'subscribed' END AS state,
COUNT(*) OVER () AS total_count
FROM subscriptions s
LEFT JOIN user_subscription us ON us.subscription_id = s.id AND us.user_id = $2
WHERE s.project_id = $1 AND s.is_public = true
ORDER BY s.name
LIMIT $3 OFFSET $4`
LIMIT $3 OFFSET $4`, subscriptionStateUnsubscribed)

type result struct {
SubscriptionID uuid.UUID `db:"subscription_id"`
Expand Down Expand Up @@ -122,16 +127,16 @@ func (s *SubscriptionsStore) GetUserSubscriptions(ctx context.Context, projectID
}

func (s *SubscriptionsStore) GetAllUserSubscriptions(ctx context.Context, projectID, userID uuid.UUID) (UserSubscriptions, error) {
query := `
query := fmt.Sprintf(`
SELECT
s.id AS subscription_id,
s.name,
s.channel,
CASE WHEN us.state = 1 THEN 'unsubscribed' ELSE 'subscribed' END AS state
CASE WHEN us.state = %d THEN 'unsubscribed' ELSE 'subscribed' END AS state
FROM subscriptions s
LEFT JOIN user_subscription us ON us.subscription_id = s.id AND us.user_id = $2
WHERE s.project_id = $1 AND s.is_public = true
ORDER BY s.name`
ORDER BY s.name`, subscriptionStateUnsubscribed)

var subscriptions []UserSubscription
err := s.db.SelectContext(ctx, &subscriptions, query, projectID, userID)
Expand Down Expand Up @@ -196,9 +201,9 @@ func (s *SubscriptionsStore) Unsubscribe(ctx context.Context, userID, subscripti
}

// Insert unsubscribe record
_, err = s.db.ExecContext(ctx, `
_, err = s.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO user_subscription (user_id, subscription_id, state, created_at, updated_at)
VALUES ($1, $2, 1, NOW(), NOW())`, userID, subscriptionID)
VALUES ($1, $2, %d, NOW(), NOW())`, subscriptionStateUnsubscribed), userID, subscriptionID)
return err
}

Expand All @@ -209,6 +214,20 @@ func (s *SubscriptionsStore) SetSubscriptionState(ctx context.Context, userID, s
return s.Unsubscribe(ctx, userID, subscriptionID)
}

func (s *SubscriptionsStore) IsUserUnsubscribed(ctx context.Context, userID, subscriptionID uuid.UUID) (bool, error) {
var state *int
err := s.db.GetContext(ctx, &state,
`SELECT state FROM user_subscription WHERE user_id = $1 AND subscription_id = $2`,
userID, subscriptionID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
return state != nil && *state == subscriptionStateUnsubscribed, nil
}

func (s *SubscriptionsStore) ListSubscriptions(ctx context.Context, projectID uuid.UUID, pagination store.Pagination) (Subscriptions, int, error) {
query := `
SELECT
Expand Down
34 changes: 34 additions & 0 deletions internal/store/management/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,38 @@ func TestSubscriptionsStore(t *testing.T) {
assert.GreaterOrEqual(t, total, 1)
assert.GreaterOrEqual(t, len(subs), 1)
})

t.Run("IsUserUnsubscribed", func(t *testing.T) {
subID, err := db.CreateSubscription(ctx, Subscription{
ProjectID: projectID,
Name: "Promotions",
Channel: "email",
IsPublic: true,
})
require.NoError(t, err)

userID := uuid.New()

t.Run("returns false when no record exists", func(t *testing.T) {
unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID)
require.NoError(t, err)
assert.False(t, unsubscribed)
})

t.Run("returns true after unsubscribe", func(t *testing.T) {
require.NoError(t, db.Unsubscribe(ctx, userID, subID))

unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID)
require.NoError(t, err)
assert.True(t, unsubscribed)
})

t.Run("returns false after re-subscribe", func(t *testing.T) {
require.NoError(t, db.Subscribe(ctx, userID, subID))

unsubscribed, err := db.IsUserUnsubscribed(ctx, userID, subID)
require.NoError(t, err)
assert.False(t, unsubscribed)
})
})
}
Loading