Skip to content

Commit

Permalink
Open Application Signals ports on CWAgent only when enabled (#230)
Browse files Browse the repository at this point in the history
  • Loading branch information
Paramadon committed Sep 13, 2024
1 parent f51816b commit a32bbc0
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 109 deletions.
49 changes: 34 additions & 15 deletions internal/manifests/collector/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ import (
)

const (
StatsD = "statsd"
CollectD = "collectd"
XrayProxy = "aws-proxy"
XrayTraces = "aws-traces"
OtlpGrpc = "otlp-grpc"
OtlpHttp = "otlp-http"
AppSignalsGrpc = "appsignals-grpc"
AppSignalsHttp = "appsignals-http"
AppSignalsProxy = "appsignals-xray"
EMF = "emf"
EMFTcp = "emf-tcp"
EMFUdp = "emf-udp"
CWA = "cwa-"
StatsD = "statsd"
CollectD = "collectd"
XrayProxy = "aws-proxy"
XrayTraces = "aws-traces"
OtlpGrpc = "otlp-grpc"
OtlpHttp = "otlp-http"
AppSignalsGrpc = "appsig-grpc"
AppSignalsHttp = "appsig-http"
AppSignalsProxy = "appsig-xray"
AppSignalsGrpcSA = ":4315"
AppSignalsHttpSA = ":4316"
AppSignalsProxySA = ":2000"
EMF = "emf"
EMFTcp = "emf-tcp"
EMFUdp = "emf-udp"
CWA = "cwa-"
)

var receiverDefaultPortsMap = map[string]int32{
Expand Down Expand Up @@ -79,7 +82,6 @@ func getContainerPorts(logger logr.Logger, cfg string, specPorts []corev1.Servic
config, err := adapters.ConfigStructFromJSONString(cfg)
if err != nil {
logger.Error(err, "error parsing cw agent config")
servicePorts = PortMapToServicePortList(AppSignalsPortToServicePortMap)
} else {
servicePorts = getServicePortsFromCWAgentConfig(logger, config)
}
Expand Down Expand Up @@ -115,13 +117,20 @@ func getContainerPorts(logger logr.Logger, cfg string, specPorts []corev1.Servic
}

func getServicePortsFromCWAgentConfig(logger logr.Logger, config *adapters.CwaConfig) []corev1.ServicePort {
servicePortsMap := getAppSignalsServicePortsMap()
servicePortsMap := make(map[int32][]corev1.ServicePort)

getApplicationSignalsReceiversServicePorts(logger, config, servicePortsMap)
getMetricsReceiversServicePorts(logger, config, servicePortsMap)
getLogsReceiversServicePorts(logger, config, servicePortsMap)
getTracesReceiversServicePorts(logger, config, servicePortsMap)

return PortMapToServicePortList(servicePortsMap)
}

func isAppSignalEnabled(config *adapters.CwaConfig) bool {
return config.GetApplicationSignalsConfig() != nil
}

func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
if config.Metrics == nil || config.Metrics.MetricsCollected == nil {
return
Expand Down Expand Up @@ -221,6 +230,16 @@ func getAppSignalsServicePortsMap() map[int32][]corev1.ServicePort {
return servicePortMap
}

func getApplicationSignalsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
if !isAppSignalEnabled(config) {
return
}

getReceiverServicePort(logger, AppSignalsGrpcSA, AppSignalsGrpc, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, AppSignalsHttpSA, AppSignalsHttp, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, AppSignalsProxySA, AppSignalsProxy, corev1.ProtocolTCP, servicePortsMap)
}

func portFromEndpoint(endpoint string) (int32, error) {
var err error
var port int64
Expand Down
131 changes: 38 additions & 93 deletions internal/manifests/collector/ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@ import (
func TestStatsDGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/statsDAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 1, len(containerPorts))
assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort)
assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol)
Expand All @@ -31,13 +25,7 @@ func TestStatsDGetContainerPorts(t *testing.T) {
func TestDefaultStatsDGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/statsDDefaultAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 1, len(containerPorts))
assert.Equal(t, int32(8125), containerPorts[StatsD].ContainerPort)
assert.Equal(t, StatsD, containerPorts[StatsD].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[StatsD].Protocol)
Expand All @@ -46,42 +34,36 @@ func TestDefaultStatsDGetContainerPorts(t *testing.T) {
func TestCollectDGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/collectDAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 1, len(containerPorts))
assert.Equal(t, int32(25936), containerPorts[CWA+CollectD].ContainerPort)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+CollectD].Protocol)
}

func TestDefaultCollectDGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/collectDDefaultAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 1, len(containerPorts))
assert.Equal(t, int32(25826), containerPorts[CollectD].ContainerPort)
assert.Equal(t, CollectD, containerPorts[CollectD].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[CollectD].Protocol)
}

