Skip to content

Commit

Permalink
Add helper functions for metric conversion [awsecscontainermetricsrec…
Browse files Browse the repository at this point in the history
…eiver] (#1089)

This change adds helper functions for converting ECS resources metrics to OT metrics. 

**Link to tracking Issue:**
#457 

**Testing:**
Unit test added.

**Documentation:** 
README.md
  • Loading branch information
hossain-rayhan authored Sep 25, 2020
1 parent 3b94b17 commit ca0feff
Show file tree
Hide file tree
Showing 13 changed files with 889 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awsecscontainermetrics

import (
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opentelemetry.io/collector/consumer/consumerdata"
)

// metricDataAccumulator defines the accumulator
type metricDataAccumulator struct {
md []*consumerdata.MetricsData
}

// getMetricsData generates OT Metrics data from task metadata and docker stats
func (acc *metricDataAccumulator) getMetricsData(containerStatsMap map[string]ContainerStats, metadata TaskMetadata) {

taskMetrics := ECSMetrics{}
timestamp := timestampProto(time.Now())
taskResource := taskResource(metadata)

for _, containerMetadata := range metadata.Containers {
stats := containerStatsMap[containerMetadata.DockerID]
containerMetrics := getContainerMetrics(stats)
containerMetrics.MemoryReserved = *containerMetadata.Limits.Memory
containerMetrics.CPUReserved = *containerMetadata.Limits.CPU

containerResource := containerResource(containerMetadata)
for k, v := range taskResource.Labels {
containerResource.Labels[k] = v
}

acc.accumulate(
containerResource,
convertToOCMetrics(ContainerPrefix, containerMetrics, nil, nil, timestamp),
)

aggregateTaskMetrics(&taskMetrics, containerMetrics)
}

// Overwrite Memory limit with task level limit
if metadata.Limits.Memory != nil {
taskMetrics.MemoryReserved = *metadata.Limits.Memory
}

taskMetrics.CPUReserved = taskMetrics.CPUReserved / CPUsInVCpu

// Overwrite CPU limit with task level limit
if metadata.Limits.CPU != nil {
taskMetrics.CPUReserved = *metadata.Limits.CPU
}

acc.accumulate(
taskResource,
convertToOCMetrics(TaskPrefix, taskMetrics, nil, nil, timestamp),
)
}

func (acc *metricDataAccumulator) accumulate(
r *resourcepb.Resource,
m ...[]*metricspb.Metric,
) {
var resourceMetrics []*metricspb.Metric
for _, metrics := range m {
for _, metric := range metrics {
if metric != nil {
resourceMetrics = append(resourceMetrics, metric)
}
}
}

acc.md = append(acc.md, &consumerdata.MetricsData{
Metrics: resourceMetrics,
Resource: r,
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awsecscontainermetrics

import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumerdata"
)

func TestGetMetricsData(t *testing.T) {
v := uint64(1)
f := float64(1.0)

memStats := make(map[string]uint64)
memStats["cache"] = v

mem := MemoryStats{
Usage: &v,
MaxUsage: &v,
Limit: &v,
MemoryReserved: &v,
MemoryUtilized: &v,
Stats: memStats,
}

disk := DiskStats{
IoServiceBytesRecursives: []IoServiceBytesRecursive{
{Op: "Read", Value: &v},
{Op: "Write", Value: &v},
{Op: "Total", Value: &v},
},
}

net := make(map[string]NetworkStats)
net["eth0"] = NetworkStats{
RxBytes: &v,
RxPackets: &v,
RxErrors: &v,
RxDropped: &v,
TxBytes: &v,
TxPackets: &v,
TxErrors: &v,
TxDropped: &v,
}

netRate := NetworkRateStats{
RxBytesPerSecond: &f,
TxBytesPerSecond: &f,
}

percpu := []*uint64{&v, &v}
cpuUsage := CPUUsage{
TotalUsage: &v,
UsageInKernelmode: &v,
UsageInUserMode: &v,
PerCPUUsage: percpu,
}

cpuStats := CPUStats{
CPUUsage: cpuUsage,
OnlineCpus: &v,
SystemCPUUsage: &v,
CPUUtilized: &v,
CPUReserved: &v,
}
containerStats := ContainerStats{
Name: "test",
ID: "001",
Memory: mem,
Disk: disk,
Network: net,
NetworkRate: netRate,
CPU: cpuStats,
}

tm := TaskMetadata{
Cluster: "cluster-1",
TaskARN: "arn:aws:some-value/001",
Family: "task-def-family-1",
Revision: "task-def-version",
Containers: []ContainerMetadata{
{ContainerName: "container-1", DockerID: "001", DockerName: "docker-container-1", Limits: Limit{CPU: &f, Memory: &v}},
},
Limits: Limit{CPU: &f, Memory: &v},
}

cstats := make(map[string]ContainerStats)
cstats["001"] = containerStats

var mds []*consumerdata.MetricsData
acc := metricDataAccumulator{
md: mds,
}

acc.getMetricsData(cstats, tm)
require.Less(t, 0, len(acc.md))
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,47 @@ const (
AttributeECSTaskRevesion = "ecs.task-definition-version"
AttributeECSServiceName = "ecs.service"

ContainerMetricsLabelLen = 3
TaskMetricsLabelLen = 6
CPUsInVCpu = 1024
BytesInMiB = 1024 * 1024

TaskPrefix = "ecs.task."
ContainerPrefix = "container."
MetricResourceType = "aoc.ecs"

AttributeMemoryUsage = "memory.usage"
AttributeMemoryMaxUsage = "memory.usage.max"
AttributeMemoryLimit = "memory.usage.limit"
AttributeMemoryReserved = "memory.reserved"
AttributeMemoryUtilized = "memory.utilized"

AttributeCPUTotalUsage = "cpu.usage.total"
AttributeCPUKernelModeUsage = "cpu.usage.kernelmode"
AttributeCPUUserModeUsage = "cpu.usage.usermode"
AttributeCPUSystemUsage = "cpu.usage.system"
AttributeCPUCores = "cpu.cores"
AttributeCPUOnlines = "cpu.onlines"
AttributeCPUReserved = "cpu.reserved"
AttributeCPUUtilized = "cpu.utilized"

AttributeNetworkRateRx = "network.rate.rx"
AttributeNetworkRateTx = "network.rate.tx"

AttributeNetworkRxBytes = "network.io.usage.rx_bytes"
AttributeNetworkRxPackets = "network.io.usage.rx_packets"
AttributeNetworkRxErrors = "network.io.usage.rx_errors"
AttributeNetworkRxDropped = "network.io.usage.rx_dropped"
AttributeNetworkTxBytes = "network.io.usage.tx_bytes"
AttributeNetworkTxPackets = "network.io.usage.tx_packets"
AttributeNetworkTxErrors = "network.io.usage.tx_errors"
AttributeNetworkTxDropped = "network.io.usage.tx_dropped"

AttributeStorageRead = "storage.read_bytes"
AttributeStorageWrite = "storage.write_bytes"

UnitBytes = "Bytes"
UnitMegaBytes = "MB"
UnitNanoSecond = "NS"
UnitBytesPerSec = "Bytes/Sec"
UnitCount = "Count"
UnitVCpu = "vCPU"
)

This file was deleted.

Loading

0 comments on commit ca0feff

Please sign in to comment.