Skip to content

Commit 735abba

Browse files
authored
Merge pull request #3 from mateusmlo/feat/worker
feat/worker
2 parents cd1a4f2 + 49f4cc6 commit 735abba

6 files changed

Lines changed: 1285 additions & 126 deletions

File tree

.gitignore

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# macOS
2+
.DS_Store
3+
.AppleDouble
4+
.LSOverride
5+
6+
# Coverage files
7+
coverage.out
8+
*.coverprofile
9+
*.coverage
10+
coverage/
11+
*.lcov
12+
13+
# Go build artifacts
14+
*.exe
15+
*.exe~
16+
*.dll
17+
*.so
18+
*.dylib
19+
20+
# Test binaries
21+
*.test
22+
23+
# Go workspace file
24+
go.work
25+
26+
# Build directories
27+
bin/
28+
dist/
29+
build/
30+
31+
# IDE and editor files
32+
.vscode/
33+
.idea/
34+
*.swp
35+
*.swo
36+
*~
37+
.project
38+
.classpath
39+
.settings/
40+
41+
# Environment files
42+
.env
43+
.env.local
44+
.env.*.local
45+
46+
# Logs
47+
*.log
48+
49+
# Temporary files
50+
tmp/
51+
temp/
52+
*.tmp

internal/server/server.go

Lines changed: 173 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
package internal
1+
package server
22

