1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
| package document
import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
bolt "go.etcd.io/bbolt"
"wellquite.org/actors"
"wellquite.org/actors/mailbox"
)
// --- Client ---
type RegistryClientFactory struct {
factory *actors.BackPressureClientBaseFactory
}
func (self *RegistryClientFactory) NewClient() *RegistryClient {
return &RegistryClient{BackPressureClientBase: self.factory.NewClient()}
}
type RegistryClient struct {
*actors.BackPressureClientBase
}
var _ actors.Client = (*RegistryClient)(nil)
type registrySubscribeMsg struct {
actors.MsgSyncBase
documentName string
fun func(updates []byte)
subscription *DocumentSubscription
}
func (self *RegistryClient) SubscribeToDocumentUpdates(documentName string, fun func(updates []byte)) *DocumentSubscription {
msg := ®istrySubscribeMsg{
documentName: documentName,
fun: fun,
}
if self.SendSync(msg, true) {
return msg.subscription
} else {
return nil
}
}
// --- Server ---
type registryServer struct {
actors.BackPressureServerBase
dbPath string
clientId uint64
db *bolt.DB
manager *actors.ManagerClientBase
openDocuments map[string]*DocumentClientFactory
}
var _ actors.Server = (*registryServer)(nil)
func SpawnRegistry(log zerolog.Logger, dbPath string, clientId uint64) (*RegistryClientFactory, error) {
server := ®istryServer{
dbPath: dbPath,
clientId: clientId,
}
clientBase, err := actors.Spawn(log, server, "registry")
if err != nil {
return nil, err
}
return &RegistryClientFactory{
factory: actors.NewBackPressureClientBaseFactory(clientBase),
}, nil
}
func (self *registryServer) Init(log zerolog.Logger, mailboxReader *mailbox.MailboxReader, selfClient *actors.ClientBase) (err error) {
if err = self.BackPressureServerBase.Init(log, mailboxReader, selfClient); err != nil {
return err
}
if err = self.openDatabase(); err != nil {
return err
}
manager, err := actors.SpawnManager(log, "document manager")
if err != nil {
return err
}
self.manager = manager
subscription := manager.OnTermination(func(subscription *actors.TerminationSubscription, err error, caughtPanic interface{}) {
// if the manager shuts down for any reason at all, we shut down
// too. In "normal" operation, we'll always be shutting down
// first, so this is more just in case the manager (or any of
// its children) shut down abnormally.
selfClient.TerminateSync()
})
if subscription == nil {
return errors.New("Unable to create terminated subscription on document manager")
}
self.openDocuments = make(map[string]*DocumentClientFactory)
self.Log.Debug().Uint64("clientId", self.clientId).Msg("Document registry spawned")
return nil
}
func (self *registryServer) openDatabase() error {
db, err := bolt.Open(self.dbPath, 0666, nil)
if err != nil {
return err
}
self.Log.Info().Str("path", self.dbPath).Msg("Opened database.")
self.db = db
return nil
}
func (self *registryServer) Terminated(err error, caughtPanic interface{}) {
self.manager.TerminateSync()
if dbErr := self.db.Close(); dbErr != nil {
self.Log.Error().Err(dbErr).Msg("Error when closing database.")
}
self.BackPressureServerBase.Terminated(err, caughtPanic)
}
func (self *registryServer) HandleMsg(msg mailbox.Msg) (err error) {
switch msgT := msg.(type) {
case *registrySubscribeMsg:
clientFactory, found := self.openDocuments[msgT.documentName]
if found {
client := clientFactory.NewClient()
if subscription := client.SubscribeToDocumentUpdates(msgT.fun); subscription != nil {
msgT.subscription = subscription
msgT.MarkProcessed()
return nil
}
}
clientFactory, err := SpawnDocument(self.db, self.manager, self.clientId, msgT.documentName)
if err != nil {
return err
}
self.openDocuments[msgT.documentName] = clientFactory
client := clientFactory.NewClient()
subscription := client.SubscribeToDocumentUpdates(msgT.fun)
if subscription == nil {
return errors.Errorf(`Unable to create subscription to document %q`, msgT.documentName)
}
msgT.subscription = subscription
msgT.MarkProcessed()
return nil
default:
return self.BackPressureServerBase.HandleMsg(msg)
}
}
|