package http import ( "context" "github.com/rs/zerolog" "nhooyr.io/websocket" "wellquite.org/edist/document" protocol "wellquite.org/edist/protocol/go" ) type Connection struct { server *Server log zerolog.Logger subscription *document.DocumentSubscription wsConn *websocket.Conn } func NewConnection(server *Server, log zerolog.Logger, documentName string, wsConn *websocket.Conn) *Connection { background := context.Background() subscription := server.registry.NewClient().SubscribeToDocumentUpdates(documentName, func(updates []byte) { wsConn.Write(background, websocket.MessageBinary, updates) }) if subscription == nil { return nil } return &Connection{ server: server, log: log, subscription: subscription, wsConn: wsConn, } } func (self *Connection) ReadLoop() { self.log.Info().Msg("New websocket connection established.") defer self.server.ConnectionClosed(self) defer self.subscription.Cancel() background := context.Background() for { msgType, msgReader, err := self.wsConn.Reader(background) if err != nil { self.log.Info().Err(err).Msg(`Error when reading from websocket connection. Closing connection.`) return } if msgType != websocket.MessageBinary { self.log.Info().Str(`message type`, msgType.String()).Msg(`Only binary messages are expected on websocket connection. Closing connection.`) return } var msg protocol.Message if err = msg.DecodeBebop(msgReader); err != nil { self.log.Info().Err(err).Msg(`Unable to decode message received on websocket connection. Closing connection.`) return } if !self.subscription.Client.Message(&msg) { return } } } func (self *Connection) Close() error { return self.wsConn.Close(websocket.StatusGoingAway, `Server shutdown.`) }