Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# macOS
.DS_Store
.AppleDouble
.LSOverride

# Coverage files
coverage.out
*.coverprofile
*.coverage
coverage/
*.lcov

# Go build artifacts
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binaries
*.test

# Go workspace file
go.work

# Build directories
bin/
dist/
build/

# IDE and editor files
.vscode/
.idea/
*.swp
*.swo
*~
.project
.classpath
.settings/

# Environment files
.env
.env.local
.env.*.local

# Logs
*.log

# Temporary files
tmp/
temp/
*.tmp
201 changes: 173 additions & 28 deletions internal/server/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package internal
package server

import (
"context"
"slices"
"sync"
"time"

Expand All @@ -10,6 +11,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/mateusmlo/taskqueue/internal/worker"
"github.com/mateusmlo/taskqueue/proto"
)

Expand All @@ -29,14 +31,15 @@ const (
FAILED
)

// Server struct implements the TaskQueue and WorkerService gRPC servers
type Server struct {
tasks map[string]*Task
tasksMux sync.RWMutex

pendingQueues map[Priority][]*Task
queuesMux sync.RWMutex

workers map[string]*Worker
workers map[string]*worker.Worker
workersMux sync.RWMutex

ctx context.Context
Expand All @@ -46,6 +49,7 @@ type Server struct {
proto.UnimplementedWorkerServiceServer
}

// Task represents a unit of work in the task queue system
type Task struct {
ID string
Type string
Expand All @@ -62,23 +66,14 @@ type Task struct {
WorkerID string
}

type Worker struct {
ID string
Address string
RegisteredAt time.Time
LastHeartbeat time.Time
TaskTypes []string
Capacity int
CurrentLoad int
}

// NewServer initializes and returns a new Server instance
func NewServer() *Server {
ctx, cancel := context.WithCancel(context.Background())

return &Server{
tasks: make(map[string]*Task),
pendingQueues: make(map[Priority][]*Task),
workers: make(map[string]*Worker),
workers: make(map[string]*worker.Worker),
ctx: ctx,
cancel: cancel,
}
Expand Down Expand Up @@ -108,6 +103,7 @@ func (t *Task) toProtoTask() *proto.Task {
return protoTask
}

// SubmitTask handles task submission requests
func (s *Server) SubmitTask(ctx context.Context, req *proto.SubmitTaskRequest) (*proto.SubmitTaskResponse, error) {
uuid, err := uuid.NewV7()
if err != nil {
Expand All @@ -131,38 +127,187 @@ func (s *Server) SubmitTask(ctx context.Context, req *proto.SubmitTaskRequest) (

s.tasks[taskID] = newTask

s.queuesMux.Lock()
defer s.queuesMux.Unlock()

s.pendingQueues[newTask.Priority] = append(s.pendingQueues[newTask.Priority], newTask)
s.appendTaskToQueue(newTask)

return &proto.SubmitTaskResponse{TaskId: newTask.ID}, nil
}

// GetTaskStatus retrieves the status of a task by its ID
func (s *Server) GetTaskStatus(ctx context.Context, req *proto.GetTaskStatusRequest) (*proto.GetTaskStatusResponse, error) {
s.tasksMux.RLock()
defer s.tasksMux.RUnlock()

task, exists := s.tasks[req.TaskId]
if !exists {
return nil, status.Errorf(codes.NotFound, "task %s not found", req.TaskId)
task, err := s.findTask(req.TaskId)
if err != nil {
return nil, err
}

return &proto.GetTaskStatusResponse{Status: proto.TaskStatus(task.Status)}, nil
}

// GetTaskResult retrieves the result of a completed task by its ID
func (s *Server) GetTaskResult(ctx context.Context, req *proto.GetTaskResultRequest) (*proto.GetTaskResultResponse, error) {
task, err := s.findTask(req.TaskId)
if err != nil {
return nil, err
}

if task.Status != COMPLETED {
return nil, status.Errorf(codes.FailedPrecondition, "task %s not completed yet", req.TaskId)
}

return &proto.GetTaskResultResponse{Task: task.toProtoTask()}, nil
}

// RegisterWorker handles worker registration requests
func (s *Server) RegisterWorker(ctx context.Context, req *proto.RegisterWorkerRequest) (*proto.RegisterWorkerResponse, error) {
var newWorker worker.Worker
newWorker.FromProtoWorker(req.Worker)

s.workersMux.Lock()
defer s.workersMux.Unlock()

s.workers[newWorker.ID] = &newWorker

return &proto.RegisterWorkerResponse{WorkerId: newWorker.ID, Success: true}, nil
}

// Heartbeat processes heartbeat messages from workers
func (s *Server) Heartbeat(ctx context.Context, req *proto.HeartbeatRequest) (*proto.HeartbeatResponse, error) {
worker, err := s.findWorker(req.WorkerId)
if err != nil {
return nil, err
}

worker.LastHeartbeat = time.Now()
worker.CurrentLoad = int(req.CurrentLoad)

return &proto.HeartbeatResponse{Success: true, CurrentLoad: int32(worker.CurrentLoad)}, nil
}

// SubmitResult processes the result submission from workers
func (s *Server) SubmitResult(ctx context.Context, req *proto.SubmitResultRequest) (*proto.SubmitResultResponse, error) {
task, err := s.findTask(req.TaskId)
if err != nil {
return nil, err
}

now := time.Now()
task.CompletedAt = &now

defer s.decrementCurrentLoad(task.WorkerID)

if req.Error != "" {
task.Error = req.Error
task.RetryCount++

if task.RetryCount < task.MaxRetries {
task.Status = PENDING
task.StartedAt = nil
task.CompletedAt = nil

s.appendTaskToQueue(task)
} else {
task.Status = FAILED

return nil, status.Errorf(codes.DeadlineExceeded, "task %s failed after maximum retries: %s", req.TaskId, req.Error)
}
} else {
task.Status = COMPLETED
task.Result = req.Result
}

return &proto.SubmitResultResponse{Success: true, Result: req.Result}, nil
}

func (s *Server) FetchTask(ctx context.Context, req *proto.FetchTaskRequest) (*proto.FetchTaskResponse, error) {
worker, err := s.findWorker(req.WorkerId)
if err != nil {
return nil, err
}

if worker.CurrentLoad >= worker.Capacity {
return &proto.FetchTaskResponse{HasTask: false}, nil
}

s.queuesMux.Lock()
defer s.queuesMux.Unlock()

for priority := HIGH; priority <= LOW; priority++ {
queue := s.pendingQueues[priority]
for i, task := range queue {
if slices.Contains(req.TaskTypes, task.Type) {
// Remove task from queue
s.pendingQueues[priority] = append(queue[:i], queue[i+1:]...)

// Update task status
now := time.Now()

s.tasksMux.Lock()
task.Status = RUNNING
task.StartedAt = &now
task.WorkerID = worker.ID
s.tasksMux.Unlock()

s.incrementCurrentLoad(worker.ID)

return &proto.FetchTaskResponse{Task: task.toProtoTask(), HasTask: true}, nil
}
}
}

return &proto.FetchTaskResponse{HasTask: false}, nil
}

// Util functions

// appendTaskToQueue appends a task back to the pending queue based on its priority
func (s *Server) appendTaskToQueue(task *Task) {
s.queuesMux.Lock()
defer s.queuesMux.Unlock()

s.pendingQueues[task.Priority] = append(s.pendingQueues[task.Priority], task)
}

// decrementCurrentLoad decreases the current load of the specified worker
func (s *Server) decrementCurrentLoad(workerID string) {
s.workersMux.Lock()
defer s.workersMux.Unlock()

if worker, exists := s.workers[workerID]; exists {
worker.CurrentLoad--
}
}

// incrementCurrentLoad increases the current load of the specified worker
func (s *Server) incrementCurrentLoad(workerID string) {
s.workersMux.Lock()
defer s.workersMux.Unlock()

if worker, exists := s.workers[workerID]; exists {
worker.CurrentLoad++
}
}

// findTask retrieves a task by its ID, returning an error if not found
func (s *Server) findTask(taskID string) (*Task, error) {
s.tasksMux.RLock()
defer s.tasksMux.RUnlock()

task, exists := s.tasks[req.TaskId]
task, exists := s.tasks[taskID]
if !exists {
return nil, status.Errorf(codes.NotFound, "task %s not found", req.TaskId)
return nil, status.Errorf(codes.NotFound, "task %s not found", taskID)
}

if task.Status != COMPLETED {
return nil, status.Errorf(codes.FailedPrecondition, "task %s not completed yet", req.TaskId)
return task, nil
}

// findWorker retrieves a worker by its ID, returning an error if not found
func (s *Server) findWorker(workerID string) (*worker.Worker, error) {
s.workersMux.RLock()
defer s.workersMux.RUnlock()

worker, exists := s.workers[workerID]
if !exists {
return nil, status.Errorf(codes.NotFound, "worker %s not registered", workerID)
}

return &proto.GetTaskResultResponse{Task: task.toProtoTask()}, nil
return worker, nil
}
Loading
Loading