Distributed Concurrent Editor

registry.go at [6cd08b113e]
Login

File document/registry.go artifact 3349f08058 part of check-in 6cd08b113e


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
package document

import (
	"github.com/pkg/errors"
	"github.com/rs/zerolog"
	bolt "go.etcd.io/bbolt"
	"wellquite.org/actors"
	"wellquite.org/actors/mailbox"
)

// --- Client ---

type RegistryClientFactory struct {
	factory *actors.BackPressureClientBaseFactory
}

func (self *RegistryClientFactory) NewClient() *RegistryClient {
	return &RegistryClient{BackPressureClientBase: self.factory.NewClient()}
}

type RegistryClient struct {
	*actors.BackPressureClientBase
}

var _ actors.Client = (*RegistryClient)(nil)

type registrySubscribeMsg struct {
	actors.MsgSyncBase
	documentName string
	fun          func(updates []byte)

	subscription *DocumentSubscription
}

func (self *RegistryClient) SubscribeToDocumentUpdates(documentName string, fun func(updates []byte)) *DocumentSubscription {
	msg := &registrySubscribeMsg{
		documentName: documentName,
		fun:          fun,
	}
	if self.SendSync(msg, true) {
		return msg.subscription
	} else {
		return nil
	}
}

// --- Server ---

type registryServer struct {
	actors.BackPressureServerBase
	dbPath   string
	clientId uint64
	db       *bolt.DB
	manager  *actors.ManagerClientBase

	openDocuments map[string]*DocumentClientFactory
}

var _ actors.Server = (*registryServer)(nil)

func SpawnRegistry(log zerolog.Logger, dbPath string, clientId uint64) (*RegistryClientFactory, error) {
	server := &registryServer{
		dbPath:   dbPath,
		clientId: clientId,
	}
	clientBase, err := actors.Spawn(log, server, "registry")
	if err != nil {
		return nil, err
	}
	return &RegistryClientFactory{
		factory: actors.NewBackPressureClientBaseFactory(clientBase),
	}, nil
}

func (self *registryServer) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *actors.ClientBase) (err error) {
	if err = self.BackPressureServerBase.Init(log, mailboxReader, selfClient); err != nil {
		return err
	}

	if err = self.openDatabase(); err != nil {
		return err
	}

	manager, err := actors.SpawnManager(log, "document manager")
	if err != nil {
		return err
	}
	self.manager = manager
	subscription := manager.OnTermination(func(subscription *actors.TerminationSubscription, err error, caughtPanic interface{}) {
		// if the manager shuts down for any reason at all, we shut down
		// too. In "normal" operation, we'll always be shutting down
		// first, so this is more just in case the manager (or any of
		// its children) shut down abnormally.
		selfClient.TerminateSync()
	})
	if subscription == nil {
		return errors.New("Unable to create terminated subscription on document manager")
	}

	self.openDocuments = make(map[string]*DocumentClientFactory)
	self.Log.Debug().Uint64("clientId", self.clientId).Msg("Document registry spawned")

	return nil
}

func (self *registryServer) openDatabase() error {
	db, err := bolt.Open(self.dbPath, 0666, nil)
	if err != nil {
		return err
	}
	self.Log.Info().Str("path", self.dbPath).Msg("Opened database.")
	self.db = db

	return nil
}

func (self *registryServer) Terminated(err error, caughtPanic interface{}) {
	self.manager.TerminateSync()

	if dbErr := self.db.Close(); dbErr != nil {
		self.Log.Error().Err(dbErr).Msg("Error when closing database.")
	}

	self.BackPressureServerBase.Terminated(err, caughtPanic)
}

func (self *registryServer) HandleMsg(msg mailbox.Msg) (err error) {
	switch msgT := msg.(type) {
	case *registrySubscribeMsg:
		clientFactory, found := self.openDocuments[msgT.documentName]
		if found {
			client := clientFactory.NewClient()
			if subscription := client.SubscribeToDocumentUpdates(msgT.fun); subscription != nil {
				msgT.subscription = subscription
				msgT.MarkProcessed()
				return nil
			}
		}

		clientFactory, err := SpawnDocument(self.db, self.manager, self.clientId, msgT.documentName)
		if err != nil {
			return err
		}
		self.openDocuments[msgT.documentName] = clientFactory
		client := clientFactory.NewClient()
		subscription := client.SubscribeToDocumentUpdates(msgT.fun)
		if subscription == nil {
			return errors.Errorf(`Unable to create subscription to document %q`, msgT.documentName)
		}
		msgT.subscription = subscription
		msgT.MarkProcessed()
		return nil

	default:
		return self.BackPressureServerBase.HandleMsg(msg)
	}
}