Distributed Concurrent Editor

Artifact [5028e361ba]
Login

Artifact [5028e361ba]

Artifact 5028e361ba8e0ed310ec99a313341e916dda5fa8fcefc3f4dc8857a5bc70c8d8:


package document_test

import (
	"fmt"
	"math/rand"
	"os"
	"path/filepath"
	"sort"
	"testing"

	"github.com/matryer/is"
	"github.com/rs/zerolog"
	bolt "go.etcd.io/bbolt"
	"golang.org/x/term"
	"wellquite.org/actors"
	"wellquite.org/edist/document"
	protocol "wellquite.org/edist/protocol/go"
)

type documentTester struct {
	tb       testing.TB
	is       *is.I
	log      zerolog.Logger
	rng      *rand.Rand
	clientId uint64

	dbPath                string
	db                    *bolt.DB
	manager               actors.ManagerClient
	documentClientFactory *document.DocumentClientFactory
}

func newDocumentTester(tb testing.TB, seed int64, managerSpawnFun func(*documentTester)) *documentTester {
	rng := rand.New(rand.NewSource(seed))
	documentTester := &documentTester{
		tb:       tb,
		is:       is.New(tb),
		log:      NewTestLogger(tb),
		rng:      rng,
		clientId: rng.Uint64(),
	}
	documentTester.log.Info().Int64("seed", seed).Send()
	documentTester.newDB()
	managerSpawnFun(documentTester)
	return documentTester
}

func (self *documentTester) newDB() {
	self.is.True(self.dbPath == "" && self.db == nil) // it looks like a database is already open?

	fh, err := os.CreateTemp("", fmt.Sprintf("edist-test-%s-", self.tb.Name()))
	self.is.NoErr(err)
	name := fh.Name()
	err = fh.Close()
	self.is.NoErr(err)
	absPath, err := filepath.Abs(name)
	self.is.NoErr(err)
	db, err := bolt.Open(absPath, 0666, nil)
	self.is.NoErr(err)
	self.log.Info().Str("path", absPath).Msg("Opened database.")

	self.dbPath = absPath
	self.db = db
}

var plainManagerSpawnFun = (*documentTester).spawnPlainManager

func (self *documentTester) spawnPlainManager() {
	self.is.True(self.manager == nil) // a manager must not already exist
	manager, err := actors.SpawnManager(self.log, self.tb.Name()+"Manager")
	self.is.NoErr(err)
	self.manager = manager
}

func (self *documentTester) spawnDocument(documentName string) {
	self.is.True(documentName != "")                // document name cannot be empty
	self.is.True(self.documentClientFactory == nil) // a document client factory must not already exist
	self.is.True(self.db != nil)                    // db must already be created
	self.is.True(self.manager != nil)               // manager must already be created
	clientId := self.rng.Uint64()
	factory, err := document.SpawnDocument(self.db, self.manager, clientId, documentName)
	self.is.NoErr(err)
	self.documentClientFactory = factory
}

func (self *documentTester) closeDocument() {
	self.is.True(self.documentClientFactory != nil) // we need a document to be open!
	self.documentClientFactory.NewClient().TerminateSync()
	self.documentClientFactory = nil
}

func (self *documentTester) close() {
	if self.documentClientFactory != nil {
		self.documentClientFactory.NewClient().TerminateSync()
		self.documentClientFactory = nil
	}

	if self.manager != nil {
		self.manager.TerminateSync()
		self.manager = nil
	}

	if self.db != nil {
		self.is.NoErr(self.db.Close())
		self.db = nil
	}

	if self.dbPath != "" {
		self.is.NoErr(os.Remove(self.dbPath))
		self.dbPath = ""
	}
}

var alphabet = []rune("abcdefghijklmnopqrstuvwxyz")

func (self *documentTester) randomWord(allowEmpty bool) string {
	length := self.rng.Intn(11)
	if !allowEmpty {
		length++
	}
	word := make([]rune, length)
	for idx := range word {
		word[idx] = alphabet[self.rng.Intn(len(alphabet))]
	}
	return string(word)
}

type listeningClient struct {
	*documentTester

	documentEvolution <-chan string
}

