Distributed Concurrent Editor

db.go at [8dc8cbfc1c]
Login

File document/db.go artifact 3ad44ef5c3 part of check-in 8dc8cbfc1c


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
package document

import (
	"encoding/binary"
	"errors"
	"fmt"

	bolt "go.etcd.io/bbolt"
	protocol "wellquite.org/edist/protocol/go"
)

type DB struct {
	db                     *bolt.DB
	documentName           []byte
	largestUsedEventNumber uint64
}

func NewDB(db *bolt.DB, documentName string) *DB {
	documentNameBytes := []byte(documentName)
	var largestUsedEventNumber uint64
	err := db.Update(func(tx *bolt.Tx) error {
		bucket, err := tx.CreateBucketIfNotExists(documentNameBytes)
		if err != nil {
			return err
		}
		largestUsedEventNumber = bucket.Sequence()
		return nil
	})
	if err != nil {
		panic(err)
	}
	return &DB{
		db:                     db,
		documentName:           documentNameBytes,
		largestUsedEventNumber: largestUsedEventNumber,
	}
}

type eventType byte

const (
	messageEvent eventType = iota
	checkpointEvent
	maxEventType
)

func makeKey(eventNumber uint64, tipe eventType) []byte {
	key := make([]byte, 9)
	binary.BigEndian.PutUint64(key, eventNumber)
	key[8] = byte(tipe)
	return key
}

func (self *DB) WriteMessage(message *protocol.Message) (eventNumber uint64) {
	return self.write(messageEvent, message)
}

func (self *DB) WriteCheckpoint(message *protocol.Message) (eventNumber uint64) {
	return self.write(checkpointEvent, message)
}

func (self *DB) write(tipe eventType, message *protocol.Message) (eventNumber uint64) {
	bites := message.MarshalBebop()

	var largestUsedEventNumber uint64
	err := self.db.Batch(func(tx *bolt.Tx) (err error) {
		bucket := tx.Bucket(self.documentName)
		eventNumber, err = bucket.NextSequence()
		if err != nil {
			return err
		}
		largestUsedEventNumber = eventNumber
		return bucket.Put(makeKey(eventNumber, tipe), bites)
	})
	if err != nil {
		panic(err)
	}
	self.largestUsedEventNumber = largestUsedEventNumber

	return eventNumber
}

func (self *DB) LoadEvents(maxEventNumber uint64) frames {
	var framesReversed frames

	var largestUsedEventNumber uint64
	err := self.db.View(func(tx *bolt.Tx) (err error) {
		frameIndex := 0
		frameStack := newFrameStack()

		bucket := tx.Bucket(self.documentName)
		largestUsedEventNumber = bucket.Sequence()

		cursor := bucket.Cursor()
		for key, value := cursor.Last(); key != nil; key, value = cursor.Prev() {
			eventNumber, tipe, err := self.parseKey(key)
			if err != nil {
				return err
			}

			message, err := protocol.MakeMessageFromBytes(value)
			if err != nil {
				return err
			}

			if tipe == checkpointEvent && eventNumber < maxEventNumber && !frameStack.hasMessagelessFrames() {
				framesReversed = append(framesReversed, &frame{
					eventNumber:  eventNumber,
					isCheckpoint: true,
					message:      &message,
				})
				break

			} else if tipe == messageEvent {
				switch {
				case message.Updates != nil:
					frame := frameStack.ensureMessagelessFrame(frameIndex)
					frame.eventNumber = eventNumber
					frame.message = &message
					framesReversed = append(framesReversed, frame)
					frameIndex--

				case message.Redo != nil:
					frame := frameStack.ensureMessagelessFrame(frameIndex)
					frame.redoCount++
					framesReversed = append(framesReversed, frame)
					frameIndex--

				case message.Undo != nil:
					frameIndex++
					frame := frameStack.ensureMessagelessFrame(frameIndex)
					frame.undoCount++
					framesReversed = append(framesReversed, frame)

				default:
					return errors.New(`Illegal message: nil Updates, Redo, and Undo`)
				}
			}
		}

		if frameStack.hasMessagelessFrames() {
			return errors.New(`Illegal sequence of messages loaded; frame stack still has message-less frames`)
		}

		return nil
	})
	if err != nil {
		panic(err)
	}
	self.largestUsedEventNumber = largestUsedEventNumber

	framesForward := framesReversed
	frameCount := len(framesForward)
	for idx := 0; idx < frameCount/2; idx++ {
		idy := frameCount - idx - 1
		framesForward[idx], framesForward[idy] = framesForward[idy], framesForward[idx]
	}
	return framesForward
}

func (self *DB) parseKey(key []byte) (eventNumber uint64, tipe eventType, err error) {
	if len(key) != 9 {
		return 0, maxEventType, fmt.Errorf(`Illegal key length. Expected keys of length 9, but got %d. %x`, len(key), key)
	}
	eventNumber = binary.BigEndian.Uint64(key)
	tipe = eventType(key[8])
	if tipe >= maxEventType {
		return 0, maxEventType, fmt.Errorf(`Illegal event type: %x`, tipe)
	}
	return eventNumber, tipe, nil
}