From 085c802113c3bde495a11fe2723a1eeaa7671abb Mon Sep 17 00:00:00 2001 From: Cameron Kingsbury Date: Fri, 27 Feb 2026 14:56:45 -0500 Subject: [PATCH] fix: Brotli stream corruption via concurrent writes to compressor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - **Introduced a per-tab “single writer” actor** (a dedicated virtual thread) that *owns*: - the http-kit `AsyncChannel` - the tab’s streaming Brotli state (`br-out` + `br-stream`, when compression is enabled) - Changed `send-sse!` from “compress + `http/send!` right now” to **“enqueue message and return immediately”**: - Messages go into a `LinkedBlockingQueue` - The actor thread drains the queue and performs the actual compression + network writes sequentially - Ensured the SSE response headers are sent **exactly once**: - The actor’s *first* send writes the `{:headers ... :body ...}` map (and includes `Content-Encoding: br` when applicable) - Subsequent sends are raw chunks on the same connection - Made teardown safer: - `unregister-sse-channel!` now **signals the actor to close via a sentinel**, so the actor closes the Brotli stream/channel on its own thread (avoiding “close while writing” races) - Re-registering a tab now stops any previous writer first (reconnect safety) - Updated/added tests to match and validate the new behavior: - Tests now account for the async actor (latches/timeouts) - Added coverage that `send-sse!` **doesn’t block on Brotli compression** and that Brotli compression is **never invoked concurrently** for a tab. --- src/hyper/render.clj | 258 +++++++++++++++++++++++++++++++++---- src/hyper/server.clj | 33 ++--- test/hyper/brotli_test.clj | 52 +++++--- test/hyper/render_test.clj | 123 +++++++++++++++++- 4 files changed, 395 insertions(+), 71 deletions(-) diff --git a/src/hyper/render.clj b/src/hyper/render.clj index df0e7e2..4be21d1 100644 --- a/src/hyper/render.clj +++ b/src/hyper/render.clj @@ -12,7 +12,198 @@ [hyper.state :as state] [hyper.utils :as utils] [org.httpkit.server :as http] - [taoensso.telemere :as t])) + [taoensso.telemere :as t]) + (:import (java.util.concurrent BlockingQueue LinkedBlockingQueue))) + +;; --------------------------------------------------------------------------- +;; SSE writer actor (single-writer per tab) +;; --------------------------------------------------------------------------- + +(def ^:private sse-headers + {"Content-Type" "text/event-stream"}) + +(def ^:private sse-headers-br + {"Content-Type" "text/event-stream" + "Content-Encoding" "br"}) + +(def ^:private sse-close-sentinel ::close) + +(def ^:private default-sse-max-batch-messages 32) +(def ^:private default-sse-max-batch-chars (* 64 1024)) ;; 64KiB + +(defn- sse-batch-limits + "Resolve batching limits for a new SSE writer actor. + + Optional app-state* overrides: + - :sse-max-batch-messages + - :sse-max-batch-chars" + [app-state*] + {:max-messages (long (or (get @app-state* :sse-max-batch-messages) + default-sse-max-batch-messages)) + :max-chars (long (or (get @app-state* :sse-max-batch-chars) + default-sse-max-batch-chars))}) + +(defn- try-http-send! + "Wrapper around http-kit send! that logs and returns boolean success." + [tab-id channel data close?] + (try + (boolean (http/send! channel data close?)) + (catch Throwable e + (t/error! e {:id :hyper.error/send-sse + :data {:hyper/tab-id tab-id}}) + false))) + +(defn- send-sse-initial-response! + "Send the initial SSE response map (headers + body) for a channel. + Must happen exactly once per SSE connection." + [tab-id writer payload] + (let [channel (:channel writer) + headers (if (:br-stream writer) sse-headers-br sse-headers)] + (try-http-send! tab-id channel {:headers headers + :body payload} + false))) + +(defn- send-sse-chunk! + "Send a subsequent SSE data chunk on an already-initialized channel." + [tab-id writer payload] + (try-http-send! tab-id (:channel writer) payload false)) + +(defn- close-sse-writer! + [tab-id writer] + ;; Close resources on the actor thread to avoid brotli races. + (br/close-stream (:br-stream writer)) + (when-let [channel (:channel writer)] + (when (instance? org.httpkit.server.AsyncChannel channel) + (t/catch->error! :hyper.error/close-sse-channel + (http/close channel)))) + (t/log! {:level :debug + :id :hyper.event/sse-writer-close + :data {:hyper/tab-id tab-id} + :msg "SSE writer closed"}) + nil) + +(defn- drop-queued-messages! + "Drain and drop any remaining queued items (used after close)." + [^BlockingQueue queue] + (loop [m (.poll queue)] + (when m + (recur (.poll queue)))) + nil) + +(defn- coalesce-sse-messages + "Coalesce already-queued SSE messages into one string payload. + + - Never blocks (uses `.poll`). + - Preserves ordering by returning a :pending message if a polled message + would exceed the batch limits. + - If a close sentinel is observed while polling, returns :close-after? true + so the actor closes *after* sending this batch. + + Returns {:batch string :pending (or string nil) :close-after? boolean}." + [first-msg ^BlockingQueue queue {:keys [max-messages max-chars]}] + (let [first-str (str first-msg) + ^StringBuilder sb (StringBuilder.)] + (.append sb first-str) + (loop [msg-count 1 + char-count (count first-str)] + (cond + (>= msg-count max-messages) + {:batch (.toString sb) :pending nil :close-after? false} + + (>= char-count max-chars) + {:batch (.toString sb) :pending nil :close-after? false} + + :else + (let [next (.poll queue)] + (cond + (nil? next) + {:batch (.toString sb) :pending nil :close-after? false} + + (= next sse-close-sentinel) + {:batch (.toString sb) :pending nil :close-after? true} + + :else + (let [next-str (str next) + next-len (count next-str)] + (if (and (pos? max-chars) + (> (+ char-count next-len) max-chars)) + {:batch (.toString sb) :pending next-str :close-after? false} + (do + (.append sb next-str) + (recur (inc msg-count) (+ char-count next-len))))))))))) + +(defn- sse-actor-loop! + "Virtual-thread actor loop. Owns the channel + (optional) streaming brotli + state, enforcing single-writer semantics by construction." + [tab-id writer] + (let [^BlockingQueue queue (:queue writer) + limits (:batch-limits writer)] + (loop [started? false + pending nil] + (let [next-state + (try + (let [msg (or pending (.take queue))] + (cond + (= msg sse-close-sentinel) + nil + + :else + (let [{batch :batch + pending-next :pending + close-after? :close-after?} + (coalesce-sse-messages msg queue limits) + payload (if-let [br-stream (:br-stream writer)] + (br/compress-stream (:br-out writer) br-stream batch) + batch) + sent? (if started? + (send-sse-chunk! tab-id writer payload) + (send-sse-initial-response! tab-id writer payload))] + (when (and sent? (not close-after?)) + {:started? true + :pending pending-next})))) + (catch InterruptedException _ + nil) + (catch Throwable e + (t/error! e {:id :hyper.error/sse-writer + :data {:hyper/tab-id tab-id}}) + nil))] + (if next-state + (recur (:started? next-state) (:pending next-state)) + (do + (close-sse-writer! tab-id writer) + (drop-queued-messages! queue) + nil)))))) + +(defn- start-sse-writer-thread! + "Start the SSE writer actor as a single virtual thread." + [tab-id writer] + (-> (Thread/ofVirtual) + (.name (str "hyper-sse-" tab-id)) + (.start ^Runnable #(sse-actor-loop! tab-id writer)))) + +(defn- new-sse-writer + "Create and start a per-tab SSE writer actor. + + - Producers enqueue quickly (send-sse!). + - The actor owns http-kit send!/close and the streaming brotli state." + [app-state* tab-id channel compress?] + (let [queue (LinkedBlockingQueue.) + out (when compress? (br/byte-array-out-stream)) + br-stream (when out (br/compress-out-stream out :window-size 18)) + writer (cond-> {:channel channel + :queue queue + :batch-limits (sse-batch-limits app-state*)} + compress? (assoc :br-out out + :br-stream br-stream)) + thread (start-sse-writer-thread! tab-id writer)] + (assoc writer :thread thread))) + +(defn- stop-sse-writer! + "Signal an SSE writer actor to close via a sentinel." + [writer] + (when writer + (.offer ^BlockingQueue (:queue writer) sse-close-sentinel)) + nil) (defn register-sse-channel! "Register an SSE channel for a tab, optionally with a streaming brotli @@ -20,24 +211,44 @@ (ByteArrayOutputStream + BrotliOutputStream) kept for the lifetime of the SSE connection so the LZ77 window is shared across fragments." [app-state* tab-id channel compress?] - (let [tab-updates (cond-> {:sse-channel channel} - compress? (merge (let [out (br/byte-array-out-stream)] - {:br-out out - :br-stream (br/compress-out-stream out :window-size 18)})))] - (swap! app-state* update-in [:tabs tab-id] merge tab-updates)) + ;; Reconnect safety: close any previous writer actor for this tab-id. + (when-let [old-writer (get-in @app-state* [:tabs tab-id :sse-writer])] + (stop-sse-writer! old-writer)) + + (let [writer (new-sse-writer app-state* tab-id channel compress?)] + (swap! app-state* update-in [:tabs tab-id] merge + {:sse-channel channel + :sse-writer writer})) nil) (defn unregister-sse-channel! - "Unregister an SSE channel and close the brotli stream for a tab." + "Unregister an SSE channel for a tab. + + Enqueues a close sentinel so the *actor thread* performs: + - brotli stream close (if present) + - channel close + + This avoids closing the brotli stream concurrently with an in-flight write." [app-state* tab-id] (let [tab-data (get-in @app-state* [:tabs tab-id]) + writer (:sse-writer tab-data) channel (:sse-channel tab-data)] - (br/close-stream (:br-stream tab-data)) - (when (and channel (instance? org.httpkit.server.AsyncChannel channel)) + (stop-sse-writer! writer) + + ;; Best-effort: if we somehow have a channel but no writer, close it. + (when (and (not writer) + channel + (instance? org.httpkit.server.AsyncChannel channel)) (t/catch->error! :hyper.error/close-sse-channel (http/close channel)))) + (swap! app-state* update-in [:tabs tab-id] - assoc :sse-channel nil :br-out nil :br-stream nil) + assoc + :sse-channel nil + :sse-writer nil + ;; Legacy keys from pre-writer-actor versions + :br-out nil + :br-stream nil) nil) (defn get-sse-channel @@ -131,23 +342,18 @@ "data: elements \n\n"))) (defn send-sse! - "Send an SSE message to a tab's channel. - If the tab has a streaming brotli compressor (client supports br), - compresses through it and sends raw bytes. The Content-Encoding: br - header is set once on the initial response — subsequent sends on the - async channel are just data frames in the same compressed stream." + "Enqueue an SSE message for a tab. + + This function is intentionally non-blocking: + - it does *not* perform brotli compression inline + - it does *not* call http-kit send! inline + + A per-tab SSE writer actor owns the channel + (optional) streaming brotli + compressor, guaranteeing single-writer semantics." [app-state* tab-id message] - (let [tab-data (get-in @app-state* [:tabs tab-id]) - channel (:sse-channel tab-data) - br-out (:br-out tab-data) - br-stream (:br-stream tab-data)] - (when channel - (or (t/catch->error! :hyper.error/send-sse - (if (and br-out br-stream) - (let [compressed (br/compress-stream br-out br-stream message)] - (http/send! channel compressed false)) - (http/send! channel message false))) - false)))) + (if-let [writer (get-in @app-state* [:tabs tab-id :sse-writer])] + (boolean (.offer ^BlockingQueue (:queue writer) message)) + false)) (defn render-error-fragment "Render an error message as a fragment." diff --git a/src/hyper/server.clj b/src/hyper/server.clj index 026f3e0..49f8201 100644 --- a/src/hyper/server.clj +++ b/src/hyper/server.clj @@ -79,6 +79,14 @@ {:on-open (fn [channel] (state/get-or-create-tab! app-state* session-id tab-id) (render/register-sse-channel! app-state* tab-id channel compress?) + + ;; Enqueue the initial connected event *before* any watchers can + ;; enqueue render output. The SSE writer actor owns sending headers + ;; (and brotli compression, if enabled). + (let [connected-msg (str "event: connected\n" + "data: {\"tab-id\":\"" tab-id "\"}\n\n")] + (render/send-sse! app-state* tab-id connected-msg)) + (render/setup-watchers! app-state* session-id tab-id request-var) ;; Auto-watch the routes Var so title/route changes ;; trigger re-renders for all connected tabs @@ -86,30 +94,7 @@ (when (var? routes-source) (render/watch-source! app-state* session-id tab-id request-var routes-source))) ;; Set up route-level watches (:watches + Var :get handlers) - (render/setup-route-watches! app-state* session-id tab-id request-var) - (let [connected-msg (str "event: connected\n" - "data: {\"tab-id\":\"" tab-id "\"}\n\n")] - (if compress? - ;; Brotli: send headers with the first chunk compressed - ;; through the tab's streaming compressor so all bytes - ;; on this connection form one contiguous brotli stream. - (let [tab-data (get-in @app-state* [:tabs tab-id]) - compressed (br/compress-stream - (:br-out tab-data) - (:br-stream tab-data) - connected-msg)] - (http-kit/send! - channel - {:headers {"Content-Type" "text/event-stream" - "Content-Encoding" "br"} - :body compressed} - false)) - ;; No compression: plain text - (http-kit/send! - channel - {:headers {"Content-Type" "text/event-stream"} - :body connected-msg} - false)))) + (render/setup-route-watches! app-state* session-id tab-id request-var)) :on-close (fn [_channel _status] (t/log! {:level :info diff --git a/test/hyper/brotli_test.clj b/test/hyper/brotli_test.clj index 0594213..ff5e553 100644 --- a/test/hyper/brotli_test.clj +++ b/test/hyper/brotli_test.clj @@ -220,8 +220,9 @@ (render/register-sse-channel! app-state* tab-id {:mock true} true) (let [tab-data (get-in @app-state* [:tabs tab-id])] - (is (some? (:br-out tab-data))) - (is (some? (:br-stream tab-data))) + (is (some? (:sse-writer tab-data))) + (is (some? (get-in tab-data [:sse-writer :br-out]))) + (is (some? (get-in tab-data [:sse-writer :br-stream]))) (is (= {:mock true} (:sse-channel tab-data)))) (render/unregister-sse-channel! app-state* tab-id))) @@ -233,8 +234,9 @@ (render/register-sse-channel! app-state* tab-id {:mock true} false) (let [tab-data (get-in @app-state* [:tabs tab-id])] - (is (nil? (:br-out tab-data))) - (is (nil? (:br-stream tab-data))) + (is (some? (:sse-writer tab-data))) + (is (nil? (get-in tab-data [:sse-writer :br-out]))) + (is (nil? (get-in tab-data [:sse-writer :br-stream]))) (is (= {:mock true} (:sse-channel tab-data)))) (render/unregister-sse-channel! app-state* tab-id))) @@ -246,52 +248,66 @@ (render/register-sse-channel! app-state* tab-id {:mock true} true) ;; Verify streams exist - (is (some? (get-in @app-state* [:tabs tab-id :br-stream]))) + (is (some? (get-in @app-state* [:tabs tab-id :sse-writer :br-stream]))) (render/unregister-sse-channel! app-state* tab-id) - (is (nil? (get-in @app-state* [:tabs tab-id :br-out]))) - (is (nil? (get-in @app-state* [:tabs tab-id :br-stream])))))) + (is (nil? (get-in @app-state* [:tabs tab-id :sse-writer]))) + (is (nil? (get-in @app-state* [:tabs tab-id :sse-channel])))))) (deftest test-send-sse-with-brotli (testing "send-sse! compresses when brotli streams are present" (let [app-state* (atom (state/init-state)) tab-id "test-br-send-1" - sent (atom [])] + sent (atom []) + sent-latch (java.util.concurrent.CountDownLatch. 1)] (state/get-or-create-tab! app-state* "sess" tab-id) (render/register-sse-channel! app-state* tab-id {:mock true} true) (with-redefs [org.httpkit.server/send! (fn [_ch data _close?] (swap! sent conj data) + (.countDown sent-latch) true)] (let [message "event: datastar-patch-elements\ndata: elements
Hello
\n\n"] - (render/send-sse! app-state* tab-id message) + (is (true? (render/send-sse! app-state* tab-id message))) + + (is (.await sent-latch 2 java.util.concurrent.TimeUnit/SECONDS) + "timed out waiting for SSE send") (is (= 1 (count @sent))) (let [payload (first @sent)] - ;; Should be raw compressed bytes — the Content-Encoding header - ;; was set on the initial response, subsequent sends are just - ;; data frames in the same brotli stream. - (is (bytes? payload)) - (is (pos? (alength ^bytes payload)))))) + ;; First send initializes the SSE response, so it should be a map. + (is (map? payload)) + (is (= "br" (get-in payload [:headers "Content-Encoding"]))) + (is (= "text/event-stream" (get-in payload [:headers "Content-Type"]))) + (is (bytes? (:body payload))) + (is (pos? (alength ^bytes (:body payload))))))) (render/unregister-sse-channel! app-state* tab-id))) (testing "send-sse! sends plain text when no brotli streams" (let [app-state* (atom (state/init-state)) tab-id "test-br-send-2" - sent (atom [])] + sent (atom []) + sent-latch (java.util.concurrent.CountDownLatch. 1)] (state/get-or-create-tab! app-state* "sess" tab-id) (render/register-sse-channel! app-state* tab-id {:mock true} false) (with-redefs [org.httpkit.server/send! (fn [_ch data _close?] (swap! sent conj data) + (.countDown sent-latch) true)] (let [message "event: datastar-patch-elements\ndata: elements
Hello
\n\n"] - (render/send-sse! app-state* tab-id message) + (is (true? (render/send-sse! app-state* tab-id message))) + + (is (.await sent-latch 2 java.util.concurrent.TimeUnit/SECONDS) + "timed out waiting for SSE send") (is (= 1 (count @sent))) - ;; Should be the raw string, not a map - (is (= message (first @sent))))) + (let [payload (first @sent)] + ;; First send initializes the SSE response, so it should be a map. + (is (map? payload)) + (is (= "text/event-stream" (get-in payload [:headers "Content-Type"]))) + (is (= message (:body payload)))))) (render/unregister-sse-channel! app-state* tab-id)))) diff --git a/test/hyper/render_test.clj b/test/hyper/render_test.clj index 237c7e0..3186ae5 100644 --- a/test/hyper/render_test.clj +++ b/test/hyper/render_test.clj @@ -26,6 +26,119 @@ ;; Verify it's gone (is (nil? (render/get-sse-channel app-state* tab-id)))))) +(deftest test-send-sse-does-not-block-on-brotli + (testing "send-sse! should not invoke streaming brotli compression on the caller thread" + (let [executor (java.util.concurrent.Executors/newSingleThreadExecutor) + app-state* (atom (assoc (state/init-state) :executor executor)) + session-id "test-session-sse-async" + tab-id "test-tab-sse-async" + channel {:mock true} + allow (promise) + sent (java.util.concurrent.CountDownLatch. 1)] + (try + (state/get-or-create-tab! app-state* session-id tab-id) + + (with-redefs [hyper.brotli/byte-array-out-stream (fn [] ::out) + hyper.brotli/compress-out-stream (fn [_out & _] ::stream) + hyper.brotli/compress-stream (fn [_out _stream _message] + ;; Block until the test releases us. + @allow + (.getBytes "ok" "UTF-8")) + org.httpkit.server/send! (fn [_ch _data _close?] + (.countDown sent) + true)] + (render/register-sse-channel! app-state* tab-id channel true) + + ;; If send-sse! calls brotli inline (current behavior), this future will block. + ;; Under the writer-actor implementation it should return immediately. + (let [f (future (render/send-sse! app-state* tab-id "hello"))] + (is (not= ::timeout (deref f 50 ::timeout)) + "send-sse! blocked on brotli compression; expected it to enqueue and return") + + ;; Always release the compressor so we don't leak a stuck thread if the assertion fails. + (deliver allow true) + + ;; Drain should eventually send. + (is (.await sent 2 java.util.concurrent.TimeUnit/SECONDS) + "expected an SSE send after unblocking brotli"))) + (finally + (render/unregister-sse-channel! app-state* tab-id) + (.shutdownNow executor)))))) + +(deftest test-sse-brotli-stream-is-single-writer + (testing "streaming brotli compression must never be invoked concurrently for a tab" + (let [app-executor (java.util.concurrent.Executors/newSingleThreadExecutor) + send-executor (java.util.concurrent.Executors/newFixedThreadPool 2) + app-state* (atom (assoc (state/init-state) :executor app-executor)) + session-id "test-session-sse-single-writer" + tab-id "test-tab-sse-single-writer" + channel {:mock true} + in-flight* (atom 0) + max-in-flight* (atom 0) + sent* (atom []) + sent-any (java.util.concurrent.CountDownLatch. 1) + start (java.util.concurrent.CountDownLatch. 1) + done (java.util.concurrent.CountDownLatch. 2)] + (try + (state/get-or-create-tab! app-state* session-id tab-id) + + (with-redefs [hyper.brotli/byte-array-out-stream (fn [] ::out) + hyper.brotli/compress-out-stream (fn [_out & _] ::stream) + hyper.brotli/compress-stream (fn [_out _stream message] + (let [n (swap! in-flight* inc)] + (swap! max-in-flight* max n) + ;; Make the call long-lived so overlaps are detectable. + (Thread/sleep 50) + (swap! in-flight* dec)) + (.getBytes (str message) "UTF-8")) + org.httpkit.server/send! (fn [_ch data _close?] + (let [payload (if (map? data) (:body data) data) + s (if (bytes? payload) + (String. ^bytes payload "UTF-8") + (str payload))] + (swap! sent* conj s)) + (.countDown sent-any) + true)] + (render/register-sse-channel! app-state* tab-id channel true) + + (doseq [i (range 2)] + (.submit send-executor + ^Runnable + (fn [] + (.await start) + (render/send-sse! app-state* tab-id (str "msg-" i)) + (.countDown done)))) + + (.countDown start) + + (is (.await done 2 java.util.concurrent.TimeUnit/SECONDS) + "timed out waiting for send-sse! calls to return") + + ;; The actor may batch multiple messages into a single send. + (is (.await sent-any 2 java.util.concurrent.TimeUnit/SECONDS) + "timed out waiting for SSE sends") + + ;; Wait until we observe both messages (either batched or separate). + (is (let [deadline (+ (System/currentTimeMillis) 2000)] + (loop [] + (let [combined (apply str @sent*)] + (cond + (and (.contains combined "msg-0") + (.contains combined "msg-1")) true + (< (System/currentTimeMillis) deadline) (do (Thread/sleep 10) (recur)) + :else false)))) + "timed out waiting for both SSE messages to be sent") + + (is (<= @max-in-flight* 1) + (str "brotli compress-stream was invoked concurrently (max in-flight = " + @max-in-flight* ")")) + + ;; Stop the actor so it doesn't leak a thread across the test suite. + (render/unregister-sse-channel! app-state* tab-id)) + (finally + (.shutdownNow send-executor) + (.shutdownNow app-executor)))))) + (deftest test-render-fn-registration (testing "Render function registration and retrieval" (let [app-state* (atom (state/init-state)) @@ -122,7 +235,8 @@ (is (>= (count @sent-messages) 3)) ;; Clean up watchers - (render/remove-watchers! app-state* tab-id))))) + (render/remove-watchers! app-state* tab-id) + (render/unregister-sse-channel! app-state* tab-id))))) (deftest test-cleanup (testing "Cleanup removes all tab resources" @@ -208,7 +322,9 @@ ;; After throttle period, render succeeds (Thread/sleep 20) (render/throttled-render-and-send! app-state* session-id tab-id #'context/*request*) - (is (= 2 @render-count))))) + (is (= 2 @render-count))) + + (render/unregister-sse-channel! app-state* tab-id))) (testing "cleanup removes last-render-ms tracking" (let [app-state* (atom (state/init-state)) @@ -307,4 +423,5 @@ (is (= 1 (tab-actions)) "Stale actions should be cleaned up, only 1 remaining")) - (render/remove-watchers! app-state* tab-id))))) + (render/remove-watchers! app-state* tab-id) + (render/unregister-sse-channel! app-state* tab-id)))))