Distributed Concurrent Editor

Artifact [3349f08058]
Login

Artifact 3349f08058803c6e8f2e5b4a0579f8de66e999944acfa16de02036d4fbc61060:


package document

import (
	"github.com/pkg/errors"
	"github.com/rs/zerolog"
	bolt "go.etcd.io/bbolt"
	"wellquite.org/actors"
	"wellquite.org/actors/mailbox"
)

// --- Client ---

type RegistryClientFactory struct {
	factory *actors.BackPressureClientBaseFactory
}

func (self *RegistryClientFactory) NewClient() *RegistryClient {
	return &RegistryClient{BackPressureClientBase: self.factory.NewClient()}
}

type RegistryClient struct {
	*actors.BackPressureClientBase
}

var _ actors.Client = (*RegistryClient)(nil)

type registrySubscribeMsg struct {
	actors.MsgSyncBase
	documentName string
	fun          func(updates []byte)

	subscription *DocumentSubscription
}

func (self *RegistryClient) SubscribeToDocumentUpdates(documentName string, fun func(updates []byte)) *DocumentSubscription {
	msg := &registrySubscribeMsg{
		documentName: documentName,
		fun:          fun,
	}
	if self.SendSync(msg, true) {
		return msg.subscription
	} else {
		return nil
	}
}

// --- Server ---

type registryServer struct {
	actors.BackPressureServerBase
	dbPath   string
	clientId uint64
	db       *bolt.DB
	manager  *actors.ManagerClientBase

	openDocuments map[string]*DocumentClientFactory
}

var _ actors.Server = (*registryServer)(nil)

func SpawnRegistry(log zerolog.Logger, dbPath string, clientId uint64) (*RegistryClientFactory, error) {
	server := &registryServer{
		dbPath:   dbPath,
		clientId: clientId,
	}
	clientBase, err := actors.Spawn(log, server, "registry")
	if err != nil {
		return nil, err
	}
	return &RegistryClientFactory{
		factory: actors.NewBackPressureClientBaseFactory(clientBase),
	}, nil
}

func (self *registryServer) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *actors.ClientBase) (err error) {
	if err = self.BackPressureServerBase.Init(log, mailboxReader, selfClient); err != nil {
		return err
	}

	if err = self.openDatabase(); err != nil {
		return err
	}

	manager, err := actors.SpawnManager(log, "document manager")
	if err != nil {
		return err
	}
	self.manager = manager
	subscription := manager.OnTermination(func(subscription *actors.TerminationSubscription, err error, caughtPanic interface{}) {
		// if the manager shuts down for any reason at all, we shut down
		// too. In "normal" operation, we'll always be shutting down
		// first, so this is more just in case the manager (or any of
		// its children) shut down abnormally.
		selfClient.TerminateSync()
	})
	if subscription == nil {
		return errors.New("Unable to create terminated subscription on document manager")
	}

	self.openDocuments = make(map[string]*DocumentClientFactory)
	self.Log.Debug().Uint64("clientId", self.clientId).Msg("Document registry spawned")

	return nil
}

func (self *registryServer) openDatabase() error {
	db, err := bolt.Open(self.dbPath, 0666, nil)
	if err != nil {
		return err
	}
	self.Log.Info().Str("path", self.dbPath).Msg("Opened database.")
	self.db = db

	return nil
}

func (self *registryServer) Terminated(err error, caughtPanic interface{}) {
	self.manager.TerminateSync()

	if dbErr := self.db.Close(); dbErr != nil {
		self.Log.Error().Err(dbErr).Msg("Error when closing database.")
	}

	self.BackPressureServerBase.Terminated(err, caughtPanic)
}

func (self *registryServer) HandleMsg(msg mailbox.Msg) (err error) {
	switch msgT := msg.(type) {
	case *registrySubscribeMsg:
		clientFactory, found := self.openDocuments[msgT.documentName]
		if found {
			client := clientFactory.NewClient()
			if subscription := client.SubscribeToDocumentUpdates(msgT.fun); subscription != nil {
				msgT.subscription = subscription
				msgT.MarkProcessed()
				return nil
			}
		}

		clientFactory, err := SpawnDocument(self.db, self.manager, self.clientId, msgT.documentName)
		if err != nil {
			return err
		}
		self.openDocuments[msgT.documentName] = clientFactory
		client := clientFactory.NewClient()
		subscription := client.SubscribeToDocumentUpdates(msgT.fun)
		if subscription == nil {
			return errors.Errorf(`Unable to create subscription to document %q`, msgT.documentName)
		}
		msgT.subscription = subscription
		msgT.MarkProcessed()
		return nil

	default:
		return self.BackPressureServerBase.HandleMsg(msg)
	}
}