Skip to content

Commit

Permalink
NETOBSERV-1889: decoupling, use gopacket fork
Browse files Browse the repository at this point in the history
- Decouple some packages so that FLP has less to pull (currently it
  pulls all cilium & bpf etc.)
  -> new "model" package split from "flow"
  -> new "tracer" package split from "ebpf"
  -> isolate packet-related dependencies so that gopacket isn't a
transitive dependency in FLP

- replace google/gopacket with gopacket/gopacket ; it's a fork, the
  former being unmaintained.
  • Loading branch information
jotak committed Sep 20, 2024
1 parent d975af5 commit 9cacdb9
Show file tree
Hide file tree
Showing 171 changed files with 26,098 additions and 14,983 deletions.
12 changes: 6 additions & 6 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ flowchart TD
style E fill:#990
E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
RB --> |chan *flow.Record| ACC(flow.Accounter)
RB --> |chan *model.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
M --> |"chan []*flow.Record"| DD
ACC --> |"chan []*model.Record"| DD(flow.Deduper)
M --> |"chan []*model.Record"| DD
subgraph Optional
DD
end
DD --> |"chan []*flow.Record"| CL(flow.CapacityLimiter)
DD --> |"chan []*model.Record"| CL(flow.CapacityLimiter)
CL --> |"chan []*flow.Record"| DC(flow.Decorator)
CL --> |"chan []*model.Record"| DC(flow.Decorator)
DC --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
DC --> |"chan []*model.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
```
6 changes: 3 additions & 3 deletions examples/packetcapture-dump/client/packetcapture-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils/packets"

"github.com/google/gopacket/layers"
"github.com/gopacket/gopacket/layers"
)

var (
Expand Down Expand Up @@ -67,7 +67,7 @@ func main() {
os.Exit(1)
}
// write pcap file header
_, err = f.Write(utils.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
_, err = f.Write(packets.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
if err != nil {
fmt.Println("Write file header failed:", err.Error())
os.Exit(1)
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/cilium/ebpf v0.16.0
github.com/fsnotify/fsnotify v1.7.0
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
github.com/google/gopacket v1.1.19
github.com/gopacket/gopacket v1.2.0
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118
github.com/netobserv/flowlogs-pipeline v1.6.1-crc0
Expand Down Expand Up @@ -126,12 +126,12 @@ require (
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
Expand Down
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,6 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand All @@ -440,6 +438,8 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/gopacket/gopacket v1.2.0 h1:eXbzFad7f73P1n2EJHQlsKuvIMJjVXK5tXoSca78I3A=
github.com/gopacket/gopacket v1.2.0/go.mod h1:BrAKEy5EOGQ76LSqh7DMAr7z0NNPdczWm2GxCG7+I8M=
github.com/gophercloud/gophercloud v0.13.0/go.mod h1:VX0Ibx85B60B5XOrZr6kaNwrmPUzcmMpwxvQ1WQIIWM=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
Expand Down Expand Up @@ -1009,8 +1009,8 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -1100,8 +1100,8 @@ golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -1126,8 +1126,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1215,8 +1215,8 @@ golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA=
golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -1230,8 +1230,8 @@ golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
34 changes: 18 additions & 16 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
"github.com/netobserv/netobserv-ebpf-agent/pkg/kernel"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
promo "github.com/netobserv/netobserv-ebpf-agent/pkg/prometheus"
"github.com/netobserv/netobserv-ebpf-agent/pkg/tracer"

"github.com/cilium/ebpf/ringbuf"
"github.com/gavv/monotime"
Expand Down Expand Up @@ -116,8 +118,8 @@ type Flows struct {
rbTracer *flow.RingBufTracer
accounter *flow.Accounter
limiter *flow.CapacityLimiter
deduper node.MiddleFunc[[]*flow.Record, []*flow.Record]
exporter node.TerminalFunc[[]*flow.Record]
deduper node.MiddleFunc[[]*model.Record, []*model.Record]
exporter node.TerminalFunc[[]*model.Record]

// elements used to decorate flows with extra information
interfaceNamer flow.InterfaceNamer
Expand Down Expand Up @@ -198,7 +200,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
debug = true
}

ebpfConfig := &ebpf.FlowFetcherConfig{
ebpfConfig := &tracer.FlowFetcherConfig{
EnableIngress: ingress,
EnableEgress: egress,
Debug: debug,
Expand All @@ -211,20 +213,20 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
EnableNetworkEventsMonitoring: cfg.EnableNetworkEventsMonitoring,
NetworkEventsMonitoringGroupID: cfg.NetworkEventsMonitoringGroupID,
EnableFlowFilter: cfg.EnableFlowFilter,
FilterConfig: &ebpf.FilterConfig{
FilterConfig: &tracer.FilterConfig{
FilterAction: cfg.FilterAction,
FilterDirection: cfg.FilterDirection,
FilterIPCIDR: cfg.FilterIPCIDR,
FilterProtocol: cfg.FilterProtocol,
FilterPeerIP: cfg.FilterPeerIP,
FilterDestinationPort: ebpf.ConvertFilterPortsToInstr(cfg.FilterDestinationPort, cfg.FilterDestinationPortRange, cfg.FilterDestinationPorts),
FilterSourcePort: ebpf.ConvertFilterPortsToInstr(cfg.FilterSourcePort, cfg.FilterSourcePortRange, cfg.FilterSourcePorts),
FilterPort: ebpf.ConvertFilterPortsToInstr(cfg.FilterPort, cfg.FilterPortRange, cfg.FilterPorts),
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(cfg.FilterDestinationPort, cfg.FilterDestinationPortRange, cfg.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(cfg.FilterSourcePort, cfg.FilterSourcePortRange, cfg.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(cfg.FilterPort, cfg.FilterPortRange, cfg.FilterPorts),
FilterTCPFLags: cfg.FilterTCPFlags,
},
}

fetcher, err := ebpf.NewFlowFetcher(ebpfConfig)
fetcher, err := tracer.NewFlowFetcher(ebpfConfig)
if err != nil {
return nil, err
}
Expand All @@ -236,7 +238,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
func flowsAgent(cfg *Config, m *metrics.Metrics,
informer ifaces.Informer,
fetcher ebpfFlowFetcher,
exporter node.TerminalFunc[[]*flow.Record],
exporter node.TerminalFunc[[]*model.Record],
agentIP net.IP,
s *ovnobserv.SampleDecoder,
) (*Flows, error) {
Expand Down Expand Up @@ -284,7 +286,7 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m)
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
limiter := flow.NewCapacityLimiter(m)
var deduper node.MiddleFunc[[]*flow.Record, []*flow.Record]
var deduper node.MiddleFunc[[]*model.Record, []*model.Record]
if cfg.Deduper == DeduperFirstCome {
deduper = flow.Dedupe(cfg.DeduperFCExpiry, cfg.DeduperJustMark, cfg.DeduperMerge, interfaceNamer, m)
}
Expand Down Expand Up @@ -321,7 +323,7 @@ func flowDirections(cfg *Config) (ingress, egress bool) {
}
}

func buildFlowExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*flow.Record], error) {
func buildFlowExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) {
switch cfg.Export {
case "grpc":
return buildGRPCExporter(cfg, m, s)
Expand All @@ -338,7 +340,7 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecod
}
}

func buildGRPCExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*flow.Record], error) {
func buildGRPCExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
Expand All @@ -350,15 +352,15 @@ func buildGRPCExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecod
return grpcExporter.ExportFlows, nil
}

func buildFlowDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildFlowDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*model.Record], error) {
flpExporter, err := exporter.StartDirectFLP(cfg.FLPConfig, cfg.BuffersLength)
if err != nil {
return nil, err
}
return flpExporter.ExportFlows, nil
}

func buildKafkaExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*flow.Record], error) {
func buildKafkaExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDecoder) (node.TerminalFunc[[]*model.Record], error) {
if len(cfg.KafkaBrokers) == 0 {
return nil, errors.New("at least one Kafka broker is needed")
}
Expand Down Expand Up @@ -409,7 +411,7 @@ func buildKafkaExporter(cfg *Config, m *metrics.Metrics, s *ovnobserv.SampleDeco
}).ExportFlows, nil
}

func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*flow.Record], error) {
func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*model.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
Expand Down Expand Up @@ -480,7 +482,7 @@ func (f *Flows) interfacesManager(ctx context.Context) error {

// buildAndStartPipeline creates the ETL flow processing graph.
// For a more visual view, check the docs/architecture.md document.
func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*flow.Record], error) {
func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*model.Record], error) {

alog.Debug("registering interfaces' listener in background")
err := f.interfacesManager(ctx)
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/gavv/monotime"
test2 "github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
"github.com/netobserv/netobserv-ebpf-agent/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) {

receivedKeys := map[ebpf.BpfFlowId]struct{}{}

var key1Flows []*flow.Record
var key1Flows []*model.Record
for _, f := range exported {
require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{}
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
assert.Len(t, exported, 3)
receivedKeys := map[ebpf.BpfFlowId]struct{}{}

var key1Flows []*flow.Record
var key1Flows []*model.Record
for _, f := range exported {
require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{}
Expand Down
Loading

0 comments on commit 9cacdb9

Please sign in to comment.