Skip to content

Commit

Permalink
Split agent's HTTP server and handler (#1996)
Browse files Browse the repository at this point in the history
* Split agent's HTTP server and handler

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Make handler public and switch to Gorilla mux

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Move handler to a shared package

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Delint

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Replace agent.GetServer() with GetHTTPRouter()

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Remove import

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Remove race condition

Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
yurishkuro committed Dec 31, 2019
1 parent 775b1d1 commit c6677ca
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 45 deletions.
7 changes: 4 additions & 3 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"sync/atomic"

"github.com/gorilla/mux"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
Expand Down Expand Up @@ -50,9 +51,9 @@ func NewAgent(
return a
}

// GetServer returns HTTP server used by the agent.
func (a *Agent) GetServer() *http.Server {
return a.httpServer
// GetHTTPRouter returns Gorilla HTTP router used by the agent's HTTP server.
func (a *Agent) GetHTTPRouter() *mux.Router {
return a.httpServer.Handler.(*mux.Router)
}

// Run runs all of agent UDP and HTTP servers in separate go-routines.
Expand Down
9 changes: 4 additions & 5 deletions cmd/agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,21 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) {
},
}
logger, logBuf := testutils.NewLogger()
//f, _ := cfg.Metrics.CreateMetricsFactory("jaeger")
mBldr := &jmetrics.Builder{HTTPRoute: "/metrics", Backend: "prometheus"}
mFactory, err := mBldr.CreateMetricsFactory("jaeger")
require.NoError(t, err)
agent, err := cfg.CreateAgent(fakeCollectorProxy{}, logger, mFactory)
require.NoError(t, err)
if h := mBldr.Handler(); mFactory != nil && h != nil {
logger.Info("Registering metrics handler with HTTP server", zap.String("route", mBldr.HTTPRoute))
agent.GetHTTPRouter().Handle(mBldr.HTTPRoute, h).Methods(http.MethodGet)
}
ch := make(chan error, 2)
go func() {
if err := agent.Run(); err != nil {
t.Errorf("error from agent.Run(): %s", err)
ch <- err
}
if h := mBldr.Handler(); mFactory != nil && h != nil {
logger.Info("Registering metrics handler with HTTP server", zap.String("route", mBldr.HTTPRoute))
agent.GetServer().Handler.(*http.ServeMux).Handle(mBldr.HTTPRoute, h)
}
close(ch)
}()

Expand Down
39 changes: 39 additions & 0 deletions cmd/agent/app/httpserver/srv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"net/http"

"github.com/gorilla/mux"
"github.com/uber/jaeger-lib/metrics"

"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
"github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
)

