forked from burke/zeus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
slavemonitor.go
104 lines (88 loc) · 2.4 KB
/
slavemonitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package processtree
import (
"math/rand"
"os"
"strconv"
"syscall"
"github.com/burke/zeus/go/messages"
slog "github.com/burke/zeus/go/shinylog"
"github.com/burke/zeus/go/unixsocket"
)
type SlaveMonitor struct {
tree *ProcessTree
remoteMasterFile *os.File
}
func Error(err string) {
// TODO
println(err)
}
func StartSlaveMonitor(tree *ProcessTree, done chan bool) chan bool {
quit := make(chan bool)
go func() {
localMasterFile, remoteMasterFile, err := unixsocket.Socketpair(syscall.SOCK_DGRAM)
if err != nil {
Error("Couldn't create socketpair")
}
monitor := &SlaveMonitor{tree, remoteMasterFile}
defer monitor.cleanupChildren()
localMasterSocket, err := unixsocket.NewFromFile(localMasterFile)
if err != nil {
Error("Couldn't Open UNIXSocket")
}
// We just want this unix socket to be a channel so we can select on it...
registeringFds := make(chan int, 3)
go func() {
for {
if fd, err := localMasterSocket.ReadFD(); err != nil {
slog.Error(err)
} else {
registeringFds <- fd
}
}
}()
for _, slave := range monitor.tree.SlavesByName {
go slave.Run(monitor)
}
for {
select {
case <-quit:
done <- true
return
case fd := <-registeringFds:
go monitor.slaveDidBeginRegistration(fd)
}
}
}()
return quit
}
func (mon *SlaveMonitor) cleanupChildren() {
for _, slave := range mon.tree.SlavesByName {
slave.ForceKill()
}
}
func (mon *SlaveMonitor) slaveDidBeginRegistration(fd int) {
// Having just started the process, we expect an IO, which we convert to a UNIX domain socket
fileName := strconv.Itoa(rand.Int())
slaveFile := os.NewFile(uintptr(fd), fileName)
slaveUsock, err := unixsocket.NewFromFile(slaveFile)
if err != nil {
slog.Error(err)
}
// We now expect the slave to use this fd they send us to send a Pid&Identifier Message
msg, err := slaveUsock.ReadMessage()
if err != nil {
slog.Error(err)
}
pid, identifier, err := messages.ParsePidMessage(msg)
// And the last step before executing its action, the slave sends us a pipe it will later use to
// send us all the features it's loaded.
featurePipeFd, err := slaveUsock.ReadFD()
if err != nil {
slog.Error(err)
}
slaveNode := mon.tree.FindSlaveByName(identifier)
if slaveNode == nil {
Error("slavemonitor.go:slaveDidBeginRegistration:Unknown identifier:" + identifier)
}
slaveNode.SlaveWasInitialized(pid, slaveUsock, featurePipeFd)
}