Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 62 additions & 28 deletions cmd/dispatchoor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,44 @@ func runServer(ctx context.Context, log *logrus.Logger, configPath string) error
m := metrics.New()
m.SetBuildInfo(Version, GitCommit, BuildDate)

// Create GitHub client.
ghClient := github.NewClient(log, cfg.GitHub.Token)

if err := ghClient.Start(ctx); err != nil {
return err
}

defer ghClient.Stop()

// Create and start runner poller.
poller := github.NewPoller(log, cfg, ghClient, st, m)

if err := poller.Start(ctx); err != nil {
return err
// Create GitHub client (may operate in disconnected mode if no token or invalid token).
var ghClient github.Client

var poller github.Poller

if cfg.HasGitHubToken() {
ghClient = github.NewClient(log, cfg.GitHub.Token)

if err := ghClient.Start(ctx); err != nil {
return err
}

defer func() {
if err := ghClient.Stop(); err != nil {
log.WithError(err).Warn("Failed to stop GitHub client")
}
}()

// Only start poller if GitHub client is connected.
if ghClient.IsConnected() {
poller = github.NewPoller(log, cfg, ghClient, st, m)

if err := poller.Start(ctx); err != nil {
return err
}

defer func() {
if err := poller.Stop(); err != nil {
log.WithError(err).Warn("Failed to stop poller")
}
}()
} else {
log.Warn("GitHub client not connected - runner polling disabled")
}
} else {
log.Warn("No GitHub token configured - GitHub integration disabled")
}

defer poller.Stop()

// Create queue service.
queueSvc := queue.NewService(log, cfg, st)

Expand All @@ -107,14 +127,24 @@ func runServer(ctx context.Context, log *logrus.Logger, configPath string) error

defer queueSvc.Stop()

// Create and start dispatcher.
disp := dispatcher.NewDispatcher(log, cfg, st, queueSvc, ghClient)
// Create and start dispatcher (only if GitHub client is connected).
var disp dispatcher.Dispatcher

if err := disp.Start(ctx); err != nil {
return err
}
if ghClient != nil && ghClient.IsConnected() {
disp = dispatcher.NewDispatcher(log, cfg, st, queueSvc, ghClient)

defer disp.Stop()
if err := disp.Start(ctx); err != nil {
return err
}

defer func() {
if err := disp.Stop(); err != nil {
log.WithError(err).Warn("Failed to stop dispatcher")
}
}()
} else {
log.Warn("Dispatcher disabled - GitHub client not connected")
}

// Create and start auth service.
authSvc := auth.NewService(log, cfg, st)
Expand All @@ -129,13 +159,17 @@ func runServer(ctx context.Context, log *logrus.Logger, configPath string) error
srv := api.NewServer(log, cfg, st, queueSvc, authSvc, ghClient, m)

// Set up runner change callbacks to broadcast via WebSocket.
poller.SetRunnerChangeCallback(func(runner *store.Runner) {
srv.BroadcastRunnerChange(runner)
})
if poller != nil {
poller.SetRunnerChangeCallback(func(runner *store.Runner) {
srv.BroadcastRunnerChange(runner)
})
}

disp.SetRunnerChangeCallback(func(runner *store.Runner) {
srv.BroadcastRunnerChange(runner)
})
if disp != nil {
disp.SetRunnerChangeCallback(func(runner *store.Runner) {
srv.BroadcastRunnerChange(runner)
})
}

if err := srv.Start(ctx); err != nil {
return err
Expand Down
74 changes: 53 additions & 21 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,33 +367,56 @@ func (s *server) handleStatus(w http.ResponseWriter, r *http.Request) {
}
}

// GitHub rate limit info.
remaining := s.ghClient.RateLimitRemaining()
resetTime := s.ghClient.RateLimitReset()

githubStatus := ComponentStatusHealthy
if remaining < 100 {
githubStatus = ComponentStatusDegraded
}
// GitHub connection and rate limit info.
if s.ghClient == nil {
resp.GitHub = GitHubStatus{
Status: ComponentStatusUnhealthy,
Connected: false,
Error: "GitHub token not configured",
}

if remaining < 10 {
githubStatus = ComponentStatusUnhealthy
if resp.Status == ComponentStatusHealthy {
resp.Status = ComponentStatusDegraded
}
} else if !s.ghClient.IsConnected() {
resp.GitHub = GitHubStatus{
Status: ComponentStatusUnhealthy,
Connected: false,
Error: s.ghClient.ConnectionError(),
}

if resp.Status == ComponentStatusHealthy {
resp.Status = ComponentStatusDegraded
}
}
} else {
remaining := s.ghClient.RateLimitRemaining()
resetTime := s.ghClient.RateLimitReset()

resetIn := time.Until(resetTime)
if resetIn < 0 {
resetIn = 0
}
githubStatus := ComponentStatusHealthy
if remaining < 100 {
githubStatus = ComponentStatusDegraded
}

resp.GitHub = GitHubStatus{
Status: githubStatus,
RateLimitRemaining: remaining,
RateLimitReset: resetTime.UTC().Format(time.RFC3339),
ResetIn: resetIn.Round(time.Second).String(),
if remaining < 10 {
githubStatus = ComponentStatusUnhealthy

if resp.Status == ComponentStatusHealthy {
resp.Status = ComponentStatusDegraded
}
}

resetIn := time.Until(resetTime)
if resetIn < 0 {
resetIn = 0
}

resp.GitHub = GitHubStatus{
Status: githubStatus,
Connected: true,
RateLimitRemaining: remaining,
RateLimitReset: resetTime.UTC().Format(time.RFC3339),
ResetIn: resetIn.Round(time.Second).String(),
}
}

// Queue statistics.
Expand Down Expand Up @@ -1104,6 +1127,13 @@ func (s *server) handleCancelJob(w http.ResponseWriter, r *http.Request) {
return
}

// Check if GitHub client is available.
if s.ghClient == nil || !s.ghClient.IsConnected() {
s.writeError(w, http.StatusServiceUnavailable, "GitHub integration is not available")

return
}

// Cancel the workflow run on GitHub.
if err := s.ghClient.CancelWorkflowRun(r.Context(), owner, repo, *job.RunID); err != nil {
s.log.WithError(err).Warn("Cancel request returned error, checking actual run status")
Expand Down Expand Up @@ -1293,8 +1323,10 @@ type DatabaseStatus struct {
// GitHubStatus contains GitHub API rate limit information.
type GitHubStatus struct {
Status ComponentStatus `json:"status"`
Connected bool `json:"connected"`
Error string `json:"error,omitempty"`
RateLimitRemaining int `json:"rate_limit_remaining"`
RateLimitReset string `json:"rate_limit_reset"`
RateLimitReset string `json:"rate_limit_reset,omitempty"`
ResetIn string `json:"reset_in,omitempty"`
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,6 @@ func (c *Config) Validate() error {
return fmt.Errorf("unsupported database driver: %s", c.Database.Driver)
}

// Validate GitHub config.
if c.GitHub.Token == "" {
return fmt.Errorf("github.token is required")
}

// Validate auth config.
if !c.Auth.Basic.Enabled && !c.Auth.GitHub.Enabled {
return fmt.Errorf("at least one auth method (basic or github) must be enabled")
Expand Down Expand Up @@ -475,6 +470,11 @@ func (c *Config) GetDSN() string {
}
}

// HasGitHubToken returns true if a GitHub token is configured.
func (c *Config) HasGitHubToken() bool {
return c.GitHub.Token != ""
}

// String returns a sanitized string representation of the config (no secrets).
func (c *Config) String() string {
var sb strings.Builder
Expand Down
47 changes: 40 additions & 7 deletions pkg/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type Client interface {
Start(ctx context.Context) error
Stop() error

// Connection status.
IsConnected() bool
ConnectionError() string

// Runners.
ListOrgRunners(ctx context.Context, org string) ([]*Runner, error)
ListRepoRunners(ctx context.Context, owner, repo string) ([]*Runner, error)
Expand Down Expand Up @@ -80,12 +84,14 @@ type WorkflowJob struct {

// client implements Client.
type client struct {
log logrus.FieldLogger
token string
gh *github.Client
mu sync.RWMutex
rateRemaining int
rateReset time.Time
log logrus.FieldLogger
token string
gh *github.Client
mu sync.RWMutex
rateRemaining int
rateReset time.Time
connected bool
connectionError string
}

// Ensure client implements Client.
Expand All @@ -100,6 +106,8 @@ func NewClient(log logrus.FieldLogger, token string) Client {
}

// Start initializes the GitHub client.
// If authentication fails, the client will be marked as disconnected but no error is returned.
// Use IsConnected() and ConnectionError() to check the connection status.
func (c *client) Start(ctx context.Context) error {
c.log.Info("Initializing GitHub client")

Expand All @@ -111,12 +119,21 @@ func (c *client) Start(ctx context.Context) error {
// Test authentication by getting rate limit.
rate, _, err := c.gh.RateLimit.Get(ctx)
if err != nil {
return fmt.Errorf("testing GitHub authentication: %w", err)
c.mu.Lock()
c.connected = false
c.connectionError = fmt.Sprintf("authentication failed: %v", err)
c.mu.Unlock()

c.log.WithError(err).Warn("GitHub authentication failed - client will operate in disconnected mode")

return nil
}

c.mu.Lock()
c.rateRemaining = rate.Core.Remaining
c.rateReset = rate.Core.Reset.Time
c.connected = true
c.connectionError = ""
c.mu.Unlock()

c.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -164,6 +181,22 @@ func (c *client) RateLimitReset() time.Time {
return c.rateReset
}

// IsConnected returns true if the GitHub client is connected and authenticated.
func (c *client) IsConnected() bool {
c.mu.RLock()
defer c.mu.RUnlock()

return c.connected
}

// ConnectionError returns the connection error message, if any.
func (c *client) ConnectionError() string {
c.mu.RLock()
defer c.mu.RUnlock()

return c.connectionError
}

// ListOrgRunners lists all self-hosted runners for an organization.
func (c *client) ListOrgRunners(ctx context.Context, org string) ([]*Runner, error) {
c.log.WithField("org", org).Debug("Listing organization runners")
Expand Down
Loading