diff --git a/cmd/queryer/main.go b/cmd/queryer/main.go index 1b07857..da56fcf 100644 --- a/cmd/queryer/main.go +++ b/cmd/queryer/main.go @@ -13,6 +13,7 @@ import ( "github.com/PRYVT/utils/pkg/auth" "github.com/PRYVT/utils/pkg/eventpolling" utilsRepo "github.com/PRYVT/utils/pkg/store/repository" + "github.com/PRYVT/utils/pkg/websocket" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -35,13 +36,13 @@ func main() { } eventRepo := utilsRepo.NewEventRepository(conn) userRepo := repository.NewUserRepository(conn) - userEventHandler := eventhandling.NewPostEventHandler(userRepo) - uc := controller.NewPostController(userRepo, userEventHandler) + postEventHandler := eventhandling.NewPostEventHandler(userRepo) + uc := controller.NewPostController(userRepo) aut := auth.NewAuthMiddleware() - wsH := controller.NewWsController(userEventHandler) + wsH := websocket.NewWsController(postEventHandler) h := httphandler.NewHttpHandler(uc, aut, wsH) - eventPolling := eventpolling.NewEventPolling(c, eventRepo, userEventHandler) + eventPolling := eventpolling.NewEventPolling(c, eventRepo, postEventHandler) tcpC, err := tcpClient.NewTcpEventClient() if err != nil { diff --git a/go.mod b/go.mod index 8313942..70aab36 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.1 require ( github.com/L4B0MB4/EVTSRC v0.5.4 // indirect - github.com/PRYVT/utils v0.3.0-rc // indirect + github.com/PRYVT/utils v0.4.0 // indirect github.com/bytedance/sonic v1.12.2 // indirect github.com/bytedance/sonic/loader v0.2.0 // indirect github.com/cloudwego/base64x v0.1.4 // indirect diff --git a/go.sum b/go.sum index efacb0d..c35965e 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,12 @@ github.com/PRYVT/utils v0.2.1 h1:GiTbziM3lqRLc4EWGV28+T/aKaY+B80KTqnkBklf9q0= github.com/PRYVT/utils v0.2.1/go.mod h1:j61GmoyWWXgnCq/laZTIJm4yhD0PreLDMZnYQqjSv7w= github.com/PRYVT/utils v0.3.0-rc h1:q5PlfgI0pu7Pv6b1A30BC/3lGIIhth2oggAxPpf/r40= github.com/PRYVT/utils v0.3.0-rc/go.mod h1:j61GmoyWWXgnCq/laZTIJm4yhD0PreLDMZnYQqjSv7w= +github.com/PRYVT/utils v0.4.0-rc-1 h1:leMBNSdBBuWJR2rgubzgGXwfJOEBN+3qxy6OtWNNKrs= +github.com/PRYVT/utils v0.4.0-rc-1/go.mod h1:G48oYenFuXUKVVA0sE/lmJlOwHGRacKH/9dNNae0DTk= +github.com/PRYVT/utils v0.4.0-rc-2 h1:rk+PNUrQmZssp/FtmpcYP/NfdqjZqvoB5aCaBruLxBM= +github.com/PRYVT/utils v0.4.0-rc-2/go.mod h1:G48oYenFuXUKVVA0sE/lmJlOwHGRacKH/9dNNae0DTk= +github.com/PRYVT/utils v0.4.0 h1:RQ7u6sunsic1EfZqOiaZDZvPEJwyx/DlPWqdFlgZHwM= +github.com/PRYVT/utils v0.4.0/go.mod h1:G48oYenFuXUKVVA0sE/lmJlOwHGRacKH/9dNNae0DTk= github.com/bytedance/sonic v1.12.2 h1:oaMFuRTpMHYLpCntGca65YWt5ny+wAceDERTkT2L9lg= github.com/bytedance/sonic v1.12.2/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= diff --git a/pkg/query/eventhandling/post.go b/pkg/query/eventhandling/post.go index 4485767..a2dc00d 100644 --- a/pkg/query/eventhandling/post.go +++ b/pkg/query/eventhandling/post.go @@ -6,34 +6,34 @@ import ( "github.com/L4B0MB4/EVTSRC/pkg/models" "github.com/PRYVT/posting/pkg/aggregates" "github.com/PRYVT/posting/pkg/query/store/repository" - ws "github.com/PRYVT/posting/pkg/query/websocket" + "github.com/PRYVT/utils/pkg/interfaces" "github.com/google/uuid" "github.com/rs/zerolog/log" ) type PostEventHandler struct { postRepo *repository.PostRepository - wsConnections []*ws.WebsocketConnection + wsConnections []interfaces.WebsocketConnecter mu sync.Mutex } func NewPostEventHandler(postRepo *repository.PostRepository) *PostEventHandler { return &PostEventHandler{ postRepo: postRepo, - wsConnections: []*ws.WebsocketConnection{}, + wsConnections: []interfaces.WebsocketConnecter{}, } } -func (eh *PostEventHandler) AddWebsocketConnection(conn *ws.WebsocketConnection) { +func (eh *PostEventHandler) AddWebsocketConnection(conn interfaces.WebsocketConnecter) { eh.mu.Lock() defer eh.mu.Unlock() eh.wsConnections = append(eh.wsConnections, conn) } -func removeDisconnectedSockets(slice []*ws.WebsocketConnection) []*ws.WebsocketConnection { - output := []*ws.WebsocketConnection{} +func removeDisconnectedSockets(slice []interfaces.WebsocketConnecter) []interfaces.WebsocketConnecter { + output := []interfaces.WebsocketConnecter{} for _, element := range slice { - if element.IsConnected { + if element.IsConnected() { output = append(output, element) } } @@ -55,7 +55,7 @@ func (eh *PostEventHandler) HandleEvent(event models.Event) error { return err } for _, conn := range eh.wsConnections { - if !conn.IsAuthenticated { + if !conn.IsAuthenticated() { continue } err := conn.WriteJSON(p) diff --git a/pkg/query/httphandler/controller/post_controller.go b/pkg/query/httphandler/controller/post_controller.go index f85e46c..727d0c6 100644 --- a/pkg/query/httphandler/controller/post_controller.go +++ b/pkg/query/httphandler/controller/post_controller.go @@ -6,17 +6,15 @@ import ( "github.com/PRYVT/posting/pkg/models/query" "github.com/PRYVT/posting/pkg/query/store/repository" "github.com/PRYVT/posting/pkg/query/utils" - "github.com/PRYVT/utils/pkg/eventpolling" "github.com/gin-gonic/gin" ) type PostController struct { - postRepo *repository.PostRepository - userEventH eventpolling.EventHanlder + postRepo *repository.PostRepository } -func NewPostController(userRepo *repository.PostRepository, userEventH eventpolling.EventHanlder) *PostController { - return &PostController{postRepo: userRepo, userEventH: userEventH} +func NewPostController(userRepo *repository.PostRepository) *PostController { + return &PostController{postRepo: userRepo} } func (ctrl *PostController) GetPost(c *gin.Context) { diff --git a/pkg/query/httphandler/controller/websocket_controller.go b/pkg/query/httphandler/controller/websocket_controller.go deleted file mode 100644 index 5d2f367..0000000 --- a/pkg/query/httphandler/controller/websocket_controller.go +++ /dev/null @@ -1,36 +0,0 @@ -package controller - -import ( - "net/http" - - "github.com/PRYVT/posting/pkg/query/eventhandling" - ws "github.com/PRYVT/posting/pkg/query/websocket" - "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" - "github.com/rs/zerolog/log" -) - -type WSController struct { - userEventH *eventhandling.PostEventHandler -} - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, -} - -func NewWsController(userEventH *eventhandling.PostEventHandler) *WSController { - - upgrader.CheckOrigin = func(r *http.Request) bool { return true } - return &WSController{userEventH: userEventH} -} - -func (w *WSController) OnRequest(c *gin.Context) { - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - log.Warn().Err(err).Msg("Error while upgrading connection") - - } else { - w.userEventH.AddWebsocketConnection(ws.NewWebsocketConnection(conn)) - } -} diff --git a/pkg/query/httphandler/handler.go b/pkg/query/httphandler/handler.go index 8c2588a..1011219 100644 --- a/pkg/query/httphandler/handler.go +++ b/pkg/query/httphandler/handler.go @@ -6,6 +6,7 @@ import ( "github.com/PRYVT/posting/pkg/query/httphandler/controller" "github.com/PRYVT/utils/pkg/auth" + ws "github.com/PRYVT/utils/pkg/websocket" "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" ) @@ -15,10 +16,10 @@ type HttpHandler struct { router *gin.Engine postController *controller.PostController authMiddleware *auth.AuthMiddleware - wsController *controller.WSController + wsController *ws.WSController } -func NewHttpHandler(c *controller.PostController, am *auth.AuthMiddleware, wsController *controller.WSController) *HttpHandler { +func NewHttpHandler(c *controller.PostController, am *auth.AuthMiddleware, wsController *ws.WSController) *HttpHandler { r := gin.Default() srv := &http.Server{ Addr: "0.0.0.0" + ":" + "5520", diff --git a/pkg/query/websocket/auth_req.go b/pkg/query/websocket/auth_req.go deleted file mode 100644 index 41d45a2..0000000 --- a/pkg/query/websocket/auth_req.go +++ /dev/null @@ -1,9 +0,0 @@ -package websocket - -import "encoding/json" - -type AuthRequest struct { - Token string `json:"token"` - Type string `json:"type"` - Data json.RawMessage -} diff --git a/pkg/query/websocket/websocket_connection.go b/pkg/query/websocket/websocket_connection.go deleted file mode 100644 index 0f34768..0000000 --- a/pkg/query/websocket/websocket_connection.go +++ /dev/null @@ -1,69 +0,0 @@ -package websocket - -import ( - "fmt" - - "github.com/PRYVT/utils/pkg/auth" - "github.com/google/uuid" - "github.com/gorilla/websocket" - "github.com/rs/zerolog/log" -) - -type WebsocketConnection struct { - connection *websocket.Conn - IsConnected bool - IsAuthenticated bool - userUuid uuid.UUID -} - -func NewWebsocketConnection(conn *websocket.Conn) *WebsocketConnection { - wC := &WebsocketConnection{connection: conn} - go wC.ReadForDisconnect() - return wC -} - -func (wC *WebsocketConnection) WriteJSON(v interface{}) error { - if !wC.IsAuthenticated { - return fmt.Errorf("WebsocketConnection is not connected or authenticated") - } - err := wC.connection.WriteJSON(v) - if err != nil { - log.Warn().Err(err).Msg("Error while writing WriteJSON") - } - return err -} - -func (wC *WebsocketConnection) ReadForDisconnect() { - wC.IsConnected = true - for { - authRequest := AuthRequest{} - err := wC.connection.ReadJSON(&authRequest) - if err != nil { - log.Debug().Err(err).Msg("Error while reading from websocket connection") - wC.IsAuthenticated = false - wC.connection.Close() - wC.IsConnected = false - break - } else { - log.Debug().Interface("authReq", authRequest).Msg("Received auth request") - _, err = auth.VerifyToken(authRequest.Token) - if err != nil { - log.Debug().Err(err).Msg("Error while verifying token") - wC.IsAuthenticated = false - wC.connection.Close() - wC.IsConnected = false - break - } - uuid, err := auth.GetUserUuidFromToken(authRequest.Token) - if err != nil { - log.Debug().Err(err).Msg("Error while getting user uuid from token") - wC.IsAuthenticated = false - wC.connection.Close() - wC.IsConnected = false - break - } - wC.userUuid = uuid - wC.IsAuthenticated = true - } - } -}