package document
import (
"errors"
"fmt"
"math"
"sync/atomic"
"github.com/rs/zerolog"
bolt "go.etcd.io/bbolt"
"wellquite.org/actors"
"wellquite.org/actors/mailbox"
protocol "wellquite.org/edist/protocol/go"
)
// --- Client ---
type DocumentClientFactory struct {
factory *actors.BackPressureClientBaseFactory
}
func (self *DocumentClientFactory) NewClient() *DocumentClient {
return &DocumentClient{BackPressureClientBase: self.factory.NewClient()}
}
type DocumentClient struct {
*actors.BackPressureClientBase
}
var _ actors.Client = (*DocumentClient)(nil)
type DocumentSubscription struct {
fun func(updates []byte)
Client *DocumentClient
expired uint32
}
func (self *DocumentSubscription) Cancel() {
if atomic.CompareAndSwapUint32(&self.expired, 0, 1) {
self.Client.Send((*documentUnsubscribeMsg)(self))
}
}
type documentSubscribeMsg struct {
actors.MsgSyncBase
subscription *DocumentSubscription
}
type documentUnsubscribeMsg DocumentSubscription
func (self *DocumentClient) SubscribeToDocumentUpdates(fun func(updates []byte)) *DocumentSubscription {
subscription := &DocumentSubscription{
fun: fun,
Client: self,
}
msg := &documentSubscribeMsg{
subscription: subscription,
}
if self.SendSync(msg, true) {
return subscription
} else {
return nil
}
}
type documentProtocolMessage struct {
protoMsg *protocol.Message
client *DocumentClient
}
func (self *DocumentClient) Message(msg *protocol.Message) bool {
return self.Send(&documentProtocolMessage{
protoMsg: msg,
client: self,
})
}
// --- Server ---
type documentServer struct {
actors.BackPressureServerBase
documentName string
db *DB
clientId uint64
subscribers map[*DocumentSubscription]struct{}
wordManager *WordManager
currentFrame *frame
loadedEvents frames
}
var _ actors.Server = (*documentServer)(nil)
func SpawnDocument(db *bolt.DB, manager actors.ManagerClient, clientId uint64, documentName string) (*DocumentClientFactory, error) {
server := &documentServer{
documentName: documentName,
db: NewDB(db, documentName),
clientId: clientId,
}
clientBase, err := manager.Spawn(server, fmt.Sprintf(`document %q`, documentName))
if err != nil {
return nil, err
}
return &DocumentClientFactory{
factory: actors.NewBackPressureClientBaseFactory(clientBase),
}, nil
}
func (self *documentServer) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *actors.ClientBase) (err error) {
if err := self.BackPressureServerBase.Init(log, mailboxReader, selfClient); err != nil {
return err
}
self.subscribers = make(map[*DocumentSubscription]struct{})
self.loadEvents(math.MaxUint64)
self.buildWordsFromEvents()
if self.currentFrame == nil {
self.Log.Info().Msg("Creating new document")
self.maybeCreateCheckpoint()
} else {
self.Log.Info().Msg("Loaded existing document")
}
return nil
}
func (self *documentServer) HandleMsg(msg mailbox.Msg) (err error) {
switch msgT := msg.(type) {
case *documentProtocolMessage:
protoMsg := msgT.protoMsg
switch {
case protoMsg.Updates != nil:
return self.updates(msgT.client, protoMsg)
case protoMsg.Undo != nil:
return self.undo(protoMsg)
case protoMsg.Redo != nil:
return self.redo(protoMsg)
default:
return errors.New(`Illegal message: updates, undo, redo all nil`)
}
case *documentSubscribeMsg:
msgT.MarkProcessed()
self.subscribers[msgT.subscription] = struct{}{}
self.Log.Debug().Int("subscription count", len(self.subscribers)).Msg("subscriber added")
msgT.subscription.fun(self.wordManager.CreateCheckpoint().MarshalBebop())
return nil
case *documentUnsubscribeMsg:
delete(self.subscribers, (*DocumentSubscription)(msgT))
self.Log.Debug().Int("subscription count", len(self.subscribers)).Msg("subscriber removed")
if len(self.subscribers) == 0 {
return actors.ErrNormalActorTermination
} else {
return nil
}
default:
return self.BackPressureServerBase.HandleMsg(msg)
}
}
func (self *documentServer) updates(sender *DocumentClient, msg *protocol.Message) error {
self.Log.Trace().Int("Updated words", len(*msg.Updates)).Msg("Received Updates")
self.maybeCreateCheckpoint()
filteredUpdates := self.wordManager.FilterUpdatesForNewer(*msg.Updates)
self.wordManager.ApplyUpdates(filteredUpdates)
self.wordManager.GC()
updates := self.wordManager.FilterUpdatesForKnown(filteredUpdates)
if len(updates) == 0 {
return nil
}
msg.Updates = &updates
eventNumber := self.db.WriteMessage(msg)
msgBites := msg.MarshalBebop()
for subscriber := range self.subscribers {
if subscriber.Client != sender {
subscriber.fun(msgBites)
}
}
currentFrame := &frame{
eventNumber: eventNumber,
message: msg,
previous: self.currentFrame,
}
if self.currentFrame != nil {
self.currentFrame.next = currentFrame
}
self.currentFrame = currentFrame
self.loadedEvents = append(self.loadedEvents, currentFrame)
return nil
}
func (self *documentServer) undo(msg *protocol.Message) error {
if self.currentFrame == nil {
// illegal undo. Ignore it.
self.Log.Trace().Msg("Received illegal Undo.")
return nil
}
self.loadEvents(self.currentFrame.eventNumber)
self.buildWordsFromEvents()
if self.currentFrame.previous == nil {
// illegal undo. Ignore it.
self.Log.Trace().Msg("Received illegal Undo.")
return nil
}
self.Log.Trace().Msg("Received Undo.")
oldWordManager := self.wordManager
currentFrame := self.currentFrame
currentFrame.undoCount++
self.loadedEvents = append(self.loadedEvents, currentFrame)
self.buildWordsFromEvents()
self.db.WriteMessage(msg)
deltaMsgBites := self.wordManager.Delta(oldWordManager).MarshalBebop()
for subscriber := range self.subscribers {
subscriber.fun(deltaMsgBites)
}
return nil
}
func (self *documentServer) redo(msg *protocol.Message) error {
if self.currentFrame == nil {
// illegal redo. Ignore it.
self.Log.Trace().Msg("Received illegal Redo")
return nil
}
self.loadEvents(self.currentFrame.eventNumber)
self.buildWordsFromEvents()
currentFrame := self.currentFrame.next
if currentFrame == nil {
// illegal redo. Ignore it.
self.Log.Trace().Msg("Received illegal Redo")
return nil
}
self.Log.Trace().Msg("Received Redo")
oldWordManager := self.wordManager
currentFrame.redoCount++
self.loadedEvents = append(self.loadedEvents, currentFrame)
self.buildWordsFromEvents()
self.db.WriteMessage(msg)
deltaMsgBites := self.wordManager.Delta(oldWordManager).MarshalBebop()
for subscriber := range self.subscribers {
subscriber.fun(deltaMsgBites)
}
return nil
}
func (self *documentServer) loadEvents(maxEventNumber uint64) {
if len(self.loadedEvents) > 0 {
frame0 := self.loadedEvents[0]
if frame0.eventNumber == 1 {
// can't go back any further anyway
return
} else if frame0.isCheckpoint && frame0.eventNumber < maxEventNumber {
// already loaded enough
return
}
}
self.loadedEvents = self.db.LoadEvents(maxEventNumber)
}
type frameApplyState struct {
updates []*UpdatePair
undoCount uint64
redoCount uint64
undoNext bool
}
func (self *documentServer) buildWordsFromEvents() {
wordMgr := NewWordManager()
states := make(map[*frame]*frameApplyState, len(self.loadedEvents))
var currentFrame *frame
for _, frame := range self.loadedEvents {
fas, found := states[frame]
if !found {
if currentFrame != nil {
currentFrame.next = frame
frame.previous = currentFrame
}
currentFrame = frame
// first time we've seen it, so this is the initial update
fas = &frameApplyState{
updates: wordMgr.FilterUpdatesForNewer(*frame.message.Updates),
undoCount: frame.undoCount,
redoCount: frame.redoCount,
undoNext: true,
}
states[frame] = fas
// even if we don't end up applying this update, we still
// need to keep the version applied correctly
for _, update := range fas.updates {
if update.updated.Version.GreaterThan(update.original.Version) {
update.original.Version = update.updated.Version
}
}
} else {
if fas.undoNext {
if currentFrame != frame {
panic(fmt.Sprintf("undo. currentFrame %v, frame %v", currentFrame, frame))
}
currentFrame = currentFrame.previous
fas.undoCount--
} else { // redoNext
currentFrame = currentFrame.next
if currentFrame != frame {
panic(fmt.Sprintf("redo. currentFrame %v, frame %v", currentFrame, frame))
}
fas.redoCount--
}
fas.undoNext = !fas.undoNext
for _, update := range fas.updates {
// only bump it if the update was a modification of an
// existing word. If the update was adding this word, then
// we don't bump it.
if !update.isNewWord {
update.original.Version.Bump(self.clientId)
}
}
}
if fas.undoCount == 0 && fas.redoCount == 0 && fas.undoNext {
// simplest case is undoCount=0, redoCount=0. So undoNext will
// be true.
wordMgr.ApplyUpdates(fas.updates)
}
}
wordMgr.GC()
self.wordManager = wordMgr
self.currentFrame = currentFrame
}
const checkpointEvery = 16
func (self *documentServer) maybeCreateCheckpoint() {
if self.db.largestUsedEventNumber%checkpointEvery == 0 {
self.Log.Trace().Msg("Creating checkpoint")
checkpoint := self.wordManager.CreateCheckpoint()
eventNumber := self.db.WriteCheckpoint(checkpoint)
currentFrame := &frame{
eventNumber: eventNumber,
message: checkpoint,
isCheckpoint: true,
}
self.loadedEvents = frames{currentFrame}
self.currentFrame = currentFrame
}
}