Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.12
github.com/prometheus/client_golang v1.23.2
github.com/r3labs/sse/v2 v2.10.0
github.com/riandyrn/otelchi v0.12.2
github.com/rogpeppe/go-internal v1.14.1
github.com/rs/cors v1.11.1
Expand All @@ -89,6 +88,7 @@ require (
github.com/thejerf/suture/v4 v4.0.6
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
github.com/tmaxmax/go-sse v0.11.0
github.com/tus/tusd/v2 v2.9.2
github.com/unrolled/secure v1.16.0
github.com/vmihailenco/msgpack/v5 v5.4.1
Expand Down Expand Up @@ -394,7 +394,6 @@ require (
golang.org/x/tools v0.42.0 // indirect
google.golang.org/genproto v0.0.0-20260128011058-8636f8732409 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1059,8 +1059,6 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9
github.com/prometheus/statsd_exporter v0.22.8 h1:Qo2D9ZzaQG+id9i5NYNGmbf1aa/KxKbB9aKfMS+Yib0=
github.com/prometheus/statsd_exporter v0.22.8/go.mod h1:/DzwbTEaFTE0Ojz5PqcSk6+PFHOPWGxdXVr6yC8eFOM=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
Expand Down Expand Up @@ -1217,6 +1215,8 @@ github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYI
github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI=
github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw=
github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ=
github.com/tmaxmax/go-sse v0.11.0 h1:nogmJM6rJUoOLoAwEKeQe5XlVpt9l7N82SS1jI7lWFg=
github.com/tmaxmax/go-sse v0.11.0/go.mod h1:u/2kZQR1tyngo1lKaNCj1mJmhXGZWS1Zs5yiSOD+Eg8=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/toorop/go-dkim v0.0.0-20201103131630-e1cd1a0a5208 h1:PM5hJF7HVfNWmCjMdEfbuOBNXSVF2cMFGgQTPdKCbwM=
github.com/toorop/go-dkim v0.0.0-20201103131630-e1cd1a0a5208/go.mod h1:BzWtXXrXzZUvMacR0oF/fbDDgUPO8L36tDMmRAf14ns=
Expand Down Expand Up @@ -1420,7 +1420,6 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -1747,8 +1746,6 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
18 changes: 12 additions & 6 deletions services/sse/pkg/server/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package http

import (
"fmt"

stdhttp "net/http"

"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/google/uuid"
"github.com/riandyrn/otelchi"
"go-micro.dev/v4"

"github.com/opencloud-eu/reva/v2/pkg/events"

"github.com/opencloud-eu/opencloud/pkg/account"
"github.com/opencloud-eu/opencloud/pkg/cors"
"github.com/opencloud-eu/opencloud/pkg/middleware"
"github.com/opencloud-eu/opencloud/pkg/service/http"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
svc "github.com/opencloud-eu/opencloud/services/sse/pkg/service"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/riandyrn/otelchi"
"go-micro.dev/v4"
)

// Service is the service interface
Expand Down Expand Up @@ -82,12 +83,17 @@ func Server(opts ...Option) (http.Service, error) {
return http.Service{}, err
}

handle, err := svc.NewSSE(options.Config, options.Logger, ch, mux)
sseHandler, err := svc.NewSSEHandler(options.Context, options.Config, options.Logger, ch)
if err != nil {
return http.Service{}, err
}

svcHandler, err := svc.New(mux, sseHandler)
if err != nil {
return http.Service{}, err
}

if err := micro.RegisterHandler(service.Server(), handle); err != nil {
if err := micro.RegisterHandler(service.Server(), svcHandler); err != nil {
return http.Service{}, err
}

Expand Down
98 changes: 9 additions & 89 deletions services/sse/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,102 +2,22 @@ package service

import (
"net/http"
"time"

"github.com/go-chi/chi/v5"
"github.com/r3labs/sse/v2"

revactx "github.com/opencloud-eu/reva/v2/pkg/ctx"
"github.com/opencloud-eu/reva/v2/pkg/events"

"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/sse/pkg/config"
)

// SSE defines implements the business logic for Service.
type SSE struct {
c *config.Config
l log.Logger
m *chi.Mux
sse *sse.Server
evChannel <-chan events.Event
}

// NewSSE returns a service implementation for Service.
func NewSSE(c *config.Config, l log.Logger, ch <-chan events.Event, mux *chi.Mux) (SSE, error) {
s := SSE{
c: c,
l: l,
m: mux,
sse: sse.New(),
evChannel: ch,
}
mux.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) {
r.Get("/sse", s.HandleSSE)
})

go s.ListenForEvents()

return s, nil
type Service struct {
handler http.Handler
}

// ServeHTTP fulfills Handler interface
func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.m.ServeHTTP(w, r)
}
func New(mux *chi.Mux, sseHandler http.Handler) (Service, error) {
mux.Get("/ocs/v2.php/apps/notifications/api/v1/notifications/sse", sseHandler.ServeHTTP)

// ListenForEvents listens for events
func (s SSE) ListenForEvents() {
for e := range s.evChannel {
switch ev := e.Event.(type) {
default:
s.l.Error().Interface("event", ev).Msg("unhandled event")
case events.SendSSE:
for _, uid := range ev.UserIDs {
s.sse.Publish(uid, &sse.Event{
Event: []byte(ev.Type),
Data: ev.Message,
})
}
}
}
return Service{
handler: mux,
}, nil
}

// HandleSSE is the GET handler for events
func (s SSE) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
if !ok {
s.l.Error().Msg("sse: no user in context")
w.WriteHeader(http.StatusInternalServerError)
return
}

uid := u.GetId().GetOpaqueId()
if uid == "" {
s.l.Error().Msg("sse: user in context is broken")
w.WriteHeader(http.StatusInternalServerError)
return
}

stream := s.sse.CreateStream(uid)
stream.AutoReplay = false

if s.c.KeepAliveInterval != 0 {
ticker := time.NewTicker(s.c.KeepAliveInterval)
defer ticker.Stop()
go func() {
for range ticker.C {
s.sse.Publish(uid, &sse.Event{
Comment: []byte("keepalive"),
})
}
}()
}

// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()

s.sse.ServeHTTP(w, r)
func (s Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}
114 changes: 114 additions & 0 deletions services/sse/pkg/service/sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package service

import (
"context"
"net/http"
"time"

"github.com/tmaxmax/go-sse"

revactx "github.com/opencloud-eu/reva/v2/pkg/ctx"
"github.com/opencloud-eu/reva/v2/pkg/events"

"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/sse/pkg/config"
)

const (
SSETopicAllUsers = "all"
)

// SSEHandler defines implements the business logic for Service.
type SSEHandler struct {
conf *config.Config
logger log.Logger
server *sse.Server
channel <-chan events.Event
}

// NewSSEHandler returns a service implementation for Service.
func NewSSEHandler(ctx context.Context, conf *config.Config, logger log.Logger, ch <-chan events.Event) (SSEHandler, error) {
handler := SSEHandler{
conf: conf,
logger: logger,
channel: ch,
}

handler.server = &sse.Server{
OnSession: func(_ http.ResponseWriter, r *http.Request) (topics []string, allowed bool) {
return handler.topics(r)
},
}

go func() {
select {
case <-ctx.Done():
if err := handler.server.Shutdown(ctx); err != nil {
logger.Error().Err(err).Msg("failed to shutdown SSE handler")
}
return
}
Comment on lines +44 to +50
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM RISK

Suggestion: Using the already-cancelled ctx for Shutdown will prevent the server from waiting for active connections to drain. Use a fresh context with a timeout.

Suggested change
select {
case <-ctx.Done():
if err := handler.server.Shutdown(ctx); err != nil {
logger.Error().Err(err).Msg("failed to shutdown SSE handler")
}
return
}
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if err := handler.server.Shutdown(shutdownCtx); err != nil {
logger.Error().Err(err).Msg("failed to shutdown SSE handler")
}
return

}()

go handler.listen()

return handler, nil
}

// ServeHTTP fulfills Handler interface
func (h SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
topics, ok := h.topics(r)
if !ok {
h.logger.Error().Msg("sse: failed to get topics")
w.WriteHeader(http.StatusInternalServerError)
return
}

if h.conf.KeepAliveInterval != 0 {
ticker := time.NewTicker(h.conf.KeepAliveInterval)
defer ticker.Stop()
go func() {
for range ticker.C {
m := &sse.Message{}
m.AppendData("keep-alive")
if err := h.server.Publish(m, topics...); err != nil {
h.logger.Error().Err(err).Msg("sse: failed to publish message")
}
}
}()
}
Comment on lines +67 to +79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 HIGH RISK

The current keep-alive implementation has significant performance and resource issues:

  1. Complexity: Spawning a ticker per request results in $O(N^2)$ message volume.
  2. Leaks: The goroutine will leak because range ticker.C does not exit when the ticker is stopped.
  3. Protocol: Heartbeats should use SSE comments (:keep-alive) instead of data fields to avoid triggering client-side message events.

Refactoring Suggestion: Move the keep-alive logic to a single background goroutine in NewSSEHandler that publishes an SSE comment to the SSETopicAllUsers topic once per interval. Use a context-aware loop to ensure it terminates when the service shuts down.


h.server.ServeHTTP(w, r)
}

// ListenForEvents listens for events
func (h SSEHandler) listen() {
for e := range h.channel {
switch ev := e.Event.(type) {
default:
h.logger.Error().Interface("event", ev).Msg("unhandled event")
case events.SendSSE:
m := &sse.Message{
Type: sse.Type(ev.Type),
}
m.AppendData(string(ev.Message))
if err := h.server.Publish(m, ev.UserIDs...); err != nil {
h.logger.Error().Err(err).Msg("sse: failed to publish message")
}
}
}
Comment on lines +85 to +99
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 MEDIUM RISK

The listen() loop (lines 85-99) is critical for message delivery but lacks context-aware termination and is entirely uncovered by tests. If the event channel isn't closed, this goroutine will leak upon service shutdown. Ensure this loop selects on ctx.Done() and add unit tests to verify that events.SendSSE events are correctly routed to the server.

}

func (h SSEHandler) topics(r *http.Request) ([]string, bool) {
u, ok := revactx.ContextGetUser(r.Context())
if !ok {
return nil, false
}

uid := u.GetId().GetOpaqueId()
if uid == "" {
return nil, false
}

return append([]string{SSETopicAllUsers}, uid), true
}
Loading