Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 232 additions & 26 deletions src/hyper/render.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,243 @@
[hyper.state :as state]
[hyper.utils :as utils]
[org.httpkit.server :as http]
[taoensso.telemere :as t]))
[taoensso.telemere :as t])
(:import (java.util.concurrent BlockingQueue LinkedBlockingQueue)))

;; ---------------------------------------------------------------------------
;; SSE writer actor (single-writer per tab)
;; ---------------------------------------------------------------------------

(def ^:private sse-headers
{"Content-Type" "text/event-stream"})

(def ^:private sse-headers-br
{"Content-Type" "text/event-stream"
"Content-Encoding" "br"})

(def ^:private sse-close-sentinel ::close)

(def ^:private default-sse-max-batch-messages 32)
(def ^:private default-sse-max-batch-chars (* 64 1024)) ;; 64KiB

(defn- sse-batch-limits
"Resolve batching limits for a new SSE writer actor.

Optional app-state* overrides:
- :sse-max-batch-messages
- :sse-max-batch-chars"
[app-state*]
{:max-messages (long (or (get @app-state* :sse-max-batch-messages)
default-sse-max-batch-messages))
:max-chars (long (or (get @app-state* :sse-max-batch-chars)
default-sse-max-batch-chars))})

(defn- try-http-send!
"Wrapper around http-kit send! that logs and returns boolean success."
[tab-id channel data close?]
(try
(boolean (http/send! channel data close?))
(catch Throwable e
(t/error! e {:id :hyper.error/send-sse
:data {:hyper/tab-id tab-id}})
false)))

(defn- send-sse-initial-response!
"Send the initial SSE response map (headers + body) for a channel.
Must happen exactly once per SSE connection."
[tab-id writer payload]
(let [channel (:channel writer)
headers (if (:br-stream writer) sse-headers-br sse-headers)]
(try-http-send! tab-id channel {:headers headers
:body payload}
false)))

(defn- send-sse-chunk!
"Send a subsequent SSE data chunk on an already-initialized channel."
[tab-id writer payload]
(try-http-send! tab-id (:channel writer) payload false))

(defn- close-sse-writer!
[tab-id writer]
;; Close resources on the actor thread to avoid brotli races.
(br/close-stream (:br-stream writer))
(when-let [channel (:channel writer)]
(when (instance? org.httpkit.server.AsyncChannel channel)
(t/catch->error! :hyper.error/close-sse-channel
(http/close channel))))
(t/log! {:level :debug
:id :hyper.event/sse-writer-close
:data {:hyper/tab-id tab-id}
:msg "SSE writer closed"})
nil)

(defn- drop-queued-messages!
"Drain and drop any remaining queued items (used after close)."
[^BlockingQueue queue]
(loop [m (.poll queue)]
(when m
(recur (.poll queue))))
nil)

(defn- coalesce-sse-messages
"Coalesce already-queued SSE messages into one string payload.

- Never blocks (uses `.poll`).
- Preserves ordering by returning a :pending message if a polled message
would exceed the batch limits.
- If a close sentinel is observed while polling, returns :close-after? true
so the actor closes *after* sending this batch.

Returns {:batch string :pending (or string nil) :close-after? boolean}."
[first-msg ^BlockingQueue queue {:keys [max-messages max-chars]}]
(let [first-str (str first-msg)
^StringBuilder sb (StringBuilder.)]
(.append sb first-str)
(loop [msg-count 1
char-count (count first-str)]
(cond
(>= msg-count max-messages)
{:batch (.toString sb) :pending nil :close-after? false}

(>= char-count max-chars)
{:batch (.toString sb) :pending nil :close-after? false}

:else
(let [next (.poll queue)]
(cond
(nil? next)
{:batch (.toString sb) :pending nil :close-after? false}

(= next sse-close-sentinel)
{:batch (.toString sb) :pending nil :close-after? true}

:else
(let [next-str (str next)
next-len (count next-str)]
(if (and (pos? max-chars)
(> (+ char-count next-len) max-chars))
{:batch (.toString sb) :pending next-str :close-after? false}
(do
(.append sb next-str)
(recur (inc msg-count) (+ char-count next-len)))))))))))

