diff --git a/src/hyper/render.clj b/src/hyper/render.clj index df0e7e2..4be21d1 100644 --- a/src/hyper/render.clj +++ b/src/hyper/render.clj @@ -12,7 +12,198 @@ [hyper.state :as state] [hyper.utils :as utils] [org.httpkit.server :as http] - [taoensso.telemere :as t])) + [taoensso.telemere :as t]) + (:import (java.util.concurrent BlockingQueue LinkedBlockingQueue))) + +;; --------------------------------------------------------------------------- +;; SSE writer actor (single-writer per tab) +;; --------------------------------------------------------------------------- + +(def ^:private sse-headers + {"Content-Type" "text/event-stream"}) + +(def ^:private sse-headers-br + {"Content-Type" "text/event-stream" + "Content-Encoding" "br"}) + +(def ^:private sse-close-sentinel ::close) + +(def ^:private default-sse-max-batch-messages 32) +(def ^:private default-sse-max-batch-chars (* 64 1024)) ;; 64KiB + +(defn- sse-batch-limits + "Resolve batching limits for a new SSE writer actor. + + Optional app-state* overrides: + - :sse-max-batch-messages + - :sse-max-batch-chars" + [app-state*] + {:max-messages (long (or (get @app-state* :sse-max-batch-messages) + default-sse-max-batch-messages)) + :max-chars (long (or (get @app-state* :sse-max-batch-chars) + default-sse-max-batch-chars))}) + +(defn- try-http-send! + "Wrapper around http-kit send! that logs and returns boolean success." + [tab-id channel data close?] + (try + (boolean (http/send! channel data close?)) + (catch Throwable e + (t/error! e {:id :hyper.error/send-sse + :data {:hyper/tab-id tab-id}}) + false))) + +(defn- send-sse-initial-response! + "Send the initial SSE response map (headers + body) for a channel. + Must happen exactly once per SSE connection." + [tab-id writer payload] + (let [channel (:channel writer) + headers (if (:br-stream writer) sse-headers-br sse-headers)] + (try-http-send! tab-id channel {:headers headers + :body payload} + false))) + +(defn- send-sse-chunk! + "Send a subsequent SSE data chunk on an already-initialized channel." + [tab-id writer payload] + (try-http-send! tab-id (:channel writer) payload false)) + +(defn- close-sse-writer! + [tab-id writer] + ;; Close resources on the actor thread to avoid brotli races. + (br/close-stream (:br-stream writer)) + (when-let [channel (:channel writer)] + (when (instance? org.httpkit.server.AsyncChannel channel) + (t/catch->error! :hyper.error/close-sse-channel + (http/close channel)))) + (t/log! {:level :debug + :id :hyper.event/sse-writer-close + :data {:hyper/tab-id tab-id} + :msg "SSE writer closed"}) + nil) + +(defn- drop-queued-messages! + "Drain and drop any remaining queued items (used after close)." + [^BlockingQueue queue] + (loop [m (.poll queue)] + (when m + (recur (.poll queue)))) + nil) + +(defn- coalesce-sse-messages + "Coalesce already-queued SSE messages into one string payload. + + - Never blocks (uses `.poll`). + - Preserves ordering by returning a :pending message if a polled message + would exceed the batch limits. + - If a close sentinel is observed while polling, returns :close-after? true + so the actor closes *after* sending this batch. + + Returns {:batch string :pending (or string nil) :close-after? boolean}." + [first-msg ^BlockingQueue queue {:keys [max-messages max-chars]}] + (let [first-str (str first-msg) + ^StringBuilder sb (StringBuilder.)] + (.append sb first-str) + (loop [msg-count 1 + char-count (count first-str)] + (cond + (>= msg-count max-messages) + {:batch (.toString sb) :pending nil :close-after? false} + + (>= char-count max-chars) + {:batch (.toString sb) :pending nil :close-after? false} + + :else + (let [next (.poll queue)] + (cond + (nil? next) + {:batch (.toString sb) :pending nil :close-after? false} + + (= next sse-close-sentinel) + {:batch (.toString sb) :pending nil :close-after? true} + + :else + (let [next-str (str next) + next-len (count next-str)] + (if (and (pos? max-chars) + (> (+ char-count next-len) max-chars)) + {:batch (.toString sb) :pending next-str :close-after? false} + (do + (.append sb next-str) + (recur (inc msg-count) (+ char-count next-len))))))))))) + +(defn- sse-actor-loop! + "Virtual-thread actor loop. Owns the channel + (optional) streaming brotli + state, enforcing single-writer semantics by construction." + [tab-id writer] + (let [^BlockingQueue queue (:queue writer) + limits (:batch-limits writer)] + (loop [started? false + pending nil] + (let [next-state + (try + (let [msg (or pending (.take queue))] + (cond + (= msg sse-close-sentinel) + nil + + :else + (let [{batch :batch + pending-next :pending + close-after? :close-after?} + (coalesce-sse-messages msg queue limits) + payload (if-let [br-stream (:br-stream writer)] + (br/compress-stream (:br-out writer) br-stream batch) + batch) + sent? (if started? + (send-sse-chunk! tab-id writer payload) + (send-sse-initial-response! tab-id writer payload))] + (when (and sent? (not close-after?)) + {:started? true + :pending pending-next})))) + (catch InterruptedException _ + nil) + (catch Throwable e + (t/error! e {:id :hyper.error/sse-writer + :data {:hyper/tab-id tab-id}}) + nil))] + (if next-state + (recur (:started? next-state) (:pending next-state)) + (do + (close-sse-writer! tab-id writer) + (drop-queued-messages! queue) + nil)))))) + +(defn- start-sse-writer-thread! + "Start the SSE writer actor as a single virtual thread." + [tab-id writer] + (-> (Thread/ofVirtual) + (.name (str "hyper-sse-" tab-id)) + (.start ^Runnable #(sse-actor-loop! tab-id writer)))) + +(defn- new-sse-writer + "Create and start a per-tab SSE writer actor. + + - Producers enqueue quickly (send-sse!). + - The actor owns http-kit send!/close and the streaming brotli state." + [app-state* tab-id channel compress?] + (let [queue (LinkedBlockingQueue.) + out (when compress? (br/byte-array-out-stream)) + br-stream (when out (br/compress-out-stream out :window-size 18)) + writer (cond-> {:channel channel + :queue queue + :batch-limits (sse-batch-limits app-state*)} + compress? (assoc :br-out out + :br-stream br-stream)) + thread (start-sse-writer-thread! tab-id writer)] + (assoc writer :thread thread))) + +(defn- stop-sse-writer! + "Signal an SSE writer actor to close via a sentinel." + [writer] + (when writer + (.offer ^BlockingQueue (:queue writer) sse-close-sentinel)) + nil) (defn register-sse-channel! "Register an SSE channel for a tab, optionally with a streaming brotli @@ -20,24 +211,44 @@ (ByteArrayOutputStream + BrotliOutputStream) kept for the lifetime of the SSE connection so the LZ77 window is shared across fragments." [app-state* tab-id channel compress?] - (let [tab-updates (cond-> {:sse-channel channel} - compress? (merge (let [out (br/byte-array-out-stream)] - {:br-out out - :br-stream (br/compress-out-stream out :window-size 18)})))] - (swap! app-state* update-in [:tabs tab-id] merge tab-updates)) + ;; Reconnect safety: close any previous writer actor for this tab-id. + (when-let [old-writer (get-in @app-state* [:tabs tab-id :sse-writer])] + (stop-sse-writer! old-writer)) + + (let [writer (new-sse-writer app-state* tab-id channel compress?)] + (swap! app-state* update-in [:tabs tab-id] merge + {:sse-channel channel + :sse-writer writer})) nil) (defn unregister-sse-channel! - "Unregister an SSE channel and close the brotli stream for a tab." + "Unregister an SSE channel for a tab. + + Enqueues a close sentinel so the *actor thread* performs: + - brotli stream close (if present) + - channel close + + This avoids closing the brotli stream concurrently with an in-flight write." [app-state* tab-id] (let [tab-data (get-in @app-state* [:tabs tab-id]) + writer (:sse-writer tab-data) channel (:sse-channel tab-data)] - (br/close-stream (:br-stream tab-data)) - (when (and channel (instance? org.httpkit.server.AsyncChannel channel)) + (stop-sse-writer! writer) + + ;; Best-effort: if we somehow have a channel but no writer, close it. + (when (and (not writer) + channel + (instance? org.httpkit.server.AsyncChannel channel)) (t/catch->error! :hyper.error/close-sse-channel (http/close channel)))) + (swap! app-state* update-in [:tabs tab-id] - assoc :sse-channel nil :br-out nil :br-stream nil) + assoc + :sse-channel nil + :sse-writer nil + ;; Legacy keys from pre-writer-actor versions + :br-out nil + :br-stream nil) nil) (defn get-sse-channel @@ -131,23 +342,18 @@ "data: elements \n\n"))) (defn send-sse! - "Send an SSE message to a tab's channel. - If the tab has a streaming brotli compressor (client supports br), - compresses through it and sends raw bytes. The Content-Encoding: br - header is set once on the initial response — subsequent sends on the - async channel are just data frames in the same compressed stream." + "Enqueue an SSE message for a tab. + + This function is intentionally non-blocking: + - it does *not* perform brotli compression inline + - it does *not* call http-kit send! inline + + A per-tab SSE writer actor owns the channel + (optional) streaming brotli + compressor, guaranteeing single-writer semantics." [app-state* tab-id message] - (let [tab-data (get-in @app-state* [:tabs tab-id]) - channel (:sse-channel tab-data) - br-out (:br-out tab-data) - br-stream (:br-stream tab-data)] - (when channel - (or (t/catch->error! :hyper.error/send-sse - (if (and br-out br-stream) - (let [compressed (br/compress-stream br-out br-stream message)] - (http/send! channel compressed false)) - (http/send! channel message false))) - false)))) + (if-let [writer (get-in @app-state* [:tabs tab-id :sse-writer])] + (boolean (.offer ^BlockingQueue (:queue writer) message)) + false)) (defn render-error-fragment "Render an error message as a fragment." diff --git a/src/hyper/server.clj b/src/hyper/server.clj index 026f3e0..49f8201 100644 --- a/src/hyper/server.clj +++ b/src/hyper/server.clj @@ -79,6 +79,14 @@ {:on-open (fn [channel] (state/get-or-create-tab! app-state* session-id tab-id) (render/register-sse-channel! app-state* tab-id channel compress?) + + ;; Enqueue the initial connected event *before* any watchers can + ;; enqueue render output. The SSE writer actor owns sending headers + ;; (and brotli compression, if enabled). + (let [connected-msg (str "event: connected\n" + "data: {\"tab-id\":\"" tab-id "\"}\n\n")] + (render/send-sse! app-state* tab-id connected-msg)) + (render/setup-watchers! app-state* session-id tab-id request-var) ;; Auto-watch the routes Var so title/route changes ;; trigger re-renders for all connected tabs @@ -86,30 +94,7 @@ (when (var? routes-source) (render/watch-source! app-state* session-id tab-id request-var routes-source))) ;; Set up route-level watches (:watches + Var :get handlers) - (render/setup-route-watches! app-state* session-id tab-id request-var) - (let [connected-msg (str "event: connected\n" - "data: {\"tab-id\":\"" tab-id "\"}\n\n")] - (if compress? - ;; Brotli: send headers with the first chunk compressed - ;; through the tab's streaming compressor so all bytes - ;; on this connection form one contiguous brotli stream. - (let [tab-data (get-in @app-state* [:tabs tab-id]) - compressed (br/compress-stream - (:br-out tab-data) - (:br-stream tab-data) - connected-msg)] - (http-kit/send! - channel - {:headers {"Content-Type" "text/event-stream" - "Content-Encoding" "br"} - :body compressed} - false)) - ;; No compression: plain text - (http-kit/send! - channel - {:headers {"Content-Type" "text/event-stream"} - :body connected-msg} - false)))) + (render/setup-route-watches! app-state* session-id tab-id request-var)) :on-close (fn [_channel _status] (t/log! {:level :info diff --git a/test/hyper/brotli_test.clj b/test/hyper/brotli_test.clj index 0594213..ff5e553 100644 --- a/test/hyper/brotli_test.clj +++ b/test/hyper/brotli_test.clj @@ -220,8 +220,9 @@ (render/register-sse-channel! app-state* tab-id {:mock true} true) (let [tab-data (get-in @app-state* [:tabs tab-id])] - (is (some? (:br-out tab-data))) - (is (some? (:br-stream tab-data))) + (is (some? (:sse-writer tab-data))) + (is (some? (get-in tab-data [:sse-writer :br-out]))) + (is (some? (get-in tab-data [:sse-writer :br-stream]))) (is (= {:mock true} (:sse-channel tab-data)))) (render/unregister-sse-channel! app-state* tab-id))) @@ -233,8 +234,9 @@ (render/register-sse-channel! app-state* tab-id {:mock true} false) (let [tab-data (get-in @app-state* [:tabs tab-id])] - (is (nil? (:br-out tab-data))) - (is (nil? (:br-stream tab-data))) + (is (some? (:sse-writer tab-data))) + (is (nil? (get-in tab-data [:sse-writer :br-out]))) + (is (nil? (get-in tab-data [:sse-writer :br-stream]))) (is (= {:mock true} (:sse-channel tab-data)))) (render/unregister-sse-channel! app-state* tab-id))) @@ -246,52 +248,66 @@ (render/register-sse-channel! app-state* tab-id {:mock true} true) ;; Verify streams exist - (is (some? (get-in @app-state* [:tabs tab-id :br-stream]))) + (is (some? (get-in @app-state* [:tabs tab-id :sse-writer :br-stream]))) (render/unregister-sse-channel! app-state* tab-id) - (is (nil? (get-in @app-state* [:tabs tab-id :br-out]))) - (is (nil? (get-in @app-state* [:tabs tab-id :br-stream])))))) + (is (nil? (get-in @app-state* [:tabs tab-id :sse-writer]))) + (is (nil? (get-in @app-state* [:tabs tab-id :sse-channel])))))) (deftest test-send-sse-with-brotli (testing "send-sse! compresses when brotli streams are present" (let [app-state* (atom (state/init-state)) tab-id "test-br-send-1" - sent (atom [])] + sent (atom []) + sent-latch (java.util.concurrent.CountDownLatch. 1)] (state/get-or-create-tab! app-state* "sess" tab-id) (render/register-sse-channel! app-state* tab-id {:mock true} true) (with-redefs [org.httpkit.server/send! (fn [_ch data _close?] (swap! sent conj data) + (.countDown sent-latch) true)] (let [message "event: datastar-patch-elements\ndata: elements