Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- ## [Unreleased] -->
## [Unreleased]

## [v1.11.0] - 2025-12-20
### Added
- Added full Server-Sent Events (SSE) support for both client and server:
- **Client-side**: `sse_callback` keyword argument for `HTTP.request` to parse SSE streams on
successful responses, invoking a callback with `HTTP.SSEEvent` for each event received.
- **Server-side**: `HTTP.sse_stream(response) do stream ... end` helper to write
`HTTP.SSEEvent`s and automatically close the stream when the block finishes (or use
`HTTP.sse_stream(response)` for manual management).
- `HTTP.SSEEvent` struct for representing SSE events with `data`, `event`, `id`, and `retry` fields.

## [v1.10.1] - 2023-11-28
### Changed
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "HTTP"
uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3"
authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"]
version = "1.10.19"
version = "1.11.0"

[deps]
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ HTTP.open(:GET, "https://tinyurl.com/bach-cello-suite-1-ogg") do http
end
```

Handle [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) (SSE) streams by passing an `sse_callback` function to `HTTP.request`:

```julia
events = HTTP.SSEEvent[]
HTTP.request("GET", "http://127.0.0.1:8080/events"; sse_callback = event -> push!(events, event))
```

Each callback receives an `HTTP.SSEEvent` with the parsed `data`, `event`, `id`, `retry`, and `fields` from the stream.

## Server Examples

[`HTTP.Servers.listen`](https://juliaweb.github.io/HTTP.jl/stable/index.html#HTTP.Servers.listen):
Expand Down
92 changes: 78 additions & 14 deletions docs/examples/server_sent_events.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,50 @@ loosely following [this tutorial](https://developer.mozilla.org/en-US/docs/Web/A
```julia
using HTTP, JSON

# Using sse_callback for automatic SSE parsing:
HTTP.request("GET", "http://127.0.0.1:8080/api/events"; sse_callback = (stream, event) -> begin
@info "Received event" data=event.data event_type=event.event id=event.id
end)

# Or using HTTP.open for raw streaming:
HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io
while !eof(io)
println(String(readavailable(io)))
end
end
```

### Server code:
### Server code (using HTTP.sse_stream - recommended):
"""
using HTTP, Sockets, JSON

# Simple SSE server using the HTTP.sse_stream helper
function simple_sse_server()
server = HTTP.serve!(listenany=true) do request
response = HTTP.Response(200)
# Add CORS headers for browser clients
HTTP.setheader(response, "Access-Control-Allow-Origin" => "*")

# Create SSE stream - automatically sets Content-Type and Cache-Control
HTTP.sse_stream(response) do stream
for i in 1:10
# Write a ping event with timestamp
write(stream, HTTP.SSEEvent(string(round(Int, time())); event="ping"))

# Occasionally write a data event
if rand(Bool)
write(stream, HTTP.SSEEvent(string(rand())))
end
sleep(1)
end
end

return response
end
return server
end

# More complex example with Router
const ROUTER = HTTP.Router()

function getItems(req::HTTP.Request)
Expand All @@ -62,17 +95,41 @@ function getItems(req::HTTP.Request)
return HTTP.Response(200, headers, JSON.json(rand(2)))
end

function events(stream::HTTP.Stream)
# Using HTTP.sse_stream with a request handler
function events_handler(req::HTTP.Request)
if HTTP.method(req) == "OPTIONS"
return HTTP.Response(200, [
"Access-Control-Allow-Origin" => "*",
"Access-Control-Allow-Methods" => "GET, OPTIONS"
])
end

response = HTTP.Response(200)
HTTP.setheader(response, "Access-Control-Allow-Origin" => "*")
HTTP.sse_stream(response) do stream
while true
write(stream, HTTP.SSEEvent(string(round(Int, time())); event="ping"))
if rand(Bool)
write(stream, HTTP.SSEEvent(string(rand())))
end
sleep(1)
end
end

return response
end

# Alternative: manual SSE using stream handler (lower-level approach)
function events_stream(stream::HTTP.Stream)
HTTP.setheader(stream, "Access-Control-Allow-Origin" => "*")
HTTP.setheader(stream, "Access-Control-Allow-Methods" => "GET, OPTIONS")
HTTP.setheader(stream, "Content-Type" => "text/event-stream")
HTTP.setheader(stream, "Cache-Control" => "no-cache")

if HTTP.method(stream.message) == "OPTIONS"
return nothing
end

HTTP.setheader(stream, "Content-Type" => "text/event-stream")
HTTP.setheader(stream, "Cache-Control" => "no-cache")
while true
write(stream, "event: ping\ndata: $(round(Int, time()))\n\n")
if rand(Bool)
Expand All @@ -83,23 +140,30 @@ function events(stream::HTTP.Stream)
return nothing
end

HTTP.register!(ROUTER, "GET", "/api/getItems", HTTP.streamhandler(getItems))
HTTP.register!(ROUTER, "/api/events", events)
HTTP.register!(ROUTER, "GET", "/api/getItems", getItems)
HTTP.register!(ROUTER, "GET", "/api/events", events_handler)

server = HTTP.serve!(ROUTER, "127.0.0.1", 8080; stream=true)
# Start the server in the normal request-handler mode
server = HTTP.serve!(ROUTER, "127.0.0.1", 8080)

# Julia usage
resp = HTTP.get("http://localhost:8080/api/getItems")
# To run the manual stream-handler variant instead, start a separate server:
# stream_server = HTTP.serve!(events_stream, "127.0.0.1", 8081; stream=true)

close = Ref(false)
@async HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io
while !eof(io) && !close[]
println(String(readavailable(io)))
# Julia client usage with sse_callback
stop = Ref(false)
@async begin
try
HTTP.request("GET", "http://127.0.0.1:8080/api/events"; sse_callback = (stream, event) -> begin
println("Event: ", event.event, " | Data: ", event.data)
stop[] && close(stream)
end)
catch e
# Connection closed or stopped
end
end

# run the following to stop the streaming client request
close[] = true
stop[] = true

# close the server which will stop the HTTP server from listening
close(server)
Expand Down
28 changes: 28 additions & 0 deletions docs/src/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,34 @@ end -> HTTP.Response

Where the `io` argument provided to the function body is an `HTTP.Stream` object, a custom `IO` that represents an open connection that is ready to be written to in order to send the request body, and/or read from to receive the response body. Note that `startread(io)` should be called before calling `readavailable` to ensure the response status line and headers are received and parsed appropriately. Calling `eof(io)` will return true until the response body has been completely received. Note that the returned `HTTP.Response` from `HTTP.open` will _not_ have a `.body` field since the body was read in the function body.

### Server-Sent Events

HTTP.jl can parse [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) streams directly via the `sse_callback` keyword on `HTTP.request`. When this keyword is supplied, HTTP.jl incrementally parses the incoming bytes as an event stream and invokes the callback with an `HTTP.SSEEvent` struct for every event:

```julia
using HTTP