// NewHTTPServer creates a new server that hosts an HTTP/JSON endpoint for clients
// to query for sampling strategies and baggage restrictions.
func NewHTTPServer(hostPort string, manager configmanager.ClientConfigManager, mFactory metrics.Factory) *http.Server {
handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{
ConfigManager: manager,
MetricsFactory: mFactory,
LegacySamplingEndpoint: true,
})
r := mux.NewRouter()
handler.RegisterRoutes(r)
return &http.Server{Addr: hostPort, Handler: r}
}
27 changes: 27 additions & 0 deletions cmd/agent/app/httpserver/srv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestHTTPServer(t *testing.T) {
s := NewHTTPServer(":1", nil, nil)
assert.NotNil(t, s)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver
package clientcfghttp

import (
"encoding/json"
Expand All @@ -22,6 +22,7 @@ import (
"net/http"
"strings"

"github.com/gorilla/mux"
"github.com/uber/jaeger-lib/metrics"

"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
Expand All @@ -34,32 +35,19 @@ var (
errBadRequest = errors.New("bad request")
)

// NewHTTPServer creates a new server that hosts an HTTP/JSON endpoint for clients
// to query for sampling strategies and baggage restrictions.
func NewHTTPServer(hostPort string, manager configmanager.ClientConfigManager, mFactory metrics.Factory) *http.Server {
handler := newHTTPHandler(manager, mFactory)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
handler.serveSamplingHTTP(w, r, true /* thriftEnums092 */)
})
mux.HandleFunc("/sampling", func(w http.ResponseWriter, r *http.Request) {
handler.serveSamplingHTTP(w, r, false /* thriftEnums092 */)
})
mux.HandleFunc("/baggageRestrictions", func(w http.ResponseWriter, r *http.Request) {
handler.serveBaggageHTTP(w, r)
})
return &http.Server{Addr: hostPort, Handler: mux}
// HTTPHandlerParams contains parameters that must be passed to NewHTTPHandler.
type HTTPHandlerParams struct {
ConfigManager configmanager.ClientConfigManager // required
MetricsFactory metrics.Factory // required
LegacySamplingEndpoint bool
}

func newHTTPHandler(manager configmanager.ClientConfigManager, mFactory metrics.Factory) *httpHandler {
handler := &httpHandler{manager: manager}
metrics.Init(&handler.metrics, mFactory, nil)
return handler
}

type httpHandler struct {
manager configmanager.ClientConfigManager
metrics struct {
// HTTPHandler implements endpoints for used by Jaeger clients to retrieve client configuration,
// such as sampling and baggage restrictions.
type HTTPHandler struct {
legacySamplingEndpoint bool
manager configmanager.ClientConfigManager
metrics struct {
// Number of good sampling requests
SamplingRequestSuccess metrics.Counter `metric:"http-server.requests" tags:"type=sampling"`

Expand All @@ -83,7 +71,35 @@ type httpHandler struct {
}
}

func (h *httpHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request) (string, error) {
// NewHTTPHandler creates new HTTPHandler.
func NewHTTPHandler(params HTTPHandlerParams) *HTTPHandler {
handler := &HTTPHandler{
manager: params.ConfigManager,
legacySamplingEndpoint: params.LegacySamplingEndpoint,
}
metrics.MustInit(&handler.metrics, params.MetricsFactory, nil)
return handler
}

// RegisterRoutes registers configuration handlers with Gorilla Router.
func (h *HTTPHandler) RegisterRoutes(router *mux.Router) {
if h.legacySamplingEndpoint {
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
h.serveSamplingHTTP(w, r, true /* thriftEnums092 */)
}).Methods(http.MethodGet)
}

router.HandleFunc("/sampling", func(w http.ResponseWriter, r *http.Request) {
h.serveSamplingHTTP(w, r, false /* thriftEnums092 */)
}).Methods(http.MethodGet)

router.HandleFunc("/baggageRestrictions", func(w http.ResponseWriter, r *http.Request) {
h.serveBaggageHTTP(w, r)
}).Methods(http.MethodGet)

}

func (h *HTTPHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request) (string, error) {
services := r.URL.Query()["service"]
if len(services) != 1 {
h.metrics.BadRequest.Inc(1)
Expand All @@ -93,7 +109,7 @@ func (h *httpHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request)
return services[0], nil
}

func (h *httpHandler) writeJSON(w http.ResponseWriter, json []byte) error {
func (h *HTTPHandler) writeJSON(w http.ResponseWriter, json []byte) error {
w.Header().Add("Content-Type", mimeTypeApplicationJSON)
if _, err := w.Write(json); err != nil {
h.metrics.WriteFailures.Inc(1)
Expand All @@ -102,7 +118,7 @@ func (h *httpHandler) writeJSON(w http.ResponseWriter, json []byte) error {
return nil
}

func (h *httpHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request, thriftEnums092 bool) {
func (h *HTTPHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request, thriftEnums092 bool) {
service, err := h.serviceFromRequest(w, r)
if err != nil {
return
Expand All @@ -116,7 +132,7 @@ func (h *httpHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request,
jsonBytes, err := json.Marshal(resp)
if err != nil {
h.metrics.BadThriftFailures.Inc(1)
http.Error(w, "Cannot marshall Thrift to JSON", http.StatusInternalServerError)
http.Error(w, "cannot marshall Thrift to JSON", http.StatusInternalServerError)
return
}
if thriftEnums092 {
Expand All @@ -132,7 +148,7 @@ func (h *httpHandler) serveSamplingHTTP(w http.ResponseWriter, r *http.Request,
}
}

func (h *httpHandler) serveBaggageHTTP(w http.ResponseWriter, r *http.Request) {
func (h *HTTPHandler) serveBaggageHTTP(w http.ResponseWriter, r *http.Request) {
service, err := h.serviceFromRequest(w, r)
if err != nil {
return
Expand Down Expand Up @@ -166,7 +182,7 @@ var samplingStrategyTypes = []tSampling.SamplingStrategyType{
//
// Thrift 0.9.3 classes generate this JSON:
// {"strategyType":"PROBABILISTIC","probabilisticSampling":{"samplingRate":0.5}}
func (h *httpHandler) encodeThriftEnums092(json []byte) []byte {
func (h *HTTPHandler) encodeThriftEnums092(json []byte) []byte {
str := string(json)
for _, strategyType := range samplingStrategyTypes {
str = strings.Replace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package httpserver
package clientcfghttp

import (
"encoding/json"
Expand All @@ -24,11 +24,12 @@ import (
"net/http/httptest"
"testing"

"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/metricstest"

tSampling092 "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver/thrift-0.9.2"
tSampling092 "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp/thrift-0.9.2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)
Expand All @@ -49,8 +50,14 @@ func withServer(
samplingResponse: mockSamplingResponse,
baggageResponse: mockBaggageResponse,
}
realServer := NewHTTPServer(":1", mgr, metricsFactory)
server := httptest.NewServer(realServer.Handler)
handler := NewHTTPHandler(HTTPHandlerParams{
ConfigManager: mgr,
MetricsFactory: metricsFactory,
LegacySamplingEndpoint: true,
})
r := mux.NewRouter()
handler.RegisterRoutes(r)
server := httptest.NewServer(r)
defer server.Close()
runTest(&testServer{
metricsFactory: metricsFactory,
Expand Down Expand Up @@ -167,7 +174,7 @@ func TestHTTPHandlerErrors(t *testing.T) {
mockSamplingResponse: probabilistic(math.NaN()),
url: "?service=Y",
statusCode: http.StatusInternalServerError,
body: "Cannot marshall Thrift to JSON\n",
body: "cannot marshall Thrift to JSON\n",
metrics: []metricstest.ExpectedMetric{
{Name: "http-server.errors", Tags: map[string]string{"source": "thrift", "status": "5xx"}, Value: 1},
},
Expand Down Expand Up @@ -195,7 +202,10 @@ func TestHTTPHandlerErrors(t *testing.T) {

t.Run("failure to write a response", func(t *testing.T) {
withServer(probabilistic(0.001), restrictions("luggage", 10), func(ts *testServer) {
handler := newHTTPHandler(ts.mgr, ts.metricsFactory)
handler := NewHTTPHandler(HTTPHandlerParams{
ConfigManager: ts.mgr,
MetricsFactory: ts.metricsFactory,
})

req := httptest.NewRequest("GET", "http://localhost:80/?service=X", nil)
w := &mockWriter{header: make(http.Header)}
Expand Down

0 comments on commit c6677ca

Please sign in to comment.