Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset peer toxic #333

Merged
merged 2 commits into from
Oct 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
/usr/bin/toxiproxy-server
```
(#331, @miry)
* A new toxic to simulate TCP RESET (Connection reset by peer) on the connections by closing
the stub Input immediately or after a timeout. (#247 and #333, @chaosbox)

# [2.1.7]

Expand Down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ stopping you from creating a client in any other language (see
3. [Bandwidth](#bandwidth)
4. [Slow close](#slow_close)
5. [Timeout](#timeout)
6. [Slicer](#slicer)
6. [Reset peer](#reset_peer)
7. [Slicer](#slicer)
6. [HTTP API](#http-api)
1. [Proxy fields](#proxy-fields)
2. [Toxic fields](#toxic-fields)
Expand Down Expand Up @@ -404,6 +405,15 @@ Stops all data from getting through, and closes the connection after `timeout`.
`timeout` is 0, the connection won't close, and data will be delayed until the
toxic is removed.

Attributes:

- `timeout`: time in milliseconds

#### reset_peer

Simulate TCP RESET (Connection reset by peer) on the connections by closing the stub Input
immediately or after a `timeout`.

Attributes:

- `timeout`: time in milliseconds
Expand Down
10 changes: 10 additions & 0 deletions bin/e2e
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ go test -bench=. ./testing -v

echo -e "-----------------\n"

echo "=== Reset peer toxic"

./dist/toxiproxy-cli toxic add --type reset_peer --toxicName "reset_peer" \
--attribute "timeout=2000" \
--toxicity 1.0 shopify_http
./dist/toxiproxy-cli inspect shopify_http
./dist/toxiproxy-cli toxic delete --toxicName "reset_peer" shopify_http

echo -e "-----------------\n"

echo "== Teardown"

./dist/toxiproxy-cli delete shopify_http
Expand Down
4 changes: 4 additions & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ var toxicDescription = `
timeout: stop all data and close after timeout
timeout=<ms>

reset_peer: simulate TCP RESET (Connection reset by peer) on the connections by closing
the stub Input immediately or after a timeout
timeout=<ms>

slicer: slice data into bits with optional delay
average_size=<bytes>,size_variation=<bytes>,delay=<microseconds>

Expand Down
24 changes: 23 additions & 1 deletion link.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package toxiproxy

import (
"io"
"net"

"github.com/sirupsen/logrus"

"github.com/Shopify/toxiproxy/v2/stream"
"github.com/Shopify/toxiproxy/v2/toxics"
"github.com/sirupsen/logrus"
)

// ToxicLinks are single direction pipelines that connects an input and output via
Expand Down Expand Up @@ -73,13 +75,33 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
}
link.input.Close()
}()

for i, toxic := range link.toxics.chain[link.direction] {
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
link.stubs[i].State = stateful.NewState()
}

if _, ok := toxic.Toxic.(*toxics.ResetToxic); ok {
if err := source.(*net.TCPConn).SetLinger(0); err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"toxic": toxic.Type,
"err": err,
}).Error("source: Unable to setLinger(ms)")
}

if err := dest.(*net.TCPConn).SetLinger(0); err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"toxic": toxic.Type,
"err": err,
}).Error("dest: Unable to setLinger(ms)")
}
}

go link.stubs[i].Run(toxic)
}

go func() {
bytes, err := io.Copy(dest, link.output)
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions toxics/reset_peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package toxics

import (
"time"
)

/*
The ResetToxic sends closes the connection abruptly after a timeout (in ms).
The behavior of Close is set to discard any unsent/unacknowledged data by setting SetLinger to 0,
~= sets TCP RST flag and resets the connection.
If the timeout is set to 0, then the connection will be reset immediately.

Drop data since it will initiate a graceful close by sending the FIN/ACK. (io.EOF)
*/

type ResetToxic struct {
// Timeout in milliseconds
Timeout int64 `json:"timeout"`
}

func (t *ResetToxic) Pipe(stub *ToxicStub) {
timeout := time.Duration(t.Timeout) * time.Millisecond

for {
select {
case <-stub.Interrupt:
return
case <-stub.Input:
<-time.After(timeout)
stub.Close()
return
}
}
}

func init() {
Register("reset_peer", new(ResetToxic))
}
96 changes: 96 additions & 0 deletions toxics/reset_peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package toxics_test

import (
"bufio"
"io"
"net"
"os"
"syscall"
"testing"
"time"

"github.com/Shopify/toxiproxy/v2/toxics"
)

const msg = "reset toxic payload\n"

func TestResetToxicNoTimeout(t *testing.T) {
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "upstream", &toxics.ResetToxic{}))
}

func TestResetToxicWithTimeout(t *testing.T) {
start := time.Now()
resetToxic := toxics.ResetToxic{Timeout: 100}
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "upstream", &resetToxic))
AssertDeltaTime(t,
"Reset after timeout",
time.Since(start),
time.Duration(resetToxic.Timeout)*time.Millisecond,
time.Duration(resetToxic.Timeout+10)*time.Millisecond,
)
}

func TestResetToxicWithTimeoutDownstream(t *testing.T) {
start := time.Now()
resetToxic := toxics.ResetToxic{Timeout: 100}
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "downstream", &resetToxic))
AssertDeltaTime(t,
"Reset after timeout",
time.Since(start),
time.Duration(resetToxic.Timeout)*time.Millisecond,
time.Duration(resetToxic.Timeout+10)*time.Millisecond,
)
}

func checkConnectionState(t *testing.T, listenAddress string) {
conn, err := net.Dial("tcp", listenAddress)
if err != nil {
t.Error("Unable to dial TCP server", err)
}
if _, err := conn.Write([]byte(msg)); err != nil {
t.Error("Failed writing TCP payload", err)
}
tmp := make([]byte, 1000)
_, err = conn.Read(tmp)
defer conn.Close()
if opErr, ok := err.(*net.OpError); ok {
syscallErr, _ := opErr.Err.(*os.SyscallError)
if !(syscallErr.Err == syscall.ECONNRESET) {
t.Error("Expected: connection reset by peer. Got:", err)
}
} else {
t.Error(
"Expected: connection reset by peer. Got:",
err, "conn:", conn.RemoteAddr(), conn.LocalAddr(),
)
}
_, err = conn.Read(tmp)
if err != io.EOF {
t.Error("expected EOF from closed connection")
}
}

func resetTCPHelper(t *testing.T, toxicJSON io.Reader) {
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal("Failed to create TCP server", err)
}
defer ln.Close()
proxy := NewTestProxy("test", ln.Addr().String())
proxy.Start()
proxy.Toxics.AddToxicJson(toxicJSON)
defer proxy.Stop()

go func() {
conn, err := ln.Accept()
if err != nil {
t.Error("Unable to accept TCP connection", err)
}
defer ln.Close()
scan := bufio.NewScanner(conn)
if scan.Scan() {
conn.Write([]byte(msg))
}
}()
checkConnectionState(t, proxy.Listen)
}