package document import ( "errors" "fmt" "math" "sync/atomic" "github.com/rs/zerolog" bolt "go.etcd.io/bbolt" "wellquite.org/actors" "wellquite.org/actors/unboundedchan" protocol "wellquite.org/edist/protocol/go" ) // --- Client --- type DocumentClientFactory struct { factory *actors.BackPressureClientBaseFactory } func (self *DocumentClientFactory) NewClient() *DocumentClient { return &DocumentClient{BackPressureClientBase: self.factory.NewClient()} } type DocumentClient struct { *actors.BackPressureClientBase } var _ actors.Client = (*DocumentClient)(nil) type DocumentSubscription struct { fun func(updates []byte) Client *DocumentClient expired uint32 } func (self *DocumentSubscription) Cancel() { if atomic.CompareAndSwapUint32(&self.expired, 0, 1) { self.Client.Send((*documentUnsubscribeMsg)(self)) } } type documentSubscribeMsg struct { actors.MsgSyncBase subscription *DocumentSubscription } type documentUnsubscribeMsg DocumentSubscription func (self *DocumentClient) SubscribeToDocumentUpdates(fun func(updates []byte)) *DocumentSubscription { subscription := &DocumentSubscription{ fun: fun, Client: self, } msg := &documentSubscribeMsg{ subscription: subscription, } if self.SendSync(msg, true) { return subscription } else { return nil } } type documentProtocolMessage struct { protoMsg *protocol.Message client *DocumentClient } func (self *DocumentClient) Message(msg *protocol.Message) bool { return self.Send(&documentProtocolMessage{ protoMsg: msg, client: self, }) } // --- Server --- type documentServer struct { actors.BackPressureServerBase documentName string db *DB clientId uint64 subscribers map[*DocumentSubscription]struct{} wordManager *WordManager currentFrame *frame loadedEvents frames } var _ actors.Server = (*documentServer)(nil) func SpawnDocument(db *bolt.DB, manager actors.Manager, clientId uint64, documentName string) (*DocumentClientFactory, error) { server := &documentServer{ documentName: documentName, db: NewDB(db, documentName), clientId: clientId, } clientBase, err := manager.Spawn(server, fmt.Sprintf(`document %q`, documentName)) if err != nil { return nil, err } return &DocumentClientFactory{ factory: actors.NewBackPressureClientBaseFactory(clientBase), }, nil } func (self *documentServer) Init(log zerolog.Logger, chanReader *unboundedchan.UnboundedMsgChanReader, selfClient *actors.ClientBase) (err error) { if err := self.BackPressureServerBase.Init(log, chanReader, selfClient); err != nil { return err } self.subscribers = make(map[*DocumentSubscription]struct{}) self.loadEvents(math.MaxUint64) self.buildWordsFromEvents() if self.currentFrame == nil { self.Log.Info().Msg("Creating new document") self.maybeCreateCheckpoint() } else { self.Log.Info().Msg("Loaded existing document") } return nil } func (self *documentServer) HandleMsg(msg unboundedchan.Msg) (err error) { switch msgT := msg.(type) { case *documentProtocolMessage: protoMsg := msgT.protoMsg switch { case protoMsg.Updates != nil: return self.updates(msgT.client, protoMsg) case protoMsg.Undo != nil: return self.undo(protoMsg) case protoMsg.Redo != nil: return self.redo(protoMsg) default: return errors.New(`Illegal message: updates, undo, redo all nil`) } case *documentSubscribeMsg: msgT.MarkProcessed() self.subscribers[msgT.subscription] = struct{}{} self.Log.Debug().Int("subscription count", len(self.subscribers)).Msg("subscriber added") msgT.subscription.fun(self.wordManager.CreateCheckpoint().MarshalBebop()) return nil case *documentUnsubscribeMsg: delete(self.subscribers, (*DocumentSubscription)(msgT)) self.Log.Debug().Int("subscription count", len(self.subscribers)).Msg("subscriber removed") if len(self.subscribers) == 0 { return actors.ErrNormalActorTermination } else { return nil } default: return self.BackPressureServerBase.HandleMsg(msg) } } func (self *documentServer) updates(sender *DocumentClient, msg *protocol.Message) error { self.Log.Trace().Int("Updated words", len(*msg.Updates)).Msg("Received Updates") self.maybeCreateCheckpoint() filteredUpdates := self.wordManager.FilterUpdatesForNewer(*msg.Updates) self.wordManager.ApplyUpdates(filteredUpdates) self.wordManager.GC() updates := self.wordManager.FilterUpdatesForKnown(filteredUpdates) if len(updates) == 0 { return nil } msg.Updates = &updates eventNumber := self.db.WriteMessage(msg) msgBites := msg.MarshalBebop() for subscriber := range self.subscribers { if subscriber.Client != sender { subscriber.fun(msgBites) } } currentFrame := &frame{ eventNumber: eventNumber, message: msg, previous: self.currentFrame, } if self.currentFrame != nil { self.currentFrame.next = currentFrame } self.currentFrame = currentFrame self.loadedEvents = append(self.loadedEvents, currentFrame) return nil } func (self *documentServer) undo(msg *protocol.Message) error { if self.currentFrame == nil { // illegal undo. Ignore it. self.Log.Trace().Msg("Received illegal Undo.") return nil } self.loadEvents(self.currentFrame.eventNumber) self.buildWordsFromEvents() if self.currentFrame.previous == nil { // illegal undo. Ignore it. self.Log.Trace().Msg("Received illegal Undo.") return nil } self.Log.Trace().Msg("Received Undo.") oldWordManager := self.wordManager currentFrame := self.currentFrame currentFrame.undoCount++ self.loadedEvents = append(self.loadedEvents, currentFrame) self.buildWordsFromEvents() self.db.WriteMessage(msg) deltaMsgBites := self.wordManager.Delta(oldWordManager).MarshalBebop() for subscriber := range self.subscribers { subscriber.fun(deltaMsgBites) } return nil } func (self *documentServer) redo(msg *protocol.Message) error { if self.currentFrame == nil { // illegal redo. Ignore it. self.Log.Trace().Msg("Received illegal Redo") return nil } self.loadEvents(self.currentFrame.eventNumber) self.buildWordsFromEvents() currentFrame := self.currentFrame.next if currentFrame == nil { // illegal redo. Ignore it. self.Log.Trace().Msg("Received illegal Redo") return nil } self.Log.Trace().Msg("Received Redo") oldWordManager := self.wordManager currentFrame.redoCount++ self.loadedEvents = append(self.loadedEvents, currentFrame) self.buildWordsFromEvents() self.db.WriteMessage(msg) deltaMsgBites := self.wordManager.Delta(oldWordManager).MarshalBebop() for subscriber := range self.subscribers { subscriber.fun(deltaMsgBites) } return nil } func (self *documentServer) loadEvents(maxEventNumber uint64) { if len(self.loadedEvents) > 0 { frame0 := self.loadedEvents[0] if frame0.eventNumber == 1 { // can't go back any further anyway return } else if frame0.isCheckpoint && frame0.eventNumber < maxEventNumber { // already loaded enough return } } self.loadedEvents = self.db.LoadEvents(maxEventNumber) } type frameApplyState struct { updates []*UpdatePair undoCount uint64 redoCount uint64 undoNext bool } func (self *documentServer) buildWordsFromEvents() { wordMgr := NewWordManager() states := make(map[*frame]*frameApplyState, len(self.loadedEvents)) var currentFrame *frame for _, frame := range self.loadedEvents { fas, found := states[frame] if !found { if currentFrame != nil { currentFrame.next = frame frame.previous = currentFrame } currentFrame = frame // first time we've seen it, so this is the initial update fas = &frameApplyState{ updates: wordMgr.FilterUpdatesForNewer(*frame.message.Updates), undoCount: frame.undoCount, redoCount: frame.redoCount, undoNext: true, } states[frame] = fas // even if we don't end up applying this update, we still // need to keep the version applied correctly for _, update := range fas.updates { if update.updated.Version.GreaterThan(update.original.Version) { update.original.Version = update.updated.Version } } } else { if fas.undoNext { if currentFrame != frame { panic(fmt.Sprintf("undo. currentFrame %v, frame %v", currentFrame, frame)) } currentFrame = currentFrame.previous fas.undoCount-- } else { // redoNext currentFrame = currentFrame.next if currentFrame != frame { panic(fmt.Sprintf("redo. currentFrame %v, frame %v", currentFrame, frame)) } fas.redoCount-- } fas.undoNext = !fas.undoNext for _, update := range fas.updates { // only bump it if the update was a modification of an // existing word. If the update was adding this word, then // we don't bump it. if !update.isNewWord { update.original.Version.Bump(self.clientId) } } } if fas.undoCount == 0 && fas.redoCount == 0 && fas.undoNext { // simplest case is undoCount=0, redoCount=0. So undoNext will // be true. wordMgr.ApplyUpdates(fas.updates) } } wordMgr.GC() self.wordManager = wordMgr self.currentFrame = currentFrame } const checkpointEvery = 16 func (self *documentServer) maybeCreateCheckpoint() { if self.db.largestUsedEventNumber%checkpointEvery == 0 { self.Log.Trace().Msg("Creating checkpoint") checkpoint := self.wordManager.CreateCheckpoint() eventNumber := self.db.WriteCheckpoint(checkpoint) currentFrame := &frame{ eventNumber: eventNumber, message: checkpoint, isCheckpoint: true, } self.loadedEvents = frames{currentFrame} self.currentFrame = currentFrame } }