Skip to content

Commit

Permalink
Rewind the request body for retries (#194)
Browse files Browse the repository at this point in the history
Fixes #193 

`prepareRequest` in HTTP Sender was returning a `http.Request` https://github.com/open-telemetry/opamp-go/blob/deb33889ff1aac0d542a9fd08f946461cccfc1c7/client/internal/httpsender.go#L201 

We have the backoff retry strategy for failed requests; however, underlying transport would close the request body and should be rewound for any subsequent attempt. 

This change creates a thin wrapper around the `http.Request` that helps to rewind the body before making a request.

In the test we hijack and close the underlying connection to simulate the connection error case.
  • Loading branch information
srikanthccv authored Sep 27, 2023
1 parent b24302d commit 816bc48
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 7 deletions.
35 changes: 28 additions & 7 deletions client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ const defaultPollingIntervalMs = 30 * 1000 // default interval is 30 seconds.
const headerContentEncoding = "Content-Encoding"
const encodingTypeGZip = "gzip"

type requestWrapper struct {
*http.Request

bodyReader func() io.ReadCloser
}

func bodyReader(buf []byte) func() io.ReadCloser {
return func() io.ReadCloser {
return io.NopCloser(bytes.NewReader(buf))
}
}

func (r *requestWrapper) rewind(ctx context.Context) {
r.Body = r.bodyReader()
r.Request = r.Request.WithContext(ctx)
}

// HTTPSender allows scheduling messages to send. Once run, it will loop through
// a request/response cycle for each message to send and will process all received
// responses using a receivedProcessor. If there are no pending messages to send
Expand Down Expand Up @@ -156,7 +173,8 @@ func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response
select {
case <-timer.C:
{
resp, err := h.client.Do(req)
req.rewind(ctx)
resp, err := h.client.Do(req.Request)
if err == nil {
switch resp.StatusCode {
case http.StatusOK:
Expand Down Expand Up @@ -198,7 +216,7 @@ func recalculateInterval(interval time.Duration, resp *http.Response) time.Durat
return interval
}

func (h *HTTPSender) prepareRequest(ctx context.Context) (*http.Request, error) {
func (h *HTTPSender) prepareRequest(ctx context.Context) (*requestWrapper, error) {
msgToSend := h.nextMessage.PopPending()
if msgToSend == nil || proto.Equal(msgToSend, &protobufs.AgentToServer{}) {
// There is no pending message or the message is empty.
Expand All @@ -211,7 +229,11 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*http.Request, error)
return nil, err
}

var body io.Reader
r, err := http.NewRequestWithContext(ctx, OpAMPPlainHTTPMethod, h.url, nil)
if err != nil {
return nil, err
}
req := requestWrapper{Request: r}

if h.compressionEnabled {
var buf bytes.Buffer
Expand All @@ -224,17 +246,16 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*http.Request, error)
h.logger.Errorf("Failed to close the writer: %v", err)
return nil, err
}
body = &buf
req.bodyReader = bodyReader(buf.Bytes())
} else {
body = bytes.NewReader(data)
req.bodyReader = bodyReader(data)
}
req, err := http.NewRequestWithContext(ctx, OpAMPPlainHTTPMethod, h.url, body)
if err != nil {
return nil, err
}

req.Header = h.requestHeader
return req, nil
return &req, nil
}

func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) {
Expand Down
74 changes: 74 additions & 0 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ package internal
import (
"context"
"crypto/tls"
"io"
"net"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/open-telemetry/opamp-go/client/types"
sharedinternal "github.com/open-telemetry/opamp-go/internal"
"github.com/open-telemetry/opamp-go/internal/testhelpers"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -98,3 +103,72 @@ EKTcWGekdmdDPsHloRNtsiCa697B2O9IFA==

return cert, nil
}

func TestHTTPSenderRetryForFailedRequests(t *testing.T) {

srv, m := newMockServer(t)
address := testhelpers.GetAvailableLocalAddress()
var connectionAttempts int64

var buf []byte
srv.OnRequest = func(w http.ResponseWriter, r *http.Request) {
attempt := atomic.AddInt64(&connectionAttempts, 1)
if attempt == 1 {
hj, ok := w.(http.Hijacker)
if !ok {
t.Error("server doesn't support hijacking")
return
}
conn, _, err := hj.Hijack()
if err != nil {
t.Error(err)
return
}
conn.Close()
} else {
buf, _ = io.ReadAll(r.Body)
w.WriteHeader(http.StatusOK)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
url := "http://" + address
sender := NewHTTPSender(&sharedinternal.NopLogger{})
sender.NextMessage().Update(func(msg *protobufs.AgentToServer) {
msg.AgentDescription = &protobufs.AgentDescription{
IdentifyingAttributes: []*protobufs.KeyValue{{
Key: "service.name",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{StringValue: "test-service"},
},
}},
}
})
sender.callbacks = types.CallbacksStruct{
OnConnectFunc: func() {
},
OnConnectFailedFunc: func(_ error) {
},
}
sender.url = url
var wg sync.WaitGroup
wg.Add(2)
go func() {
sender.sendRequestWithRetries(ctx)
wg.Done()
}()
go func() {
l, err := net.Listen("tcp", address)
assert.NoError(t, err)
ts := httptest.NewUnstartedServer(m)
ts.Listener.Close()
ts.Listener = l
ts.Start()
srv.srv = ts
wg.Done()
}()
wg.Wait()
assert.True(t, len(buf) > 0)
assert.Contains(t, string(buf), "test-service")
cancel()
srv.Close()
}

0 comments on commit 816bc48

Please sign in to comment.