Skip to content

Commit

Permalink
Metricbeat/HTTP: Support array in http/json metricset
Browse files Browse the repository at this point in the history
Currently (before this commit) the http/json metricset in Metricbeat
only can query information from http endpoints which expose
map[string]interface{}. For endpoints which expose an array on the
root level, the json metricset does not work.

A config option is added `response.isarray | bool`. If
someone configures array but a non array json object
is returned, an error is logged.

Event response is unified to []map[string]interface{} even
if the response is map[string]interface{}.

Signed-off-by: Jaipradeesh <jaipradeesh@gmail.com>
  • Loading branch information
dolftax committed Mar 12, 2018
1 parent bb6d3d8 commit d89f6eb
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Refactor prometheus endpoint parsing to look similar to upstream prometheus {pull}6332[6332]
- Update prometheus dependencies to latest {pull}6333[6333]
- Making the http/json metricset GA. {pull}6471[6471]
- Add support for array in http/json metricset. {pull}6480[6480]

*Packetbeat*

Expand Down
1 change: 1 addition & 0 deletions metricbeat/docs/modules/http.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ metricbeat.modules:
#method: "GET"
#request.enabled: false
#response.enabled: false
#json.is_array: false
#dedot.enabled: false
- module: http
Expand Down
1 change: 1 addition & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ metricbeat.modules:
#method: "GET"
#request.enabled: false
#response.enabled: false
#json.is_array: false
#dedot.enabled: false

- module: http
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/http/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#method: "GET"
#request.enabled: false
#response.enabled: false
#json.is_array: false
#dedot.enabled: false

- module: http
Expand Down
11 changes: 9 additions & 2 deletions metricbeat/module/http/_meta/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ import (
)

func main() {
http.HandleFunc("/", serve)
http.HandleFunc("/jsonarr", serveJSONArr)
http.HandleFunc("/jsonobj", serveJSONObj)
http.HandleFunc("/", serveJSONObj)

err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}

func serve(w http.ResponseWriter, r *http.Request) {
func serveJSONArr(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, `[{"hello1":"world1"}, {"hello2": "world2"}]`)
}

func serveJSONObj(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, `{"hello":"world"}`)
}
5 changes: 2 additions & 3 deletions metricbeat/module/http/json/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
"name": "host.example.com"
},
"http": {
"testnamespace": {
"json": {
"hello": "world"
}
},
"metricset": {
"host": "http:8080",
"host": "127.0.0.1:8080",
"module": "http",
"name": "json",
"namespace": "testnamespace",
"rtt": 115
}
}
1 change: 1 addition & 0 deletions metricbeat/module/http/json/_meta/test/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ metricbeat.modules:
headers:
Accept: application/json
request.enabled: true
json.is_array: false
response.enabled: true

#================================ Outputs =====================================
Expand Down
76 changes: 51 additions & 25 deletions metricbeat/module/http/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type MetricSet struct {
body string
requestEnabled bool
responseEnabled bool
jsonIsArray bool
deDotEnabled bool
}

Expand All @@ -63,12 +64,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Body string `config:"body"`
RequestEnabled bool `config:"request.enabled"`
ResponseEnabled bool `config:"response.enabled"`
JSONIsArray bool `config:"json.is_array"`
DeDotEnabled bool `config:"dedot.enabled"`
}{
Method: "GET",
Body: "",
RequestEnabled: false,
ResponseEnabled: false,
JSONIsArray: false,
DeDotEnabled: false,
}

Expand All @@ -91,37 +94,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
http: http,
requestEnabled: config.RequestEnabled,
responseEnabled: config.ResponseEnabled,
jsonIsArray: config.JSONIsArray,
deDotEnabled: config.DeDotEnabled,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch() (common.MapStr, error) {
response, err := m.http.FetchResponse()
if err != nil {
return nil, err
}
defer response.Body.Close()

var jsonBody map[string]interface{}
var event map[string]interface{}

body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}

err = json.Unmarshal(body, &jsonBody)
if err != nil {
return nil, err
}
func (m *MetricSet) processBody(response *http.Response, jsonBody interface{}) common.MapStr {
var event common.MapStr

if m.deDotEnabled {
event = common.DeDotJSON(jsonBody).(map[string]interface{})
event = common.DeDotJSON(jsonBody).(common.MapStr)
} else {
event = jsonBody
event = jsonBody.(common.MapStr)
}

if m.requestEnabled {
Expand All @@ -148,7 +132,49 @@ func (m *MetricSet) Fetch() (common.MapStr, error) {
// Set dynamic namespace
event["_namespace"] = m.namespace

return event, nil
return event
}

// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
response, err := m.http.FetchResponse()
if err != nil {
return nil, err
}
defer response.Body.Close()

var jsonBody common.MapStr
var jsonBodyArr []common.MapStr
var events []common.MapStr

body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}

if m.jsonIsArray {
err = json.Unmarshal(body, &jsonBodyArr)
if err != nil {
return nil, err
}

for _, obj := range jsonBodyArr {
event := m.processBody(response, obj)
events = append(events, event)
}
} else {
err = json.Unmarshal(body, &jsonBody)
if err != nil {
return nil, err
}

event := m.processBody(response, jsonBody)
events = append(events, event)
}

return events, nil
}

func (m *MetricSet) getHeaders(header http.Header) map[string]string {
Expand Down
44 changes: 34 additions & 10 deletions metricbeat/module/http/json/json_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
)

func TestFetch(t *testing.T) {
func TestFetchObject(t *testing.T) {
compose.EnsureUp(t, "http")

f := mbtest.NewEventFetcher(t, getConfig())
f := mbtest.NewEventsFetcher(t, getConfig("object"))
event, err := f.Fetch()
if !assert.NoError(t, err) {
t.FailNow()
Expand All @@ -24,23 +24,47 @@ func TestFetch(t *testing.T) {
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)
}

func TestFetchArray(t *testing.T) {
compose.EnsureUp(t, "http")

f := mbtest.NewEventsFetcher(t, getConfig("array"))
event, err := f.Fetch()
if !assert.NoError(t, err) {
t.FailNow()
}

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)
}
func TestData(t *testing.T) {
compose.EnsureUp(t, "http")

f := mbtest.NewEventFetcher(t, getConfig())
err := mbtest.WriteEvent(f, t)
f := mbtest.NewEventsFetcher(t, getConfig("object"))
err := mbtest.WriteEvents(f, t)
if err != nil {
t.Fatal("write", err)
}

}

func getConfig() map[string]interface{} {
func getConfig(jsonType string) map[string]interface{} {
var path string
var responseIsArray bool
switch jsonType {
case "object":
path = "/jsonobj"
responseIsArray = false
case "array":
path = "/jsonarr"
responseIsArray = true
}

return map[string]interface{}{
"module": "http",
"metricsets": []string{"json"},
"hosts": []string{getEnvHost() + ":" + getEnvPort()},
"path": "/",
"namespace": "testnamespace",
"module": "http",
"metricsets": []string{"json"},
"hosts": []string{getEnvHost() + ":" + getEnvPort()},
"path": path,
"namespace": "testnamespace",
"json.is_array": responseIsArray,
}
}

Expand Down
1 change: 1 addition & 0 deletions metricbeat/modules.d/http.yml.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#method: "GET"
#request.enabled: false
#response.enabled: false
#json.is_array: false
#dedot.enabled: false

- module: http
Expand Down

0 comments on commit d89f6eb

Please sign in to comment.