From 0b9328f1a5b810ec799268bb816e7c3bb0763d79 Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Tue, 16 Dec 2025 09:46:11 -0700 Subject: [PATCH] Client & server support for text/event-stream, SSE --- CHANGELOG.md | 12 +- Project.toml | 2 +- README.md | 9 + docs/examples/server_sent_events.jl | 92 +++++- docs/src/client.md | 28 ++ docs/src/reference.md | 10 +- docs/src/server.md | 73 ++++- src/HTTP.jl | 12 +- src/SSE.jl | 470 ++++++++++++++++++++++++++++ src/Servers.jl | 5 +- src/Streams.jl | 6 + src/clientlayers/StreamRequest.jl | 14 +- test/client.jl | 3 +- test/runtests.jl | 1 + test/sse.jl | 355 +++++++++++++++++++++ test/websockets/autobahn.jl | 31 +- 16 files changed, 1094 insertions(+), 29 deletions(-) create mode 100644 src/SSE.jl create mode 100644 test/sse.jl diff --git a/CHANGELOG.md b/CHANGELOG.md index b32569b11..68da04e1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - +## [Unreleased] + +## [v1.11.0] - 2025-12-20 +### Added +- Added full Server-Sent Events (SSE) support for both client and server: + - **Client-side**: `sse_callback` keyword argument for `HTTP.request` to parse SSE streams on + successful responses, invoking a callback with `HTTP.SSEEvent` for each event received. + - **Server-side**: `HTTP.sse_stream(response) do stream ... end` helper to write + `HTTP.SSEEvent`s and automatically close the stream when the block finishes (or use + `HTTP.sse_stream(response)` for manual management). + - `HTTP.SSEEvent` struct for representing SSE events with `data`, `event`, `id`, and `retry` fields. ## [v1.10.1] - 2023-11-28 ### Changed diff --git a/Project.toml b/Project.toml index 114ba3a98..48793ab45 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "HTTP" uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3" authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"] -version = "1.10.19" +version = "1.11.0" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" diff --git a/README.md b/README.md index accf0e70e..6b2d796ff 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,15 @@ HTTP.open(:GET, "https://tinyurl.com/bach-cello-suite-1-ogg") do http end ``` +Handle [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) (SSE) streams by passing an `sse_callback` function to `HTTP.request`: + +```julia +events = HTTP.SSEEvent[] +HTTP.request("GET", "http://127.0.0.1:8080/events"; sse_callback = event -> push!(events, event)) +``` + +Each callback receives an `HTTP.SSEEvent` with the parsed `data`, `event`, `id`, `retry`, and `fields` from the stream. + ## Server Examples [`HTTP.Servers.listen`](https://juliaweb.github.io/HTTP.jl/stable/index.html#HTTP.Servers.listen): diff --git a/docs/examples/server_sent_events.jl b/docs/examples/server_sent_events.jl index fa3750af7..bba54f62f 100644 --- a/docs/examples/server_sent_events.jl +++ b/docs/examples/server_sent_events.jl @@ -38,6 +38,12 @@ loosely following [this tutorial](https://developer.mozilla.org/en-US/docs/Web/A ```julia using HTTP, JSON +# Using sse_callback for automatic SSE parsing: +HTTP.request("GET", "http://127.0.0.1:8080/api/events"; sse_callback = (stream, event) -> begin + @info "Received event" data=event.data event_type=event.event id=event.id +end) + +# Or using HTTP.open for raw streaming: HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io while !eof(io) println(String(readavailable(io))) @@ -45,10 +51,37 @@ HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io end ``` -### Server code: +### Server code (using HTTP.sse_stream - recommended): """ using HTTP, Sockets, JSON +# Simple SSE server using the HTTP.sse_stream helper +function simple_sse_server() + server = HTTP.serve!(listenany=true) do request + response = HTTP.Response(200) + # Add CORS headers for browser clients + HTTP.setheader(response, "Access-Control-Allow-Origin" => "*") + + # Create SSE stream - automatically sets Content-Type and Cache-Control + HTTP.sse_stream(response) do stream + for i in 1:10 + # Write a ping event with timestamp + write(stream, HTTP.SSEEvent(string(round(Int, time())); event="ping")) + + # Occasionally write a data event + if rand(Bool) + write(stream, HTTP.SSEEvent(string(rand()))) + end + sleep(1) + end + end + + return response + end + return server +end + +# More complex example with Router const ROUTER = HTTP.Router() function getItems(req::HTTP.Request) @@ -62,17 +95,41 @@ function getItems(req::HTTP.Request) return HTTP.Response(200, headers, JSON.json(rand(2))) end -function events(stream::HTTP.Stream) +# Using HTTP.sse_stream with a request handler +function events_handler(req::HTTP.Request) + if HTTP.method(req) == "OPTIONS" + return HTTP.Response(200, [ + "Access-Control-Allow-Origin" => "*", + "Access-Control-Allow-Methods" => "GET, OPTIONS" + ]) + end + + response = HTTP.Response(200) + HTTP.setheader(response, "Access-Control-Allow-Origin" => "*") + HTTP.sse_stream(response) do stream + while true + write(stream, HTTP.SSEEvent(string(round(Int, time())); event="ping")) + if rand(Bool) + write(stream, HTTP.SSEEvent(string(rand()))) + end + sleep(1) + end + end + + return response +end + +# Alternative: manual SSE using stream handler (lower-level approach) +function events_stream(stream::HTTP.Stream) HTTP.setheader(stream, "Access-Control-Allow-Origin" => "*") HTTP.setheader(stream, "Access-Control-Allow-Methods" => "GET, OPTIONS") HTTP.setheader(stream, "Content-Type" => "text/event-stream") + HTTP.setheader(stream, "Cache-Control" => "no-cache") if HTTP.method(stream.message) == "OPTIONS" return nothing end - HTTP.setheader(stream, "Content-Type" => "text/event-stream") - HTTP.setheader(stream, "Cache-Control" => "no-cache") while true write(stream, "event: ping\ndata: $(round(Int, time()))\n\n") if rand(Bool) @@ -83,23 +140,30 @@ function events(stream::HTTP.Stream) return nothing end -HTTP.register!(ROUTER, "GET", "/api/getItems", HTTP.streamhandler(getItems)) -HTTP.register!(ROUTER, "/api/events", events) +HTTP.register!(ROUTER, "GET", "/api/getItems", getItems) +HTTP.register!(ROUTER, "GET", "/api/events", events_handler) -server = HTTP.serve!(ROUTER, "127.0.0.1", 8080; stream=true) +# Start the server in the normal request-handler mode +server = HTTP.serve!(ROUTER, "127.0.0.1", 8080) -# Julia usage -resp = HTTP.get("http://localhost:8080/api/getItems") +# To run the manual stream-handler variant instead, start a separate server: +# stream_server = HTTP.serve!(events_stream, "127.0.0.1", 8081; stream=true) -close = Ref(false) -@async HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io - while !eof(io) && !close[] - println(String(readavailable(io))) +# Julia client usage with sse_callback +stop = Ref(false) +@async begin + try + HTTP.request("GET", "http://127.0.0.1:8080/api/events"; sse_callback = (stream, event) -> begin + println("Event: ", event.event, " | Data: ", event.data) + stop[] && close(stream) + end) + catch e + # Connection closed or stopped end end # run the following to stop the streaming client request -close[] = true +stop[] = true # close the server which will stop the HTTP server from listening close(server) diff --git a/docs/src/client.md b/docs/src/client.md index f523e6167..8c3e63bed 100644 --- a/docs/src/client.md +++ b/docs/src/client.md @@ -233,6 +233,34 @@ end -> HTTP.Response Where the `io` argument provided to the function body is an `HTTP.Stream` object, a custom `IO` that represents an open connection that is ready to be written to in order to send the request body, and/or read from to receive the response body. Note that `startread(io)` should be called before calling `readavailable` to ensure the response status line and headers are received and parsed appropriately. Calling `eof(io)` will return true until the response body has been completely received. Note that the returned `HTTP.Response` from `HTTP.open` will _not_ have a `.body` field since the body was read in the function body. +### Server-Sent Events + +HTTP.jl can parse [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) streams directly via the `sse_callback` keyword on `HTTP.request`. When this keyword is supplied, HTTP.jl incrementally parses the incoming bytes as an event stream and invokes the callback with an `HTTP.SSEEvent` struct for every event: + +```julia +using HTTP + +events = HTTP.SSEEvent[] +HTTP.request("GET", "http://127.0.0.1:8080/events"; sse_callback = (stream, event) -> begin + @info "event" data=event.data id=event.id event_type=event.event retry_after=event.retry + push!(events, event) +end) +``` + +The callback can be `f(event)` or `f(stream, event)`. The two-argument form allows cancelling the request by calling `close(stream)` (for example, in response to a specific event). + +Each callback receives a `SSEEvent` with the following fields: + +- `data::String`: newline-joined `data:` payload for the event (with the trailing newline removed). +- `event::Union{Nothing,String}`: the most recent `event:` field, or `nothing` when not provided (equivalent to the default `"message"` type). +- `id::Union{Nothing,String}`: the last `id:` value observed, automatically persisted between events per the SSE specification. +- `retry::Union{Nothing,Int}`: the last `retry:` directive in milliseconds, propagated to subsequent events until another `retry:` value is parsed. +- `fields::Dict{String,String}`: newline-joined string values for every field encountered since the previous event, including custom non-standard fields. + +Because HTTP.jl streams the response directly to the callback, the returned `HTTP.Response` will always have `response.body === HTTP.nobody`. The `sse_callback` keyword cannot be combined with `response_stream` or a custom `iofunction`. The callback is only invoked for non-error responses; error responses are read like a normal request, and `status_exception` behavior applies. Parsing or callback errors surface as regular request errors (`HTTP.RequestError`) with the underlying exception in `err.error`. Compressed streams are supported automatically unless `decompress=false` is explicitly set. + +For a full end-to-end example, see [`docs/examples/server_sent_events.jl`](https://github.com/JuliaWeb/HTTP.jl/blob/master/docs/examples/server_sent_events.jl). + ### Download A [`download`](@ref) function is provided for similar functionality to `Downloads.download`. diff --git a/docs/src/reference.md b/docs/src/reference.md index 9a9630bc4..efab5fc28 100644 --- a/docs/src/reference.md +++ b/docs/src/reference.md @@ -89,6 +89,14 @@ HTTP.WebSockets.isclosed HTTP.WebSockets.isok ``` +### Server-Sent Events + +```@docs +HTTP.SSEEvent +HTTP.SSEStream +HTTP.sse_stream +``` + ## Utilities ```@docs @@ -193,4 +201,4 @@ HTTP.Parsers.parse_status_line! HTTP.Parsers.parse_request_line! HTTP.Parsers.parse_header_field HTTP.Parsers.parse_chunk_size -``` \ No newline at end of file +``` diff --git a/docs/src/server.md b/docs/src/server.md index 17e62a76b..a81ddb5f9 100644 --- a/docs/src/server.md +++ b/docs/src/server.md @@ -108,8 +108,79 @@ Lower-level core server functionality that only operates on `HTTP.Stream`. Provi Nginx-style log formatting is supported via the [`HTTP.@logfmt_str`](@ref) macro and can be passed via the `access_log` keyword argument for [`HTTP.listen`](@ref) or [`HTTP.serve`](@ref). +## Server-Sent Events (SSE) + +HTTP.jl provides built-in support for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events), a standard for pushing real-time updates from server to client over HTTP. + +### Creating an SSE Response + +Use [`HTTP.sse_stream`](@ref) to create an SSE stream from a response object: + +```julia +using HTTP + +HTTP.serve() do request + response = HTTP.Response(200) + HTTP.sse_stream(response) do stream + for i in 1:5 + write(stream, HTTP.SSEEvent("Event $i")) + sleep(1) + end + end + + return response +end +``` + +The `sse_stream` function: +1. Creates an `SSEStream` for writing events +2. Sets the response body to the stream +3. Adds required headers: `Content-Type: text/event-stream` and `Cache-Control: no-cache` +4. Uses a bounded internal buffer (configurable via `max_len`, default 16 MiB) to provide backpressure if the client is slow to read +5. Spawns a task to run the body of the do-block asynchronously +6. Closes the stream when the do-block completes + +### Writing Events + +Write events using `write(stream, HTTP.SSEEvent(...))`: + +```julia +# Simple data-only event +write(stream, HTTP.SSEEvent("Hello, world!")) + +# Event with type (for client-side addEventListener) +write(stream, HTTP.SSEEvent("User logged in"; event="login")) + +# Event with ID (for client reconnection tracking) +write(stream, HTTP.SSEEvent("Message content"; id="msg-123")) + +# Event with retry hint (milliseconds) +write(stream, HTTP.SSEEvent("Reconnect hint"; retry=5000)) + +# Event with all fields +write(stream, HTTP.SSEEvent("Full event"; event="update", id="42", retry=3000)) + +# Multiline data is automatically handled +write(stream, HTTP.SSEEvent("Line 1\nLine 2\nLine 3")) +``` + +### SSEEvent Fields + +The `HTTP.SSEEvent` struct supports: +- `data::String`: The event payload (required) +- `event::Union{Nothing,String}`: Event type name (maps to `addEventListener` on client) +- `id::Union{Nothing,String}`: Event ID for reconnection tracking +- `retry::Union{Nothing,Int}`: Suggested reconnection delay in milliseconds + +### Important Notes + +- The do-block spawns a task where events will be written asynchronously +- The handler must return the response while events are written asynchronously +- Events will not actually be sent to the client until the handler has returned the response +- For client-side SSE consumption, see the [Client documentation](client.md#Server-Sent-Events) + ## Serving on the interactive thead pool Beginning in Julia 1.9, the main server loop is spawned on the [interactive threadpool](https://docs.julialang.org/en/v1.9/manual/multi-threading/#man-threadpools) by default. If users do a Threads.@spawn from a handler, those threaded tasks should run elsewhere and not in the interactive threadpool, keeping the web server responsive. -Note that just having a reserved interactive thread doesn’t guarantee CPU cycles, so users need to properly configure their running Julia session appropriately (i.e. ensuring non-interactive threads available to run tasks, etc). \ No newline at end of file +Note that just having a reserved interactive thread doesn’t guarantee CPU cycles, so users need to properly configure their running Julia session appropriately (i.e. ensuring non-interactive threads available to run tasks, etc). diff --git a/src/HTTP.jl b/src/HTTP.jl index cbbf1be70..41f91fc37 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -44,6 +44,7 @@ include("StatusCodes.jl") ;using .StatusCodes include("Messages.jl") ;using .Messages include("cookies.jl") ;using .Cookies include("Streams.jl") ;using .Streams +include("SSE.jl") ;using .SSE getrequest(r::Request) = r getrequest(s::Stream) = s.message.request @@ -131,10 +132,17 @@ shorthand for `HTTP.request("GET", ...)`, etc. Supported optional keyword arguments: - - `query = nothing`, a `Pair` or `Dict` of key => values to be included in the url - - `response_stream = nothing`, a writeable `IO` stream or any `IO`-like +- `query = nothing`, a `Pair` or `Dict` of key => values to be included in the url +- `response_stream = nothing`, a writeable `IO` stream or any `IO`-like type `T` for which `write(T, AbstractVector{UInt8})` is defined. The response body will be written to this stream instead of returned as a `Vector{UInt8}`. +- `sse_callback = nothing`, provide a function `f(event)` or `f(stream, event)` to incrementally + consume Server-Sent Events responses. When set, HTTP.jl parses the response body as an event + stream, invokes the callback for each `HTTP.SSEEvent`, and returns the final `HTTP.Response` + with `response.body === HTTP.nobody`. The two-argument form can cancel the request by calling + `close(stream)`. The callback is only invoked for non-error responses; error responses are read + normally and `status_exception` behavior applies. This keyword is mutually exclusive with custom + `iofunction` or `response_stream` handling. - `verbose = 0`, set to `1` or `2` for increasingly verbose logging of the request and response process - `connect_timeout = 30`, close the connection after this many seconds if it diff --git a/src/SSE.jl b/src/SSE.jl new file mode 100644 index 000000000..4592e4357 --- /dev/null +++ b/src/SSE.jl @@ -0,0 +1,470 @@ +module SSE + +export SSEEvent, SSEStream, sse_stream + +using CodecZlib +using SimpleBufferStream: BufferStream +using ..IOExtras, ..Messages, ..Streams, ..Strings +using ..Exceptions: @try +import ..HTTP + +const LF = UInt8('\n') +const CR = UInt8('\r') +const COLON = UInt8(':') +const SPACE = UInt8(' ') +const EMPTY_LINE = UInt8[] +const DEFAULT_SSE_STREAM_MAX_LEN = 16 * 1024 * 1024 # 16 MiB + +struct SSEEvent + data::String + event::Union{Nothing,String} + id::Union{Nothing,String} + retry::Union{Nothing,Int} + fields::Dict{String,String} +end + +""" + SSEEvent(data; event=nothing, id=nothing, retry=nothing) + +Construct an SSE event for server-side emission. + +# Arguments +- `data::AbstractString`: The event data (required) +- `event::Union{Nothing,AbstractString}=nothing`: Optional event type name +- `id::Union{Nothing,AbstractString}=nothing`: Optional event ID +- `retry::Union{Nothing,Integer}=nothing`: Optional retry timeout in milliseconds +""" +function SSEEvent(data::AbstractString; + event::Union{Nothing,AbstractString}=nothing, + id::Union{Nothing,AbstractString}=nothing, + retry::Union{Nothing,Integer}=nothing) + return SSEEvent( + String(data), + event === nothing ? nothing : String(event), + id === nothing ? nothing : String(id), + retry === nothing ? nothing : Int(retry), + Dict{String,String}() + ) +end + +# Server-side SSE support + +""" + SSEStream <: IO + +A stream for writing Server-Sent Events (SSE) to an HTTP response. + +Create an SSEStream using [`sse_stream`](@ref) which sets up the response +with the correct content type and returns the stream for writing events. + +# Example +```julia +HTTP.serve() do request + response = HTTP.Response(200) + HTTP.sse_stream(response) do stream + for i in 1:5 + write(stream, HTTP.SSEEvent("Event \$i")) + sleep(1) + end + end + return response +end +``` +""" +struct SSEStream <: IO + buffer::BufferStream + iobuffer::IOBuffer + lock::ReentrantLock +end + +SSEStream(; max_len::Integer=DEFAULT_SSE_STREAM_MAX_LEN) = + SSEStream(BufferStream(Int(max_len)), IOBuffer(), ReentrantLock()) + +Base.isopen(s::SSEStream) = isopen(s.buffer) +Base.eof(s::SSEStream) = eof(s.buffer) +Base.close(s::SSEStream) = close(s.buffer) +Base.readavailable(s::SSEStream) = readavailable(s.buffer) + +""" + write(stream::SSEStream, event::SSEEvent) + +Write an SSE event to the stream in the standard SSE wire format. + +The event is formatted according to the SSE specification: +- `event:` line (if event type is set) +- `id:` line (if id is set) +- `retry:` line (if retry is set) +- `data:` lines (one per line in the data, supporting multiline data) +- Empty line to signal end of event +""" +function Base.write(s::SSEStream, event::SSEEvent) + bytes = lock(s.lock) do + buf = s.iobuffer + truncate(buf, 0) + if event.event !== nothing + write(buf, "event: ", event.event, "\n") + end + if event.id !== nothing + write(buf, "id: ", event.id, "\n") + end + if event.retry !== nothing + write(buf, "retry: ", string(event.retry), "\n") + end + for line in split(event.data, '\n'; keepempty=true) + write(buf, "data: ", line, "\n") + end + write(buf, "\n") + return take!(buf) + end + return write(s.buffer, bytes) +end + +""" + sse_stream(response::HTTP.Response; max_len=16*1024*1024) -> SSEStream + sse_stream(response::HTTP.Response, f::Function; max_len=16*1024*1024) -> SSEStream + +Create an SSEStream and configure the response for Server-Sent Events. + +This function: +1. Creates an SSEStream +2. Sets it as the response body +3. Adds the `Content-Type: text/event-stream` header +4. Adds `Cache-Control: no-cache` header (recommended for SSE) + +The `do`-block form runs the writer in a background task and closes the stream +when `f` completes. + +# Example +```julia +HTTP.serve() do request + response = HTTP.Response(200) + HTTP.sse_stream(response) do stream + for i in 1:5 + write(stream, HTTP.SSEEvent("Event \$i")) + sleep(1) + end + end + return response +end +``` +""" +function sse_stream(response::Response; max_len::Integer=DEFAULT_SSE_STREAM_MAX_LEN) + stream = SSEStream(; max_len=max_len) + response.body = stream + Messages.setheader(response, "Content-Type" => "text/event-stream") + Messages.setheader(response, "Cache-Control" => "no-cache") + return stream +end + +function sse_stream(response::Response, f::Function; max_len::Integer=DEFAULT_SSE_STREAM_MAX_LEN) + stream = sse_stream(response; max_len=max_len) + Threads.@spawn begin + try + f(stream) + catch err + err isa InterruptException && rethrow() + @error "SSE stream handler error" exception=(err, catch_backtrace()) + finally + close(stream) + end + end + return stream +end + +function sse_stream(f::Function, response::Response; max_len::Integer=DEFAULT_SSE_STREAM_MAX_LEN) + return sse_stream(response, f; max_len=max_len) +end + +mutable struct SSEState + data_lines::Vector{String} + has_data::Bool + bom_checked::Bool + saw_evidence::Bool + event_name::Union{Nothing,String} + last_event_id::Union{Nothing,String} + last_retry::Union{Nothing,Int} + fields::Dict{String,Vector{String}} +end + +function SSEState() + return SSEState(String[], false, false, false, nothing, nothing, nothing, Dict{String,Vector{String}}()) +end + +function handle_sse_stream(stream::Stream{<:Response}, callback::Function; + decompress::Union{Nothing,Bool}=nothing, context_lock::Union{Nothing,ReentrantLock}=nothing) + callback isa Function || throw(ArgumentError("`sse_callback` must be a callable")) + response = stream.message + io, tsk = wrap_stream(stream, response, decompress) + response.body = HTTP.nobody + wrapped_callback = wrap_callback(callback, stream) + bytes_read = 0 + try + bytes_read = parse_stream!(io, wrapped_callback) + tsk === nothing || wait(tsk) + catch + @try Base.IOError EOFError close(stream) + tsk === nothing || @try Base.IOError EOFError wait(tsk) + rethrow() + end + if context_lock === nothing + response.request.context[:nbytes] = get(response.request.context, :nbytes, 0) + bytes_read + else + Base.@lock context_lock begin + response.request.context[:nbytes] = get(response.request.context, :nbytes, 0) + bytes_read + end + end + return +end + +function ensure_sse_content_type(response::Response) + ctype = header(response, "Content-Type") + isempty(ctype) && throw(ErrorException("Response Content-Type is not text/event-stream")) + base = strip(first(split(ctype, ';'))) + ascii_lc_isequal(base, "text/event-stream") || + throw(ErrorException("Response Content-Type is not text/event-stream")) +end + +function wrap_callback(callback::Function, stream::Stream{<:Response}) + dummy = SSEEvent("", nothing, nothing, nothing, Dict{String,String}()) + if applicable(callback, stream, dummy) + return event -> callback(stream, event) + else + return callback + end +end + +function wrap_stream(stream::Stream{<:Response}, response::Response, + decompress::Union{Nothing,Bool}) + encoding = header(response, "Content-Encoding") + should_decompress = decompress === true || + (decompress === nothing && ascii_lc_isequal(encoding, "gzip")) + if should_decompress + buf = BufferStream() + gzstream = GzipDecompressorStream(buf) + tsk = @async begin + try + write(gzstream, stream) + finally + close(gzstream) + end + end + return buf, tsk + else + return stream, nothing + end +end + +function parse_stream!(io, callback::Function) + state = SSEState() + partial = Vector{UInt8}() + total = 0 + detect_bytes = 8 * 1024 + while !eof(io) + chunk = readavailable(io) + isempty(chunk) && continue + total += length(chunk) + process_chunk!(state, partial, chunk, callback) + if total >= detect_bytes && !state.saw_evidence && !looks_like_sse_prefix(partial) + throw(ErrorException("Response does not appear to be a Server-Sent Events stream")) + end + end + if !isempty(partial) + process_line!(state, partial, callback) + empty!(partial) + end + total > 0 && !state.saw_evidence && + throw(ErrorException("Response does not appear to be a Server-Sent Events stream")) + dispatch_pending!(state, callback) + return total +end + +function looks_like_sse_prefix(partial::Vector{UInt8}) + isempty(partial) && return false + idx = 1 + if length(partial) >= 3 && partial[1] == 0xEF && partial[2] == 0xBB && partial[3] == 0xBF + idx = 4 + end + idx > length(partial) && return false + b = partial[idx] + b == COLON && return true + return startswith_known_field(partial, idx) +end + +function startswith_known_field(bytes::Vector{UInt8}, idx::Int) + remaining = length(bytes) - idx + 1 + remaining >= 4 && bytes[idx] == UInt8('d') && bytes[idx + 1] == UInt8('a') && + bytes[idx + 2] == UInt8('t') && bytes[idx + 3] == UInt8('a') && return true + remaining >= 5 && bytes[idx] == UInt8('e') && bytes[idx + 1] == UInt8('v') && + bytes[idx + 2] == UInt8('e') && bytes[idx + 3] == UInt8('n') && bytes[idx + 4] == UInt8('t') && + return true + remaining >= 2 && bytes[idx] == UInt8('i') && bytes[idx + 1] == UInt8('d') && return true + remaining >= 5 && bytes[idx] == UInt8('r') && bytes[idx + 1] == UInt8('e') && + bytes[idx + 2] == UInt8('t') && bytes[idx + 3] == UInt8('r') && bytes[idx + 4] == UInt8('y') && + return true + return false +end + +function process_chunk!(state::SSEState, partial::Vector{UInt8}, chunk::AbstractVector{UInt8}, callback::Function) + start = 1 + len = length(chunk) + while start <= len + nl = findnext(isequal(LF), chunk, start) + nl === nothing && break + stop = nl - 1 + if isempty(partial) + if stop >= start + process_line!(state, @view(chunk[start:stop]), callback) + else + process_line!(state, EMPTY_LINE, callback) + end + else + if stop >= start + append!(partial, @view(chunk[start:stop])) + end + process_line!(state, partial, callback) + empty!(partial) + end + start = nl + 1 + end + if start <= len + append!(partial, @view(chunk[start:len])) + end +end + +function process_line!(state::SSEState, raw::AbstractVector{UInt8}, callback::Function) + line_len = trim_length(raw) + if line_len == 0 + dispatch_event!(state, callback) + return + end + line = line_len == length(raw) ? raw : @view raw[1:line_len] + if !state.bom_checked + state.bom_checked = true + if length(line) >= 3 && line[1] == 0xEF && line[2] == 0xBB && line[3] == 0xBF + line = length(line) == 3 ? EMPTY_LINE : @view line[4:length(line)] + isempty(line) && return + end + end + firstbyte = line[1] + if firstbyte == COLON + state.saw_evidence = true + return + end + colon_idx = findfirst(isequal(COLON), line) + if colon_idx === nothing + field = bytes_to_string(line) + value = "" + else + field = colon_idx == 1 ? "" : bytes_to_string(@view line[1:colon_idx-1]) + value_slice = colon_idx == length(line) ? EMPTY_LINE : @view line[colon_idx+1:length(line)] + if !isempty(value_slice) && value_slice[1] == SPACE + value_slice = length(value_slice) == 1 ? EMPTY_LINE : @view value_slice[2:length(value_slice)] + end + value = bytes_to_string(value_slice) + end + process_field!(state, field, value) +end + +function trim_length(bytes::AbstractVector{UInt8}) + len = length(bytes) + while len > 0 && bytes[len] == CR + len -= 1 + end + return len +end + +function bytes_to_string(bytes::AbstractVector{UInt8}) + isempty(bytes) && return "" + try + return String(bytes) + catch e + if e isa ArgumentError + throw(ErrorException("SSE stream emitted invalid UTF-8 data")) + end + rethrow() + end +end + +function process_field!(state::SSEState, field::String, value::String) + if field == "data" + state.saw_evidence = true + append_field!(state, field, value) + push!(state.data_lines, value) + state.has_data = true + elseif field == "event" + state.saw_evidence = true + append_field!(state, field, value) + state.event_name = value + elseif field == "id" + state.saw_evidence = true + occursin('\0', value) && return + append_field!(state, field, value) + state.last_event_id = value + elseif field == "retry" + state.saw_evidence = true + retry = parse_retry(value) + append_field!(state, field, value) + # Only update last_retry if the value was valid + retry !== nothing && (state.last_retry = retry) + else + append_field!(state, field, value) + end +end + +function append_field!(state::SSEState, field::String, value::String) + vec = get!(state.fields, field) do + String[] + end + push!(vec, value) +end + +function parse_retry(value::String) + # Per SSE spec, if the value is not a valid integer, ignore the field + retry = tryparse(Int, strip(value)) + # Also ignore negative values per spec + (retry === nothing || retry < 0) && return nothing + return retry +end + +function dispatch_event!(state::SSEState, callback::Function) + if state.has_data + emit_event(state, callback) + end + reset_current_event!(state) +end + +function emit_event(state::SSEState, callback::Function) + data = isempty(state.data_lines) ? "" : join(state.data_lines, "\n") + fields = build_fields(state) + event = SSEEvent(data, state.event_name, state.last_event_id, state.last_retry, fields) + callback(event) +end + +function build_fields(state::SSEState) + out = Dict{String,String}() + for (k, entries) in state.fields + isempty(entries) && continue + out[k] = length(entries) == 1 ? entries[1] : join(entries, "\n") + end + return out +end + +function reset_current_event!(state::SSEState) + empty!(state.data_lines) + state.has_data = false + state.event_name = nothing + for vec in values(state.fields) + empty!(vec) + end +end + +function dispatch_pending!(state::SSEState, callback::Function) + if state.has_data + emit_event(state, callback) + reset_current_event!(state) + else + reset_current_event!(state) + end +end + +end # module diff --git a/src/Servers.jl b/src/Servers.jl index 868431548..be03d1a86 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -479,13 +479,14 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log, catch e # The remote can close the stream whenever it wants to, but there's nothing # anyone can do about it on this side. No reason to log an error in that case. - level = e isa Base.IOError && !isopen(c) ? Logging.Debug : Logging.Error + client_closed = e isa Base.IOError && e.code in (Base.UV_EPIPE, Base.UV_ECONNRESET, Base.UV_ECONNABORTED) + level = (e isa Base.IOError && !isopen(c)) || client_closed ? Logging.Debug : Logging.Error @logmsgv 1 level begin msg = current_exceptions_to_string() "handle_connection handler error. $msg" end request - if isopen(http) && !iswritable(http) + if !client_closed && isopen(http) && !iswritable(http) request.response.status = 500 startwrite(http) closewrite(http) diff --git a/src/Streams.jl b/src/Streams.jl index 26ce305b3..46ea4b988 100644 --- a/src/Streams.jl +++ b/src/Streams.jl @@ -54,6 +54,12 @@ end IOExtras.isopen(http::Stream) = isopen(http.stream) +function Base.close(http::Stream) + http.ntoread = 0 + @try Base.IOError close(http.stream) + return +end + # Writing HTTP Messages messagetowrite(http::Stream{<:Response}) = http.message.request::Request diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index 9d8c48a40..29d242dfc 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -1,6 +1,6 @@ module StreamRequest -using ..IOExtras, ..Messages, ..Streams, ..Connections, ..Strings, ..RedirectRequest, ..Exceptions +using ..IOExtras, ..Messages, ..Streams, ..Connections, ..Strings, ..RedirectRequest, ..Exceptions, ..SSE using CodecZlib, URIs using SimpleBufferStream: BufferStream using ConcurrentUtilities: @samethreadpool_spawn @@ -18,9 +18,13 @@ immediately so that the transmission can be aborted if the `Response` status indicates that the server does not wish to receive the message body. [RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5). """ -function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, logtag=nothing, timedout=nothing, kw...)::Response +function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, + logerrors::Bool=false, logtag=nothing, timedout=nothing, sse_callback=nothing, kw...)::Response response = stream.message req = response.request + if sse_callback !== nothing && (iofunction !== nothing || !isbytes(response.body)) + throw(ArgumentError("`sse_callback` cannot be combined with `response_stream` or a custom `iofunction`")) + end @debug sprintcompact(req) @debug "client startwrite" write_start = time() @@ -51,7 +55,11 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi @debug "client startread" startread(stream) if !isaborted(stream) - readbody(stream, response, decompress, lock) + if sse_callback !== nothing && !iserror(stream.message) + SSE.handle_sse_stream(stream, sse_callback; decompress=decompress, context_lock=lock) + else + readbody(stream, response, decompress, lock) + end end finally Base.@lock lock begin diff --git a/test/client.jl b/test/client.jl index ad76ad9f6..6494941f5 100644 --- a/test/client.jl +++ b/test/client.jl @@ -366,7 +366,8 @@ end @test_throws HTTP.TimeoutError begin HTTP.get("http://$httpbin/delay/5"; readtimeout=1, retry=false) end - HTTP.get("http://$httpbin/delay/1"; readtimeout=2, retry=false) + # allow extra slack for external server/network jitter + HTTP.get("http://$httpbin/delay/1"; readtimeout=10, retry=false) end @testset "connect_timeout does not include the time needed to acquire a connection from the pool" begin diff --git a/test/runtests.jl b/test/runtests.jl index d8cd6c2c7..a7d10f903 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -16,6 +16,7 @@ isok(r) = r.status == 200 "chunking.jl", "utils.jl", "client.jl", + "sse.jl", # "download.jl", "multipart.jl", "parsemultipart.jl", diff --git a/test/sse.jl b/test/sse.jl new file mode 100644 index 000000000..49203e843 --- /dev/null +++ b/test/sse.jl @@ -0,0 +1,355 @@ +module test_sse + +using HTTP, Test, CodecZlib + +const HOST = "127.0.0.1" + +function with_sse_server(f::Function, chunks::Vector; content_type::AbstractString="text/event-stream", + status::Int=200, headers::Vector{Pair{String,String}}=Pair{String,String}[]) + stream_handler = stream -> begin + HTTP.setstatus(stream, status) + HTTP.setheader(stream, "Content-Type" => content_type) + HTTP.setheader(stream, "Cache-Control" => "no-cache") + for h in headers + HTTP.setheader(stream, h) + end + startwrite(stream) + for chunk in chunks + write(stream, chunk) + end + end + server = HTTP.serve!(stream_handler; stream=true, listenany=true) + try + port = HTTP.port(server) + return f(port) + finally + close(server) + end +end + +@testset "Server Sent Events" begin + payload = [ + "data: hello\n", + "data: world\n", + "id: 42\n", + "\n", + ": keep-alive\n", + "event: update\n", + "data: done\n", + "retry: 1500\n", + "foo: bar\n", + "foo: baz\n", + "\n", + "data: closing\n" + ] + with_sse_server(payload) do port + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/stream"; sse_callback = event -> push!(events, event)) + @test resp.status == 200 + @test resp.body === HTTP.nobody + @test length(events) == 3 + first_event, second_event, third_event = events + @test first_event.event === nothing + @test first_event.id == "42" + @test first_event.retry === nothing + @test first_event.data == "hello\nworld" + @test first_event.fields["data"] == "hello\nworld" + @test first_event.fields["id"] == "42" + @test !haskey(first_event.fields, "retry") + @test second_event.event == "update" + @test second_event.id == "42" + @test second_event.retry == 1500 + @test second_event.fields["foo"] == "bar\nbaz" + @test second_event.fields["retry"] == "1500" + @test third_event.event === nothing + @test third_event.data == "closing" + @test third_event.retry == 1500 + @test third_event.id == "42" + end + + with_sse_server(["data: hi\n", "\n"], content_type="text/plain") do port + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/stream"; sse_callback = event -> push!(events, event)) + @test resp.status == 200 + @test resp.body === HTTP.nobody + @test length(events) == 1 + @test events[1].data == "hi" + end + + with_sse_server(["{\"error\":\"nope\"}"], content_type="application/json", status=400) do port + captured = Ref{Union{Nothing,HTTP.Request}}(nothing) + called = Ref(false) + capturelayer(handler) = function(req; kw...) + captured[] = req + return handler(req; kw...) + end + stack = HTTP.stack(false, [capturelayer]) + pool = HTTP.Pool(1) + err = try + HTTP.request(stack, "GET", "http://$HOST:$port/stream"; sse_callback = _ -> (called[] = true), pool=pool) + nothing + catch e + e + end + @test err isa HTTP.StatusError + @test called[] == false + @test captured[] !== nothing + resp = captured[].response + @test resp.status == 400 + @test String(resp.body) == "{\"error\":\"nope\"}" + end + + with_sse_server(["data: boom\n", "\n"]) do port + err = try + HTTP.request("GET", "http://$HOST:$port/stream"; sse_callback = _ -> error("boom")) + nothing + catch e + e + end + @test err isa HTTP.RequestError + @test err.error isa ErrorException + @test occursin("boom", sprint(showerror, err.error)) + @test err.request.response.body === HTTP.nobody + end + + # UTF-8 BOM at the start of the stream should be ignored + with_sse_server([UInt8[0xEF], UInt8[0xBB], UInt8[0xBF], "data: bom\n\n"]) do port + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/stream"; sse_callback = e -> push!(events, e)) + @test resp.status == 200 + @test length(events) == 1 + @test events[1].data == "bom" + end + + # Per SSE spec, invalid retry values should be ignored, not cause an error + with_sse_server(["retry: nope\n", "data: test\n", "\n"]) do port + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/stream"; sse_callback = e -> push!(events, e)) + @test resp.status == 200 + @test length(events) == 1 + @test events[1].data == "test" + @test events[1].retry === nothing # Invalid retry was ignored + end + + # Also test that negative retry values are ignored + with_sse_server(["retry: -100\n", "data: test\n", "\n"]) do port + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/stream"; sse_callback = e -> push!(events, e)) + @test resp.status == 200 + @test length(events) == 1 + @test events[1].retry === nothing # Negative retry was ignored + end + + compressed = read(GzipCompressorStream(IOBuffer("data: zipped\n\n"))) + with_sse_server([compressed]; headers=["Content-Encoding" => "gzip"]) do port + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/stream"; sse_callback = e -> push!(events, e)) + @test resp.status == 200 + @test length(events) == 1 + @test events[1].data == "zipped" + end + + with_sse_server(String[]) do port + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/stream"; sse_callback = e -> push!(events, e)) + @test resp.status == 200 + @test isempty(events) + end + + with_sse_server(["data: body\n", "\n"]) do port + io = IOBuffer() + err = try + HTTP.request("GET", "http://$HOST:$port/stream"; response_stream=io, sse_callback = _ -> nothing) + nothing + catch e + e + end + @test err isa HTTP.RequestError + @test err.error isa ArgumentError + + err = try + HTTP.request("GET", "http://$HOST:$port/stream"; iofunction = _ -> nothing, sse_callback = _ -> nothing) + nothing + catch e + e + end + @test err isa HTTP.RequestError + @test err.error isa ArgumentError + end + + @testset "Client cancellation" begin + server = HTTP.serve!(listenany=true) do request + response = HTTP.Response(200) + HTTP.sse_stream(response) do stream + write(stream, HTTP.SSEEvent("first")) + sleep(0.1) + write(stream, HTTP.SSEEvent("second")) + end + return response + end + + try + port = HTTP.port(server) + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/"; sse_callback = (s, e) -> begin + push!(events, e) + close(s) + end) + @test resp.status == 200 + @test resp.body === HTTP.nobody + @test length(events) == 1 + @test events[1].data == "first" + finally + close(server) + end + end +end + +@testset "Server-Side SSE" begin + # Test SSEEvent constructor with keyword arguments + @testset "SSEEvent constructor" begin + # Simple event with just data + e = HTTP.SSEEvent("hello") + @test e.data == "hello" + @test e.event === nothing + @test e.id === nothing + @test e.retry === nothing + + # Event with all optional fields + e = HTTP.SSEEvent("data"; event="message", id="123", retry=5000) + @test e.data == "data" + @test e.event == "message" + @test e.id == "123" + @test e.retry == 5000 + end + + # Test SSEStream write formatting + @testset "SSEStream write formatting" begin + stream = HTTP.SSEStream() + + # Simple data event + write(stream, HTTP.SSEEvent("hello")) + @test String(readavailable(stream)) == "data: hello\n\n" + + # Event with event type + write(stream, HTTP.SSEEvent("data"; event="update")) + @test String(readavailable(stream)) == "event: update\ndata: data\n\n" + + # Event with id + write(stream, HTTP.SSEEvent("data"; id="42")) + @test String(readavailable(stream)) == "id: 42\ndata: data\n\n" + + # Event with retry + write(stream, HTTP.SSEEvent("data"; retry=3000)) + @test String(readavailable(stream)) == "retry: 3000\ndata: data\n\n" + + # Event with all fields + write(stream, HTTP.SSEEvent("payload"; event="msg", id="99", retry=1000)) + @test String(readavailable(stream)) == "event: msg\nid: 99\nretry: 1000\ndata: payload\n\n" + + # Multiline data + write(stream, HTTP.SSEEvent("line1\nline2\nline3")) + @test String(readavailable(stream)) == "data: line1\ndata: line2\ndata: line3\n\n" + + close(stream) + end + + # Test sse_stream helper + @testset "sse_stream helper" begin + response = HTTP.Response(200) + stream = HTTP.sse_stream(response) + + @test response.body === stream + @test HTTP.header(response, "Content-Type") == "text/event-stream" + @test HTTP.header(response, "Cache-Control") == "no-cache" + + close(stream) + end + + # Integration test: server-side SSE with client consumption + @testset "Server-side SSE integration" begin + server = HTTP.serve!(listenany=true) do request + response = HTTP.Response(200) + HTTP.sse_stream(response) do stream + write(stream, HTTP.SSEEvent("first")) + write(stream, HTTP.SSEEvent("second"; event="update", id="2")) + write(stream, HTTP.SSEEvent("multi\nline\ndata")) + end + return response + end + + try + port = HTTP.port(server) + events = HTTP.SSEEvent[] + resp = HTTP.request("GET", "http://$HOST:$port/"; sse_callback = event -> push!(events, event)) + + @test resp.status == 200 + @test resp.body === HTTP.nobody + @test length(events) == 3 + + @test events[1].data == "first" + @test events[1].event === nothing + @test events[1].id === nothing + + @test events[2].data == "second" + @test events[2].event == "update" + @test events[2].id == "2" + + @test events[3].data == "multi\nline\ndata" + finally + close(server) + end + end + + # Test that retry values are preserved through roundtrip + @testset "SSE retry roundtrip" begin + server = HTTP.serve!(listenany=true) do request + response = HTTP.Response(200) + HTTP.sse_stream(response) do stream + write(stream, HTTP.SSEEvent("test"; retry=2500)) + end + return response + end + + try + port = HTTP.port(server) + events = HTTP.SSEEvent[] + HTTP.request("GET", "http://$HOST:$port/"; sse_callback = event -> push!(events, event)) + + @test length(events) == 1 + @test events[1].retry == 2500 + finally + close(server) + end + end + + # Test streaming multiple events over time + @testset "SSE streaming timing" begin + server = HTTP.serve!(listenany=true) do request + response = HTTP.Response(200) + HTTP.sse_stream(response) do stream + for i in 1:3 + write(stream, HTTP.SSEEvent("event $i"; id=string(i))) + sleep(0.05) # Small delay between events + end + end + return response + end + + try + port = HTTP.port(server) + events = HTTP.SSEEvent[] + HTTP.request("GET", "http://$HOST:$port/"; sse_callback = event -> push!(events, event)) + + @test length(events) == 3 + for i in 1:3 + @test events[i].data == "event $i" + @test events[i].id == string(i) + end + finally + close(server) + end + end +end + +end # module diff --git a/test/websockets/autobahn.jl b/test/websockets/autobahn.jl index 8aea00ce0..af98217b6 100644 --- a/test/websockets/autobahn.jl +++ b/test/websockets/autobahn.jl @@ -77,23 +77,48 @@ end end # @testset "Autobahn testsuite" @testset "Server" begin - server = WebSockets.listen!(9002; suppress_close_error=true) do ws + server_host = Sys.islinux() ? Sockets.localhost : "0.0.0.0" + server_url = Sys.islinux() ? "ws://127.0.0.1:9002" : "ws://host.docker.internal:9002" + config_dir = Sys.islinux() ? joinpath(DIR, "config") : mktempdir() + config_path = joinpath(config_dir, "fuzzingclient.json") + if !Sys.islinux() + cfg = Dict( + "outdir" => "./reports/server", + "servers" => [Dict("agent" => "main", "url" => server_url)], + "cases" => ["*"], + "exclude-cases" => ["9.*"], + "exclude-agent-cases" => Dict{String,Any}() + ) + write(config_path, JSON.json(cfg)) + end + docker_net = Sys.islinux() ? ["--net=host"] : String[] + server = WebSockets.listen!(server_host, 9002; suppress_close_error=true) do ws for msg in ws send(ws, msg) end end try _remove_report(DIR, "server/index.json") - @test success(run(Cmd(`docker run --rm --net="host" -v "$DIR/config:/config" -v "$DIR/reports:/reports" --name fuzzingclient crossbario/autobahn-testsuite wstest -m fuzzingclient -s /config/fuzzingclient.json`; dir=DIR), stdin, stdout, stdout; wait=false)) + docker_args = vcat( + "docker", + "run", + "--rm", + docker_net, + ["-v", "$config_dir:/config", "-v", "$DIR/reports:/reports", "--name", "fuzzingclient", + "crossbario/autobahn-testsuite", "wstest", "-m", "fuzzingclient", "-s", "/config/fuzzingclient.json"] + ) + docker_cmd = Cmd(Cmd(docker_args); dir=DIR) + @test success(run(docker_cmd, stdin, stdout, stdout; wait=false)) report = JSON.parsefile(joinpath(DIR, "reports/server/index.json")) for (k, v) in pairs(report["main"]) @test v["behavior"] in ("OK", "NON-STRICT", "INFORMATIONAL", "UNIMPLEMENTED") end finally close(server) + Sys.islinux() || rm(config_dir; recursive=true, force=true) end end end # @testset "WebSockets" -end # 64-bit only \ No newline at end of file +end # 64-bit only