Distributed Concurrent Editor

document.go at [f83c1422bc]
Login

document.go at [f83c1422bc]

File document/document.go artifact d27057e3a3 part of check-in f83c1422bc


package document

import (
	"errors"
	"fmt"
	"math"
	"sync/atomic"

	"github.com/rs/zerolog"
	bolt "go.etcd.io/bbolt"
	"wellquite.org/actors"
	"wellquite.org/actors/mailbox"
	protocol "wellquite.org/edist/protocol/go"
)

// --- Client ---

type DocumentClientFactory struct {
	factory *actors.BackPressureClientBaseFactory
}

func (self *DocumentClientFactory) NewClient() *DocumentClient {
	return &DocumentClient{BackPressureClientBase: self.factory.NewClient()}
}

type DocumentClient struct {
	*actors.BackPressureClientBase
}

var _ actors.Client = (*DocumentClient)(nil)

type DocumentSubscription struct {
	fun     func(updates []byte)
	Client  *DocumentClient
	expired uint32
}

func (self *DocumentSubscription) Cancel() {
	if atomic.CompareAndSwapUint32(&self.expired, 0, 1) {
		self.Client.Send((*documentUnsubscribeMsg)(self))
	}
}

type documentSubscribeMsg struct {
	actors.MsgSyncBase
	subscription *DocumentSubscription
}
type documentUnsubscribeMsg DocumentSubscription

func (self *DocumentClient) SubscribeToDocumentUpdates(fun func(updates []byte)) *DocumentSubscription {
	subscription := &DocumentSubscription{
		fun:    fun,
		Client: self,
	}
	msg := &documentSubscribeMsg{
		subscription: subscription,
	}
	if self.SendSync(msg, true) {
		return subscription
	} else {
		return nil
	}
}

type documentProtocolMessage struct {
	protoMsg *protocol.Message
	client   *DocumentClient
}

func (self *DocumentClient) Message(msg *protocol.Message) bool {
	return self.Send(&documentProtocolMessage{
		protoMsg: msg,
		client:   self,
	})
}

// --- Server ---

type documentServer struct {
	actors.BackPressureServerBase
	documentName string
	db           *DB
	clientId     uint64

	subscribers map[*DocumentSubscription]struct{}

	wordManager *WordManager

	currentFrame *frame
	loadedEvents frames
}

var _ actors.Server = (*documentServer)(nil)

func SpawnDocument(db *bolt.DB, manager actors.ManagerClient, clientId uint64, documentName string) (*DocumentClientFactory, error) {
	server := &documentServer{
		documentName: documentName,
		db:           NewDB(db, documentName),
		clientId:     clientId,
	}
	clientBase, err := manager.Spawn(server, fmt.Sprintf(`document %q`, documentName))
	if err != nil {
		return nil, err
	}
	return &DocumentClientFactory{
		factory: actors.NewBackPressureClientBaseFactory(clientBase),
	}, nil
}

func (self *documentServer) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *actors.ClientBase) (err error) {
	if err := self.BackPressureServerBase.Init(log, mailboxReader, selfClient); err != nil {
		return err
	}

	self.subscribers = make(map[*DocumentSubscription]struct{})
	self.loadEvents(math.MaxUint64)
	self.buildWordsFromEvents()

	if self.currentFrame == nil {
		self.Log.Info().Msg("Creating new document")
		self.maybeCreateCheckpoint()
	} else {
		self.Log.Info().Msg("Loaded existing document")
	}
	return nil
}

func (self *documentServer) HandleMsg(msg mailbox.Msg) (err error) {
	switch msgT := msg.(type) {
	case *documentProtocolMessage:
		protoMsg := msgT.protoMsg
		switch {
		case protoMsg.Updates != nil:
			return self.updates(msgT.client, protoMsg)

		case protoMsg.Undo != nil:
			return self.undo(protoMsg)

		case protoMsg.Redo != nil:
			return self.redo(protoMsg)

		default:
			return errors.New(`Illegal message: updates, undo, redo all nil`)
		}

	case *documentSubscribeMsg:
		msgT.MarkProcessed()
		self.subscribers[msgT.subscription] = struct{}{}
		self.Log.Debug().Int("subscription count", len(self.subscribers)).Msg("subscriber added")
		msgT.subscription.fun(self.wordManager.CreateCheckpoint().MarshalBebop())
		return nil

	case *documentUnsubscribeMsg:
		delete(self.subscribers, (*DocumentSubscription)(msgT))
		self.Log.Debug().Int("subscription count", len(self.subscribers)).Msg("subscriber removed")
		if len(self.subscribers) == 0 {
			return actors.ErrNormalActorTermination
		} else {
			return nil
		}

	default:
		return self.BackPressureServerBase.HandleMsg(msg)
	}
}

func (self *documentServer) updates(sender *DocumentClient, msg *protocol.Message) error {
	self.Log.Trace().Int("Updated words", len(*msg.Updates)).Msg("Received Updates")

	self.maybeCreateCheckpoint()

	filteredUpdates := self.wordManager.FilterUpdatesForNewer(*msg.Updates)
	self.wordManager.ApplyUpdates(filteredUpdates)
	self.wordManager.GC()
	updates := self.wordManager.FilterUpdatesForKnown(filteredUpdates)

	if len(updates) == 0 {
		return nil
	}

	msg.Updates = &updates
	eventNumber := self.db.WriteMessage(msg)

	msgBites := msg.MarshalBebop()
	for subscriber := range self.subscribers {
		if subscriber.Client != sender {
			subscriber.fun(msgBites)
		}
	}

	currentFrame := &frame{
		eventNumber: eventNumber,
		message:     msg,
		previous:    self.currentFrame,
	}
	if self.currentFrame != nil {
		self.currentFrame.next = currentFrame
	}
	self.currentFrame = currentFrame
	self.loadedEvents = append(self.loadedEvents, currentFrame)

	return nil
}

