diff --git a/cmd/lunogram/main.go b/cmd/lunogram/main.go index 518f352f..3acb481a 100644 --- a/cmd/lunogram/main.go +++ b/cmd/lunogram/main.go @@ -5,6 +5,8 @@ import ( "flag" "fmt" "os" + "slices" + "strings" "github.com/caarlos0/env/v10" "github.com/cloudproud/graceful" @@ -56,6 +58,16 @@ func run() error { return err } + for _, m := range conf.Modules { + if !slices.Contains(config.ValidModules, m) { + valid := make([]string, len(config.ValidModules)) + for i, v := range config.ValidModules { + valid[i] = fmt.Sprintf("%q", v) + } + return fmt.Errorf("invalid module %q, valid modules are: %s", m, strings.Join(valid, ", ")) + } + } + if migrate || conf.DatabaseMigrate { logger.Info("running database migrations...") logger.Info("running management migration") @@ -115,56 +127,81 @@ func run() error { logger.Info("initializing provider registry") - providersRegisrtry, err := providers.NewRegistry(ctx, conf.WASM, logger) - if err != nil { - return err + var providersRegisrtry *providers.Registry + if slices.Contains(conf.Modules, "wasm") { + logger.Info("wasm module is enabled, initializing wasm provider registry") + + providersRegisrtry, err = providers.NewRegistry(ctx, conf.WASM, logger) + if err != nil { + return err + } + defer providersRegisrtry.Close(ctx) + } else { + logger.Info("wasm module is disabled, skipping wasm provider registry initialization") } - defer providersRegisrtry.Close(ctx) - logger.Info("initializing action registry") + var actionRegistry *actions.Registry + if slices.Contains(conf.Modules, "wasm") { + logger.Info("initializing action registry") - actionRegistry, err := actions.NewRegistry(ctx, conf.WASM, logger) - if err != nil { - return err + actionRegistry, err = actions.NewRegistry(ctx, conf.WASM, logger) + if err != nil { + return err + } + defer actionRegistry.Close(ctx) + } else { + logger.Info("wasm module is disabled, skipping action registry initialization") } - defer actionRegistry.Close(ctx) pub := pubsub.NewPublisher(jet, conf.Nats.Namespace) req := pubsub.NewCaller(jet, conf.Nats.Namespace) ns := consumer.Namespace(conf.Nats.Namespace) - consumer.Serve(ctx, jet, logger, ns, db, managementStore, usersStore, journeyStore, providersRegisrtry, actionRegistry, req, conf.PublicURL) + consumer.Serve(ctx, jet, logger, ns, db, managementStore, usersStore, journeyStore, providersRegisrtry, actionRegistry, req, conf.PublicURL, conf) - logger.Info("initializing cluster") + if slices.Contains(conf.Modules, "scheduler") { + logger.Info("starting scheduler") - sched := scheduler.NewController(ctx, logger, conf, journeyStore, pub) - lead := leader.NewHandler(sched) - cons, err := consensus.NewCluster(ctx, logger, conf) - if err != nil { - return err - } + sched := scheduler.NewController(ctx, logger, conf, journeyStore, pub) + lead := leader.NewHandler(sched) + cons, err := consensus.NewCluster(ctx, logger, conf) + if err != nil { + return err + } - _, err = cluster.NewNode(ctx, logger, conf, cons, lead) - if err != nil { - return err + _, err = cluster.NewNode(ctx, logger, conf, cons, lead) + if err != nil { + return err + } + } else { + logger.Info("scheduler module is disabled, skipping cluster initialization") } - logger.Info("initializing rbac engine") + var rbacEngine *rbac.Engine + if slices.Contains(conf.Modules, "http") { + logger.Info("initializing rbac engine") - rbacEngine, err := rbac.NewEngine(ctx, conf.RBAC) - if err != nil { - return fmt.Errorf("failed to initialize rbac engine: %w", err) + rbacEngine, err = rbac.NewEngine(ctx, conf.RBAC) + if err != nil { + return fmt.Errorf("failed to initialize rbac engine: %w", err) + } + defer rbacEngine.Close() + } else { + logger.Info("http module is disabled, skipping rbac engine initialization") } - defer rbacEngine.Close() - logger.Info("starting http server") + if slices.Contains(conf.Modules, "http") { + logger.Info("starting http server") - server, err := v1.NewServer(ctx, logger, conf, db, bucket, jet, pub, req, providersRegisrtry, actionRegistry, rbacEngine) - if err != nil { - return err - } + server, err := v1.NewServer(ctx, logger, conf, db, bucket, jet, pub, req, providersRegisrtry, actionRegistry, rbacEngine) + if err != nil { + return err + } - logger.Info("serving http server") - go server.Serve(ctx, conf.HTTPAddress) + logger.Info("serving http server") + go server.Serve(ctx, conf.HTTPAddress) + } else { + logger.Info("http module is disabled, skipping http server startup") + } logger.Info("service up and running!") ctx.AwaitKillSignal() diff --git a/docker-compose.yml b/docker-compose.yml index c89c06b2..1349b578 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,8 @@ include: services: lunogram: + env_file: + - .env build: context: . dockerfile: Dockerfile diff --git a/internal/config/config.go b/internal/config/config.go index deed677d..6ddca705 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,8 +26,11 @@ type Node struct { HTTP http.Config Store store.Config Storage storage.Config + Modules []string `env:"MODULES" envSeparator:"," envDefault:"http,consumers,wasm,scheduler"` } +var ValidModules = []string{"http", "consumers", "wasm", "scheduler"} + type Auth struct { Driver string `env:"DRIVER"` JWTSecret string `env:"JWT_SECRET"` diff --git a/internal/pubsub/consumer/consumer.go b/internal/pubsub/consumer/consumer.go index 3a79ceb5..ab30d2f7 100644 --- a/internal/pubsub/consumer/consumer.go +++ b/internal/pubsub/consumer/consumer.go @@ -1,8 +1,11 @@ package consumer import ( + "slices" + "github.com/cloudproud/graceful" "github.com/lunogram/platform/internal/actions" + "github.com/lunogram/platform/internal/config" "github.com/lunogram/platform/internal/providers" "github.com/lunogram/platform/internal/pubsub" "github.com/lunogram/platform/internal/store" @@ -54,25 +57,38 @@ const ( ) // Serve starts all JetStream consumers and registers their handlers. -func Serve(ctx graceful.Context, jet jetstream.JetStream, logger *zap.Logger, ns Namespace, db *store.Connections, mgmt *management.State, usrs *subjects.State, jrny *journey.State, registry *providers.Registry, actionRegistry *actions.Registry, caller pubsub.Caller, publicURL string) { +func Serve(ctx graceful.Context, jet jetstream.JetStream, logger *zap.Logger, ns Namespace, db *store.Connections, mgmt *management.State, usrs *subjects.State, jrny *journey.State, registry *providers.Registry, actionRegistry *actions.Registry, caller pubsub.Caller, publicURL string, conf config.Node) { pub := pubsub.NewPublisher(jet, string(ns)) renderer := pubsub.NewEmailRenderer(caller) router := NewRouter(ctx, jet, logger) - router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersProcess), UsersHandler(logger, usrs, pub)) - router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersSchema), UserSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsProcess), UserEventsHandler(logger, usrs, jrny, pub)) - router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsSchema), UserEventSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamLists), ns.Consumer(ConsumerListsRecompute), RecomputeListHandler(logger, usrs, pub)) - router.HandleStream(ns.Stream(StreamJourneys), ns.Consumer(ConsumerJourneysAdvance), JourneyStepHandler(logger, db.Subjects, jrny, mgmt, pub, actionRegistry)) - router.HandleStream(ns.Stream(StreamCampaigns), ns.Consumer(ConsumerCampaignsSend), CampaignsSendHandler(logger, mgmt, usrs, registry, renderer, publicURL)) - router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsProcess), OrganizationsHandler(logger, usrs, pub)) - router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsSchema), OrganizationSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersProcess), OrganizationUsersHandler(logger, usrs, pub)) - router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersSchema), OrganizationUserSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsProcess), OrganizationEventsHandler(logger, usrs, jrny, pub)) - router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsSchema), OrganizationEventSchemasHandler(logger, usrs)) - router.HandleStream(ns.Stream(StreamActions), ns.Consumer(ConsumerActionsSchema), ActionSchemasHandler(logger, usrs)) - router.HandleCaller(ns.Subject(SubjectActionsExecute), ActionExecuteHandler(logger, actionRegistry, pub)) - router.HandleCaller(ns.Subject(SubjectActionsValidate), ActionValidateHandler(logger, actionRegistry)) + if slices.Contains(conf.Modules, "consumers") { + logger.Info("starting pub/sub consumers") + + router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersProcess), UsersHandler(logger, usrs, pub)) + router.HandleStream(ns.Stream(StreamUsers), ns.Consumer(ConsumerUsersSchema), UserSchemasHandler(logger, usrs)) + router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsProcess), UserEventsHandler(logger, usrs, jrny, pub)) + router.HandleStream(ns.Stream(StreamUserEvents), ns.Consumer(ConsumerUserEventsSchema), UserEventSchemasHandler(logger, usrs)) + router.HandleStream(ns.Stream(StreamLists), ns.Consumer(ConsumerListsRecompute), RecomputeListHandler(logger, usrs, pub)) + router.HandleStream(ns.Stream(StreamJourneys), ns.Consumer(ConsumerJourneysAdvance), JourneyStepHandler(logger, db.Subjects, jrny, mgmt, pub, actionRegistry)) + router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsProcess), OrganizationsHandler(logger, usrs, pub)) + router.HandleStream(ns.Stream(StreamOrganizations), ns.Consumer(ConsumerOrganizationsSchema), OrganizationSchemasHandler(logger, usrs)) + router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersProcess), OrganizationUsersHandler(logger, usrs, pub)) + router.HandleStream(ns.Stream(StreamOrganizationUsers), ns.Consumer(ConsumerOrganizationUsersSchema), OrganizationUserSchemasHandler(logger, usrs)) + router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsProcess), OrganizationEventsHandler(logger, usrs, jrny, pub)) + router.HandleStream(ns.Stream(StreamOrganizationEvents), ns.Consumer(ConsumerOrganizationEventsSchema), OrganizationEventSchemasHandler(logger, usrs)) + } else { + logger.Info("skipping pub/sub consumers") + } + + if slices.Contains(conf.Modules, "wasm") { + logger.Info("starting wasm consumers") + + router.HandleStream(ns.Stream(StreamCampaigns), ns.Consumer(ConsumerCampaignsSend), CampaignsSendHandler(logger, mgmt, usrs, registry, renderer, publicURL)) + router.HandleStream(ns.Stream(StreamActions), ns.Consumer(ConsumerActionsSchema), ActionSchemasHandler(logger, usrs)) + router.HandleCaller(ns.Subject(SubjectActionsExecute), ActionExecuteHandler(logger, actionRegistry, pub)) + router.HandleCaller(ns.Subject(SubjectActionsValidate), ActionValidateHandler(logger, actionRegistry)) + } else { + logger.Info("skipping wasm consumers") + } }