Skip to content

Commit

Permalink
move go-tcp-transport here
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Apr 22, 2022
2 parents 456b36f + 9d3f10b commit e2ceabf
Show file tree
Hide file tree
Showing 8 changed files with 741 additions and 0 deletions.
266 changes: 266 additions & 0 deletions p2p/transport/tcp/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
//go:build !windows
// +build !windows

package tcp

import (
"strings"
"sync"
"time"

"github.com/marten-seemann/tcp"
"github.com/mikioh/tcpinfo"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
)

var (
newConns *prometheus.CounterVec
closedConns *prometheus.CounterVec
segsSentDesc *prometheus.Desc
segsRcvdDesc *prometheus.Desc
bytesSentDesc *prometheus.Desc
bytesRcvdDesc *prometheus.Desc
)

const collectFrequency = 10 * time.Second

var collector *aggregatingCollector

func init() {
segsSentDesc = prometheus.NewDesc("tcp_sent_segments_total", "TCP segments sent", nil, nil)
segsRcvdDesc = prometheus.NewDesc("tcp_rcvd_segments_total", "TCP segments received", nil, nil)
bytesSentDesc = prometheus.NewDesc("tcp_sent_bytes", "TCP bytes sent", nil, nil)
bytesRcvdDesc = prometheus.NewDesc("tcp_rcvd_bytes", "TCP bytes received", nil, nil)

collector = newAggregatingCollector()
prometheus.MustRegister(collector)

const direction = "direction"

newConns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tcp_connections_new_total",
Help: "TCP new connections",
},
[]string{direction},
)
prometheus.MustRegister(newConns)
closedConns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tcp_connections_closed_total",
Help: "TCP connections closed",
},
[]string{direction},
)
prometheus.MustRegister(closedConns)
}

type aggregatingCollector struct {
cronOnce sync.Once

mutex sync.Mutex
highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
segsSent, segsRcvd uint64
bytesSent, bytesRcvd uint64
}

var _ prometheus.Collector = &aggregatingCollector{}

func newAggregatingCollector() *aggregatingCollector {
c := &aggregatingCollector{
conns: make(map[uint64]*tracingConn),
rtts: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_rtt",
Help: "TCP round trip time",
Buckets: prometheus.ExponentialBuckets(0.001, 1.25, 40), // 1ms to ~6000ms
}),
connDurations: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_connection_duration",
Help: "TCP Connection Duration",
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
}),
}
return c
}

func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 {
c.mutex.Lock()
defer c.mutex.Unlock()
c.highestID++
c.conns[c.highestID] = t
return c.highestID
}

func (c *aggregatingCollector) removeConn(id uint64) {
delete(c.conns, id)
}

func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) {
descs <- c.rtts.Desc()
descs <- c.connDurations.Desc()
if hasSegmentCounter {
descs <- segsSentDesc
descs <- segsRcvdDesc
}
if hasByteCounter {
descs <- bytesSentDesc
descs <- bytesRcvdDesc
}
}

func (c *aggregatingCollector) cron() {
ticker := time.NewTicker(collectFrequency)
defer ticker.Stop()

for now := range ticker.C {
c.gatherMetrics(now)
}
}

func (c *aggregatingCollector) gatherMetrics(now time.Time) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.segsSent = 0
c.segsRcvd = 0
c.bytesSent = 0
c.bytesRcvd = 0
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
continue
}
log.Errorf("Failed to get TCP info: %s", err)
continue
}
if hasSegmentCounter {
c.segsSent += getSegmentsSent(info)
c.segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
c.bytesSent += getBytesSent(info)
c.bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
}
}

func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
// Start collecting the metrics collection the first time Collect is called.
c.cronOnce.Do(func() {
c.gatherMetrics(time.Now())
go c.cron()
})

c.mutex.Lock()
defer c.mutex.Unlock()

metrics <- c.rtts
metrics <- c.connDurations
if hasSegmentCounter {
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(c.segsSent))
if err != nil {
log.Errorf("creating tcp_sent_segments_total metric failed: %v", err)
return
}
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(c.segsRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err)
return
}
metrics <- segsSentMetric
metrics <- segsRcvdMetric
}
if hasByteCounter {
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(c.bytesSent))
if err != nil {
log.Errorf("creating tcp_sent_bytes metric failed: %v", err)
return
}
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(c.bytesRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err)
return
}
metrics <- bytesSentMetric
metrics <- bytesRcvdMetric
}
}

func (c *aggregatingCollector) ClosedConn(conn *tracingConn, direction string) {
c.mutex.Lock()
collector.removeConn(conn.id)
c.mutex.Unlock()
closedConns.WithLabelValues(direction).Inc()
}