events = HTTP.SSEEvent[]
HTTP.request("GET", "http://127.0.0.1:8080/events"; sse_callback = (stream, event) -> begin
@info "event" data=event.data id=event.id event_type=event.event retry_after=event.retry
push!(events, event)
end)
```

The callback can be `f(event)` or `f(stream, event)`. The two-argument form allows cancelling the request by calling `close(stream)` (for example, in response to a specific event).

Each callback receives a `SSEEvent` with the following fields:

- `data::String`: newline-joined `data:` payload for the event (with the trailing newline removed).
- `event::Union{Nothing,String}`: the most recent `event:` field, or `nothing` when not provided (equivalent to the default `"message"` type).
- `id::Union{Nothing,String}`: the last `id:` value observed, automatically persisted between events per the SSE specification.
- `retry::Union{Nothing,Int}`: the last `retry:` directive in milliseconds, propagated to subsequent events until another `retry:` value is parsed.
- `fields::Dict{String,String}`: newline-joined string values for every field encountered since the previous event, including custom non-standard fields.

Because HTTP.jl streams the response directly to the callback, the returned `HTTP.Response` will always have `response.body === HTTP.nobody`. The `sse_callback` keyword cannot be combined with `response_stream` or a custom `iofunction`. The callback is only invoked for non-error responses; error responses are read like a normal request, and `status_exception` behavior applies. Parsing or callback errors surface as regular request errors (`HTTP.RequestError`) with the underlying exception in `err.error`. Compressed streams are supported automatically unless `decompress=false` is explicitly set.

For a full end-to-end example, see [`docs/examples/server_sent_events.jl`](https://github.com/JuliaWeb/HTTP.jl/blob/master/docs/examples/server_sent_events.jl).

### Download

A [`download`](@ref) function is provided for similar functionality to `Downloads.download`.
Expand Down
10 changes: 9 additions & 1 deletion docs/src/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ HTTP.WebSockets.isclosed
HTTP.WebSockets.isok
```

### Server-Sent Events

```@docs
HTTP.SSEEvent
HTTP.SSEStream
HTTP.sse_stream
```

## Utilities

```@docs
Expand Down Expand Up @@ -193,4 +201,4 @@ HTTP.Parsers.parse_status_line!
HTTP.Parsers.parse_request_line!
HTTP.Parsers.parse_header_field
HTTP.Parsers.parse_chunk_size
```
```
73 changes: 72 additions & 1 deletion docs/src/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,79 @@ Lower-level core server functionality that only operates on `HTTP.Stream`. Provi

Nginx-style log formatting is supported via the [`HTTP.@logfmt_str`](@ref) macro and can be passed via the `access_log` keyword argument for [`HTTP.listen`](@ref) or [`HTTP.serve`](@ref).

## Server-Sent Events (SSE)

HTTP.jl provides built-in support for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events), a standard for pushing real-time updates from server to client over HTTP.

### Creating an SSE Response

Use [`HTTP.sse_stream`](@ref) to create an SSE stream from a response object:

```julia
using HTTP