func TestApplicationSignals(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/application_signals.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[CWA+AppSignalsGrpc].ContainerPort)
assert.Equal(t, CWA+AppSignalsGrpc, containerPorts[CWA+AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[CWA+AppSignalsHttp].ContainerPort)
assert.Equal(t, CWA+AppSignalsHttp, containerPorts[CWA+AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[CWA+AppSignalsProxy].ContainerPort)
assert.Equal(t, CWA+AppSignalsProxy, containerPorts[CWA+AppSignalsProxy].Name)
}

func TestEMFGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/emfAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 5, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 2, len(containerPorts))
assert.Equal(t, int32(25888), containerPorts[EMFTcp].ContainerPort)
assert.Equal(t, EMFTcp, containerPorts[EMFTcp].Name)
assert.Equal(t, int32(25888), containerPorts[EMFUdp].ContainerPort)
Expand All @@ -92,13 +74,9 @@ func TestEMFGetContainerPorts(t *testing.T) {
func TestXrayAndOTLPGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/xrayAndOTLPAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 5, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, int32(2000), containerPorts[CWA+XrayTraces].ContainerPort)
assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name)
assert.Equal(t, int32(4327), containerPorts[CWA+OtlpGrpc].ContainerPort)
assert.Equal(t, CWA+OtlpGrpc, containerPorts[CWA+OtlpGrpc].Name)
assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+OtlpGrpc].Protocol)
Expand All @@ -110,13 +88,9 @@ func TestXrayAndOTLPGetContainerPorts(t *testing.T) {
func TestDefaultXRayAndOTLPGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/xrayAndOTLPDefaultAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 5, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, int32(2000), containerPorts[XrayTraces].ContainerPort)
assert.Equal(t, XrayTraces, containerPorts[XrayTraces].Name)
assert.Equal(t, int32(4317), containerPorts[OtlpGrpc].ContainerPort)
assert.Equal(t, OtlpGrpc, containerPorts[OtlpGrpc].Name)
assert.Equal(t, corev1.ProtocolTCP, containerPorts[OtlpGrpc].Protocol)
Expand All @@ -128,13 +102,7 @@ func TestDefaultXRayAndOTLPGetContainerPorts(t *testing.T) {
func TestXRayGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 5, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 2, len(containerPorts))
assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort)
assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol)
Expand All @@ -147,13 +115,9 @@ func TestXRayWithBindAddressDefaultGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json")
strings.Replace(cfg, "2800", "2000", 1)
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 5, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 2, len(containerPorts))
assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort)
assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name)
assert.Equal(t, int32(2900), containerPorts[CWA+XrayProxy].ContainerPort)
assert.Equal(t, CWA+XrayProxy, containerPorts[CWA+XrayProxy].Name)
assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+XrayProxy].Protocol)
Expand All @@ -163,13 +127,7 @@ func TestXRayWithTCPProxyBindAddressDefaultGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json")
strings.Replace(cfg, "2900", "2000", 1)
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 5, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 2, len(containerPorts))
assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort)
assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol)
Expand All @@ -178,26 +136,20 @@ func TestXRayWithTCPProxyBindAddressDefaultGetContainerPorts(t *testing.T) {
func TestNilMetricsGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/nilMetrics.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 0, len(containerPorts))
}

func TestMultipleReceiversGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/multipleReceiversAgentConfig.json")
strings.Replace(cfg, "2900", "2000", 1)
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 11, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, int32(4315), containerPorts[CWA+AppSignalsGrpc].ContainerPort)
assert.Equal(t, CWA+AppSignalsGrpc, containerPorts[CWA+AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[CWA+AppSignalsHttp].ContainerPort)
assert.Equal(t, CWA+AppSignalsHttp, containerPorts[CWA+AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[CWA+AppSignalsProxy].ContainerPort)
assert.Equal(t, CWA+AppSignalsProxy, containerPorts[CWA+AppSignalsProxy].Name)
assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort)
assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol)
Expand Down Expand Up @@ -234,11 +186,9 @@ func TestSpecPortsOverrideGetContainerPorts(t *testing.T) {
},
}
containerPorts := getContainerPorts(logger, cfg, specPorts)
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, int32(12345), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(12346), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort)
Expand All @@ -250,13 +200,8 @@ func TestInvalidConfigGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/nilMetrics.json")
cfg = cfg + ","
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 3, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, 0, len(containerPorts))

}

func getJSONStringFromFile(path string) string {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"logs": {
"metrics_collected": {
"application_signals": {}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
},
"logs": {
"metrics_collected": {
"emf": {}
"emf": {},
"application_signals": {}
}
},
"traces": {
Expand Down

0 comments on commit a32bbc0

Please sign in to comment.