Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/mateusmlo/taskqueue
go 1.24.1

require (
github.com/google/uuid v1.6.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.27.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
Expand Down
168 changes: 168 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package internal

import (
"context"
"sync"
"time"

"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/mateusmlo/taskqueue/proto"
)

type Priority int
type TaskStatus int

const (
HIGH Priority = iota
MEDIUM
LOW
)

const (
PENDING TaskStatus = iota
RUNNING
COMPLETED
FAILED
)

type Server struct {
tasks map[string]*Task
tasksMux sync.RWMutex

pendingQueues map[Priority][]*Task
queuesMux sync.RWMutex

workers map[string]*Worker
workersMux sync.RWMutex

ctx context.Context
cancel context.CancelFunc

proto.UnimplementedTaskQueueServer
proto.UnimplementedWorkerServiceServer
}

type Task struct {
ID string
Type string
Payload []byte
Priority Priority
Status TaskStatus
RetryCount int
MaxRetries int
CreatedAt time.Time
StartedAt *time.Time
CompletedAt *time.Time
Result []byte
Error string
WorkerID string
}

type Worker struct {
ID string
Address string
RegisteredAt time.Time
LastHeartbeat time.Time
TaskTypes []string
Capacity int
CurrentLoad int
}

func NewServer() *Server {
ctx, cancel := context.WithCancel(context.Background())

return &Server{
tasks: make(map[string]*Task),
pendingQueues: make(map[Priority][]*Task),
workers: make(map[string]*Worker),
ctx: ctx,
cancel: cancel,
}
}

// toProtoTask converts internal Task to proto.Task
func (t *Task) toProtoTask() *proto.Task {
protoTask := &proto.Task{
Id: t.ID,
Type: t.Type,
Payload: t.Payload,
Priority: proto.Priority(t.Priority),
MaxRetries: int32(t.MaxRetries),
RetryCount: int32(t.RetryCount),
CreatedAt: timestamppb.New(t.CreatedAt),
Status: proto.TaskStatus(t.Status),
}

// Handle optional timestamp fields
if t.StartedAt != nil {
protoTask.StartedAt = timestamppb.New(*t.StartedAt)
}
if t.CompletedAt != nil {
protoTask.CompletedAt = timestamppb.New(*t.CompletedAt)
}

return protoTask
}

func (s *Server) SubmitTask(ctx context.Context, req *proto.SubmitTaskRequest) (*proto.SubmitTaskResponse, error) {
uuid, err := uuid.NewV7()
if err != nil {
return nil, err
}
taskID := uuid.String()

newTask := &Task{
ID: taskID,
Type: req.Type,
Payload: req.Payload,
Priority: Priority(req.Priority),
Status: PENDING,
RetryCount: 0,
MaxRetries: int(req.MaxRetries),
CreatedAt: time.Now(),
}

s.tasksMux.Lock()
defer s.tasksMux.Unlock()

s.tasks[taskID] = newTask

s.queuesMux.Lock()
defer s.queuesMux.Unlock()

s.pendingQueues[newTask.Priority] = append(s.pendingQueues[newTask.Priority], newTask)

return &proto.SubmitTaskResponse{TaskId: newTask.ID}, nil
}

func (s *Server) GetTaskStatus(ctx context.Context, req *proto.GetTaskStatusRequest) (*proto.GetTaskStatusResponse, error) {
s.tasksMux.RLock()
defer s.tasksMux.RUnlock()

task, exists := s.tasks[req.TaskId]
if !exists {
return nil, status.Errorf(codes.NotFound, "task %s not found", req.TaskId)
}

return &proto.GetTaskStatusResponse{Status: proto.TaskStatus(task.Status)}, nil
}

func (s *Server) GetTaskResult(ctx context.Context, req *proto.GetTaskResultRequest) (*proto.GetTaskResultResponse, error) {
s.tasksMux.RLock()
defer s.tasksMux.RUnlock()

task, exists := s.tasks[req.TaskId]
if !exists {
return nil, status.Errorf(codes.NotFound, "task %s not found", req.TaskId)
}

if task.Status != COMPLETED {
return nil, status.Errorf(codes.FailedPrecondition, "task %s not completed yet", req.TaskId)
}

return &proto.GetTaskResultResponse{Task: task.toProtoTask()}, nil
}
Loading