Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 54 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,27 @@

Ultra-low latency Go HTTP engine with a protocol-aware dual-architecture (io_uring & epoll) designed for high-throughput infrastructure and zero-allocation microservices. It provides a familiar route-group and middleware API similar to Gin and Echo, so teams can adopt it without learning a new programming model.

## Highlights

- **3.3M+ HTTP/2 requests/sec** on a single 8-vCPU machine (arm64 Graviton3)
- **590K+ HTTP/1.1 requests/sec** — 81% syscall-bound, zero allocations on the hot path
- **io_uring and epoll at parity** — both engines hit the same throughput
- **H2 is 5.7x faster than H1** thanks to stream multiplexing and inline handler execution
- **Zero hot-path allocations** for both H1 and H2

## Features

- **Tiered io_uring** — auto-selects the best io_uring feature set (multishot accept, provided buffers, SQ poll) for your kernel
- **Tiered io_uring** — auto-selects the best io_uring feature set (multishot accept/recv, provided buffers, SQ poll, fixed files) for your kernel
- **Edge-triggered epoll** — per-core event loops with CPU pinning
- **Adaptive meta-engine** — dynamically switches between io_uring and epoll based on runtime telemetry
- **SIMD HTTP parser** — SSE2 (amd64) and NEON (arm64) with generic SWAR fallback
- **HTTP/2 cleartext (h2c)** — full stream multiplexing, flow control, HPACK
- **HTTP/2 cleartext (h2c)** — full stream multiplexing, flow control, HPACK, inline handler execution, zero-alloc HEADERS fast path
- **Auto-detect** — protocol negotiation from the first bytes on the wire
- **Error-returning handlers** — `HandlerFunc` returns `error`; structured `HTTPError` for status codes
- **Serialization** — JSON and XML response methods (`JSON`, `XML`); Protocol Buffers available via [`github.com/goceleris/middlewares`](https://github.com/goceleris/middlewares); `Bind` auto-detects request format from Content-Type
- **net/http compatibility** — wrap existing `http.Handler` via `celeris.Adapt()`
- **Built-in metrics collector** — atomic counters, always-on `Server.Collector().Snapshot()`

## API Overview

| Type | Package | Description |
|------|---------|-------------|
| `Server` | `celeris` | Top-level entry point; owns config, router, engine |
| `Config` | `celeris` | Server configuration (addr, engine, protocol, timeouts) |
| `Context` | `celeris` | Per-request context with params, headers, body, response methods |
| `HandlerFunc` | `celeris` | `func(*Context) error` — handler/middleware signature |
| `HTTPError` | `celeris` | Structured error carrying HTTP status code and message |
| `RouteGroup` | `celeris` | Group of routes sharing a prefix and middleware |
| `Route` | `celeris` | Opaque handle to a registered route |
| `Collector` | `observe` | Lock-free request metrics aggregator |
| `Snapshot` | `observe` | Point-in-time copy of all collected metrics |

## Architecture

```mermaid
block-beta
columns 3
A["celeris (public API)"]:3
B["adaptive"]:1 C["observe"]:2
E["engine/iouring"]:1 F["engine/epoll"]:1 G["engine/std"]:1
H["protocol/h1"]:1 I["protocol/h2"]:1 J["protocol/detect"]:1
K["probe"]:1 L["resource"]:1 M["internal"]:1
```

## Quick Start

```
Expand Down Expand Up @@ -267,18 +249,53 @@ For Prometheus exposition and debug endpoints, use the [`middlewares/metrics`](h
| CPU pinning | yes | yes | no |
| Provided buffers | yes (5.19+) | no | no |
| Multishot accept | yes (5.19+) | no | no |
| Multishot recv | yes (6.0+) | no | no |
| Zero-alloc HEADERS | yes | yes | no |
| Inline H2 handlers | yes | yes | no |

## Benchmarks

Framework overhead on 8 vCPU (arm64 c6g.2xlarge, x86 c5a.2xlarge):
Cloud benchmarks on arm64 c7g.2xlarge (8 vCPU Graviton3), separate server and client machines:

- **<1.5%** overhead vs raw engine (balanced mode)
- All 3 engines within **0.3%** of each other (adaptive fully matches dedicated)
- Beats Fiber by **+1.4-2.1%** (arm64), **+0.7-1.3%** (x86)
- Beats echo/chi/gin/iris by **5-6%**
- H2 overhead: **1.5%** vs Go frameworks' **16.6%** (11x smaller)
| Protocol | Engine | Throughput |
|----------|--------|-----------|
| HTTP/2 | epoll | **3.33M rps** |
| HTTP/2 | io_uring | **3.30M rps** |
| HTTP/1.1 | epoll | **590K rps** |
| HTTP/1.1 | io_uring | **590K rps** |

Methodology: 27 server configurations (3 engines x 3 objectives x 3 protocols) tested with `wrk2` at fixed request rates. Full results and reproduction scripts are in the [benchmarks repo](https://github.com/goceleris/benchmarks).
- io_uring and epoll within **1%** of each other on both protocols
- H2 is **5.7x faster** than H1 (stream multiplexing + inline handlers)
- Zero allocations on the hot path for both H1 and H2
- All 3 engines within **0.3%** of each other in adaptive mode

Methodology: 14 server configurations (io_uring/epoll/std x latency/throughput/balanced x H1/H2) tested with `wrk` (H1, 16384 connections) and `h2load` (H2, 128 connections x 128 streams) in 9-pass interleaved runs. Full results and reproduction scripts are in the [benchmarks repo](https://github.com/goceleris/benchmarks).

## API Overview

| Type | Package | Description |
|------|---------|-------------|
| `Server` | `celeris` | Top-level entry point; owns config, router, engine |
| `Config` | `celeris` | Server configuration (addr, engine, protocol, timeouts) |
| `Context` | `celeris` | Per-request context with params, headers, body, response methods |
| `HandlerFunc` | `celeris` | `func(*Context) error` — handler/middleware signature |
| `HTTPError` | `celeris` | Structured error carrying HTTP status code and message |
| `RouteGroup` | `celeris` | Group of routes sharing a prefix and middleware |
| `Route` | `celeris` | Opaque handle to a registered route |
| `Collector` | `observe` | Lock-free request metrics aggregator |
| `Snapshot` | `observe` | Point-in-time copy of all collected metrics |

## Architecture

```mermaid
block-beta
columns 3
A["celeris (public API)"]:3
B["adaptive"]:1 C["observe"]:2
E["engine/iouring"]:1 F["engine/epoll"]:1 G["engine/std"]:1
H["protocol/h1"]:1 I["protocol/h2"]:1 J["protocol/detect"]:1
K["probe"]:1 L["resource"]:1 M["internal"]:1
```

## Requirements

Expand All @@ -305,9 +322,9 @@ test/ Conformance, spec compliance, integration, benchmarks
```bash
go install github.com/magefile/mage@latest # one-time setup
mage build # build all targets
mage test # run tests
mage test # run tests with race detector
mage lint # run linters
mage bench # run benchmarks
mage check # full verification: lint + test + spec + build
```

Pull requests should target `main`.
Expand Down
11 changes: 6 additions & 5 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

## Supported Versions

| Version | Supported |
|----------------|-----------|
| 1.x | Yes |
| < 1.0 | No |
| Beta / RC | No |
| Version | Supported |
|----------------|--------------------|
| >= 1.1.0 | Yes |
| < 1.1.0 | No |

Only the latest minor release receives security patches. Upgrade to the latest version to ensure you have all fixes.

## Reporting a Vulnerability

Expand Down
8 changes: 6 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,12 @@ func acquireContext(s *stream.Stream) *Context {
c = s.CachedCtx.(*Context)
} else {
c = contextPool.Get().(*Context)
// Cache on the stream for per-connection reuse (H1 keep-alive).
if s.CachedCtx == nil {
// Cache on H1 streams for per-connection reuse (keep-alive).
// H2 streams are ephemeral (released after one request), so caching
// would leak the context — releaseContext skips pool.Put for cached
// contexts, but stream.Release() nils CachedCtx, leaving the context
// unreachable. H2 inline handlers use InlineCachedCtx instead.
if s.IsH1() {
s.CachedCtx = c
}
}
Expand Down
47 changes: 24 additions & 23 deletions engine/iouring/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,20 @@ func (w *Worker) run(ctx context.Context) {
w.hasBufReturns = false
}

// Drain H2 async write queues FIRST. Handler goroutines enqueue
// response frame bytes; draining them before the dirty list ensures
// SEND SQEs are queued as early as possible after CQE processing,
// reducing pipeline stalls for H2 multiplexed streams.
for _, fd := range w.h2Conns {
cs := w.conns[fd]
if cs != nil && cs.h2State != nil && cs.h2State.WriteQueuePending() {
cs.h2State.DrainWriteQueue(cs.writeFn)
if w.flushSend(cs) {
w.markDirty(cs)
}
}
}

// Retry pending sends and dropped recv arms on dirty connections
// (SQ ring was full earlier). Typically empty under normal load.
for cs := w.dirtyHead; cs != nil; {
Expand All @@ -395,30 +409,17 @@ func (w *Worker) run(ctx context.Context) {
cs = next
}

// Drain H2 async write queues. Handler goroutines enqueue response
// frame bytes; we drain them into writeBuf and flush to the wire.
for _, fd := range w.h2Conns {
cs := w.conns[fd]
if cs != nil && cs.h2State != nil && cs.h2State.WriteQueuePending() {
cs.h2State.DrainWriteQueue(cs.writeFn)
if w.flushSend(cs) {
w.markDirty(cs)
}
}
}

// Immediate submit (non-SQPOLL): flush all SENDs from CQE
// processing, dirty list retries, and H2 async drain to the
// kernel NOW instead of deferring to the next iteration's
// adaptive submit. Without this, responses sit in the SQ ring
// for an entire iteration (~50-200μs), creating pipeline stalls
// for H2 multiplexed streams where clients wait for responses
// before sending more requests. The extra Submit() syscall
// (~300ns) is far cheaper than the stall it prevents.
// Immediate submit (non-SQPOLL): flush SENDs to the kernel NOW
// instead of deferring to the next iteration's adaptive submit.
// Skip when CQEs are already visible — the next iteration wakes
// immediately (mode 1) and does Submit() there, saving ~300ns.
if !w.sqpoll && w.ring.Pending() > 0 {
if _, err := w.ring.Submit(); err != nil {
w.shutdown()
return
cqH, cqT := w.ring.BeginCQ()
if cqH == cqT {
if _, err := w.ring.Submit(); err != nil {
w.shutdown()
return
}
}
}

Expand Down
62 changes: 47 additions & 15 deletions internal/conn/h2.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,32 +454,64 @@ func ProcessH2(ctx context.Context, data []byte, state *H2State, _ stream.Handle

state.inBuf.Write(data)

maxFrameSize := state.processor.GetManager().GetMaxFrameSize()
for state.inBuf.hasCompleteFrame() {
// Tell the processor if more frames follow in this recv buffer.
// When true, canRunInline returns false to avoid sending the
// response before subsequent frames (WINDOW_UPDATE, PRIORITY)
// are processed — required for h2spec compliance.
state.processor.SetHasMoreFrames(state.inBuf.hasMoreThanOneFrame())
f, err := state.parser.ReadNextFrame()
if err != nil {
if err == io.EOF {
break
}
if ce, ok := err.(http2.ConnectionError); ok {
_ = state.processor.SendGoAway(
state.processor.GetManager().GetLastStreamID(),
http2.ErrCode(ce), []byte(ce.Error()))
flushOutBuf(state.outBuf, write)

// Fast path: peek at frame header to detect simple HEADERS frames
// (END_HEADERS set, no PADDED, no PRIORITY, not in CONTINUATION).
// Bypasses x/net framer's ReadFrame which allocates *HeadersFrame.
var processErr error
raw := state.inBuf.Bytes()
frameType := raw[3]
frameFlags := raw[4]
frameLen := uint32(raw[0])<<16 | uint32(raw[1])<<8 | uint32(raw[2])
const (
flagEndStream = 0x01
flagEndHeaders = 0x04
flagPadded = 0x08
flagPriority = 0x20
typeHeaders = 0x01
)
if frameType == typeHeaders &&
frameFlags&flagEndHeaders != 0 &&
frameFlags&flagPadded == 0 &&
frameFlags&flagPriority == 0 &&
frameLen <= maxFrameSize &&
!state.processor.IsExpectingContinuation() {
// Zero-alloc HEADERS fast path.
rf, consumed, _ := frame.ReadRawFrame(raw)
state.inBuf.Next(consumed) // advance past frame
processErr = state.processor.ProcessRawHeaders(
rf.StreamID, rf.HasEndStream(), rf.Payload)
} else {
// Standard path: delegate to x/net framer for complex frames.
f, err := state.parser.ReadNextFrame()
if err != nil {
if err == io.EOF {
break
}
if ce, ok := err.(http2.ConnectionError); ok {
_ = state.processor.SendGoAway(
state.processor.GetManager().GetLastStreamID(),
http2.ErrCode(ce), []byte(ce.Error()))
flushOutBuf(state.outBuf, write)
}
state.mu.Unlock()
state.DrainWriteQueue(write)
return fmt.Errorf("frame read error: %w", err)
}
state.mu.Unlock()
state.DrainWriteQueue(write)
return fmt.Errorf("frame read error: %w", err)
processErr = state.processor.ProcessFrame(ctx, f)
}
if err := state.processor.ProcessFrame(ctx, f); err != nil {
if processErr != nil {
flushOutBuf(state.outBuf, write)
state.mu.Unlock()
state.DrainWriteQueue(write)
return err
return processErr
}
flushOutBuf(state.outBuf, write)
// Drain inline handler responses immediately so they're sent in the
Expand Down
37 changes: 32 additions & 5 deletions internal/conn/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"github.com/goceleris/celeris/protocol/h2/stream"

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)

// cachedDatePtr holds the pre-formatted HTTP Date header line, updated every second.
// Using atomic.Pointer avoids the type assertion cost of atomic.Value (P6).
var cachedDatePtr atomic.Pointer[[]byte]

// cachedStatus200DatePtr holds "HTTP/1.1 200 OK\r\ndate: <RFC1123>\r\n" fused into
// a single cached block, updated every second alongside cachedDatePtr. For 200 OK
// responses (95%+ of API traffic), this replaces two separate appends with one.
var cachedStatus200DatePtr atomic.Pointer[[]byte]

func init() {
updateCachedDate()
go func() {
Expand All @@ -40,6 +44,15 @@ func updateCachedDate() {
cp := make([]byte, len(b))
copy(cp, b)
cachedDatePtr.Store(&cp)

// Fused "HTTP/1.1 200 OK\r\ndate: ...\r\n" block.
var buf2 [96]byte
b2 := buf2[:0]
b2 = append(b2, statusLine200...)
b2 = append(b2, cp...)
cp2 := make([]byte, len(b2))
copy(cp2, b2)
cachedStatus200DatePtr.Store(&cp2)
}

func appendCachedDate(buf []byte) []byte {
Expand All @@ -51,6 +64,16 @@ func appendCachedDate(buf []byte) []byte {
return append(buf, crlf...)
}

// appendCachedStatus200Date appends the fused "HTTP/1.1 200 OK\r\ndate: ...\r\n"
// block. Falls back to two separate appends if the cache isn't ready.
func appendCachedStatus200Date(buf []byte) []byte {
if p := cachedStatus200DatePtr.Load(); p != nil {
return append(buf, (*p)...)
}
buf = appendStatusLine(buf, 200)
return appendCachedDate(buf)
}

var (
statusLine200 = []byte("HTTP/1.1 200 OK\r\n")
statusLine201 = []byte("HTTP/1.1 201 Created\r\n")
Expand Down Expand Up @@ -128,9 +151,13 @@ func (a *h1ResponseAdapter) WriteResponse(_ *stream.Stream, status int, headers
// across requests on the same keep-alive connection.
buf := a.respBuf[:0]

buf = appendStatusLine(buf, status)

buf = appendCachedDate(buf)
// Fast path: status 200 uses fused status+date block (one append).
if status == 200 {
buf = appendCachedStatus200Date(buf)
} else {
buf = appendStatusLine(buf, status)
buf = appendCachedDate(buf)
}

// Fast path: exactly 2 headers from Blob() (content-type, content-length).
// This is the dominant case for API responses. Skip per-header string
Expand Down Expand Up @@ -380,7 +407,7 @@ func (a *h2ResponseAdapter) WriteResponse(s *stream.Stream, status int, headers
enc.hpackBuf.Reset()
enc.hpackBuf.Write(hpackStatus200)
enc.hpackBuf.Write(ctBlock)
_ = enc.hpackEnc.WriteField(hpack.HeaderField{Name: "content-length", Value: headers[1][1]})
appendHPACKContentLength(&enc.hpackBuf, headers[1][1])
headerBlock := enc.hpackBuf.Bytes()

endStream := len(body) == 0 || s.IsHEAD
Expand Down
Loading