This repository has been archived by the owner on May 19, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
server.go
158 lines (119 loc) · 3.66 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package dispatcher
import (
"errors"
"github.com/gofort/dispatcher/log"
"sync"
)
// Server contains AMQP connection and creates publisher.
// Server is a parent of workers and publisher.
type Server struct {
con *amqpConnection
notifyConnected chan bool
startGlobalShutoff chan struct{}
log Log
workers map[string]*Worker
*publisher
}
// NewServer creates new server from config and connects to AMQP.
func NewServer(cfg *ServerConfig) (*Server, chan struct{}, error) {
srv := &Server{
publisher: &publisher{
defaultRoutingKey: cfg.DefaultRoutingKey,
defaultExchange: cfg.Exchange,
},
startGlobalShutoff: make(chan struct{}),
notifyConnected: make(chan bool),
workers: make(map[string]*Worker),
con: &amqpConnection{
workersFinished: make(chan struct{}),
stopReconnecting: make(chan struct{}),
},
}
if cfg.Exchange == "" {
return nil, nil, errors.New("Exchange parameter is required")
}
if cfg.DefaultRoutingKey == "" {
return nil, nil, errors.New("DefaultRoutingKey parameter is required")
}
if cfg.Logger == nil {
srv.log = log.InitLogger(cfg.DebugMode)
} else {
srv.log = cfg.Logger
}
srv.publisher.log = srv.log
if cfg.Exchange == "" || cfg.DefaultRoutingKey == "" {
srv.log.Info("You havn't passed default exchange or default routing key for publisher in config. " +
"This means that you need to fill exchange and routing key for every task manually via PublishCustom method or " +
"via exchange and routing key of task itself.")
}
connectionBroken := make(chan struct{})
go srv.con.initConnection(srv.log, cfg, srv.notifyConnected, srv.startGlobalShutoff, connectionBroken)
<-srv.notifyConnected
go func() {
for {
select {
case connected := <-srv.notifyConnected:
if connected {
err := srv.publisher.init(srv.con.con)
if err != nil {
srv.log.Error(err)
}
for _, v := range srv.workers {
if v.working {
err = v.Start(srv)
if err != nil {
srv.log.Error(err)
}
}
}
} else {
srv.publisher.deactivate()
}
case <-srv.startGlobalShutoff:
srv.log.Debug("Starting global shutoff: close publisher, stop workers consuming, wait for all tasks to be finished")
srv.publisher.deactivate()
wg := new(sync.WaitGroup)
wg.Add(len(srv.workers))
for _, v := range srv.workers {
go func(w *Worker, wg *sync.WaitGroup) {
defer wg.Done()
if w.working {
w.Close()
}
}(v, wg)
}
srv.log.Info("Waiting for all workers to be done")
wg.Wait()
srv.log.Info("All workers finished their tasks and were closed!")
srv.con.workersFinished <- struct{}{}
}
}
}()
err := srv.publisher.init(srv.con.con)
if err != nil {
srv.log.Error(err)
return nil, nil, err
}
if err = bootstrap(srv.publisher.ch, cfg.Exchange, cfg.InitQueues); err != nil {
srv.log.Error(err)
return nil, nil, err
}
return srv, connectionBroken, nil
}
// GetWorkerByName returns a pointer to a Worker by its name.
func (s *Server) GetWorkerByName(name string) (*Worker, error) {
worker, ok := s.workers[name]
if !ok {
return nil, errors.New("Worker not found")
}
return worker, nil
}
// Close is a complicated function which handles graceful quit of everything which dispatcher
// has (workers, publisher and connection).
// At first it stops reconnection process, then it closes publisher, after this it closes all
// workers and waits until all of them will finish their tasks and closes their channels.
// After all of this it closes AMQP connection.
func (s *Server) Close() {
s.con.stopReconnecting <- struct{}{}
s.con.close(s.log, s.startGlobalShutoff)
}