Distributed Concurrent Editor

db.go at [f83c1422bc]
Login

File document/db.go artifact 3ad44ef5c3 part of check-in f83c1422bc


package document

import (
	"encoding/binary"
	"errors"
	"fmt"

	bolt "go.etcd.io/bbolt"
	protocol "wellquite.org/edist/protocol/go"
)

type DB struct {
	db                     *bolt.DB
	documentName           []byte
	largestUsedEventNumber uint64
}

func NewDB(db *bolt.DB, documentName string) *DB {
	documentNameBytes := []byte(documentName)
	var largestUsedEventNumber uint64
	err := db.Update(func(tx *bolt.Tx) error {
		bucket, err := tx.CreateBucketIfNotExists(documentNameBytes)
		if err != nil {
			return err
		}
		largestUsedEventNumber = bucket.Sequence()
		return nil
	})
	if err != nil {
		panic(err)
	}
	return &DB{
		db:                     db,
		documentName:           documentNameBytes,
		largestUsedEventNumber: largestUsedEventNumber,
	}
}

type eventType byte

const (
	messageEvent eventType = iota
	checkpointEvent
	maxEventType
)

func makeKey(eventNumber uint64, tipe eventType) []byte {
	key := make([]byte, 9)
	binary.BigEndian.PutUint64(key, eventNumber)
	key[8] = byte(tipe)
	return key
}

func (self *DB) WriteMessage(message *protocol.Message) (eventNumber uint64) {
	return self.write(messageEvent, message)
}

func (self *DB) WriteCheckpoint(message *protocol.Message) (eventNumber uint64) {
	return self.write(checkpointEvent, message)
}

func (self *DB) write(tipe eventType, message *protocol.Message) (eventNumber uint64) {
	bites := message.MarshalBebop()

	var largestUsedEventNumber uint64
	err := self.db.Batch(func(tx *bolt.Tx) (err error) {
		bucket := tx.Bucket(self.documentName)
		eventNumber, err = bucket.NextSequence()
		if err != nil {
			return err
		}
		largestUsedEventNumber = eventNumber
		return bucket.Put(makeKey(eventNumber, tipe), bites)
	})
	if err != nil {
		panic(err)
	}
	self.largestUsedEventNumber = largestUsedEventNumber

	return eventNumber
}

func (self *DB) LoadEvents(maxEventNumber uint64) frames {
	var framesReversed frames

	var largestUsedEventNumber uint64
	err := self.db.View(func(tx *bolt.Tx) (err error) {
		frameIndex := 0
		frameStack := newFrameStack()

		bucket := tx.Bucket(self.documentName)
		largestUsedEventNumber = bucket.Sequence()

		cursor := bucket.Cursor()
		for key, value := cursor.Last(); key != nil; key, value = cursor.Prev() {
			eventNumber, tipe, err := self.parseKey(key)
			if err != nil {
				return err
			}

			message, err := protocol.MakeMessageFromBytes(value)
			if err != nil {
				return err
			}

			if tipe == checkpointEvent && eventNumber < maxEventNumber && !frameStack.hasMessagelessFrames() {
				framesReversed = append(framesReversed, &frame{
					eventNumber:  eventNumber,
					isCheckpoint: true,
					message:      &message,
				})
				break

			} else if tipe == messageEvent {
				switch {
				case message.Updates != nil:
					frame := frameStack.ensureMessagelessFrame(frameIndex)
					frame.eventNumber = eventNumber
					frame.message = &message
					framesReversed = append(framesReversed, frame)
					frameIndex--

				case message.Redo != nil:
					frame := frameStack.ensureMessagelessFrame(frameIndex)
					frame.redoCount++
					framesReversed = append(framesReversed, frame)
					frameIndex--

				case message.Undo != nil:
					frameIndex++
					frame := frameStack.ensureMessagelessFrame(frameIndex)
					frame.undoCount++
					framesReversed = append(framesReversed, frame)

				default:
					return errors.New(`Illegal message: nil Updates, Redo, and Undo`)
				}
			}
		}

		if frameStack.hasMessagelessFrames() {
			return errors.New(`Illegal sequence of messages loaded; frame stack still has message-less frames`)
		}

		return nil
	})
	if err != nil {
		panic(err)
	}
	self.largestUsedEventNumber = largestUsedEventNumber

	framesForward := framesReversed
	frameCount := len(framesForward)
	for idx := 0; idx < frameCount/2; idx++ {
		idy := frameCount - idx - 1
		framesForward[idx], framesForward[idy] = framesForward[idy], framesForward[idx]
	}
	return framesForward
}

func (self *DB) parseKey(key []byte) (eventNumber uint64, tipe eventType, err error) {
	if len(key) != 9 {
		return 0, maxEventType, fmt.Errorf(`Illegal key length. Expected keys of length 9, but got %d. %x`, len(key), key)
	}
	eventNumber = binary.BigEndian.Uint64(key)
	tipe = eventType(key[8])
	if tipe >= maxEventType {
		return 0, maxEventType, fmt.Errorf(`Illegal event type: %x`, tipe)
	}
	return eventNumber, tipe, nil
}