Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.8] Allow delete and index actions with a document ID (#12606) #18309

Merged
merged 1 commit into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ func (e *Event) SetID(id string) {
e.Meta["_id"] = id
}

func (e *Event) GetMetaStringValue(key string) (string, error) {
tmp, err := e.Meta.GetValue(key)
if err != nil {
return "", err
}

if s, ok := tmp.(string); ok {
return s, nil
}

return "", nil
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == "@timestamp" {
return e.Timestamp, nil
Expand Down
4 changes: 4 additions & 0 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type BulkCreateAction struct {
Create BulkMeta `json:"create" struct:"create"`
}

type BulkDeleteAction struct {
Delete BulkMeta `json:"delete" struct:"delete"`
}

type BulkMeta struct {
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
Expand Down
37 changes: 26 additions & 11 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,16 @@ type bulkResultStats struct {

const (
defaultEventType = "doc"
opTypeCreate = "create"
opTypeDelete = "delete"
opTypeIndex = "index"
)

// opTypeKey defines the metadata key name for event operation type.
// The key's value can be an empty string, `create`, `index`, or `delete`. If empty, it will assume
// either `create` or `index`. See `createEventBulkMeta`. If in doubt, set explicitly.
const opTypeKey = "op_type"

// NewClient instantiates a new client.
func NewClient(
s ClientSettings,
Expand Down Expand Up @@ -281,7 +289,12 @@ func bulkEncodePublishRequest(
log.Errorf("Failed to encode event meta data: %+v", err)
continue
}
bulkItems = append(bulkItems, meta, event)
if opType, err := event.GetMetaStringValue(opTypeKey); err == nil && opType == opTypeDelete {
// We don't include the event source in a bulk DELETE
bulkItems = append(bulkItems, meta)
} else {
bulkItems = append(bulkItems, meta, event)
}
okEvents = append(okEvents, data[i])
}
return okEvents, bulkItems
Expand Down Expand Up @@ -311,16 +324,8 @@ func createEventBulkMeta(
return nil, err
}

var id string
if m := event.Meta; m != nil {
if tmp := m["_id"]; tmp != nil {
if s, ok := tmp.(string); ok {
id = s
} else {
log.Errorf("Event ID '%v' is no string value", id)
}
}
}
id, _ := event.GetMetaStringValue("_id")
opType, _ := event.GetMetaStringValue(opTypeKey)

meta := eslegclient.BulkMeta{
Index: index,
Expand All @@ -329,7 +334,17 @@ func createEventBulkMeta(
ID: id,
}

if opType == opTypeDelete {
if id != "" {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
} else {
return nil, fmt.Errorf("%s %s requires _id", opTypeKey, opTypeDelete)
}
}
if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
if opType == opTypeIndex {
return eslegclient.BulkIndexAction{Index: meta}, nil
}
return eslegclient.BulkCreateAction{Create: meta}, nil
}
return eslegclient.BulkIndexAction{Index: meta}, nil
Expand Down
62 changes: 62 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,68 @@ func TestBulkEncodeEvents(t *testing.T) {
}
}

func TestBulkEncodeEventsWithOpType(t *testing.T) {
cases := []common.MapStr{
{"_id": "111", "op_type": "index", "message": "test 1", "bulkIndex": 0},
{"_id": "112", "op_type": "", "message": "test 2", "bulkIndex": 2},
{"_id": "", "op_type": "delete", "message": "test 6", "bulkIndex": -1}, // this won't get encoded due to missing _id
{"_id": "", "op_type": "", "message": "test 3", "bulkIndex": 4},
{"_id": "114", "op_type": "delete", "message": "test 4", "bulkIndex": 6},
{"_id": "115", "op_type": "index", "message": "test 5", "bulkIndex": 7},
}

cfg := common.MustNewConfigFrom(common.MapStr{})
info := beat.Info{
IndexPrefix: "test",
Version: version.GetDefaultVersion(),
}

im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig())
require.NoError(t, err)

index, pipeline, err := buildSelectors(im, info, cfg)
require.NoError(t, err)

events := make([]publisher.Event, len(cases))
for i, fields := range cases {
events[i] = publisher.Event{
Content: beat.Event{
Meta: common.MapStr{
"_id": fields["_id"],
"op_type": fields["op_type"],
},
Fields: common.MapStr{
"message": fields["message"],
},
}}
}

encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(version.GetDefaultVersion()), index, pipeline, events)
require.Equal(t, len(events)-1, len(encoded), "all events should have been encoded")
require.Equal(t, 9, len(bulkItems), "incomplete bulk")

for i := 0; i < len(cases); i++ {
bulkEventIndex, _ := cases[i]["bulkIndex"].(int)
if bulkEventIndex == -1 {
continue
}
caseOpType, _ := cases[i]["op_type"].(string)
caseMessage, _ := cases[i]["message"].(string)
switch bulkItems[bulkEventIndex].(type) {
case eslegclient.BulkCreateAction:
validOpTypes := []string{opTypeCreate, ""}
require.Contains(t, validOpTypes, caseOpType, caseMessage)
case eslegclient.BulkIndexAction:
require.Equal(t, opTypeIndex, caseOpType, caseMessage)
case eslegclient.BulkDeleteAction:
require.Equal(t, opTypeDelete, caseOpType, caseMessage)
default:
require.FailNow(t, "unknown type")
}
}

}

func TestClientWithAPIKey(t *testing.T) {
var headers http.Header

Expand Down