package document
import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
bolt "go.etcd.io/bbolt"
"wellquite.org/actors"
"wellquite.org/actors/mailbox"
)
// --- Client ---
type RegistryClientFactory struct {
factory *actors.BackPressureClientBaseFactory
}
func (self *RegistryClientFactory) NewClient() *RegistryClient {
return &RegistryClient{BackPressureClientBase: self.factory.NewClient()}
}
type RegistryClient struct {
*actors.BackPressureClientBase
}
var _ actors.Client = (*RegistryClient)(nil)
type registrySubscribeMsg struct {
actors.MsgSyncBase
documentName string
fun func(updates []byte)
subscription *DocumentSubscription
}
func (self *RegistryClient) SubscribeToDocumentUpdates(documentName string, fun func(updates []byte)) *DocumentSubscription {
msg := ®istrySubscribeMsg{
documentName: documentName,
fun: fun,
}
if self.SendSync(msg, true) {
return msg.subscription
} else {
return nil
}
}
// --- Server ---
type registryServer struct {
actors.BackPressureServerBase
dbPath string
clientId uint64
db *bolt.DB
manager *actors.ManagerClientBase
openDocuments map[string]*DocumentClientFactory
}
var _ actors.Server = (*registryServer)(nil)
func SpawnRegistry(log zerolog.Logger, dbPath string, clientId uint64) (*RegistryClientFactory, error) {
server := ®istryServer{
dbPath: dbPath,
clientId: clientId,
}
clientBase, err := actors.Spawn(log, server, "registry")
if err != nil {
return nil, err
}
return &RegistryClientFactory{
factory: actors.NewBackPressureClientBaseFactory(clientBase),
}, nil
}
func (self *registryServer) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *actors.ClientBase) (err error) {
if err = self.BackPressureServerBase.Init(log, mailboxReader, selfClient); err != nil {
return err
}
if err = self.openDatabase(); err != nil {
return err
}
manager, err := actors.SpawnManager(log, "document manager")
if err != nil {
return err
}
self.manager = manager
subscription := manager.OnTermination(func(subscription *actors.TerminationSubscription, err error, caughtPanic interface{}) {
// if the manager shuts down for any reason at all, we shut down
// too. In "normal" operation, we'll always be shutting down
// first, so this is more just in case the manager (or any of
// its children) shut down abnormally.
selfClient.TerminateSync()
})
if subscription == nil {
return errors.New("Unable to create terminated subscription on document manager")
}
self.openDocuments = make(map[string]*DocumentClientFactory)
self.Log.Debug().Uint64("clientId", self.clientId).Msg("Document registry spawned")
return nil
}
func (self *registryServer) openDatabase() error {
db, err := bolt.Open(self.dbPath, 0666, nil)
if err != nil {
return err
}
self.Log.Info().Str("path", self.dbPath).Msg("Opened database.")
self.db = db
return nil
}
func (self *registryServer) Terminated(err error, caughtPanic interface{}) {
self.manager.TerminateSync()
if dbErr := self.db.Close(); dbErr != nil {
self.Log.Error().Err(dbErr).Msg("Error when closing database.")
}
self.BackPressureServerBase.Terminated(err, caughtPanic)
}
func (self *registryServer) HandleMsg(msg mailbox.Msg) (err error) {
switch msgT := msg.(type) {
case *registrySubscribeMsg:
clientFactory, found := self.openDocuments[msgT.documentName]
if found {
client := clientFactory.NewClient()
if subscription := client.SubscribeToDocumentUpdates(msgT.fun); subscription != nil {
msgT.subscription = subscription
msgT.MarkProcessed()
return nil
}
}
clientFactory, err := SpawnDocument(self.db, self.manager, self.clientId, msgT.documentName)
if err != nil {
return err
}
self.openDocuments[msgT.documentName] = clientFactory
client := clientFactory.NewClient()
subscription := client.SubscribeToDocumentUpdates(msgT.fun)
if subscription == nil {
return errors.Errorf(`Unable to create subscription to document %q`, msgT.documentName)
}
msgT.subscription = subscription
msgT.MarkProcessed()
return nil
default:
return self.BackPressureServerBase.HandleMsg(msg)
}
}