Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: CI

on:
push:
branches: [main]
pull_request:

jobs:
build:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '1.25'

- name: Setup Buf
uses: bufbuild/buf-setup-action@v1
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
version: 1.66.0

- name: Generate protobuf stubs
run: buf generate buf.build/agynio/api --path agynio/api/agent_state/v1

- name: Go build
run: go build ./...

- name: Go test
run: go test ./...
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.local
.env
.DS_Store
gen/
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,40 @@
# Agent State Service

Go implementation of AgentStateService (agynio/api).
Go implementation of `AgentStateService` from [`agynio/api`](https://github.com/agynio/api).

## Prerequisites

- Go 1.25+
- Docker (the e2e tests start Postgres via docker-compose)
- [Buf CLI](https://buf.build/docs/installation) for protobuf code generation

## Getting started

```bash
# Install dependencies
go mod tidy

# Generate protobuf stubs
buf generate buf.build/agynio/api --path agynio/api/agent_state/v1

# Start Postgres locally (listens on localhost:55432)
docker compose up -d

# Apply migrations and run the gRPC server
DATABASE_URL="postgres://agentstate:agentstate@localhost:55432/agentstate?sslmode=disable" \
go run ./cmd/agent-state-service
```

## Testing

End-to-end coverage is provided by Go tests. The suite automatically ensures a
docker-compose binary is available (downloading one if required).

```bash
go test ./...
```

## Continuous Integration

GitHub Actions run `buf generate`, `go build`, and the full test suite (including
docker-backed e2e tests) on every push and pull request.
14 changes: 14 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: v1

plugins:
- plugin: buf.build/protocolbuffers/go
out: gen/go
opt:
- paths=source_relative
- Magynio/api/agent_state/v1/agent_state.proto=github.com/agynio/agent-state/gen/go/agynio/api/agent_state/v1
- plugin: buf.build/grpc/go
out: gen/go
opt:
- paths=source_relative
- require_unimplemented_servers=false
- Magynio/api/agent_state/v1/agent_state.proto=github.com/agynio/agent-state/gen/go/agynio/api/agent_state/v1
8 changes: 8 additions & 0 deletions buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Generated by buf. DO NOT EDIT.
version: v1
deps:
- remote: buf.build
owner: agynio
repository: api
commit: eb49ad04e04c4462bc2b55cf73ccbb2d
digest: shake256:ffbbb8e96a41cad041a2ebaa6d1ceb3a1f6588d09c945e7647acefbaad938ae9edb1146ed03509ca3a931a7eb0537b6cefdfb2cee6d228cb0524dfb16898c6f6
4 changes: 4 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
version: v1

deps:
- buf.build/agynio/api
74 changes: 74 additions & 0 deletions cmd/agent-state-service/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"

agentstatev1 "github.com/agynio/agent-state/gen/go/agynio/api/agent_state/v1"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/grpc"

"github.com/agynio/agent-state/internal/config"
"github.com/agynio/agent-state/internal/db"
"github.com/agynio/agent-state/internal/server"
"github.com/agynio/agent-state/internal/state"
)

func main() {
if err := run(); err != nil {
log.Fatalf("agent-state-service: %v", err)
}
}

func run() error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

cfg, err := config.FromEnv()
if err != nil {
return err
}

poolCfg, err := pgxpool.ParseConfig(cfg.DatabaseURL)
if err != nil {
return fmt.Errorf("parse database url: %w", err)
}
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
if err != nil {
return fmt.Errorf("create connection pool: %w", err)
}
defer pool.Close()

if err := db.ApplyMigrations(ctx, pool); err != nil {
return fmt.Errorf("apply migrations: %w", err)
}

grpcServer := grpc.NewServer()
agentstatev1.RegisterAgentStateServiceServer(grpcServer, server.New(state.NewStore(pool)))

lis, err := net.Listen("tcp", cfg.GRPCAddress)
if err != nil {
return fmt.Errorf("listen on %s: %w", cfg.GRPCAddress, err)
}

go func() {
<-ctx.Done()
grpcServer.GracefulStop()
}()

log.Printf("AgentStateService listening on %s", cfg.GRPCAddress)

if err := grpcServer.Serve(lis); err != nil {
if errors.Is(err, grpc.ErrServerStopped) {
return nil
}
return fmt.Errorf("serve: %w", err)
}
return nil
}
22 changes: 22 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3.9'

services:
postgres:
image: public.ecr.aws/docker/library/postgres:16-alpine
environment:
POSTGRES_USER: agentstate
POSTGRES_PASSWORD: agentstate
POSTGRES_DB: agentstate
ports:
- '55432:5432'
healthcheck:
test: ["CMD-SHELL", "pg_isready -U agentstate"]
interval: 1s
timeout: 5s
retries: 30
restart: unless-stopped
volumes:
- pgdata:/var/lib/postgresql/data

volumes:
pgdata: {}
27 changes: 27 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module github.com/agynio/agent-state

go 1.25.7

require (
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.8.0
github.com/stretchr/testify v1.11.1
google.golang.org/grpc v1.79.1
google.golang.org/protobuf v1.36.11
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/text v0.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
71 changes: 71 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU=
golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY=
google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
24 changes: 24 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package config

import (
"fmt"
"os"
)

type Config struct {
GRPCAddress string
DatabaseURL string
}

func FromEnv() (Config, error) {
cfg := Config{}
cfg.GRPCAddress = os.Getenv("GRPC_ADDRESS")
if cfg.GRPCAddress == "" {
cfg.GRPCAddress = ":50051"
}
cfg.DatabaseURL = os.Getenv("DATABASE_URL")
if cfg.DatabaseURL == "" {
return Config{}, fmt.Errorf("DATABASE_URL must be set")
}
return cfg, nil
}
57 changes: 57 additions & 0 deletions internal/db/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package db

import (
"context"
"fmt"
"io/fs"
"sort"

"github.com/agynio/agent-state/migrations"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

func ApplyMigrations(ctx context.Context, pool *pgxpool.Pool) error {
conn, err := pool.Acquire(ctx)
if err != nil {
return fmt.Errorf("acquire connection: %w", err)
}
defer conn.Release()

return pgx.BeginFunc(ctx, conn.Conn(), func(tx pgx.Tx) error {
if _, err := tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS schema_migrations (version TEXT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW())`); err != nil {
return fmt.Errorf("ensure schema_migrations: %w", err)
}

entries, err := fs.ReadDir(migrations.Files, ".")
if err != nil {
return fmt.Errorf("read migrations: %w", err)
}
sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() })

for _, entry := range entries {
if entry.IsDir() {
continue
}
version := entry.Name()
var applied bool
if err := tx.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM schema_migrations WHERE version = $1)`, version).Scan(&applied); err != nil {
return fmt.Errorf("check migration %s: %w", version, err)
}
if applied {
continue
}
content, err := migrations.Files.ReadFile(version)
if err != nil {
return fmt.Errorf("read migration %s: %w", version, err)
}
if _, err := tx.Exec(ctx, string(content)); err != nil {
return fmt.Errorf("apply migration %s: %w", version, err)
}
if _, err := tx.Exec(ctx, `INSERT INTO schema_migrations (version) VALUES ($1)`, version); err != nil {
return fmt.Errorf("record migration %s: %w", version, err)
}
}
return nil
})
}
Loading