(defn- sse-actor-loop!
"Virtual-thread actor loop. Owns the channel + (optional) streaming brotli
state, enforcing single-writer semantics by construction."
[tab-id writer]
(let [^BlockingQueue queue (:queue writer)
limits (:batch-limits writer)]
(loop [started? false
pending nil]
(let [next-state
(try
(let [msg (or pending (.take queue))]
(cond
(= msg sse-close-sentinel)
nil

:else
(let [{batch :batch
pending-next :pending
close-after? :close-after?}
(coalesce-sse-messages msg queue limits)
payload (if-let [br-stream (:br-stream writer)]
(br/compress-stream (:br-out writer) br-stream batch)
batch)
sent? (if started?
(send-sse-chunk! tab-id writer payload)
(send-sse-initial-response! tab-id writer payload))]
(when (and sent? (not close-after?))
{:started? true
:pending pending-next}))))
(catch InterruptedException _
nil)
(catch Throwable e
(t/error! e {:id :hyper.error/sse-writer
:data {:hyper/tab-id tab-id}})
nil))]
(if next-state
(recur (:started? next-state) (:pending next-state))
(do
(close-sse-writer! tab-id writer)
(drop-queued-messages! queue)
nil))))))

(defn- start-sse-writer-thread!
"Start the SSE writer actor as a single virtual thread."
[tab-id writer]
(-> (Thread/ofVirtual)
(.name (str "hyper-sse-" tab-id))
(.start ^Runnable #(sse-actor-loop! tab-id writer))))

(defn- new-sse-writer
"Create and start a per-tab SSE writer actor.

- Producers enqueue quickly (send-sse!).
- The actor owns http-kit send!/close and the streaming brotli state."
[app-state* tab-id channel compress?]
(let [queue (LinkedBlockingQueue.)
out (when compress? (br/byte-array-out-stream))
br-stream (when out (br/compress-out-stream out :window-size 18))
writer (cond-> {:channel channel
:queue queue
:batch-limits (sse-batch-limits app-state*)}
compress? (assoc :br-out out
:br-stream br-stream))
thread (start-sse-writer-thread! tab-id writer)]
(assoc writer :thread thread)))

(defn- stop-sse-writer!
"Signal an SSE writer actor to close via a sentinel."
[writer]
(when writer
(.offer ^BlockingQueue (:queue writer) sse-close-sentinel))
nil)

(defn register-sse-channel!
"Register an SSE channel for a tab, optionally with a streaming brotli
compressor. When compress? is true, creates a compressor pair
(ByteArrayOutputStream + BrotliOutputStream) kept for the lifetime
of the SSE connection so the LZ77 window is shared across fragments."
[app-state* tab-id channel compress?]
(let [tab-updates (cond-> {:sse-channel channel}
compress? (merge (let [out (br/byte-array-out-stream)]
{:br-out out
:br-stream (br/compress-out-stream out :window-size 18)})))]
(swap! app-state* update-in [:tabs tab-id] merge tab-updates))
;; Reconnect safety: close any previous writer actor for this tab-id.
(when-let [old-writer (get-in @app-state* [:tabs tab-id :sse-writer])]
(stop-sse-writer! old-writer))

(let [writer (new-sse-writer app-state* tab-id channel compress?)]
(swap! app-state* update-in [:tabs tab-id] merge
{:sse-channel channel
:sse-writer writer}))
nil)

(defn unregister-sse-channel!
"Unregister an SSE channel and close the brotli stream for a tab."
"Unregister an SSE channel for a tab.

Enqueues a close sentinel so the *actor thread* performs:
- brotli stream close (if present)
- channel close

This avoids closing the brotli stream concurrently with an in-flight write."
[app-state* tab-id]
(let [tab-data (get-in @app-state* [:tabs tab-id])
writer (:sse-writer tab-data)
channel (:sse-channel tab-data)]
(br/close-stream (:br-stream tab-data))
(when (and channel (instance? org.httpkit.server.AsyncChannel channel))
(stop-sse-writer! writer)

;; Best-effort: if we somehow have a channel but no writer, close it.
(when (and (not writer)
channel
(instance? org.httpkit.server.AsyncChannel channel))
(t/catch->error! :hyper.error/close-sse-channel
(http/close channel))))

(swap! app-state* update-in [:tabs tab-id]
assoc :sse-channel nil :br-out nil :br-stream nil)
assoc
:sse-channel nil
:sse-writer nil
;; Legacy keys from pre-writer-actor versions
:br-out nil
:br-stream nil)
nil)

(defn get-sse-channel
Expand Down Expand Up @@ -131,23 +342,18 @@
"data: elements <script data-effect=\"el.remove()\">" js "</script>\n\n")))

(defn send-sse!
"Send an SSE message to a tab's channel.
If the tab has a streaming brotli compressor (client supports br),
compresses through it and sends raw bytes. The Content-Encoding: br
header is set once on the initial response — subsequent sends on the
async channel are just data frames in the same compressed stream."
"Enqueue an SSE message for a tab.

This function is intentionally non-blocking:
- it does *not* perform brotli compression inline
- it does *not* call http-kit send! inline

A per-tab SSE writer actor owns the channel + (optional) streaming brotli
compressor, guaranteeing single-writer semantics."
[app-state* tab-id message]
(let [tab-data (get-in @app-state* [:tabs tab-id])
channel (:sse-channel tab-data)
br-out (:br-out tab-data)
br-stream (:br-stream tab-data)]
(when channel
(or (t/catch->error! :hyper.error/send-sse
(if (and br-out br-stream)
(let [compressed (br/compress-stream br-out br-stream message)]
(http/send! channel compressed false))
(http/send! channel message false)))
false))))
(if-let [writer (get-in @app-state* [:tabs tab-id :sse-writer])]
(boolean (.offer ^BlockingQueue (:queue writer) message))
false))

