-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathrequests_std.go
More file actions
209 lines (166 loc) · 6.62 KB
/
requests_std.go
File metadata and controls
209 lines (166 loc) · 6.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package ctrl
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// -----
// Handler for receiving hello messages.
func methodHello(proc process, message Message, node string) ([]byte, error) {
data := fmt.Sprintf("%v, Received hello from %#v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), message.FromNode)
fileName := message.FileName
folderTree := filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.FromNode))
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0770)
if err != nil {
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
}
proc.errorKernel.logDebug("methodHello: Creating subscribers data folder at ", "foldertree", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
//f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660)
f, err := os.OpenFile(file, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660)
if err != nil {
er := fmt.Errorf("error: methodHello.handler: failed to open file: %v", err)
return nil, er
}
defer f.Close()
_, err = f.Write([]byte(data))
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
// The handling of the public key that is in the message.Data field is handled in the procfunc.
proc.procFuncCh <- message
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// procFuncHello is the procFunc used with the hello subscriber process.
// To keep the state of all the hello's received from nodes we need
// to also start a procFunc that will live as a go routine tied to this process,
// where the procFunc will receive messages from the handler when a message is
// received, the handler will deliver the message to the procFunc on the
// proc.procFuncCh, and we can then read that message from the procFuncCh in
// the procFunc running.
// The public key of a node are sent in the data field of the hello message,
// so we also handle the logic with registering keys from here.
func procFuncHelloSubscriber(ctx context.Context, proc process, procFuncCh chan Message) error {
// sayHelloNodes := make(map[Node]struct{})
for {
// Receive a copy of the message sent from the method handler.
var m Message
select {
case m = <-procFuncCh:
case <-ctx.Done():
proc.errorKernel.logDebug("procFuncHelloSubscriber: stopped handleFunc for: subscriber", "subject", proc.subject.name())
return nil
}
proc.centralAuth.addPublicKey(proc, m)
// update the prometheus metrics
proc.server.centralAuth.pki.nodesAcked.mu.Lock()
mapLen := len(proc.server.centralAuth.pki.nodesAcked.keysAndHash.Keys)
proc.server.centralAuth.pki.nodesAcked.mu.Unlock()
proc.metrics.promHelloNodesTotal.Set(float64(mapLen))
proc.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
}
}
func procFuncHelloPublisher(ctx context.Context, proc process, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(proc.configuration.StartProcesses.StartPubHello))
defer ticker.Stop()
for {
// d := fmt.Sprintf("Hello from %v\n", p.node)
// Send the ed25519 public key used for signing as the payload of the message.
d := proc.server.nodeAuth.SignPublicKey
m := Message{
FileName: "hello.log",
Directory: "hello-messages",
ToNode: Node(proc.configuration.CentralNodeName),
FromNode: Node(proc.node),
Data: []byte(d),
Method: Hello,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.newMessagesCh <- m
select {
case <-ticker.C:
case <-ctx.Done():
proc.errorKernel.logDebug("procFuncHelloPublisher: stopped handleFunc for: publisher", "subject", proc.subject.name())
return nil
}
}
}
// ---
// Handle the writing of error logs.
func methodErrorLog(proc process, message Message, node string) ([]byte, error) {
proc.metrics.promErrorMessagesReceivedTotal.Inc()
// If it was a request type message we want to check what the initial messages
// method, so we can use that in creating the file name to store the data.
fileName, folderTree := selectFileNaming(message, proc)
// Check if folder structure exist, if not create it.
if _, err := os.Stat(folderTree); os.IsNotExist(err) {
err := os.MkdirAll(folderTree, 0770)
if err != nil {
return nil, fmt.Errorf("error: failed to create errorLog directory tree %v: %v", folderTree, err)
}
proc.errorKernel.logDebug("methodErrorLog: Creating subscribers data folder", "foldertree", folderTree)
}
// Open file and write data.
file := filepath.Join(folderTree, fileName)
f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0660)
if err != nil {
er := fmt.Errorf("error: methodErrorLog.handler: failed to open file: %v", err)
return nil, er
}
defer f.Close()
_, err = f.Write(message.Data)
f.Sync()
if err != nil {
er := fmt.Errorf("error: methodEventTextLogging.handler: failed to write to file: %v", err)
proc.errorKernel.errSend(proc, message, er, logWarning)
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
// Handler to write directly to console.
// This handler handles the writing to console.
func methodConsole(proc process, message Message, node string) ([]byte, error) {
switch {
case len(message.MethodArgs) > 0 && message.MethodArgs[0] == "stderr":
log.Printf("* DEBUG: MethodArgs: got stderr \n")
fmt.Fprintf(os.Stderr, "%v", string(message.Data))
fmt.Println()
default:
fmt.Fprintf(os.Stdout, "%v", string(message.Data))
fmt.Println()
}
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ---
// handler to be used as a reply method when testing requests.
// We can then within the test listen on the testCh for received
// data and validate it.
// If no test is listening the data will be dropped.
func methodTest(proc process, message Message, node string) ([]byte, error) {
go func() {
// Try to send the received message data on the test channel. If we
// have a test started the data will be read from the testCh.
// If no test is reading from the testCh the data will be dropped.
select {
case proc.errorKernel.testCh <- message.Data:
default:
// drop.
}
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}