Skip to content

Commit

Permalink
Clean up code and rename PipeRequest to PipeUpstream
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Wirth authored and xthexder committed Jun 5, 2018
1 parent c75bec4 commit 0e85ee6
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 24 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 2.2.0 (unreleased)

* Add bidirectional toxics #132
* Add support for bidirectional toxics (for protocol-aware toxics) #132

# 2.1.3

Expand Down
6 changes: 3 additions & 3 deletions CREATING_TOXICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Bidirectional toxics allow state to be shared for the `upstream` and `downstream
toxic implementation. They also ensure direction-specific code is always run on the correct pipe
(a toxic that only works on the `upstream` can't be added to the `downstream`).

Creating a bidirectional toxic is done by implementing a second `Pipe()` function called `PipeRequest()`.
Creating a bidirectional toxic is done by implementing a second `Pipe()` function called `PipeUpstream()`.
The implementation is same as a regular toxic, and can be paired with other types such as a stateful toxic.

One use case of a bidirectional toxic is to mock out the backend server entirely, which is shown below:
Expand All @@ -164,8 +164,8 @@ type EchoToxicState struct {
Request chan *stream.StreamChunk
}

// PipeRequest handles the upstream direction
func (t *EchoToxic) PipeRequest(stub *toxics.ToxicStub) {
// PipeUpstream handles the upstream direction
func (t *EchoToxic) PipeUpstream(stub *toxics.ToxicStub) {
state := stub.State.(*EchoToxicState)

for {
Expand Down
34 changes: 18 additions & 16 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
link.input.Close()
}()
for i, toxic := range link.toxics.chain[link.direction] {
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil {
link.stubs[i].State = stateful.NewState()
} else {
link.stubs[i].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State
link.stubs[i].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity
}
}
link.InitPairState(toxic)

go link.stubs[i].Run(toxic)
}
Expand All @@ -93,6 +86,22 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
}()
}

func (link *ToxicLink) InitPairState(toxic *toxics.ToxicWrapper) {
// If the toxic is stateful, create a state object or copy it from the paired link.
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil {
link.stubs[toxic.Index].State = stateful.NewState()
} else {
link.stubs[toxic.Index].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State
}
}

// If the toxic is paired, synchronize the toxicity so they are always in the same state.
if toxic.PairedToxic != nil {
link.stubs[toxic.Index].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity
}
}

// Add a toxic to the end of the chain.
func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
i := len(link.stubs)
Expand All @@ -104,14 +113,7 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
if link.stubs[i-1].InterruptToxic() {
link.stubs[i-1].Output = newin

if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil {
link.stubs[i].State = stateful.NewState()
} else {
link.stubs[i].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State
link.stubs[i].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity
}
}
link.InitPairState(toxic)

go link.stubs[i].Run(toxic)
go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
Expand Down
2 changes: 1 addition & 1 deletion toxics/bidirectional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type EchoToxicState struct {
UpstreamToxicity float32
}

func (t *EchoToxic) PipeRequest(stub *toxics.ToxicStub) {
func (t *EchoToxic) PipeUpstream(stub *toxics.ToxicStub) {
state := stub.State.(*EchoToxicState)
state.UpstreamToxicity = stub.Toxicity

Expand Down
6 changes: 3 additions & 3 deletions toxics/toxic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type StatefulToxic interface {
}

// Bidirectional toxics operate on both TCP streams and allow state to be shared.
// PipeRequest() will oparate on the upstream, while Pipe() will operate on the downstream.
// PipeUpstream() will oparate on the upstream, while Pipe() will operate on the downstream.
type BidirectionalToxic interface {
// Defines the packet flow through an upstream ToxicStub. Operates the same as Pipe().
PipeRequest(*ToxicStub)
PipeUpstream(*ToxicStub)
}

type ToxicWrapper struct {
Expand Down Expand Up @@ -95,7 +95,7 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) {
toxic.Pipe(s)
} else {
bidirectional := toxic.Toxic.(BidirectionalToxic)
bidirectional.PipeRequest(s)
bidirectional.PipeUpstream(s)
}
} else {
new(NoopToxic).Pipe(s)
Expand Down

0 comments on commit 0e85ee6

Please sign in to comment.