Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 69 additions & 32 deletions cmd/lunogram/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"flag"
"fmt"
"os"
"slices"
"strings"

"github.com/caarlos0/env/v10"
"github.com/cloudproud/graceful"
Expand Down Expand Up @@ -56,6 +58,16 @@ func run() error {
return err
}

for _, m := range conf.Modules {
if !slices.Contains(config.ValidModules, m) {
valid := make([]string, len(config.ValidModules))
for i, v := range config.ValidModules {
valid[i] = fmt.Sprintf("%q", v)
}
return fmt.Errorf("invalid module %q, valid modules are: %s", m, strings.Join(valid, ", "))
}
}

if migrate || conf.DatabaseMigrate {
logger.Info("running database migrations...")
logger.Info("running management migration")
Expand Down Expand Up @@ -115,56 +127,81 @@ func run() error {

logger.Info("initializing provider registry")

providersRegisrtry, err := providers.NewRegistry(ctx, conf.WASM, logger)
if err != nil {
return err
var providersRegisrtry *providers.Registry
if slices.Contains(conf.Modules, "wasm") {
logger.Info("wasm module is enabled, initializing wasm provider registry")

providersRegisrtry, err = providers.NewRegistry(ctx, conf.WASM, logger)
if err != nil {
return err
}
defer providersRegisrtry.Close(ctx)
} else {
logger.Info("wasm module is disabled, skipping wasm provider registry initialization")
}
defer providersRegisrtry.Close(ctx)

logger.Info("initializing action registry")
var actionRegistry *actions.Registry
if slices.Contains(conf.Modules, "wasm") {
logger.Info("initializing action registry")

actionRegistry, err := actions.NewRegistry(ctx, conf.WASM, logger)
if err != nil {
return err
actionRegistry, err = actions.NewRegistry(ctx, conf.WASM, logger)
if err != nil {
return err
}
defer actionRegistry.Close(ctx)
} else {
logger.Info("wasm module is disabled, skipping action registry initialization")
}
defer actionRegistry.Close(ctx)

pub := pubsub.NewPublisher(jet, conf.Nats.Namespace)
req := pubsub.NewCaller(jet, conf.Nats.Namespace)
ns := consumer.Namespace(conf.Nats.Namespace)
consumer.Serve(ctx, jet, logger, ns, db, managementStore, usersStore, journeyStore, providersRegisrtry, actionRegistry, req, conf.PublicURL)
consumer.Serve(ctx, jet, logger, ns, db, managementStore, usersStore, journeyStore, providersRegisrtry, actionRegistry, req, conf.PublicURL, conf)

logger.Info("initializing cluster")
if slices.Contains(conf.Modules, "scheduler") {
logger.Info("starting scheduler")

sched := scheduler.NewController(ctx, logger, conf, journeyStore, pub)
lead := leader.NewHandler(sched)
cons, err := consensus.NewCluster(ctx, logger, conf)
if err != nil {
return err
}
sched := scheduler.NewController(ctx, logger, conf, journeyStore, pub)
lead := leader.NewHandler(sched)
cons, err := consensus.NewCluster(ctx, logger, conf)
if err != nil {
return err
}

_, err = cluster.NewNode(ctx, logger, conf, cons, lead)
if err != nil {
return err
_, err = cluster.NewNode(ctx, logger, conf, cons, lead)
if err != nil {
return err
}
} else {
logger.Info("scheduler module is disabled, skipping cluster initialization")
}

logger.Info("initializing rbac engine")
var rbacEngine *rbac.Engine
if slices.Contains(conf.Modules, "http") {
logger.Info("initializing rbac engine")

rbacEngine, err := rbac.NewEngine(ctx, conf.RBAC)
if err != nil {
return fmt.Errorf("failed to initialize rbac engine: %w", err)
rbacEngine, err = rbac.NewEngine(ctx, conf.RBAC)
if err != nil {
return fmt.Errorf("failed to initialize rbac engine: %w", err)
}
defer rbacEngine.Close()
} else {
logger.Info("http module is disabled, skipping rbac engine initialization")
}
defer rbacEngine.Close()

logger.Info("starting http server")
if slices.Contains(conf.Modules, "http") {
logger.Info("starting http server")

server, err := v1.NewServer(ctx, logger, conf, db, bucket, jet, pub, req, providersRegisrtry, actionRegistry, rbacEngine)
if err != nil {
return err
}
server, err := v1.NewServer(ctx, logger, conf, db, bucket, jet, pub, req, providersRegisrtry, actionRegistry, rbacEngine)
if err != nil {
return err
}

logger.Info("serving http server")
go server.Serve(ctx, conf.HTTPAddress)
logger.Info("serving http server")
go server.Serve(ctx, conf.HTTPAddress)
} else {
logger.Info("http module is disabled, skipping http server startup")
}

logger.Info("service up and running!")
ctx.AwaitKillSignal()
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ include:

services:
lunogram:
env_file:
- .env
build:
context: .
dockerfile: Dockerfile
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ type Node struct {
HTTP http.Config
Store store.Config
Storage storage.Config
Modules []string `env:"MODULES" envSeparator:"," envDefault:"http,consumers,wasm,scheduler"`
}

var ValidModules = []string{"http", "consumers", "wasm", "scheduler"}

type Auth struct {
Driver string `env:"DRIVER"`
JWTSecret string `env:"JWT_SECRET"`
Expand Down
50 changes: 33 additions & 17 deletions internal/pubsub/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package consumer

import (
"slices"

"github.com/cloudproud/graceful"
"github.com/lunogram/platform/internal/actions"
"github.com/lunogram/platform/internal/config"
"github.com/lunogram/platform/internal/providers"
"github.com/lunogram/platform/internal/pubsub"
"github.com/lunogram/platform/internal/store"
Expand Down Expand Up @@ -54,25 +57,38 @@ const (
)

// Serve starts all JetStream consumers and registers their handlers.
func Serve(ctx graceful.Context, jet jetstream.JetStream, logger *zap.Logger, ns Namespace, db *store.Connections, mgmt *management.State, usrs *subjects.State, jrny *journey.State, registry *providers.Registry, actionRegistry *actions.Registry, caller pubsub.Caller, publicURL string) {
func Serve(ctx graceful.Context, jet jetstream.JetStream, logger *zap.Logger, ns Namespace, db *store.Connections, mgmt *management.State, usrs *subjects.State, jrny *journey.State, registry *providers.Registry, actionRegistry *actions.Registry, caller pubsub.Caller, publicURL string, conf config.Node) {
pub := pubsub.NewPublisher(jet, string(ns))
renderer := pubsub.NewEmailRenderer(caller)
router := NewRouter(ctx, jet, logger)

router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersProcess), UsersHandler(logger, usrs, pub))
router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersSchema), UserSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsProcess), UserEventsHandler(logger, usrs, jrny, pub))
router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsSchema), UserEventSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamLists), ns.Consumer(ConsumerListsRecompute), RecomputeListHandler(logger, usrs, pub))
router.HandleStream(ns.Stream(StreamJourneys), ns.Consumer(ConsumerJourneysAdvance), JourneyStepHandler(logger, db.Subjects, jrny, mgmt, pub, actionRegistry))
router.HandleStream(ns.Stream(StreamCampaigns), ns.Consumer(ConsumerCampaignsSend), CampaignsSendHandler(logger, mgmt, usrs, registry, renderer, publicURL))
router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsProcess), OrganizationsHandler(logger, usrs, pub))
router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsSchema), OrganizationSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersProcess), OrganizationUsersHandler(logger, usrs, pub))
router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersSchema), OrganizationUserSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsProcess), OrganizationEventsHandler(logger, usrs, jrny, pub))
router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsSchema), OrganizationEventSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamActions), ns.Consumer(ConsumerActionsSchema), ActionSchemasHandler(logger, usrs))
router.HandleCaller(ns.Subject(SubjectActionsExecute), ActionExecuteHandler(logger, actionRegistry, pub))
router.HandleCaller(ns.Subject(SubjectActionsValidate), ActionValidateHandler(logger, actionRegistry))
if slices.Contains(conf.Modules, "consumers") {
logger.Info("starting pub/sub consumers")

router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersProcess), UsersHandler(logger, usrs, pub))
router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersSchema), UserSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsProcess), UserEventsHandler(logger, usrs, jrny, pub))
router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsSchema), UserEventSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamLists), ns.Consumer(ConsumerListsRecompute), RecomputeListHandler(logger, usrs, pub))
router.HandleStream(ns.Stream(StreamJourneys), ns.Consumer(ConsumerJourneysAdvance), JourneyStepHandler(logger, db.Subjects, jrny, mgmt, pub, actionRegistry))
router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsProcess), OrganizationsHandler(logger, usrs, pub))
router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsSchema), OrganizationSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersProcess), OrganizationUsersHandler(logger, usrs, pub))
router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersSchema), OrganizationUserSchemasHandler(logger, usrs))
router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsProcess), OrganizationEventsHandler(logger, usrs, jrny, pub))
router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsSchema), OrganizationEventSchemasHandler(logger, usrs))
} else {
logger.Info("skipping pub/sub consumers")
}

if slices.Contains(conf.Modules, "wasm") {
logger.Info("starting wasm consumers")

router.HandleStream(ns.Stream(StreamCampaigns), ns.Consumer(ConsumerCampaignsSend), CampaignsSendHandler(logger, mgmt, usrs, registry, renderer, publicURL))
router.HandleStream(ns.Stream(StreamActions), ns.Consumer(ConsumerActionsSchema), ActionSchemasHandler(logger, usrs))
router.HandleCaller(ns.Subject(SubjectActionsExecute), ActionExecuteHandler(logger, actionRegistry, pub))
router.HandleCaller(ns.Subject(SubjectActionsValidate), ActionValidateHandler(logger, actionRegistry))
} else {
logger.Info("skipping wasm consumers")
}
}
Loading