Distributed Concurrent Editor

connection.go at [6cd08b113e]
Login

File http/connection.go artifact e2a25da011 part of check-in 6cd08b113e


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
package http

import (
	"context"

	"github.com/rs/zerolog"
	"nhooyr.io/websocket"
	"wellquite.org/edist/document"
	protocol "wellquite.org/edist/protocol/go"
)

type Connection struct {
	server       *Server
	log          zerolog.Logger
	subscription *document.DocumentSubscription
	wsConn       *websocket.Conn
}

func NewConnection(server *Server, log zerolog.Logger, documentName string, wsConn *websocket.Conn) *Connection {
	background := context.Background()
	subscription := server.registry.NewClient().SubscribeToDocumentUpdates(documentName, func(updates []byte) {
		wsConn.Write(background, websocket.MessageBinary, updates)
	})
	if subscription == nil {
		return nil
	}
	return &Connection{
		server:       server,
		log:          log,
		subscription: subscription,
		wsConn:       wsConn,
	}
}

func (self *Connection) ReadLoop() {
	self.log.Info().Msg("New websocket connection established.")
	defer self.server.ConnectionClosed(self)
	defer self.subscription.Cancel()
	background := context.Background()
	for {
		msgType, msgReader, err := self.wsConn.Reader(background)
		if err != nil {
			self.log.Info().Err(err).Msg(`Error when reading from websocket connection. Closing connection.`)
			return
		}

		if msgType != websocket.MessageBinary {
			self.log.Info().Str(`message type`, msgType.String()).Msg(`Only binary messages are expected on websocket connection. Closing connection.`)
			return
		}

		var msg protocol.Message
		if err = msg.DecodeBebop(msgReader); err != nil {
			self.log.Info().Err(err).Msg(`Unable to decode message received on websocket connection. Closing connection.`)
			return
		}

		if !self.subscription.Client.Message(&msg) {
			return
		}
	}
}

func (self *Connection) Close() error {
	return self.wsConn.Close(websocket.StatusGoingAway, `Server shutdown.`)
}