package document import ( "github.com/pkg/errors" "github.com/rs/zerolog" bolt "go.etcd.io/bbolt" "wellquite.org/actors" "wellquite.org/actors/mailbox" ) // --- Client --- type RegistryClientFactory struct { factory *actors.BackPressureClientBaseFactory } func (self *RegistryClientFactory) NewClient() *RegistryClient { return &RegistryClient{BackPressureClientBase: self.factory.NewClient()} } type RegistryClient struct { *actors.BackPressureClientBase } var _ actors.Client = (*RegistryClient)(nil) type registrySubscribeMsg struct { actors.MsgSyncBase documentName string fun func(updates []byte) subscription *DocumentSubscription } func (self *RegistryClient) SubscribeToDocumentUpdates(documentName string, fun func(updates []byte)) *DocumentSubscription { msg := ®istrySubscribeMsg{ documentName: documentName, fun: fun, } if self.SendSync(msg, true) { return msg.subscription } else { return nil } } // --- Server --- type registryServer struct { actors.BackPressureServerBase dbPath string clientId uint64 db *bolt.DB manager *actors.ManagerClientBase openDocuments map[string]*DocumentClientFactory } var _ actors.Server = (*registryServer)(nil) func SpawnRegistry(log zerolog.Logger, dbPath string, clientId uint64) (*RegistryClientFactory, error) { server := ®istryServer{ dbPath: dbPath, clientId: clientId, } clientBase, err := actors.Spawn(log, server, "registry") if err != nil { return nil, err } return &RegistryClientFactory{ factory: actors.NewBackPressureClientBaseFactory(clientBase), }, nil } func (self *registryServer) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *actors.ClientBase) (err error) { if err = self.BackPressureServerBase.Init(log, mailboxReader, selfClient); err != nil { return err } if err = self.openDatabase(); err != nil { return err } manager, err := actors.SpawnManager(log, "document manager") if err != nil { return err } self.manager = manager subscription := manager.OnTermination(func(subscription *actors.TerminationSubscription, err error, caughtPanic interface{}) { // if the manager shuts down for any reason at all, we shut down // too. In "normal" operation, we'll always be shutting down // first, so this is more just in case the manager (or any of // its children) shut down abnormally. selfClient.TerminateSync() }) if subscription == nil { return errors.New("Unable to create terminated subscription on document manager") } self.openDocuments = make(map[string]*DocumentClientFactory) self.Log.Debug().Uint64("clientId", self.clientId).Msg("Document registry spawned") return nil } func (self *registryServer) openDatabase() error { db, err := bolt.Open(self.dbPath, 0666, nil) if err != nil { return err } self.Log.Info().Str("path", self.dbPath).Msg("Opened database.") self.db = db return nil } func (self *registryServer) Terminated(err error, caughtPanic interface{}) { self.manager.TerminateSync() if dbErr := self.db.Close(); dbErr != nil { self.Log.Error().Err(dbErr).Msg("Error when closing database.") } self.BackPressureServerBase.Terminated(err, caughtPanic) } func (self *registryServer) HandleMsg(msg mailbox.Msg) (err error) { switch msgT := msg.(type) { case *registrySubscribeMsg: clientFactory, found := self.openDocuments[msgT.documentName] if found { client := clientFactory.NewClient() if subscription := client.SubscribeToDocumentUpdates(msgT.fun); subscription != nil { msgT.subscription = subscription msgT.MarkProcessed() return nil } } clientFactory, err := SpawnDocument(self.db, self.manager, self.clientId, msgT.documentName) if err != nil { return err } self.openDocuments[msgT.documentName] = clientFactory client := clientFactory.NewClient() subscription := client.SubscribeToDocumentUpdates(msgT.fun) if subscription == nil { return errors.Errorf(`Unable to create subscription to document %q`, msgT.documentName) } msgT.subscription = subscription msgT.MarkProcessed() return nil default: return self.BackPressureServerBase.HandleMsg(msg) } }