package document
import (
"encoding/binary"
"errors"
"fmt"
bolt "go.etcd.io/bbolt"
protocol "wellquite.org/edist/protocol/go"
)
type DB struct {
db *bolt.DB
documentName []byte
largestUsedEventNumber uint64
}
func NewDB(db *bolt.DB, documentName string) *DB {
documentNameBytes := []byte(documentName)
var largestUsedEventNumber uint64
err := db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(documentNameBytes)
if err != nil {
return err
}
largestUsedEventNumber = bucket.Sequence()
return nil
})
if err != nil {
panic(err)
}
return &DB{
db: db,
documentName: documentNameBytes,
largestUsedEventNumber: largestUsedEventNumber,
}
}
type eventType byte
const (
messageEvent eventType = iota
checkpointEvent
maxEventType
)
func makeKey(eventNumber uint64, tipe eventType) []byte {
key := make([]byte, 9)
binary.BigEndian.PutUint64(key, eventNumber)
key[8] = byte(tipe)
return key
}
func (self *DB) WriteMessage(message *protocol.Message) (eventNumber uint64) {
return self.write(messageEvent, message)
}
func (self *DB) WriteCheckpoint(message *protocol.Message) (eventNumber uint64) {
return self.write(checkpointEvent, message)
}
func (self *DB) write(tipe eventType, message *protocol.Message) (eventNumber uint64) {
bites := message.MarshalBebop()
var largestUsedEventNumber uint64
err := self.db.Batch(func(tx *bolt.Tx) (err error) {
bucket := tx.Bucket(self.documentName)
eventNumber, err = bucket.NextSequence()
if err != nil {
return err
}
largestUsedEventNumber = eventNumber
return bucket.Put(makeKey(eventNumber, tipe), bites)
})
if err != nil {
panic(err)
}
self.largestUsedEventNumber = largestUsedEventNumber
return eventNumber
}
func (self *DB) LoadEvents(maxEventNumber uint64) frames {
var framesReversed frames
var largestUsedEventNumber uint64
err := self.db.View(func(tx *bolt.Tx) (err error) {
frameIndex := 0
frameStack := newFrameStack()
bucket := tx.Bucket(self.documentName)
largestUsedEventNumber = bucket.Sequence()
cursor := bucket.Cursor()
for key, value := cursor.Last(); key != nil; key, value = cursor.Prev() {
eventNumber, tipe, err := self.parseKey(key)
if err != nil {
return err
}
message, err := protocol.MakeMessageFromBytes(value)
if err != nil {
return err
}
if tipe == checkpointEvent && eventNumber < maxEventNumber && !frameStack.hasMessagelessFrames() {
framesReversed = append(framesReversed, &frame{
eventNumber: eventNumber,
isCheckpoint: true,
message: &message,
})
break
} else if tipe == messageEvent {
switch {
case message.Updates != nil:
frame := frameStack.ensureMessagelessFrame(frameIndex)
frame.eventNumber = eventNumber
frame.message = &message
framesReversed = append(framesReversed, frame)
frameIndex--
case message.Redo != nil:
frame := frameStack.ensureMessagelessFrame(frameIndex)
frame.redoCount++
framesReversed = append(framesReversed, frame)
frameIndex--
case message.Undo != nil:
frameIndex++
frame := frameStack.ensureMessagelessFrame(frameIndex)
frame.undoCount++
framesReversed = append(framesReversed, frame)
default:
return errors.New(`Illegal message: nil Updates, Redo, and Undo`)
}
}
}
if frameStack.hasMessagelessFrames() {
return errors.New(`Illegal sequence of messages loaded; frame stack still has message-less frames`)
}
return nil
})
if err != nil {
panic(err)
}
self.largestUsedEventNumber = largestUsedEventNumber
framesForward := framesReversed
frameCount := len(framesForward)
for idx := 0; idx < frameCount/2; idx++ {
idy := frameCount - idx - 1
framesForward[idx], framesForward[idy] = framesForward[idy], framesForward[idx]
}
return framesForward
}
func (self *DB) parseKey(key []byte) (eventNumber uint64, tipe eventType, err error) {
if len(key) != 9 {
return 0, maxEventType, fmt.Errorf(`Illegal key length. Expected keys of length 9, but got %d. %x`, len(key), key)
}
eventNumber = binary.BigEndian.Uint64(key)
tipe = eventType(key[8])
if tipe >= maxEventType {
return 0, maxEventType, fmt.Errorf(`Illegal event type: %x`, tipe)
}
return eventNumber, tipe, nil
}