Distributed Concurrent Editor

connection.go at [6cd08b113e]
Login

connection.go at [6cd08b113e]

File http/connection.go artifact e2a25da011 part of check-in 6cd08b113e


     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
package http

import (
	"context"

	"github.com/rs/zerolog"
	"nhooyr.io/websocket"
	"wellquite.org/edist/document"
	protocol "wellquite.org/edist/protocol/go"
)

type Connection struct {
	server       *Server
	log          zerolog.Logger
	subscription *document.DocumentSubscription
	wsConn       *websocket.Conn
}

func NewConnection(server *Server, log zerolog.Logger, documentName string, wsConn *websocket.Conn) *Connection {
	background := context.Background()
	subscription := server.registry.NewClient().SubscribeToDocumentUpdates(documentName, func(updates []byte) {
		wsConn.Write(background, websocket.MessageBinary, updates)
	})
	if subscription == nil {
		return nil
	}
	return &Connection{
		server:       server,
		log:          log,
		subscription: subscription,
		wsConn:       wsConn,
	}
}

func (self *Connection) ReadLoop() {
	self.log.Info().Msg("New websocket connection established.")
	defer self.server.ConnectionClosed(self)
	defer self.subscription.Cancel()
	background := context.Background()
	for {
		msgType, msgReader, err := self.wsConn.Reader(background)
		if err != nil {
			self.log.Info().Err(err).Msg(`Error when reading from websocket connection. Closing connection.`)
			return
		}

		if msgType != websocket.MessageBinary {
			self.log.Info().Str(`message type`, msgType.String()).Msg(`Only binary messages are expected on websocket connection. Closing connection.`)
			return
		}

		var msg protocol.Message
		if err = msg.DecodeBebop(msgReader); err != nil {
			self.log.Info().Err(err).Msg(`Unable to decode message received on websocket connection. Closing connection.`)
			return
		}

		if !self.subscription.Client.Message(&msg) {
			return
		}
	}
}

func (self *Connection) Close() error {
	return self.wsConn.Close(websocket.StatusGoingAway, `Server shutdown.`)
}