(defn render-error-fragment
"Render an error message as a fragment."
Expand Down
33 changes: 9 additions & 24 deletions src/hyper/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -79,37 +79,22 @@
{:on-open (fn [channel]
(state/get-or-create-tab! app-state* session-id tab-id)
(render/register-sse-channel! app-state* tab-id channel compress?)

;; Enqueue the initial connected event *before* any watchers can
;; enqueue render output. The SSE writer actor owns sending headers
;; (and brotli compression, if enabled).
(let [connected-msg (str "event: connected\n"
"data: {\"tab-id\":\"" tab-id "\"}\n\n")]
(render/send-sse! app-state* tab-id connected-msg))

(render/setup-watchers! app-state* session-id tab-id request-var)
;; Auto-watch the routes Var so title/route changes
;; trigger re-renders for all connected tabs
(when-let [routes-source (get @app-state* :routes-source)]
(when (var? routes-source)
(render/watch-source! app-state* session-id tab-id request-var routes-source)))
;; Set up route-level watches (:watches + Var :get handlers)
(render/setup-route-watches! app-state* session-id tab-id request-var)
(let [connected-msg (str "event: connected\n"
"data: {\"tab-id\":\"" tab-id "\"}\n\n")]
(if compress?
;; Brotli: send headers with the first chunk compressed
;; through the tab's streaming compressor so all bytes
;; on this connection form one contiguous brotli stream.
(let [tab-data (get-in @app-state* [:tabs tab-id])
compressed (br/compress-stream
(:br-out tab-data)
(:br-stream tab-data)
connected-msg)]
(http-kit/send!
channel
{:headers {"Content-Type" "text/event-stream"
"Content-Encoding" "br"}
:body compressed}
false))
;; No compression: plain text
(http-kit/send!
channel
{:headers {"Content-Type" "text/event-stream"}
:body connected-msg}
false))))
(render/setup-route-watches! app-state* session-id tab-id request-var))

:on-close (fn [_channel _status]
(t/log! {:level :info
Expand Down
Loading
Loading