type tracingConn struct {
id uint64

startTime time.Time
isClient bool

manet.Conn
tcpConn *tcp.Conn
}

func newTracingConn(c manet.Conn, isClient bool) (*tracingConn, error) {
conn, err := tcp.NewConn(c)
if err != nil {
return nil, err
}
tc := &tracingConn{
startTime: time.Now(),
isClient: isClient,
Conn: c,
tcpConn: conn,
}
tc.id = collector.AddConn(tc)
newConns.WithLabelValues(tc.getDirection()).Inc()
return tc, nil
}

func (c *tracingConn) getDirection() string {
if c.isClient {
return "outgoing"
}
return "incoming"
}

func (c *tracingConn) Close() error {
collector.ClosedConn(c, c.getDirection())
return c.Conn.Close()
}

func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {
var o tcpinfo.Info
var b [256]byte
i, err := c.tcpConn.Option(o.Level(), o.Name(), b[:])
if err != nil {
return nil, err
}
info := i.(*tcpinfo.Info)
return info, nil
}

type tracingListener struct {
manet.Listener
}

func newTracingListener(l manet.Listener) *tracingListener {
return &tracingListener{Listener: l}
}

func (l *tracingListener) Accept() (manet.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return newTracingConn(conn, false)
}
16 changes: 16 additions & 0 deletions p2p/transport/tcp/metrics_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//go:build darwin
// +build darwin

package tcp

import "github.com/mikioh/tcpinfo"

const (
hasSegmentCounter = true
hasByteCounter = true
)

func getSegmentsSent(info *tcpinfo.Info) uint64 { return info.Sys.SegsSent }
func getSegmentsRcvd(info *tcpinfo.Info) uint64 { return info.Sys.SegsReceived }
func getBytesSent(info *tcpinfo.Info) uint64 { return info.Sys.BytesSent }
func getBytesRcvd(info *tcpinfo.Info) uint64 { return info.Sys.BytesReceived }
16 changes: 16 additions & 0 deletions p2p/transport/tcp/metrics_general.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//go:build !linux && !darwin && !windows
// +build !linux,!darwin,!windows

package tcp

import "github.com/mikioh/tcpinfo"

const (
hasSegmentCounter = false
hasByteCounter = false
)

func getSegmentsSent(info *tcpinfo.Info) uint64 { return 0 }
func getSegmentsRcvd(info *tcpinfo.Info) uint64 { return 0 }
func getBytesSent(info *tcpinfo.Info) uint64 { return 0 }
func getBytesRcvd(info *tcpinfo.Info) uint64 { return 0 }
16 changes: 16 additions & 0 deletions p2p/transport/tcp/metrics_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//go:build linux
// +build linux

package tcp

import "github.com/mikioh/tcpinfo"

const (
hasSegmentCounter = true
hasByteCounter = false
)

func getSegmentsSent(info *tcpinfo.Info) uint64 { return uint64(info.Sys.SegsOut) }
func getSegmentsRcvd(info *tcpinfo.Info) uint64 { return uint64(info.Sys.SegsIn) }
func getBytesSent(info *tcpinfo.Info) uint64 { return 0 }
func getBytesRcvd(info *tcpinfo.Info) uint64 { return 0 }
9 changes: 9 additions & 0 deletions p2p/transport/tcp/metrics_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//go:build windows
// +build windows

package tcp

import manet "github.com/multiformats/go-multiaddr/net"

func newTracingConn(c manet.Conn, _ bool) (manet.Conn, error) { return c, nil }
func newTracingListener(l manet.Listener) manet.Listener { return l }
35 changes: 35 additions & 0 deletions p2p/transport/tcp/reuseport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package tcp

import (
"os"
"strings"

"github.com/libp2p/go-reuseport"
)

// envReuseport is the env variable name used to turn off reuse port.
// It default to true.
const envReuseport = "LIBP2P_TCP_REUSEPORT"

// envReuseportVal stores the value of envReuseport. defaults to true.
var envReuseportVal = true

func init() {
v := strings.ToLower(os.Getenv(envReuseport))
if v == "false" || v == "f" || v == "0" {
envReuseportVal = false
log.Infof("REUSEPORT disabled (LIBP2P_TCP_REUSEPORT=%s)", v)
}
}

// ReuseportIsAvailable returns whether reuseport is available to be used. This
// is here because we want to be able to turn reuseport on and off selectively.
// For now we use an ENV variable, as this handles our pressing need:
//
// LIBP2P_TCP_REUSEPORT=false ipfs daemon
//
// If this becomes a sought after feature, we could add this to the config.
// In the end, reuseport is a stop-gap.
func ReuseportIsAvailable() bool {
return envReuseportVal && reuseport.Available()
}
Loading

0 comments on commit e2ceabf

Please sign in to comment.