Distributed Concurrent Editor

registry.go at [6cd08b113e]
Login

File document/registry.go artifact 3349f08058 part of check-in 6cd08b113e


     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
   100
   101
   102
   103
   104
   105
   106
   107
   108
   109
   110
   111
   112
   113
   114
   115
   116
   117
   118
   119
   120
   121
   122
   123
   124
   125
   126
   127
   128
   129
   130
   131
   132
   133
   134
   135
   136
   137
   138
   139
   140
   141
   142
   143
   144
   145
   146
   147
   148
   149
   150
   151
   152
   153
   154
   155
   156
   157
package document

import (
	"github.com/pkg/errors"
	"github.com/rs/zerolog"
	bolt "go.etcd.io/bbolt"
	"wellquite.org/actors"
	"wellquite.org/actors/mailbox"
)

// --- Client ---

type RegistryClientFactory struct {
	factory *actors.BackPressureClientBaseFactory
}

func (self *RegistryClientFactory) NewClient() *RegistryClient {
	return &RegistryClient{BackPressureClientBase: self.factory.NewClient()}
}

type RegistryClient struct {
	*actors.BackPressureClientBase
}

var _ actors.Client = (*RegistryClient)(nil)

type registrySubscribeMsg struct {
	actors.MsgSyncBase
	documentName string
	fun          func(updates []byte)

	subscription *DocumentSubscription
}

func (self *RegistryClient) SubscribeToDocumentUpdates(documentName string, fun func(updates []byte)) *DocumentSubscription {
	msg := &registrySubscribeMsg{
		documentName: documentName,
		fun:          fun,
	}
	if self.SendSync(msg, true) {
		return msg.subscription
	} else {
		return nil
	}
}

// --- Server ---

type registryServer struct {
	actors.BackPressureServerBase
	dbPath   string
	clientId uint64
	db       *bolt.DB
	manager  *actors.ManagerClientBase

	openDocuments map[string]*DocumentClientFactory
}

var _ actors.Server = (*registryServer)(nil)

func SpawnRegistry(log zerolog.Logger, dbPath string, clientId uint64) (*RegistryClientFactory, error) {
	server := &registryServer{
		dbPath:   dbPath,
		clientId: clientId,
	}
	clientBase, err := actors.Spawn(log, server, "registry")
	if err != nil {
		return nil, err
	}
	return &RegistryClientFactory{
		factory: actors.NewBackPressureClientBaseFactory(clientBase),
	}, nil
}

func (self *registryServer) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *actors.ClientBase) (err error) {
	if err = self.BackPressureServerBase.Init(log, mailboxReader, selfClient); err != nil {
		return err
	}

	if err = self.openDatabase(); err != nil {
		return err
	}

	manager, err := actors.SpawnManager(log, "document manager")
	if err != nil {
		return err
	}
	self.manager = manager
	subscription := manager.OnTermination(func(subscription *actors.TerminationSubscription, err error, caughtPanic interface{}) {
		// if the manager shuts down for any reason at all, we shut down
		// too. In "normal" operation, we'll always be shutting down
		// first, so this is more just in case the manager (or any of
		// its children) shut down abnormally.
		selfClient.TerminateSync()
	})
	if subscription == nil {
		return errors.New("Unable to create terminated subscription on document manager")
	}

	self.openDocuments = make(map[string]*DocumentClientFactory)
	self.Log.Debug().Uint64("clientId", self.clientId).Msg("Document registry spawned")

	return nil
}

func (self *registryServer) openDatabase() error {
	db, err := bolt.Open(self.dbPath, 0666, nil)
	if err != nil {
		return err
	}
	self.Log.Info().Str("path", self.dbPath).Msg("Opened database.")
	self.db = db

	return nil
}

func (self *registryServer) Terminated(err error, caughtPanic interface{}) {
	self.manager.TerminateSync()

	if dbErr := self.db.Close(); dbErr != nil {
		self.Log.Error().Err(dbErr).Msg("Error when closing database.")
	}

	self.BackPressureServerBase.Terminated(err, caughtPanic)
}

func (self *registryServer) HandleMsg(msg mailbox.Msg) (err error) {
	switch msgT := msg.(type) {
	case *registrySubscribeMsg:
		clientFactory, found := self.openDocuments[msgT.documentName]
		if found {
			client := clientFactory.NewClient()
			if subscription := client.SubscribeToDocumentUpdates(msgT.fun); subscription != nil {
				msgT.subscription = subscription
				msgT.MarkProcessed()
				return nil
			}
		}

		clientFactory, err := SpawnDocument(self.db, self.manager, self.clientId, msgT.documentName)
		if err != nil {
			return err
		}
		self.openDocuments[msgT.documentName] = clientFactory
		client := clientFactory.NewClient()
		subscription := client.SubscribeToDocumentUpdates(msgT.fun)
		if subscription == nil {
			return errors.Errorf(`Unable to create subscription to document %q`, msgT.documentName)
		}
		msgT.subscription = subscription
		msgT.MarkProcessed()
		return nil

	default:
		return self.BackPressureServerBase.HandleMsg(msg)
	}
}