diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index f16b53592ef..5e2eb48c2b8 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -546,6 +546,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added "detect_mime_type" processor for detecting mime types {pull}22940[22940]
- Improve event normalization performance {pull}22974[22974]
- Add tini as init system in docker images {pull}22137[22137]
+- Added "add_network_direction" processor for determining perimeter-based network direction. {pull}23076[23076]
*Auditbeat*
diff --git a/libbeat/processors/actions/add_network_direction.go b/libbeat/processors/actions/add_network_direction.go
new file mode 100644
index 00000000000..1b9c80dbef3
--- /dev/null
+++ b/libbeat/processors/actions/add_network_direction.go
@@ -0,0 +1,122 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package actions
+
+import (
+ "fmt"
+ "net"
+
+ "github.com/pkg/errors"
+
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/common"
+ "github.com/elastic/beats/v7/libbeat/conditions"
+ "github.com/elastic/beats/v7/libbeat/processors"
+ "github.com/elastic/beats/v7/libbeat/processors/checks"
+)
+
+func init() {
+ processors.RegisterPlugin("add_network_direction",
+ checks.ConfigChecked(NewAddNetworkDirection,
+ checks.RequireFields("source", "destination", "target", "internal_networks"),
+ checks.AllowedFields("source", "destination", "target", "internal_networks")))
+}
+
+const (
+ directionInternal = "internal"
+ directionExternal = "external"
+ directionOutbound = "outbound"
+ directionInbound = "inbound"
+)
+
+type networkDirectionProcessor struct {
+ Source string `config:"source"`
+ Destination string `config:"destination"`
+ Target string `config:"target"`
+ InternalNetworks []string `config:"internal_networks"`
+}
+
+// NewAddNetworkDirection constructs a new network direction processor.
+func NewAddNetworkDirection(cfg *common.Config) (processors.Processor, error) {
+ networkDirection := &networkDirectionProcessor{}
+ if err := cfg.Unpack(networkDirection); err != nil {
+ return nil, errors.Wrapf(err, "fail to unpack the add_network_direction configuration")
+ }
+
+ return networkDirection, nil
+}
+
+func (m *networkDirectionProcessor) Run(event *beat.Event) (*beat.Event, error) {
+ sourceI, err := event.GetValue(m.Source)
+ if err != nil {
+ // doesn't have the required field value to analyze
+ return event, nil
+ }
+ source, _ := sourceI.(string)
+ if source == "" {
+ // wrong type or not set
+ return event, nil
+ }
+ destinationI, err := event.GetValue(m.Destination)
+ if err != nil {
+ // doesn't have the required field value to analyze
+ return event, nil
+ }
+ destination, _ := destinationI.(string)
+ if destination == "" {
+ // wrong type or not set
+ return event, nil
+ }
+ sourceIP := net.ParseIP(source)
+ destinationIP := net.ParseIP(destination)
+ if sourceIP == nil || destinationIP == nil {
+ // bad ip address
+ return event, nil
+ }
+
+ internalSource, err := conditions.NetworkContains(sourceIP, m.InternalNetworks...)
+ if err != nil {
+ return event, err
+ }
+ internalDestination, err := conditions.NetworkContains(destinationIP, m.InternalNetworks...)
+ if err != nil {
+ return event, err
+ }
+
+ event.Fields.DeepUpdate(common.MapStr{
+ m.Target: networkDirection(internalSource, internalDestination),
+ })
+ return event, nil
+}
+
+func networkDirection(internalSource, internalDestination bool) string {
+ if internalSource && internalDestination {
+ return directionInternal
+ }
+ if internalSource {
+ return directionOutbound
+ }
+ if internalDestination {
+ return directionInbound
+ }
+ return directionExternal
+}
+
+func (m *networkDirectionProcessor) String() string {
+ return fmt.Sprintf("networkDirection=%+v|%+v->%+v", m.Source, m.Destination, m.Target)
+}
diff --git a/libbeat/processors/actions/add_network_direction_test.go b/libbeat/processors/actions/add_network_direction_test.go
new file mode 100644
index 00000000000..54fb170426b
--- /dev/null
+++ b/libbeat/processors/actions/add_network_direction_test.go
@@ -0,0 +1,80 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package actions
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/common"
+)
+
+func TestNetworkDirection(t *testing.T) {
+ tests := []struct {
+ Source string
+ Destination string
+ InternalNetworks []string
+ Direction string
+ Error bool
+ }{
+ {"1.1.1.1", "8.8.8.8", []string{"private"}, "external", false},
+ {"1.1.1.1", "192.168.1.218", []string{"private"}, "inbound", false},
+ {"192.168.1.218", "8.8.8.8", []string{"private"}, "outbound", false},
+ {"192.168.1.218", "192.168.1.219", []string{"private"}, "internal", false},
+ // early return
+ {"1.1.1.1", "8.8.8.8", []string{"foo"}, "", true},
+ {"", "192.168.1.219", []string{"private"}, "", false},
+ {"foo", "192.168.1.219", []string{"private"}, "", false},
+ {"192.168.1.218", "foo", []string{"private"}, "", false},
+ {"192.168.1.218", "", []string{"private"}, "", false},
+ }
+
+ for _, tt := range tests {
+ t.Run(fmt.Sprintf("%v -> %v : %v", tt.Source, tt.Destination, tt.Direction), func(t *testing.T) {
+ evt := beat.Event{
+ Fields: common.MapStr{
+ "source": tt.Source,
+ "destination": tt.Destination,
+ },
+ }
+ p, err := NewAddNetworkDirection(common.MustNewConfigFrom(map[string]interface{}{
+ "source": "source",
+ "destination": "destination",
+ "target": "direction",
+ "internal_networks": tt.InternalNetworks,
+ }))
+ require.NoError(t, err)
+ observed, err := p.Run(&evt)
+ if tt.Error {
+ require.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ }
+ enriched, err := observed.Fields.GetValue("direction")
+ if tt.Direction == "" {
+ require.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ require.Equal(t, tt.Direction, enriched)
+ }
+ })
+ }
+}
diff --git a/libbeat/processors/actions/docs/add_network_direction.asciidoc b/libbeat/processors/actions/docs/add_network_direction.asciidoc
new file mode 100644
index 00000000000..75fdd862b2b
--- /dev/null
+++ b/libbeat/processors/actions/docs/add_network_direction.asciidoc
@@ -0,0 +1,22 @@
+[[add-network-direction]]
+=== Add network direction
+
+++++
+add_network_direction
+++++
+
+The `add_network_direction` processor attempts to compute the perimeter-based network direction
+given an a source and destination ip address and list of internal networks. The key `internal_networks`
+can contain either CIDR blocks or a list of special values enumerated in the network section of <>.
+
+[source,yaml]
+-------
+processors:
+ - add_network_direction:
+ source: source.ip
+ destination: destination.ip
+ target: network.direction
+ internal_networks: [ private ]
+-------
+
+See <> for a list of supported conditions.