func (self *documentTester) newListeningClient() *listeningClient {
	self.is.True(self.documentClientFactory != nil) // document must be spawned first

	docStrChan := make(chan string, 1)
	listeningClient := &listeningClient{
		documentTester:    self,
		documentEvolution: docStrChan,
	}

	documentClient := self.documentClientFactory.NewClient()
	documentClient.OnTermination(func(subscription *actors.TerminationSubscription, err error, caughtPanic interface{}) {
		close(docStrChan)
	})
	wordManager := document.NewWordManager()
	documentClient.SubscribeToDocumentUpdates(func(updates []byte) {
		msg, err := protocol.MakeMessageFromBytes(updates)
		if err != nil {
			// can't use fatal as we'll be in the wrong go-routine
			listeningClient.tb.Error(err)
			return
		}
		filtered := wordManager.FilterUpdatesForNewer(*msg.Updates)
		wordManager.ApplyUpdates(filtered)
		wordManager.GC()
		docStrChan <- wordManager.String()
	})

	return listeningClient
}

func (self *listeningClient) nextDocumentRendering() (string, bool) {
	str, ok := <-self.documentEvolution
	return str, ok
}

type mutatingClient struct {
	*documentTester

	documentClient   *document.DocumentClient
	wordIdCounter    uint64
	sentGenerations  []*generation
	unsentGeneration *unsentGeneration
	maxVersions      map[protocol.WordId]protocol.Version
}

func (self *documentTester) newMutatingClient(orderedWords ...*protocol.Word) *mutatingClient {
	self.is.True(self.documentClientFactory != nil) // document must be spawned first
	mutatingClient := &mutatingClient{
		documentTester: self,
		documentClient: self.documentClientFactory.NewClient(),
		sentGenerations: []*generation{
			{
				generationNumber: 0,
				orderedWords:     orderedWords,
			},
		},
		maxVersions: make(map[protocol.WordId]protocol.Version),
	}

	for _, word := range orderedWords {
		mutatingClient.maxVersions[word.WordId] = word.Version
	}

	mutatingClient.unsentGeneration = mutatingClient.newUnsentGeneration(mutatingClient.sentGenerations[0])

	return mutatingClient
}

func (self *documentTester) newMutatingClientWithEmptyRoot() *mutatingClient {
	root := &protocol.Word{
		WordId: protocol.WordId{
			ClientId: 0,
			Counter:  1,
		},
	}
	return self.newMutatingClient(root)
}

type wordState struct {
	word    *protocol.Word
	isDirty bool
}

type generation struct {
	*mutatingClient
	previous         *generation
	next             *generation
	generationNumber int
	orderedWords     []*protocol.Word
}

type unsentGeneration struct {
	generation
	wordStateById map[protocol.WordId]*wordState
	isDirty       bool
}

func (self *mutatingClient) newUnsentGeneration(previous *generation) *unsentGeneration {
	orderedWordsCopy := make([]*protocol.Word, len(previous.orderedWords))
	wordStateById := make(map[protocol.WordId]*wordState)
	for idx, word := range previous.orderedWords {
		wordCopy := *word
		orderedWordsCopy[idx] = &wordCopy
		wordStateById[wordCopy.WordId] = &wordState{word: &wordCopy, isDirty: false}
	}
	return &unsentGeneration{
		generation: generation{
			mutatingClient:   self,
			previous:         previous,
			generationNumber: previous.generationNumber + 1,
			orderedWords:     orderedWordsCopy,
		},
		wordStateById: wordStateById,
		isDirty:       false,
	}
}

func (self *generation) String() string {
	str := ""
	for _, word := range self.orderedWords {
		str += " " + word.Letters
	}
	if len(str) > 0 {
		return str[1:]
	}
	return str
}

func (self *unsentGeneration) editExistingWord() {
	word := self.orderedWords[self.rng.Intn(len(self.orderedWords))]
	word.Letters = self.randomWord(true)
	self.wordStateById[word.WordId].isDirty = true
	self.isDirty = true
}

func (self *unsentGeneration) deleteWord() {
	// make sure we never delete the first/left-most word
	index := 1 + self.rng.Intn(len(self.orderedWords)-1)
	deletedWord := self.orderedWords[index]
	self.orderedWords = append(self.orderedWords[:index], self.orderedWords[index+1:]...)
	previous, previousFound := self.wordStateById[*deletedWord.Links.Previous]
	self.is.True(previousFound)

	// next may not exist if we're deleting the last word
	if deletedWord.Links.Next == nil {
		previous.word.Links.Next = nil
		previous.isDirty = true

	} else {
		next, nextFound := self.wordStateById[*deletedWord.Links.Next]
		self.is.True(nextFound)

		previous.word.Links.Next = &next.word.WordId
		previous.isDirty = true

		next.word.Links.Previous = &previous.word.WordId
		next.isDirty = true
	}
	self.isDirty = true
}

