From c15e0a9bc0e4650f9b149649e9c807f044fadb9e Mon Sep 17 00:00:00 2001 From: IAmKirbki Date: Mon, 16 Mar 2026 10:19:00 +0100 Subject: [PATCH 1/3] feat: add ability to enable or disable specific modules at runtime --- cmd/lunogram/main.go | 74 ++++++++++++++++++---------- internal/config/config.go | 23 ++++----- internal/pubsub/consumer/consumer.go | 50 ++++++++++++------- 3 files changed, 94 insertions(+), 53 deletions(-) diff --git a/cmd/lunogram/main.go b/cmd/lunogram/main.go index 518f352f..d3d56e7c 100644 --- a/cmd/lunogram/main.go +++ b/cmd/lunogram/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "os" + "slices" "github.com/caarlos0/env/v10" "github.com/cloudproud/graceful" @@ -24,6 +25,7 @@ import ( "github.com/lunogram/platform/internal/store/journey" "github.com/lunogram/platform/internal/store/management" "github.com/lunogram/platform/internal/store/subjects" + "github.com/nats-io/nats.go/jetstream" "go.uber.org/zap" ) @@ -103,9 +105,16 @@ func run() error { logger.Info("initializing pubsub...") - jet, err := pubsub.New(ctx, conf) - if err != nil { - return err + var jet jetstream.JetStream + if slices.Contains(conf.EnabledModules, "consumers") || slices.Contains(conf.EnabledModules, "wasm") || slices.Contains(conf.EnabledModules, "scheduler") { + logger.Info("pubsub module is enabled, initializing pubsub connection") + + jet, err = pubsub.New(ctx, conf) + if err != nil { + return err + } + } else { + logger.Info("pubsub module is disabled, skipping pubsub initialization") } err = consumer.Bootstrap(ctx, logger, jet, consumer.Namespace(conf.Nats.Namespace)) @@ -115,11 +124,18 @@ func run() error { logger.Info("initializing provider registry") - providersRegisrtry, err := providers.NewRegistry(ctx, conf.WASM, logger) - if err != nil { - return err + var providersRegisrtry *providers.Registry + if slices.Contains(conf.EnabledModules, "wasm") { + logger.Info("wasm module is enabled, initializing wasm provider registry") + + providersRegisrtry, err := providers.NewRegistry(ctx, conf.WASM, logger) + if err != nil { + return err + } + defer providersRegisrtry.Close(ctx) + } else { + logger.Info("wasm module is disabled, skipping wasm provider registry initialization") } - defer providersRegisrtry.Close(ctx) logger.Info("initializing action registry") @@ -132,20 +148,24 @@ func run() error { pub := pubsub.NewPublisher(jet, conf.Nats.Namespace) req := pubsub.NewCaller(jet, conf.Nats.Namespace) ns := consumer.Namespace(conf.Nats.Namespace) - consumer.Serve(ctx, jet, logger, ns, db, managementStore, usersStore, journeyStore, providersRegisrtry, actionRegistry, req, conf.PublicURL) + consumer.Serve(ctx, jet, logger, ns, db, managementStore, usersStore, journeyStore, providersRegisrtry, actionRegistry, req, conf.PublicURL, conf) - logger.Info("initializing cluster") + if slices.Contains(conf.EnabledModules, "scheduler") { + logger.Info("starting scheduler") - sched := scheduler.NewController(ctx, logger, conf, journeyStore, pub) - lead := leader.NewHandler(sched) - cons, err := consensus.NewCluster(ctx, logger, conf) - if err != nil { - return err - } + sched := scheduler.NewController(ctx, logger, conf, journeyStore, pub) + lead := leader.NewHandler(sched) + cons, err := consensus.NewCluster(ctx, logger, conf) + if err != nil { + return err + } - _, err = cluster.NewNode(ctx, logger, conf, cons, lead) - if err != nil { - return err + _, err = cluster.NewNode(ctx, logger, conf, cons, lead) + if err != nil { + return err + } + } else { + logger.Info("scheduler module is disabled, skipping cluster initialization") } logger.Info("initializing rbac engine") @@ -156,15 +176,19 @@ func run() error { } defer rbacEngine.Close() - logger.Info("starting http server") + if slices.Contains(conf.EnabledModules, "http") { + logger.Info("starting http server") - server, err := v1.NewServer(ctx, logger, conf, db, bucket, jet, pub, req, providersRegisrtry, actionRegistry, rbacEngine) - if err != nil { - return err - } + server, err := v1.NewServer(ctx, logger, conf, db, bucket, jet, pub, req, providersRegisrtry, actionRegistry, rbacEngine) + if err != nil { + return err + } - logger.Info("serving http server") - go server.Serve(ctx, conf.HTTPAddress) + logger.Info("serving http server") + go server.Serve(ctx, conf.HTTPAddress) + } else { + logger.Info("http module is disabled, skipping http server startup") + } logger.Info("service up and running!") ctx.AwaitKillSignal() diff --git a/internal/config/config.go b/internal/config/config.go index deed677d..36945693 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,17 +15,18 @@ type Node struct { HTTPAddress string `env:"HTTP_ADDRESS" envDefault:":8080"` DatabaseMigrate bool `env:"DATABASE_MIGRATE" envDefault:"true"` - PublicURL string `env:"PUBLIC_URL" envDefault:"http://localhost:8080"` - Redis Redis `envPrefix:"REDIS_"` - Cluster Cluster `envPrefix:"CLUSTER_"` - Auth Auth `envPrefix:"AUTH_"` - Nats Nats `envPrefix:"NATS_"` - WASM WASM `envPrefix:"WASM_"` - Webhook Webhook `envPrefix:"WEBHOOK_"` - RBAC rbac.Config `envPrefix:"RBAC_"` - HTTP http.Config - Store store.Config - Storage storage.Config + PublicURL string `env:"PUBLIC_URL" envDefault:"http://localhost:8080"` + Redis Redis `envPrefix:"REDIS_"` + Cluster Cluster `envPrefix:"CLUSTER_"` + Auth Auth `envPrefix:"AUTH_"` + Nats Nats `envPrefix:"NATS_"` + WASM WASM `envPrefix:"WASM_"` + Webhook Webhook `envPrefix:"WEBHOOK_"` + RBAC rbac.Config `envPrefix:"RBAC_"` + HTTP http.Config + Store store.Config + Storage storage.Config + EnabledModules []string `env:"MODULES" envSeparator:"," envDefault:"http,consumers,wasm,scheduler"` } type Auth struct { diff --git a/internal/pubsub/consumer/consumer.go b/internal/pubsub/consumer/consumer.go index 3a79ceb5..5c7c96b0 100644 --- a/internal/pubsub/consumer/consumer.go +++ b/internal/pubsub/consumer/consumer.go @@ -1,8 +1,11 @@ package consumer import ( + "slices" + "github.com/cloudproud/graceful" "github.com/lunogram/platform/internal/actions" + "github.com/lunogram/platform/internal/config" "github.com/lunogram/platform/internal/providers" "github.com/lunogram/platform/internal/pubsub" "github.com/lunogram/platform/internal/store" @@ -54,25 +57,38 @@ const ( ) // Serve starts all JetStream consumers and registers their handlers. -func Serve(ctx graceful.Context, jet jetstream.JetStream, logger *zap.Logger, ns Namespace, db *store.Connections, mgmt *management.State, usrs *subjects.State, jrny *journey.State, registry *providers.Registry, actionRegistry *actions.Registry, caller pubsub.Caller, publicURL string) { +func Serve(ctx graceful.Context, jet jetstream.JetStream, logger *zap.Logger, ns Namespace, db *store.Connections, mgmt *management.State, usrs *subjects.State, jrny *journey.State, registry *providers.Registry, actionRegistry *actions.Registry, caller pubsub.Caller, publicURL string, conf config.Node) { pub := pubsub.NewPublisher(jet, string(ns)) renderer := pubsub.NewEmailRenderer(caller) router := NewRouter(ctx, jet, logger) - router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersProcess), UsersHandler(logger, usrs, pub)) - router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersSchema), UserSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsProcess), UserEventsHandler(logger, usrs, jrny, pub)) - router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsSchema), UserEventSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamLists), ns.Consumer(ConsumerListsRecompute), RecomputeListHandler(logger, usrs, pub)) - router.HandleStream(ns.Stream(StreamJourneys), ns.Consumer(ConsumerJourneysAdvance), JourneyStepHandler(logger, db.Subjects, jrny, mgmt, pub, actionRegistry)) - router.HandleStream(ns.Stream(StreamCampaigns), ns.Consumer(ConsumerCampaignsSend), CampaignsSendHandler(logger, mgmt, usrs, registry, renderer, publicURL)) - router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsProcess), OrganizationsHandler(logger, usrs, pub)) - router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsSchema), OrganizationSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersProcess), OrganizationUsersHandler(logger, usrs, pub)) - router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersSchema), OrganizationUserSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsProcess), OrganizationEventsHandler(logger, usrs, jrny, pub)) - router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsSchema), OrganizationEventSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamActions), ns.Consumer(ConsumerActionsSchema), ActionSchemasHandler(logger, usrs)) - router.HandleCaller(ns.Subject(SubjectActionsExecute), ActionExecuteHandler(logger, actionRegistry, pub)) - router.HandleCaller(ns.Subject(SubjectActionsValidate), ActionValidateHandler(logger, actionRegistry)) + if slices.Contains(conf.EnabledModules, "consumers") { + logger.Info("starting pub/sub consumers") + + router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersProcess), UsersHandler(logger, usrs, pub)) + router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersSchema), UserSchemasHandler(logger, usrs)) + router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsProcess), UserEventsHandler(logger, usrs, jrny, pub)) + router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsSchema), UserEventSchemasHandler(logger, usrs)) + router.HandleStream(ns.Stream(StreamLists), ns.Consumer(ConsumerListsRecompute), RecomputeListHandler(logger, usrs, pub)) + router.HandleStream(ns.Stream(StreamJourneys), ns.Consumer(ConsumerJourneysAdvance), JourneyStepHandler(logger, db.Subjects, jrny, mgmt, pub, actionRegistry)) + router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsProcess), OrganizationsHandler(logger, usrs, pub)) + router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsSchema), OrganizationSchemasHandler(logger, usrs)) + router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersProcess), OrganizationUsersHandler(logger, usrs, pub)) + router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersSchema), OrganizationUserSchemasHandler(logger, usrs)) + router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsProcess), OrganizationEventsHandler(logger, usrs, jrny, pub)) + router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsSchema), OrganizationEventSchemasHandler(logger, usrs)) + } else { + logger.Info("skipping pub/sub consumers") + } + + if slices.Contains(conf.EnabledModules, "wasm") { + logger.Info("starting wasm consumers") + + router.HandleStream(ns.Stream(StreamCampaigns), ns.Consumer(ConsumerCampaignsSend), CampaignsSendHandler(logger, mgmt, usrs, registry, renderer, publicURL)) + router.HandleStream(ns.Stream(StreamActions), ns.Consumer(ConsumerActionsSchema), ActionSchemasHandler(logger, usrs)) + router.HandleCaller(ns.Subject(SubjectActionsExecute), ActionExecuteHandler(logger, actionRegistry, pub)) + router.HandleCaller(ns.Subject(SubjectActionsValidate), ActionValidateHandler(logger, actionRegistry)) + } else { + logger.Info("skipping wasm consumers") + } } From abe7050d1794d19c73e4046cfb5bc385f56d4850 Mon Sep 17 00:00:00 2001 From: IAmKirbki Date: Mon, 16 Mar 2026 11:03:24 +0100 Subject: [PATCH 2/3] feat: refactor module handling to support independent module execution --- cmd/lunogram/main.go | 20 ++++++-------------- docker-compose.yml | 2 ++ internal/config/config.go | 24 ++++++++++++------------ internal/pubsub/consumer/consumer.go | 4 ++-- 4 files changed, 22 insertions(+), 28 deletions(-) diff --git a/cmd/lunogram/main.go b/cmd/lunogram/main.go index d3d56e7c..a3934644 100644 --- a/cmd/lunogram/main.go +++ b/cmd/lunogram/main.go @@ -25,7 +25,6 @@ import ( "github.com/lunogram/platform/internal/store/journey" "github.com/lunogram/platform/internal/store/management" "github.com/lunogram/platform/internal/store/subjects" - "github.com/nats-io/nats.go/jetstream" "go.uber.org/zap" ) @@ -105,16 +104,9 @@ func run() error { logger.Info("initializing pubsub...") - var jet jetstream.JetStream - if slices.Contains(conf.EnabledModules, "consumers") || slices.Contains(conf.EnabledModules, "wasm") || slices.Contains(conf.EnabledModules, "scheduler") { - logger.Info("pubsub module is enabled, initializing pubsub connection") - - jet, err = pubsub.New(ctx, conf) - if err != nil { - return err - } - } else { - logger.Info("pubsub module is disabled, skipping pubsub initialization") + jet, err := pubsub.New(ctx, conf) + if err != nil { + return err } err = consumer.Bootstrap(ctx, logger, jet, consumer.Namespace(conf.Nats.Namespace)) @@ -125,7 +117,7 @@ func run() error { logger.Info("initializing provider registry") var providersRegisrtry *providers.Registry - if slices.Contains(conf.EnabledModules, "wasm") { + if slices.Contains(conf.Modules, "wasm") { logger.Info("wasm module is enabled, initializing wasm provider registry") providersRegisrtry, err := providers.NewRegistry(ctx, conf.WASM, logger) @@ -150,7 +142,7 @@ func run() error { ns := consumer.Namespace(conf.Nats.Namespace) consumer.Serve(ctx, jet, logger, ns, db, managementStore, usersStore, journeyStore, providersRegisrtry, actionRegistry, req, conf.PublicURL, conf) - if slices.Contains(conf.EnabledModules, "scheduler") { + if slices.Contains(conf.Modules, "scheduler") { logger.Info("starting scheduler") sched := scheduler.NewController(ctx, logger, conf, journeyStore, pub) @@ -176,7 +168,7 @@ func run() error { } defer rbacEngine.Close() - if slices.Contains(conf.EnabledModules, "http") { + if slices.Contains(conf.Modules, "http") { logger.Info("starting http server") server, err := v1.NewServer(ctx, logger, conf, db, bucket, jet, pub, req, providersRegisrtry, actionRegistry, rbacEngine) diff --git a/docker-compose.yml b/docker-compose.yml index c89c06b2..1349b578 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,8 @@ include: services: lunogram: + env_file: + - .env build: context: . dockerfile: Dockerfile diff --git a/internal/config/config.go b/internal/config/config.go index 36945693..2d41249a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,18 +15,18 @@ type Node struct { HTTPAddress string `env:"HTTP_ADDRESS" envDefault:":8080"` DatabaseMigrate bool `env:"DATABASE_MIGRATE" envDefault:"true"` - PublicURL string `env:"PUBLIC_URL" envDefault:"http://localhost:8080"` - Redis Redis `envPrefix:"REDIS_"` - Cluster Cluster `envPrefix:"CLUSTER_"` - Auth Auth `envPrefix:"AUTH_"` - Nats Nats `envPrefix:"NATS_"` - WASM WASM `envPrefix:"WASM_"` - Webhook Webhook `envPrefix:"WEBHOOK_"` - RBAC rbac.Config `envPrefix:"RBAC_"` - HTTP http.Config - Store store.Config - Storage storage.Config - EnabledModules []string `env:"MODULES" envSeparator:"," envDefault:"http,consumers,wasm,scheduler"` + PublicURL string `env:"PUBLIC_URL" envDefault:"http://localhost:8080"` + Redis Redis `envPrefix:"REDIS_"` + Cluster Cluster `envPrefix:"CLUSTER_"` + Auth Auth `envPrefix:"AUTH_"` + Nats Nats `envPrefix:"NATS_"` + WASM WASM `envPrefix:"WASM_"` + Webhook Webhook `envPrefix:"WEBHOOK_"` + RBAC rbac.Config `envPrefix:"RBAC_"` + HTTP http.Config + Store store.Config + Storage storage.Config + Modules []string `env:"MODULES" envSeparator:"," envDefault:"http,consumers,wasm,scheduler"` } type Auth struct { diff --git a/internal/pubsub/consumer/consumer.go b/internal/pubsub/consumer/consumer.go index 5c7c96b0..ab30d2f7 100644 --- a/internal/pubsub/consumer/consumer.go +++ b/internal/pubsub/consumer/consumer.go @@ -62,7 +62,7 @@ func Serve(ctx graceful.Context, jet jetstream.JetStream, logger *zap.Logger, ns renderer := pubsub.NewEmailRenderer(caller) router := NewRouter(ctx, jet, logger) - if slices.Contains(conf.EnabledModules, "consumers") { + if slices.Contains(conf.Modules, "consumers") { logger.Info("starting pub/sub consumers") router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersProcess), UsersHandler(logger, usrs, pub)) @@ -81,7 +81,7 @@ func Serve(ctx graceful.Context, jet jetstream.JetStream, logger *zap.Logger, ns logger.Info("skipping pub/sub consumers") } - if slices.Contains(conf.EnabledModules, "wasm") { + if slices.Contains(conf.Modules, "wasm") { logger.Info("starting wasm consumers") router.HandleStream(ns.Stream(StreamCampaigns), ns.Consumer(ConsumerCampaignsSend), CampaignsSendHandler(logger, mgmt, usrs, registry, renderer, publicURL)) From eb1fbd9b4a5700d41eaf2032c5d3a3ed9ac1348f Mon Sep 17 00:00:00 2001 From: IAmKirbki Date: Mon, 16 Mar 2026 12:26:48 +0100 Subject: [PATCH 3/3] feat: validate module names against a predefined list for independent execution --- cmd/lunogram/main.go | 43 +++++++++++++++++++++++++++++---------- internal/config/config.go | 2 ++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/cmd/lunogram/main.go b/cmd/lunogram/main.go index a3934644..3acb481a 100644 --- a/cmd/lunogram/main.go +++ b/cmd/lunogram/main.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "slices" + "strings" "github.com/caarlos0/env/v10" "github.com/cloudproud/graceful" @@ -57,6 +58,16 @@ func run() error { return err } + for _, m := range conf.Modules { + if !slices.Contains(config.ValidModules, m) { + valid := make([]string, len(config.ValidModules)) + for i, v := range config.ValidModules { + valid[i] = fmt.Sprintf("%q", v) + } + return fmt.Errorf("invalid module %q, valid modules are: %s", m, strings.Join(valid, ", ")) + } + } + if migrate || conf.DatabaseMigrate { logger.Info("running database migrations...") logger.Info("running management migration") @@ -120,7 +131,7 @@ func run() error { if slices.Contains(conf.Modules, "wasm") { logger.Info("wasm module is enabled, initializing wasm provider registry") - providersRegisrtry, err := providers.NewRegistry(ctx, conf.WASM, logger) + providersRegisrtry, err = providers.NewRegistry(ctx, conf.WASM, logger) if err != nil { return err } @@ -129,13 +140,18 @@ func run() error { logger.Info("wasm module is disabled, skipping wasm provider registry initialization") } - logger.Info("initializing action registry") + var actionRegistry *actions.Registry + if slices.Contains(conf.Modules, "wasm") { + logger.Info("initializing action registry") - actionRegistry, err := actions.NewRegistry(ctx, conf.WASM, logger) - if err != nil { - return err + actionRegistry, err = actions.NewRegistry(ctx, conf.WASM, logger) + if err != nil { + return err + } + defer actionRegistry.Close(ctx) + } else { + logger.Info("wasm module is disabled, skipping action registry initialization") } - defer actionRegistry.Close(ctx) pub := pubsub.NewPublisher(jet, conf.Nats.Namespace) req := pubsub.NewCaller(jet, conf.Nats.Namespace) @@ -160,13 +176,18 @@ func run() error { logger.Info("scheduler module is disabled, skipping cluster initialization") } - logger.Info("initializing rbac engine") + var rbacEngine *rbac.Engine + if slices.Contains(conf.Modules, "http") { + logger.Info("initializing rbac engine") - rbacEngine, err := rbac.NewEngine(ctx, conf.RBAC) - if err != nil { - return fmt.Errorf("failed to initialize rbac engine: %w", err) + rbacEngine, err = rbac.NewEngine(ctx, conf.RBAC) + if err != nil { + return fmt.Errorf("failed to initialize rbac engine: %w", err) + } + defer rbacEngine.Close() + } else { + logger.Info("http module is disabled, skipping rbac engine initialization") } - defer rbacEngine.Close() if slices.Contains(conf.Modules, "http") { logger.Info("starting http server") diff --git a/internal/config/config.go b/internal/config/config.go index 2d41249a..6ddca705 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,6 +29,8 @@ type Node struct { Modules []string `env:"MODULES" envSeparator:"," envDefault:"http,consumers,wasm,scheduler"` } +var ValidModules = []string{"http", "consumers", "wasm", "scheduler"} + type Auth struct { Driver string `env:"DRIVER"` JWTSecret string `env:"JWT_SECRET"`