33
import (
44
"context"
5+
"slices"
56
"sync"
67
"time"
78

@@ -10,6 +11,7 @@ import (
1011
"google.golang.org/grpc/status"
1112
"google.golang.org/protobuf/types/known/timestamppb"
1213

14+
"github.com/mateusmlo/taskqueue/internal/worker"
1315
"github.com/mateusmlo/taskqueue/proto"
1416
)
1517

@@ -29,14 +31,15 @@ const (
2931
FAILED
3032
)
3133

34+
// Server struct implements the TaskQueue and WorkerService gRPC servers
3235
type Server struct {
3336
tasks map[string]*Task
3437
tasksMux sync.RWMutex
3538

3639
pendingQueues map[Priority][]*Task
3740
queuesMux sync.RWMutex
3841

39-
workers map[string]*Worker
42+
workers map[string]*worker.Worker
4043
workersMux sync.RWMutex
4144

4245
ctx context.Context
@@ -46,6 +49,7 @@ type Server struct {
4649
proto.UnimplementedWorkerServiceServer
4750
}
4851

52+
// Task represents a unit of work in the task queue system
4953
type Task struct {
5054
ID string
5155
Type string
@@ -62,23 +66,14 @@ type Task struct {
6266
WorkerID string
6367
}
6468

65-
type Worker struct {
66-
ID string
67-
Address string
68-
RegisteredAt time.Time
69-
LastHeartbeat time.Time
70-
TaskTypes []string
71-
Capacity int
72-
CurrentLoad int
73-
}
74-
69+
// NewServer initializes and returns a new Server instance
7570
func NewServer() *Server {
7671
ctx, cancel := context.WithCancel(context.Background())
7772

7873
return &Server{
7974
tasks: make(map[string]*Task),
8075
pendingQueues: make(map[Priority][]*Task),
81-
workers: make(map[string]*Worker),
76+
workers: make(map[string]*worker.Worker),
8277
ctx: ctx,
8378
cancel: cancel,
8479
}
@@ -108,6 +103,7 @@ func (t *Task) toProtoTask() *proto.Task {
108103
return protoTask
109104
}
110105

106+
// SubmitTask handles task submission requests
111107
func (s *Server) SubmitTask(ctx context.Context, req *proto.SubmitTaskRequest) (*proto.SubmitTaskResponse, error) {
112108
uuid, err := uuid.NewV7()
113109
if err != nil {
@@ -131,38 +127,187 @@ func (s *Server) SubmitTask(ctx context.Context, req *proto.SubmitTaskRequest) (
131127

132128
s.tasks[taskID] = newTask
133129

134-
s.queuesMux.Lock()
135-
defer s.queuesMux.Unlock()
136-
137-
s.pendingQueues[newTask.Priority] = append(s.pendingQueues[newTask.Priority], newTask)
130+
s.appendTaskToQueue(newTask)
138131

139132
return &proto.SubmitTaskResponse{TaskId: newTask.ID}, nil
140133
}
141134

135+
// GetTaskStatus retrieves the status of a task by its ID
142136
func (s *Server) GetTaskStatus(ctx context.Context, req *proto.GetTaskStatusRequest) (*proto.GetTaskStatusResponse, error) {
143-
s.tasksMux.RLock()
144-
defer s.tasksMux.RUnlock()
145-
146-
task, exists := s.tasks[req.TaskId]
147-
if !exists {
148-
return nil, status.Errorf(codes.NotFound, "task %s not found", req.TaskId)
137+
task, err := s.findTask(req.TaskId)
138+
if err != nil {
139+
return nil, err
149140
}
150141

151142
return &proto.GetTaskStatusResponse{Status: proto.TaskStatus(task.Status)}, nil
152143
}
153144

145+
// GetTaskResult retrieves the result of a completed task by its ID
154146
func (s *Server) GetTaskResult(ctx context.Context, req *proto.GetTaskResultRequest) (*proto.GetTaskResultResponse, error) {
147+
task, err := s.findTask(req.TaskId)
148+
if err != nil {
149+
return nil, err
150+
}
151+
152+
if task.Status != COMPLETED {
153+
return nil, status.Errorf(codes.FailedPrecondition, "task %s not completed yet", req.TaskId)
154+
}
155+
156+
return &proto.GetTaskResultResponse{Task: task.toProtoTask()}, nil
157+
}
158+
159+
// RegisterWorker handles worker registration requests
160+
func (s *Server) RegisterWorker(ctx context.Context, req *proto.RegisterWorkerRequest) (*proto.RegisterWorkerResponse, error) {
161+
var newWorker worker.Worker
162+
newWorker.FromProtoWorker(req.Worker)
163+
164+
s.workersMux.Lock()
165+
defer s.workersMux.Unlock()
166+
167+
s.workers[newWorker.ID] = &newWorker
168+
169+
return &proto.RegisterWorkerResponse{WorkerId: newWorker.ID, Success: true}, nil
170+
}
171+
172+
// Heartbeat processes heartbeat messages from workers
173+
func (s *Server) Heartbeat(ctx context.Context, req *proto.HeartbeatRequest) (*proto.HeartbeatResponse, error) {
174+
worker, err := s.findWorker(req.WorkerId)
175+
if err != nil {
176+
return nil, err
177+
}
178+
179+
worker.LastHeartbeat = time.Now()
180+
worker.CurrentLoad = int(req.CurrentLoad)
181+
182+
return &proto.HeartbeatResponse{Success: true, CurrentLoad: int32(worker.CurrentLoad)}, nil
183+
}
184+
185+
// SubmitResult processes the result submission from workers
186+
func (s *Server) SubmitResult(ctx context.Context, req *proto.SubmitResultRequest) (*proto.SubmitResultResponse, error) {
187+
task, err := s.findTask(req.TaskId)
188+
if err != nil {
189+
return nil, err
190+
}
191+
192+
now := time.Now()
193+
task.CompletedAt = &now
194+
195+
defer s.decrementCurrentLoad(task.WorkerID)
196+
197+
if req.Error != "" {
198+
task.Error = req.Error
199+
task.RetryCount++
200+
201+
if task.RetryCount < task.MaxRetries {
202+
task.Status = PENDING
203+
task.StartedAt = nil
204+
task.CompletedAt = nil
205+
206+
s.appendTaskToQueue(task)
207+
} else {
208+
task.Status = FAILED
209+
210+
return nil, status.Errorf(codes.DeadlineExceeded, "task %s failed after maximum retries: %s", req.TaskId, req.Error)
211+
}
212+
} else {
213+
task.Status = COMPLETED
214+
task.Result = req.Result
215+
}
216+
217+
return &proto.SubmitResultResponse{Success: true, Result: req.Result}, nil
218+
}
219+
220+
func (s *Server) FetchTask(ctx context.Context, req *proto.FetchTaskRequest) (*proto.FetchTaskResponse, error) {
221+
worker, err := s.findWorker(req.WorkerId)
222+
if err != nil {
223+
return nil, err
224+
}
225+
226+
if worker.CurrentLoad >= worker.Capacity {
227+
return &proto.FetchTaskResponse{HasTask: false}, nil
228+
}
229+
230+
s.queuesMux.Lock()
231+
defer s.queuesMux.Unlock()
232+
233+
for priority := HIGH; priority <= LOW; priority++ {
234+
queue := s.pendingQueues[priority]
235+
for i, task := range queue {
236+
if slices.Contains(req.TaskTypes, task.Type) {
237+
// Remove task from queue
238+
s.pendingQueues[priority] = append(queue[:i], queue[i+1:]...)
239+
240+
// Update task status
241+
now := time.Now()
242+
243+
s.tasksMux.Lock()
244+
task.Status = RUNNING
245+
task.StartedAt = &now
246+
task.WorkerID = worker.ID
247+
s.tasksMux.Unlock()
248+
249+
s.incrementCurrentLoad(worker.ID)
250+
251+
return &proto.FetchTaskResponse{Task: task.toProtoTask(), HasTask: true}, nil
252+
}
253+
}
254+
}
255+
256+
return &proto.FetchTaskResponse{HasTask: false}, nil
257+
}
258+
259+
// Util functions
260+
261+
// appendTaskToQueue appends a task back to the pending queue based on its priority
262+
func (s *Server) appendTaskToQueue(task *Task) {
263+
s.queuesMux.Lock()
264+
defer s.queuesMux.Unlock()
265+
266+
s.pendingQueues[task.Priority] = append(s.pendingQueues[task.Priority], task)
267+
}
268+
269+
// decrementCurrentLoad decreases the current load of the specified worker
270+
func (s *Server) decrementCurrentLoad(workerID string) {
271+
s.workersMux.Lock()
272+
defer s.workersMux.Unlock()
273+
274+
if worker, exists := s.workers[workerID]; exists {
275+
worker.CurrentLoad--
276+
}
277+
}
278+
279+
// incrementCurrentLoad increases the current load of the specified worker
280+
func (s *Server) incrementCurrentLoad(workerID string) {
281+
s.workersMux.Lock()
282+
defer s.workersMux.Unlock()
283+
284+
if worker, exists := s.workers[workerID]; exists {
285+
worker.CurrentLoad++
286+
}
287+
}
288+
289+
// findTask retrieves a task by its ID, returning an error if not found
290+
func (s *Server) findTask(taskID string) (*Task, error) {
155291
s.tasksMux.RLock()
156292
defer s.tasksMux.RUnlock()
157293

158-
task, exists := s.tasks[req.TaskId]
294+
task, exists := s.tasks[taskID]
159295
if !exists {
160-
return nil, status.Errorf(codes.NotFound, "task %s not found", req.TaskId)
296+
return nil, status.Errorf(codes.NotFound, "task %s not found", taskID)
161297
}
162298

163-
if task.Status != COMPLETED {
164-
return nil, status.Errorf(codes.FailedPrecondition, "task %s not completed yet", req.TaskId)
299+
return task, nil
300+
}
301+
302+
// findWorker retrieves a worker by its ID, returning an error if not found
303+
func (s *Server) findWorker(workerID string) (*worker.Worker, error) {
304+
s.workersMux.RLock()
305+
defer s.workersMux.RUnlock()
306+
307+
worker, exists := s.workers[workerID]
308+
if !exists {
309+
return nil, status.Errorf(codes.NotFound, "worker %s not registered", workerID)
165310
}
166311

167-
return &proto.GetTaskResultResponse{Task: task.toProtoTask()}, nil
312+
return worker, nil
168313
}

0 commit comments

Comments
 (0)