Distributed Concurrent Editor

Artifact [e2a25da011]
Login

Artifact [e2a25da011]

Artifact e2a25da011cf8b3c0eb95f6d3f7178d6fc4cf66a5f4941986ff248b023a407ec:


package http

import (
	"context"

	"github.com/rs/zerolog"
	"nhooyr.io/websocket"
	"wellquite.org/edist/document"
	protocol "wellquite.org/edist/protocol/go"
)

type Connection struct {
	server       *Server
	log          zerolog.Logger
	subscription *document.DocumentSubscription
	wsConn       *websocket.Conn
}

func NewConnection(server *Server, log zerolog.Logger, documentName string, wsConn *websocket.Conn) *Connection {
	background := context.Background()
	subscription := server.registry.NewClient().SubscribeToDocumentUpdates(documentName, func(updates []byte) {
		wsConn.Write(background, websocket.MessageBinary, updates)
	})
	if subscription == nil {
		return nil
	}
	return &Connection{
		server:       server,
		log:          log,
		subscription: subscription,
		wsConn:       wsConn,
	}
}

func (self *Connection) ReadLoop() {
	self.log.Info().Msg("New websocket connection established.")
	defer self.server.ConnectionClosed(self)
	defer self.subscription.Cancel()
	background := context.Background()
	for {
		msgType, msgReader, err := self.wsConn.Reader(background)
		if err != nil {
			self.log.Info().Err(err).Msg(`Error when reading from websocket connection. Closing connection.`)
			return
		}

		if msgType != websocket.MessageBinary {
			self.log.Info().Str(`message type`, msgType.String()).Msg(`Only binary messages are expected on websocket connection. Closing connection.`)
			return
		}

		var msg protocol.Message
		if err = msg.DecodeBebop(msgReader); err != nil {
			self.log.Info().Err(err).Msg(`Unable to decode message received on websocket connection. Closing connection.`)
			return
		}

		if !self.subscription.Client.Message(&msg) {
			return
		}
	}
}

func (self *Connection) Close() error {
	return self.wsConn.Close(websocket.StatusGoingAway, `Server shutdown.`)
}