func (self *unsentGeneration) addWord() {
	// index is the position in orderedWords where we're going to
	// insert the new word
	index := 1 + self.rng.Intn(len(self.orderedWords))
	self.wordIdCounter++
	previous := self.orderedWords[index-1]
	addedWord := &protocol.Word{
		WordId: protocol.WordId{
			ClientId: self.clientId,
			Counter:  self.wordIdCounter,
		},
		Version: protocol.Version{
			Counter:  1,
			ClientId: self.clientId,
		},
		Letters: self.randomWord(true),
		Links: protocol.Links{
			Previous: &previous.WordId,
		},
	}
	addedWordState := &wordState{word: addedWord, isDirty: true}
	self.maxVersions[addedWord.WordId] = addedWord.Version
	previous.Links.Next = &addedWord.WordId
	self.wordStateById[addedWord.WordId] = addedWordState
	self.wordStateById[previous.WordId].isDirty = true
	self.isDirty = true

	if index == len(self.orderedWords) {
		self.orderedWords = append(self.orderedWords, addedWord)

	} else {
		// not appending to the end, so need to link to/from next
		next := self.orderedWords[index]
		next.Links.Previous = &addedWord.WordId
		addedWord.Links.Next = &next.WordId
		self.wordStateById[next.WordId].isDirty = true

		self.orderedWords = append(self.orderedWords[:index+1], self.orderedWords[index:]...)
		self.orderedWords[index] = addedWord
	}
}

func (self *unsentGeneration) send() {
	self.is.True(self.isDirty)
	self.bumpAll()

	dirtyWords := make([]protocol.Word, 0, len(self.wordStateById))
	for wordId, wordState := range self.wordStateById {
		if wordState.isDirty {
			wordCopy := *wordState.word
			wordCopy.Version = self.maxVersions[wordId]
			dirtyWords = append(dirtyWords, wordCopy)
		}
	}
	self.is.True(len(dirtyWords) > 0)

	sort.Slice(dirtyWords, func(i, j int) bool {
		wordI, wordJ := dirtyWords[i], dirtyWords[j]
		return wordJ.WordId.GreaterThan(wordI.WordId)
	})
	self.rng.Shuffle(len(dirtyWords), func(i, j int) { dirtyWords[i], dirtyWords[j] = dirtyWords[j], dirtyWords[i] })
	message := &protocol.Message{
		Updates: &dirtyWords,
	}
	self.is.True(self.documentClient.Message(message))

	generation := &self.generation
	self.unsentGeneration = self.newUnsentGeneration(generation)
	self.previous.next = generation
	self.sentGenerations = append(self.sentGenerations[:generation.generationNumber], generation)
}

var tt = true

func (self *mutatingClient) canUndo() bool {
	generation := self.unsentGeneration.previous
	return !self.unsentGeneration.isDirty && generation != nil && generation.previous != nil
}

func (self *mutatingClient) sendUndo() {
	self.is.True(self.canUndo())
	message := &protocol.Message{Undo: &tt}
	self.is.True(self.documentClient.Message(message))

	self.unsentGeneration = self.newUnsentGeneration(self.unsentGeneration.previous.previous)
	self.bumpAll()
}

func (self *mutatingClient) canRedo() bool {
	generation := self.unsentGeneration.previous
	return !self.unsentGeneration.isDirty && generation != nil && generation.next != nil
}

func (self *mutatingClient) sendRedo() {
	self.is.True(self.canRedo())
	message := &protocol.Message{Redo: &tt}
	self.is.True(self.documentClient.Message(message))

	self.unsentGeneration = self.newUnsentGeneration(self.unsentGeneration.previous.next)
	self.bumpAll()
}

func (self *mutatingClient) bumpAll() {
	for wordId, version := range self.maxVersions {
		version.Bump(self.clientId)
		self.maxVersions[wordId] = version
	}
}

func NewTestLogger(tb testing.TB) zerolog.Logger {
	consoleWriter := zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(tb))
	isTerminal := term.IsTerminal(int(os.Stdout.Fd()))
	consoleWriter.NoColor = !isTerminal
	return zerolog.New(consoleWriter).With().Timestamp().Logger()
}

func setGlobalLogLevelDebug() {
	setGlobalLogLevel(zerolog.DebugLevel)
}

func setGlobalLogLevel(level zerolog.Level) {
	if testing.Verbose() {
		zerolog.SetGlobalLevel(level)
	} else {
		zerolog.SetGlobalLevel(zerolog.Disabled)
	}
}