package http
import (
"context"
"github.com/rs/zerolog"
"nhooyr.io/websocket"
"wellquite.org/edist/document"
protocol "wellquite.org/edist/protocol/go"
)
type Connection struct {
server *Server
log zerolog.Logger
subscription *document.DocumentSubscription
wsConn *websocket.Conn
}
func NewConnection(server *Server, log zerolog.Logger, documentName string, wsConn *websocket.Conn) *Connection {
background := context.Background()
subscription := server.registry.NewClient().SubscribeToDocumentUpdates(documentName, func(updates []byte) {
wsConn.Write(background, websocket.MessageBinary, updates)
})
if subscription == nil {
return nil
}
return &Connection{
server: server,
log: log,
subscription: subscription,
wsConn: wsConn,
}
}
func (self *Connection) ReadLoop() {
self.log.Info().Msg("New websocket connection established.")
defer self.server.ConnectionClosed(self)
defer self.subscription.Cancel()
background := context.Background()
for {
msgType, msgReader, err := self.wsConn.Reader(background)
if err != nil {
self.log.Info().Err(err).Msg(`Error when reading from websocket connection. Closing connection.`)
return
}
if msgType != websocket.MessageBinary {
self.log.Info().Str(`message type`, msgType.String()).Msg(`Only binary messages are expected on websocket connection. Closing connection.`)
return
}
var msg protocol.Message
if err = msg.DecodeBebop(msgReader); err != nil {
self.log.Info().Err(err).Msg(`Unable to decode message received on websocket connection. Closing connection.`)
return
}
if !self.subscription.Client.Message(&msg) {
return
}
}
}
func (self *Connection) Close() error {
return self.wsConn.Close(websocket.StatusGoingAway, `Server shutdown.`)
}