func (self *documentServer) undo(msg *protocol.Message) error {
	if self.currentFrame == nil {
		// illegal undo. Ignore it.
		self.Log.Trace().Msg("Received illegal Undo.")

		return nil
	}

	self.loadEvents(self.currentFrame.eventNumber)
	self.buildWordsFromEvents()
	if self.currentFrame.previous == nil {
		// illegal undo. Ignore it.
		self.Log.Trace().Msg("Received illegal Undo.")
		return nil
	}
	self.Log.Trace().Msg("Received Undo.")

	oldWordManager := self.wordManager

	currentFrame := self.currentFrame
	currentFrame.undoCount++
	self.loadedEvents = append(self.loadedEvents, currentFrame)
	self.buildWordsFromEvents()

	self.db.WriteMessage(msg)

	deltaMsgBites := self.wordManager.Delta(oldWordManager).MarshalBebop()
	for subscriber := range self.subscribers {
		subscriber.fun(deltaMsgBites)
	}

	return nil
}

func (self *documentServer) redo(msg *protocol.Message) error {
	if self.currentFrame == nil {
		// illegal redo. Ignore it.
		self.Log.Trace().Msg("Received illegal Redo")
		return nil
	}

	self.loadEvents(self.currentFrame.eventNumber)
	self.buildWordsFromEvents()

	currentFrame := self.currentFrame.next
	if currentFrame == nil {
		// illegal redo. Ignore it.
		self.Log.Trace().Msg("Received illegal Redo")
		return nil
	}
	self.Log.Trace().Msg("Received Redo")

	oldWordManager := self.wordManager

	currentFrame.redoCount++
	self.loadedEvents = append(self.loadedEvents, currentFrame)
	self.buildWordsFromEvents()

	self.db.WriteMessage(msg)

	deltaMsgBites := self.wordManager.Delta(oldWordManager).MarshalBebop()
	for subscriber := range self.subscribers {
		subscriber.fun(deltaMsgBites)
	}

	return nil
}

func (self *documentServer) loadEvents(maxEventNumber uint64) {
	if len(self.loadedEvents) > 0 {
		frame0 := self.loadedEvents[0]
		if frame0.eventNumber == 1 {
			// can't go back any further anyway
			return
		} else if frame0.isCheckpoint && frame0.eventNumber < maxEventNumber {
			// already loaded enough
			return
		}
	}
	self.loadedEvents = self.db.LoadEvents(maxEventNumber)
}

type frameApplyState struct {
	updates   []*UpdatePair
	undoCount uint64
	redoCount uint64
	undoNext  bool
}

func (self *documentServer) buildWordsFromEvents() {
	wordMgr := NewWordManager()
	states := make(map[*frame]*frameApplyState, len(self.loadedEvents))

	var currentFrame *frame
	for _, frame := range self.loadedEvents {
		fas, found := states[frame]
		if !found {
			if currentFrame != nil {
				currentFrame.next = frame
				frame.previous = currentFrame
			}
			currentFrame = frame

			// first time we've seen it, so this is the initial update
			fas = &frameApplyState{
				updates:   wordMgr.FilterUpdatesForNewer(*frame.message.Updates),
				undoCount: frame.undoCount,
				redoCount: frame.redoCount,
				undoNext:  true,
			}
			states[frame] = fas
			// even if we don't end up applying this update, we still
			// need to keep the version applied correctly
			for _, update := range fas.updates {
				if update.updated.Version.GreaterThan(update.original.Version) {
					update.original.Version = update.updated.Version
				}
			}

		} else {
			if fas.undoNext {
				if currentFrame != frame {
					panic(fmt.Sprintf("undo. currentFrame %v, frame %v", currentFrame, frame))
				}
				currentFrame = currentFrame.previous
				fas.undoCount--

			} else { // redoNext
				currentFrame = currentFrame.next
				if currentFrame != frame {
					panic(fmt.Sprintf("redo. currentFrame %v, frame %v", currentFrame, frame))
				}
				fas.redoCount--
			}
			fas.undoNext = !fas.undoNext

			for _, update := range fas.updates {
				// only bump it if the update was a modification of an
				// existing word. If the update was adding this word, then
				// we don't bump it.
				if !update.isNewWord {
					update.original.Version.Bump(self.clientId)
				}
			}
		}

		if fas.undoCount == 0 && fas.redoCount == 0 && fas.undoNext {
			// simplest case is undoCount=0, redoCount=0. So undoNext will
			// be true.
			wordMgr.ApplyUpdates(fas.updates)
		}
	}

	wordMgr.GC()
	self.wordManager = wordMgr
	self.currentFrame = currentFrame
}

const checkpointEvery = 16

func (self *documentServer) maybeCreateCheckpoint() {
	if self.db.largestUsedEventNumber%checkpointEvery == 0 {
		self.Log.Trace().Msg("Creating checkpoint")
		checkpoint := self.wordManager.CreateCheckpoint()
		eventNumber := self.db.WriteCheckpoint(checkpoint)

		currentFrame := &frame{
			eventNumber:  eventNumber,
			message:      checkpoint,
			isCheckpoint: true,
		}
		self.loadedEvents = frames{currentFrame}
		self.currentFrame = currentFrame
	}
}