Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make UDP socket buffer size configurable #2336

Merged
merged 21 commits into from
Jul 17, 2020
Merged
5 changes: 4 additions & 1 deletion cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
const (
defaultQueueSize = 1000
defaultMaxPacketSize = 65000
defaultBufferSize = 4 * 1024
kbarukhov marked this conversation as resolved.
Show resolved Hide resolved
defaultServerWorkers = 10

jaegerModel Model = "jaeger"
Expand Down Expand Up @@ -90,6 +91,7 @@ type ProcessorConfiguration struct {
type ServerConfiguration struct {
QueueSize int `yaml:"queueSize"`
MaxPacketSize int `yaml:"maxPacketSize"`
BufferSize int `yaml:"bufferSize"`
kbarukhov marked this conversation as resolved.
Show resolved Hide resolved
HostPort string `yaml:"hostPort" validate:"nonzero"`
}

Expand Down Expand Up @@ -188,6 +190,7 @@ func (c *ProcessorConfiguration) applyDefaults() {
func (c *ServerConfiguration) applyDefaults() {
c.QueueSize = defaultInt(c.QueueSize, defaultQueueSize)
c.MaxPacketSize = defaultInt(c.MaxPacketSize, defaultMaxPacketSize)
c.BufferSize = defaultInt(c.BufferSize, defaultBufferSize)
}

// getUDPServer gets a TBufferedServer backed server using the server configuration
Expand All @@ -197,7 +200,7 @@ func (c *ServerConfiguration) getUDPServer(mFactory metrics.Factory) (servers.Se
if c.HostPort == "" {
return nil, fmt.Errorf("no host:port provided for udp server: %+v", *c)
}
transport, err := thriftudp.NewTUDPServerTransport(c.HostPort)
transport, err := thriftudp.NewTUDPServerTransport(c.HostPort, c.BufferSize)
if err != nil {
return nil, fmt.Errorf("cannot create UDPServerTransport: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
suffixWorkers = "workers"
suffixServerQueueSize = "server-queue-size"
suffixServerMaxPacketSize = "server-max-packet-size"
suffixServerBufferSize = "server-buffer-size"
suffixServerHostPort = "server-host-port"
httpServerHostPort = "http-server.host-port"
)
Expand All @@ -50,6 +51,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(prefix+suffixWorkers, defaultServerWorkers, "how many workers the processor should run")
flags.Int(prefix+suffixServerQueueSize, defaultQueueSize, "length of the queue for the UDP server")
flags.Int(prefix+suffixServerMaxPacketSize, defaultMaxPacketSize, "max packet size for the UDP server")
flags.Int(prefix+suffixServerBufferSize, defaultBufferSize, "buffer size for UDP packets")
kbarukhov marked this conversation as resolved.
Show resolved Hide resolved
flags.String(prefix+suffixServerHostPort, ":"+strconv.Itoa(p.port), "host:port for the UDP server")
}
flags.String(
Expand All @@ -66,6 +68,7 @@ func (b *Builder) InitFromViper(v *viper.Viper) *Builder {
p.Workers = v.GetInt(prefix + suffixWorkers)
p.Server.QueueSize = v.GetInt(prefix + suffixServerQueueSize)
p.Server.MaxPacketSize = v.GetInt(prefix + suffixServerMaxPacketSize)
p.Server.BufferSize = v.GetInt(prefix + suffixServerBufferSize)
p.Server.HostPort = portNumToHostPort(v.GetString(prefix + suffixServerHostPort))
b.Processors = append(b.Processors, *p)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/processors/thrift_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
)

func createProcessor(t *testing.T, mFactory metrics.Factory, tFactory thrift.TProtocolFactory, handler AgentProcessor) (string, Processor) {
transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0")
transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0", thriftudp.BufferSize)
require.NoError(t, err)

queueSize := 10
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/servers/tbuffered_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestTBufferedServer(t *testing.T) {
func testTBufferedServer(t *testing.T, queueSize int, testDroppedPackets bool) {
metricsFactory := metricstest.NewFactory(0)

transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0")
transport, err := thriftudp.NewTUDPServerTransport("127.0.0.1:0", thriftudp.BufferSize)
require.NoError(t, err)

maxPacketSize := 65000
Expand Down
17 changes: 17 additions & 0 deletions cmd/agent/app/servers/thriftudp/socket_buffer_linux_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// +build linux darwin

package thriftudp

import (
"net"
"syscall"
)

func setSocketBuffer(conn *net.UDPConn, bufferSize int) error {
file, err := conn.File()
if err != nil {
return err
}

return syscall.SetsockoptInt(int(file.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF, bufferSize)
}
12 changes: 12 additions & 0 deletions cmd/agent/app/servers/thriftudp/socket_buffer_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// +build windows

package thriftudp

import (
"net"
)

// Not supported on windows, so windows version just returns nil
func setSocketBuffer(_ *net.UDPConn, _ int) error {
return nil
}
12 changes: 10 additions & 2 deletions cmd/agent/app/servers/thriftudp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
)

//MaxLength of UDP packet
const MaxLength = 65000
const (
MaxLength = 65000
BufferSize = 4 * 1024
)

var errConnAlreadyClosed = errors.New("connection already closed")

Expand Down Expand Up @@ -71,7 +74,7 @@ func createClient(destAddr, locAddr *net.UDPAddr) (*TUDPTransport, error) {
// It will listen for incoming udp packets on the specified host/port
// Example:
// trans, err := thriftudp.NewTUDPClientTransport("localhost:9001")
func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) {
func NewTUDPServerTransport(hostPort string, bufferSize int) (*TUDPTransport, error) {
kbarukhov marked this conversation as resolved.
Show resolved Hide resolved
addr, err := net.ResolveUDPAddr("udp", hostPort)
if err != nil {
return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
Expand All @@ -80,6 +83,11 @@ func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) {
if err != nil {
return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error())
}

if err = setSocketBuffer(conn, bufferSize); err != nil {
return nil, err
}

return &TUDPTransport{addr: conn.LocalAddr(), conn: conn}, nil
}

Expand Down
16 changes: 8 additions & 8 deletions cmd/agent/app/servers/thriftudp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ func TestNewTUDPClientTransport(t *testing.T) {
}

func TestNewTUDPServerTransport(t *testing.T) {
_, err := NewTUDPServerTransport("fakeAddressAndPort")
_, err := NewTUDPServerTransport("fakeAddressAndPort", BufferSize)
require.NotNil(t, err)

trans, err := NewTUDPServerTransport(localListenAddr.String())
trans, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
require.True(t, trans.IsOpen())
require.Equal(t, ^uint64(0), trans.RemainingBytes())

//Ensure a second server can't be created on the same address
trans2, err := NewTUDPServerTransport(trans.Addr().String())
trans2, err := NewTUDPServerTransport(trans.Addr().String(), BufferSize)
if trans2 != nil {
//close the second server if one got created
trans2.Close()
Expand All @@ -77,10 +77,10 @@ func TestNewTUDPServerTransport(t *testing.T) {
}

func TestTUDPServerTransportIsOpen(t *testing.T) {
_, err := NewTUDPServerTransport("fakeAddressAndPort")
_, err := NewTUDPServerTransport("fakeAddressAndPort", BufferSize)
require.NotNil(t, err)

trans, err := NewTUDPServerTransport(localListenAddr.String())
trans, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
require.True(t, trans.IsOpen())
require.Equal(t, ^uint64(0), trans.RemainingBytes())
Expand All @@ -107,7 +107,7 @@ func TestTUDPServerTransportIsOpen(t *testing.T) {
}

func TestWriteRead(t *testing.T) {
server, err := NewTUDPServerTransport(localListenAddr.String())
server, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
defer server.Close()

Expand All @@ -133,7 +133,7 @@ func TestWriteRead(t *testing.T) {
}

func TestDoubleCloseError(t *testing.T) {
trans, err := NewTUDPServerTransport(localListenAddr.String())
trans, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
require.True(t, trans.IsOpen())

Expand All @@ -149,7 +149,7 @@ func TestDoubleCloseError(t *testing.T) {
}

func TestConnClosedReadWrite(t *testing.T) {
trans, err := NewTUDPServerTransport(localListenAddr.String())
trans, err := NewTUDPServerTransport(localListenAddr.String(), BufferSize)
require.Nil(t, err)
require.True(t, trans.IsOpen())
require.NoError(t, trans.Close())
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe
github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ=
github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
github.com/gocql/gocql v0.0.0-20200226121155-e5c8c1f505c5 h1:FDQYpzoJWwYzJ0pOMU+RqUFqT3N4BfCBGey9rP5708c=
github.com/gocql/gocql v0.0.0-20200226121155-e5c8c1f505c5/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb h1:H3tisfjQwq9FTyWqlKsZpgoYrsvn2pmTWvAiDHa5pho=
github.com/gocql/gocql v0.0.0-20200228163523-cd4b606dd2fb/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gogo/googleapis v1.0.1-0.20180501115203-b23578765ee5 h1:l3BMcdrtdBYa5PH99FBrPEWJGRODZFOjxHPnb2I7/98=
Expand Down