Skip to content

Commit

Permalink
Add tests and address PR review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ed Snible <snible@us.ibm.com>
  • Loading branch information
esnible committed Mar 22, 2022
1 parent 712a10d commit b8098e3
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 50 deletions.
16 changes: 9 additions & 7 deletions api/traces/v1/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
)

const (
ReadTimeout = 15 * time.Minute
WriteTimeout = time.Minute
ReadTimeout = 15 * time.Minute
)

type handlerConfiguration struct {
Expand All @@ -47,7 +46,7 @@ func Logger(logger log.Logger) HandlerOption {
}
}

// WithJaegerQueryV3 adds a custom Jaeger query for the handler to use.
// WithRegistry adds a custom Jaeger query for the handler to use.
func WithRegistry(r *prometheus.Registry) HandlerOption {
return func(h *handlerConfiguration) {
h.registry = r
Expand Down Expand Up @@ -94,6 +93,10 @@ func (n nopInstrumentHandler) NewHandler(labels prometheus.Labels, handler http.

// NewV2APIHandler creates a trace query handler for Jaeger V2 HTTP queries
func NewV2APIHandler(read *url.URL, opts ...HandlerOption) http.Handler {
if read == nil {
panic("missing Jaeger read url")
}

c := &handlerConfiguration{
logger: log.NewNopLogger(),
registry: prometheus.NewRegistry(),
Expand Down Expand Up @@ -123,10 +126,9 @@ func NewV2APIHandler(read *url.URL, opts ...HandlerOption) http.Handler {
}

proxyRead = &httputil.ReverseProxy{
Director: middlewares,
ErrorLog: proxy.Logger(c.logger),
Transport: otelhttp.NewTransport(t),
ErrorHandler: func(rw http.ResponseWriter, r *http.Request, e error) {},
Director: middlewares,
ErrorLog: proxy.Logger(c.logger),
Transport: otelhttp.NewTransport(t),
}
}
r.Group(func(r chi.Router) {
Expand Down
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func main() {
}

// Traces.
if cfg.traces.enabled {
if cfg.traces.enabled && cfg.traces.readEndpoint != nil {
r.Group(func(r chi.Router) {
r.Use(authentication.WithTenantMiddlewares(pm.Middlewares))
r.Use(authentication.WithTenantHeader(cfg.traces.tenantHeader, tenantIDs))
Expand Down Expand Up @@ -1027,18 +1027,18 @@ func parseFlags() (config, error) {
cfg.logs.writeEndpoint = logsWriteEndpoint
}

if rawTracesWriteEndpoint != "" {
if rawTracesReadEndpoint != "" {
cfg.traces.enabled = true

tracesReadEndpoint, err := url.ParseRequestURI(rawTracesReadEndpoint)
if err != nil {
return cfg, fmt.Errorf("--traces.read.endpoint is invalid, raw %s: %w", rawTracesReadEndpoint, err)
return cfg, fmt.Errorf("--traces.read.endpoint %q is invalid: %w", rawTracesReadEndpoint, err)
}

cfg.traces.readEndpoint = tracesReadEndpoint
}

if rawTracesReadEndpoint != "" {
if rawTracesWriteEndpoint != "" {
cfg.traces.enabled = true

_, _, err := net.SplitHostPort(rawTracesWriteEndpoint)
Expand Down
5 changes: 5 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func Middlewares(middlewares ...Middleware) func(r *http.Request) {
}

func MiddlewareSetUpstream(upstream *url.URL) Middleware {
// Verify that the middleware upstream is valid now; rather than failing during processing
if upstream == nil {
panic("no upstream")
}

return func(r *http.Request) {
r.URL.Scheme = upstream.Scheme
r.URL.Host = upstream.Host
Expand Down
6 changes: 4 additions & 2 deletions test/e2e/interactive_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build interactive
// +build interactive

package e2e
Expand All @@ -23,7 +24,7 @@ func TestInteractiveSetup(t *testing.T) {
readEndpoint, writeEndpoint, readExtEndpoint := startServicesForMetrics(t, e)
logsEndpoint, logsExtEndpoint := startServicesForLogs(t, e)
rulesEndpoint := startServicesForRules(t, e)
internalOtlpEndpoint, httpQueryEndpoint := startServicesForTraces(t, e)
internalOtlpEndpoint, httpExternalQueryEndpoint, httpInternalQueryEndpoint := startServicesForTraces(t, e)

api, err := newObservatoriumAPIService(
e,
Expand All @@ -33,6 +34,7 @@ func TestInteractiveSetup(t *testing.T) {
withRateLimiter(rateLimiterAddr),
withGRPCListenEndpoint(":8317"),
withOtelTraceEndpoint(internalOtlpEndpoint),
withJaegerEndpoint("http://"+httpInternalQueryEndpoint),
)
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(api))
Expand All @@ -57,7 +59,7 @@ func TestInteractiveSetup(t *testing.T) {
fmt.Printf("Thanos Query on host machine: %s \n", readExtEndpoint)
fmt.Printf("Loki on host machine: %s \n", logsExtEndpoint)
fmt.Printf("Observatorium gRPC API on host machine: %s\n", api.Endpoint("grpc"))
fmt.Printf("Jaeger Query on host machine (HTTP): %s\n", httpQueryEndpoint)
fmt.Printf("Jaeger Query on host machine (HTTP): %s\n", httpExternalQueryEndpoint)

fmt.Printf("API Token: %s \n\n", token)

Expand Down
16 changes: 14 additions & 2 deletions test/e2e/services.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build integration || interactive
// +build integration interactive

package e2e
Expand Down Expand Up @@ -112,7 +113,7 @@ func startServicesForLogs(t *testing.T, e e2e.Environment) (
return loki.InternalEndpoint("http"), loki.Endpoint("http")
}

func startServicesForTraces(t *testing.T, e e2e.Environment) (otlpGRPCEndpoint string, jaegerHttpEndpoint string) {
func startServicesForTraces(t *testing.T, e e2e.Environment) (otlpGRPCEndpoint, jaegerExternalHttpEndpoint, jaegerInternalHttpEndpoint string) {
jaeger := e.Runnable("jaeger").
WithPorts(
map[string]int{
Expand Down Expand Up @@ -146,7 +147,7 @@ func startServicesForTraces(t *testing.T, e e2e.Environment) (otlpGRPCEndpoint s
testutil.Ok(t, e2e.StartAndWaitReady(jaeger))
testutil.Ok(t, e2e.StartAndWaitReady(otel))

return otel.InternalEndpoint("grpc"), jaeger.Endpoint("http.query")
return otel.InternalEndpoint("grpc"), jaeger.Endpoint("http.query"), jaeger.InternalEndpoint("http.query")
}

// startBaseServices starts and waits until all base services required for the test are ready.
Expand Down Expand Up @@ -312,6 +313,7 @@ type apiOptions struct {
ratelimiterAddr string
tracesWriteEndpoint string
gRPCListenEndpoint string
jaegerQueryEndpoint string
}

type apiOption func(*apiOptions)
Expand Down Expand Up @@ -341,6 +343,12 @@ func withGRPCListenEndpoint(listenEndpoint string) apiOption {
}
}

func withJaegerEndpoint(jaegerQueryEndpoint string) apiOption {
return func(o *apiOptions) {
o.jaegerQueryEndpoint = jaegerQueryEndpoint
}
}

func withRulesEndpoint(rulesEndpoint string) apiOption {
return func(o *apiOptions) {
o.metricsRulesEndpoint = rulesEndpoint
Expand Down Expand Up @@ -402,6 +410,10 @@ func newObservatoriumAPIService(
args = append(args, "--traces.write.endpoint="+opts.tracesWriteEndpoint)
}

if opts.jaegerQueryEndpoint != "" {
args = append(args, "--traces.read.endpoint="+opts.jaegerQueryEndpoint)
}

if opts.gRPCListenEndpoint != "" {
gRPCChunks := strings.SplitN(opts.gRPCListenEndpoint, ":", 2)
if len(gRPCChunks) != 2 {
Expand Down
125 changes: 90 additions & 35 deletions test/e2e/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package e2e
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -53,6 +54,14 @@ const (
}
]
}`

//nolint:lll
// queriedV3Trace is traceJSON returned through Jaeger's V3 API
queriedV3Trace = `{"result":{"resourceSpans":[{"resource":{"attributes":[{"key":"host.name","value":{"stringValue":"testHost"}}]},"instrumentationLibrarySpans":[{"instrumentationLibrary":{},"spans":[{"traceId":"W47/95gDgQPSabYzgT/GDA==","spanId":"7uGbfsPBsXM=","parentSpanId":"AAAAAAAAAAA=","name":"testSpan","kind":"SPAN_KIND_INTERNAL","startTimeUnixNano":"1544712660000000000","endTimeUnixNano":"1544712661000000000","attributes":[{"key":"attr1","value":{"intValue":"55"}},{"key":"internal.span.format","value":{"stringValue":"proto"}}]}]}]}]}}`

//nolint:lll
// queriedV2Trace is traceJSON returned through Jaeger's V2 API
queriedV2Trace = `{"data":[{"traceID":"5b8efff798038103d269b633813fc60c","spans":[{"traceID":"5b8efff798038103d269b633813fc60c","spanID":"eee19b7ec3c1b173","operationName":"testSpan","references":[],"startTime":1544712660000000,"duration":1000000,"tags":[{"key":"attr1","type":"int64","value":55},{"key":"internal.span.format","type":"string","value":"proto"}],"logs":[],"processID":"p1","warnings":null}],"processes":{"p1":{"serviceName":"","tags":[{"key":"host.name","type":"string","value":"testHost"}]}},"warnings":null}],"total":0,"limit":0,"offset":0,"errors":null}`
)

func TestTracesExport(t *testing.T) {
Expand All @@ -64,12 +73,13 @@ func TestTracesExport(t *testing.T) {

prepareConfigsAndCerts(t, traces, e)
_, token, _ := startBaseServices(t, e, traces)
internalOtlpEndpoint, httpQueryEndpoint := startServicesForTraces(t, e)
internalOtlpEndpoint, httpExternalQueryEndpoint, httpInternalQueryEndpoint := startServicesForTraces(t, e)

api, err := newObservatoriumAPIService(
e,
withGRPCListenEndpoint(":8317"),
withOtelTraceEndpoint(internalOtlpEndpoint),
withJaegerEndpoint("http://"+httpInternalQueryEndpoint),

// This test doesn't actually write logs, but we need
// this because Observatorium currently MUST see a logs or metrics endpoints
Expand Down Expand Up @@ -125,45 +135,90 @@ func TestTracesExport(t *testing.T) {

testutil.Equals(t, http.StatusOK, response.StatusCode)

request, err = http.NewRequest(
"GET",
fmt.Sprintf("http://%s/api/v3/traces/%s", httpQueryEndpoint, "5B8EFFF798038103D269B633813FC60C"),
nil)
testutil.Ok(t, err)
returnedTrace := queryForTraceDirectV3(t,
httpExternalQueryEndpoint, "5B8EFFF798038103D269B633813FC60C")
assertResponse(t, returnedTrace, queriedV3Trace)

returnedTrace = queryForTraceV2(t,
fmt.Sprintf("http://%s/api/traces", httpExternalQueryEndpoint), "5B8EFFF798038103D269B633813FC60C",
false, "")
assertResponse(t, returnedTrace, queriedV2Trace)

httpObservatoriumQueryEndpoint := fmt.Sprintf("https://%s/api/traces/v1/test-oidc/api/traces", api.Endpoint("https"))
// We skip TLS verification because Observatorium will present a cert for "e2e_traces_read_export-api",
// but we contact it using "localhost"
returnedTrace = queryForTraceV2(t,
httpObservatoriumQueryEndpoint, "5B8EFFF798038103D269B633813FC60C",
true, fmt.Sprintf("bearer %s", token))
assertResponse(t, returnedTrace, queriedV2Trace)
})
}

// Read from Jaeger to verify the trace is there. Retry in case
// there is a short delay with trace storage.
ctx := context.Background()
b := backoff.New(ctx, backoff.Config{
Min: 500 * time.Millisecond,
Max: 5 * time.Second,
MaxRetries: 10,
})
for b.Reset(); b.Ongoing(); {
response, err = client.Do(request)
// Retry if we have a connection problem (timeout, etc)
if err != nil {
b.Wait()
continue
}
func queryForTraceDirectV3(t *testing.T, httpQueryEndpoint, traceID string) string {
t.Helper()

// Jaeger might give a 404 or 500 before the trace is there. Retry.
if response.StatusCode != http.StatusOK {
b.Wait()
continue
}
request, err := http.NewRequest(
"GET",
fmt.Sprintf("http://%s/api/v3/traces/%s", httpQueryEndpoint, traceID),
nil)
testutil.Ok(t, err)

// We got a 200 response. Verify the trace appears as expected.
defer response.Body.Close()
return requestWithRetry(t, &http.Client{}, request)
}

body, err = ioutil.ReadAll(response.Body)
testutil.Ok(t, err)
func queryForTraceV2(t *testing.T, httpQueryURL, traceID string, insecureSkipVerify bool, authHeader string) string {
t.Helper()

bodyStr = string(body)
//nolint:lll
assertResponse(t, bodyStr, `{"result":{"resourceSpans":[{"resource":{"attributes":[{"key":"host.name","value":{"stringValue":"testHost"}}]},"instrumentationLibrarySpans":[{"instrumentationLibrary":{},"spans":[{"traceId":"W47/95gDgQPSabYzgT/GDA==","spanId":"7uGbfsPBsXM=","parentSpanId":"AAAAAAAAAAA=","name":"testSpan","kind":"SPAN_KIND_INTERNAL","startTimeUnixNano":"1544712660000000000","endTimeUnixNano":"1544712661000000000","attributes":[{"key":"attr1","value":{"intValue":"55"}},{"key":"internal.span.format","value":{"stringValue":"proto"}}]}]}]}]}}`)
request, err := http.NewRequest(
"GET",
fmt.Sprintf("%s/%s", httpQueryURL, traceID),
nil)
testutil.Ok(t, err)
request.Header.Set("authorization", authHeader)

break
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
},
}

return requestWithRetry(t, client, request)
}

func requestWithRetry(t *testing.T, client *http.Client, request *http.Request) string {
t.Helper()

// Read to verify the trace is there. Retry in case
// there is a short delay with trace storage.
ctx := context.Background()
b := backoff.New(ctx, backoff.Config{
Min: 500 * time.Millisecond,
Max: 5 * time.Second,
MaxRetries: 10,
})
for b.Reset(); b.Ongoing(); {
response, err := client.Do(request)
// Retry if we have a connection problem (timeout, etc)
if err != nil {
b.Wait()
continue
}

// Jaeger might give a 404 or 500 before the trace is there. Retry.
if response.StatusCode != http.StatusOK {
b.Wait()
continue
}

// We got a 200 response.
defer response.Body.Close()

body, err := ioutil.ReadAll(response.Body)
testutil.Ok(t, err)

return string(body)
}

testutil.Assert(t, false, "HTTP 200 response not received within time limit")
return ""
}

0 comments on commit b8098e3

Please sign in to comment.