Skip to content

feat(sse): support multiple topic subscriptions#2650

Open
fschade wants to merge 1 commit into
opencloud-eu:mainfrom
fschade:sse-service-n-topic
Open

feat(sse): support multiple topic subscriptions#2650
fschade wants to merge 1 commit into
opencloud-eu:mainfrom
fschade:sse-service-n-topic

Conversation

@fschade
Copy link
Copy Markdown
Member

@fschade fschade commented Apr 22, 2026

the previous SSE library only supported subscriptions to a single topic. with the console, we need to support sending SSE messages to multiple topics (e.g., "all_users") instead of addressing each user individually (which is still possible).

Testing

  • its a drop in replacement, no other changes are needed.
  • you have to use web to test the SSE behavior.

REF: https://github.com/opencloud-eu/console/issues/287 (1/3)

@fschade fschade force-pushed the sse-service-n-topic branch from 6ef5d23 to ade868d Compare May 5, 2026 14:42
@fschade fschade self-assigned this May 5, 2026
@github-project-automation github-project-automation Bot moved this to Qualification in OpenCloud Team Board May 5, 2026
@fschade fschade moved this from Qualification to In Progress in OpenCloud Team Board May 5, 2026
@fschade fschade marked this pull request as ready for review May 5, 2026 14:52
replaced SSE library since the previous one only supported a single topic.
@fschade fschade force-pushed the sse-service-n-topic branch from ade868d to 2e5559d Compare May 6, 2026 06:06
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented May 6, 2026

Quality Gate Failed Quality Gate failed

Failed conditions
12.0% Duplication on New Code (required ≤ 3%)

See analysis details on SonarQube Cloud

@fschade fschade requested a review from aduffeck May 6, 2026 06:11
@codacy-production
Copy link
Copy Markdown

Up to standards ✅

🟢 Issues 0 issues

Results:
0 new issues

View in Codacy

🟢 Metrics 33 complexity · 0 duplication

Metric Results
Complexity 33
Duplication 0

View in Codacy

AI Reviewer: first review requested successfully. AI can make mistakes. Always validate suggestions.

Run reviewer

TIP This summary will be updated as you push new changes.

Copy link
Copy Markdown

@codacy-production codacy-production Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

The PR successfully migrates to tmaxmax/go-sse to support multi-topic subscriptions; however, there are several critical issues that must be addressed before merging. The most significant concern is the keep-alive implementation, which currently exhibits $O(N^2)$ message complexity and triggers goroutine leaks on every connection.

Additionally, the Go toolchain version in go.mod is flagged for multiple critical vulnerabilities (CVEs). While Codacy reports the PR is 'up to standards', the core logic in services/sse/pkg/service/sse.go remains complex and largely uncovered by tests, specifically the event listening loop. You must resolve the resource leaks and the heartbeat architecture to prevent production performance degradation.

About this PR

  • There is a discrepancy between the code (using 'all') and the PR description (using 'all_users') for the broadcast topic. Please ensure this aligns with external event producers. Additionally, multiple goroutines are spawned without adequate lifecycle management (keep-alives and event listeners), which will lead to memory exhaustion under load.
1 comment outside of the diff
go.mod

line 3 🔴 HIGH RISK
Update the Go version to at least 1.25.10 to resolve critical security vulnerabilities in the standard library, including TLS session resumption issues and potential DoS vectors.

go 1.25.10

Test suggestions

  • SSEHandler initialization with valid configuration and context
  • SSE connection failure when user context is missing from the request
  • Client receives SSE messages targeted at their specific user ID
  • Client receives broadcast messages targeted at the global 'all' topic
  • Verify graceful shutdown of the SSE server and event listeners upon context cancellation
  • Verify periodic keep-alive heartbeats are successfully sent to the client
  • Verify listen() loop correctly processes events.SendSSE and publishes to the server
Prompt proposal for missing tests
Consider implementing these tests if applicable:
1. Verify graceful shutdown of the SSE server and event listeners upon context cancellation
2. Verify periodic keep-alive heartbeats are successfully sent to the client
3. Verify `listen()` loop correctly processes `events.SendSSE` and publishes to the server

TIP Improve review quality by adding custom instructions
TIP How was this review? Give us feedback

Comment on lines +67 to +79
if h.conf.KeepAliveInterval != 0 {
ticker := time.NewTicker(h.conf.KeepAliveInterval)
defer ticker.Stop()
go func() {
for range ticker.C {
m := &sse.Message{}
m.AppendData("keep-alive")
if err := h.server.Publish(m, topics...); err != nil {
h.logger.Error().Err(err).Msg("sse: failed to publish message")
}
}
}()
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 HIGH RISK

The current keep-alive implementation has significant performance and resource issues:

  1. Complexity: Spawning a ticker per request results in $O(N^2)$ message volume.
  2. Leaks: The goroutine will leak because range ticker.C does not exit when the ticker is stopped.
  3. Protocol: Heartbeats should use SSE comments (:keep-alive) instead of data fields to avoid triggering client-side message events.

Refactoring Suggestion: Move the keep-alive logic to a single background goroutine in NewSSEHandler that publishes an SSE comment to the SSETopicAllUsers topic once per interval. Use a context-aware loop to ensure it terminates when the service shuts down.

Comment on lines +85 to +99
func (h SSEHandler) listen() {
for e := range h.channel {
switch ev := e.Event.(type) {
default:
h.logger.Error().Interface("event", ev).Msg("unhandled event")
case events.SendSSE:
m := &sse.Message{
Type: sse.Type(ev.Type),
}
m.AppendData(string(ev.Message))
if err := h.server.Publish(m, ev.UserIDs...); err != nil {
h.logger.Error().Err(err).Msg("sse: failed to publish message")
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM RISK

The listen() loop (lines 85-99) is critical for message delivery but lacks context-aware termination and is entirely uncovered by tests. If the event channel isn't closed, this goroutine will leak upon service shutdown. Ensure this loop selects on ctx.Done() and add unit tests to verify that events.SendSSE events are correctly routed to the server.

Comment on lines +44 to +50
select {
case <-ctx.Done():
if err := handler.server.Shutdown(ctx); err != nil {
logger.Error().Err(err).Msg("failed to shutdown SSE handler")
}
return
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM RISK

Suggestion: Using the already-cancelled ctx for Shutdown will prevent the server from waiting for active connections to drain. Use a fresh context with a timeout.

Suggested change
select {
case <-ctx.Done():
if err := handler.server.Shutdown(ctx); err != nil {
logger.Error().Err(err).Msg("failed to shutdown SSE handler")
}
return
}
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if err := handler.server.Shutdown(shutdownCtx); err != nil {
logger.Error().Err(err).Msg("failed to shutdown SSE handler")
}
return

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

1 participant