HTTP.serve() do request
response = HTTP.Response(200)
HTTP.sse_stream(response) do stream
for i in 1:5
write(stream, HTTP.SSEEvent("Event $i"))
sleep(1)
end
end

return response
end
```

The `sse_stream` function:
1. Creates an `SSEStream` for writing events
2. Sets the response body to the stream
3. Adds required headers: `Content-Type: text/event-stream` and `Cache-Control: no-cache`
4. Uses a bounded internal buffer (configurable via `max_len`, default 16 MiB) to provide backpressure if the client is slow to read
5. Spawns a task to run the body of the do-block asynchronously
6. Closes the stream when the do-block completes

### Writing Events

Write events using `write(stream, HTTP.SSEEvent(...))`:

```julia
# Simple data-only event
write(stream, HTTP.SSEEvent("Hello, world!"))

# Event with type (for client-side addEventListener)
write(stream, HTTP.SSEEvent("User logged in"; event="login"))

# Event with ID (for client reconnection tracking)
write(stream, HTTP.SSEEvent("Message content"; id="msg-123"))

# Event with retry hint (milliseconds)
write(stream, HTTP.SSEEvent("Reconnect hint"; retry=5000))

# Event with all fields
write(stream, HTTP.SSEEvent("Full event"; event="update", id="42", retry=3000))

# Multiline data is automatically handled
write(stream, HTTP.SSEEvent("Line 1\nLine 2\nLine 3"))
```

### SSEEvent Fields

The `HTTP.SSEEvent` struct supports:
- `data::String`: The event payload (required)
- `event::Union{Nothing,String}`: Event type name (maps to `addEventListener` on client)
- `id::Union{Nothing,String}`: Event ID for reconnection tracking
- `retry::Union{Nothing,Int}`: Suggested reconnection delay in milliseconds

### Important Notes

- The do-block spawns a task where events will be written asynchronously
- The handler must return the response while events are written asynchronously
- Events will not actually be sent to the client until the handler has returned the response
- For client-side SSE consumption, see the [Client documentation](client.md#Server-Sent-Events)

## Serving on the interactive thead pool

Beginning in Julia 1.9, the main server loop is spawned on the [interactive threadpool](https://docs.julialang.org/en/v1.9/manual/multi-threading/#man-threadpools) by default. If users do a Threads.@spawn from a handler, those threaded tasks should run elsewhere and not in the interactive threadpool, keeping the web server responsive.

Note that just having a reserved interactive thread doesn’t guarantee CPU cycles, so users need to properly configure their running Julia session appropriately (i.e. ensuring non-interactive threads available to run tasks, etc).
Note that just having a reserved interactive thread doesn’t guarantee CPU cycles, so users need to properly configure their running Julia session appropriately (i.e. ensuring non-interactive threads available to run tasks, etc).
12 changes: 10 additions & 2 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ include("StatusCodes.jl") ;using .StatusCodes
include("Messages.jl") ;using .Messages
include("cookies.jl") ;using .Cookies
include("Streams.jl") ;using .Streams
include("SSE.jl") ;using .SSE

getrequest(r::Request) = r
getrequest(s::Stream) = s.message.request
Expand Down Expand Up @@ -131,10 +132,17 @@ shorthand for `HTTP.request("GET", ...)`, etc.

Supported optional keyword arguments:

- `query = nothing`, a `Pair` or `Dict` of key => values to be included in the url
- `response_stream = nothing`, a writeable `IO` stream or any `IO`-like
- `query = nothing`, a `Pair` or `Dict` of key => values to be included in the url
- `response_stream = nothing`, a writeable `IO` stream or any `IO`-like
type `T` for which `write(T, AbstractVector{UInt8})` is defined. The response body
will be written to this stream instead of returned as a `Vector{UInt8}`.
- `sse_callback = nothing`, provide a function `f(event)` or `f(stream, event)` to incrementally
consume Server-Sent Events responses. When set, HTTP.jl parses the response body as an event
stream, invokes the callback for each `HTTP.SSEEvent`, and returns the final `HTTP.Response`
with `response.body === HTTP.nobody`. The two-argument form can cancel the request by calling
`close(stream)`. The callback is only invoked for non-error responses; error responses are read
normally and `status_exception` behavior applies. This keyword is mutually exclusive with custom
`iofunction` or `response_stream` handling.
- `verbose = 0`, set to `1` or `2` for increasingly verbose logging of the
request and response process
- `connect_timeout = 30`, close the connection after this many seconds if it
Expand Down
Loading
Loading