-
Notifications
You must be signed in to change notification settings - Fork 0
/
websocketoutput.go
105 lines (90 loc) · 3.34 KB
/
websocketoutput.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package heka_websockets_output
import (
"fmt"
"github.com/mozilla-services/heka/message"
"github.com/mozilla-services/heka/pipeline"
"golang.org/x/net/websocket"
"net/http"
)
type connection struct {
ws *websocket.Conn
send chan *message.Message
}
type WebSocketsOutputConfig struct {
Address string `toml:"address"`
Handler string `toml:"handler"`
}
type WebSocketsOutput struct {
conf *WebSocketsOutputConfig
connections map[*connection]struct{}
register chan *connection
unregister chan *connection
broadcast chan *message.Message
}
func (wso *WebSocketsOutput) ConfigStruct() interface{} {
return &WebSocketsOutputConfig{}
}
func (wso *WebSocketsOutput) Init(config interface{}) error {
wso.conf = config.(*WebSocketsOutputConfig)
wso.connections = make(map[*connection]struct{})
wso.register = make(chan *connection)
wso.unregister = make(chan *connection)
wso.broadcast = make(chan *message.Message, 1028)
// Connections handler
go func() {
var conn *connection
var m *message.Message
for {
select {
case conn = <-wso.register:
wso.connections[conn] = struct{}{}
case conn = <-wso.unregister:
delete(wso.connections, conn)
close(conn.send)
case m = <-wso.broadcast:
for conn = range wso.connections {
select {
case conn.send <- m:
default:
delete(wso.connections, conn)
close(conn.send)
go conn.ws.Close()
}
}
}
}
}()
// Websocket server and connection handler
http.Handle(wso.conf.Handler, websocket.Handler(func(ws *websocket.Conn) {
c := &connection{ws, make(chan *message.Message, 1028)}
wso.register <- c
defer func() {
wso.unregister <- c
}()
var err error
for m := range c.send {
if err = websocket.JSON.Send(ws, m); err != nil {
fmt.Println("Websocket:", err.Error())
break
}
}
}))
go func() {
if err := http.ListenAndServe(wso.conf.Address, nil); err != nil {
fmt.Println("Http:", err.Error())
}
}()
return nil
}
func (wso *WebSocketsOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) error {
for pc := range or.InChan() {
wso.broadcast <- pc.Message
pc.Recycle(nil)
}
return nil
}
func init() {
pipeline.RegisterPlugin("WebSocketsOutput", func() interface{} {
return new(WebSocketsOutput)
})
}