Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GitHub-issue#1994 : Implementation Of Cloudwatch metrics source plugin c… #3128

Conversation

venkataraopasyavula
Copy link
Contributor

Description

Implementation of Cloudwatch metrics source plugin configuration

Issues Resolved

GitHub-issue #1994

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…onfiguration.

Signed-off-by: venkataraopasyavula <venkataraopasyavula@gmail.com>
venkataraopasyavula and others added 5 commits August 10, 2023 20:06
…n configuration.

Signed-off-by: venkataraopasyavula <venkataraopasyavula@gmail.com>
…n configuration Junit test cases and source coordinator.

Signed-off-by: venkataraopasyavula <venkataraopasyavula@gmail.com>
.thenReturn(bufferAccumulator);
cloudwatchMetricsSource.start(buffer);
}
assertThat(buffer.isEmpty(),equalTo(true));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the test is asserting empty buffer, that means nothing is being pushed to the buffer, right? Please send N number of records to the source and assert that buffer has N records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review comment Krishna.
Basically this test case is used to test with empty buffer. So we have renamed the method name to avoid the confusion. Testing the buffer with the records test cases is present in CloudwatchMetricsWorkerTest.java file

…n configuration Junit test cases and source coordinator.

Signed-off-by: venkataraopasyavula <venkataraopasyavula@gmail.com>
try {
Thread.sleep(waitTimeMillis);
} catch (InterruptedException e) {
e.printStackTrace();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use e.printStackTrace(). You can use the following instead.

LOG.error("Thread interrupted", e);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed.

throw new IllegalStateException("Buffer provided is null");
}

BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, cloudwatchMetricsSourceConfig.getBatchSize(), BUFFER_TIMEOUT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a BufferAccumulator for this source? This was originally added for the S3 source to avoid lots of small calls to write() on the Buffer which is synchronized.

For this source, each call to GetMetricData can perform a single writeAll on the buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed.

private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonProperty("batch_size")
private Integer batchSize = 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the BufferAccumulator. I think we can remove this configuration option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed.

@JsonProperty("namespaces")
@NotNull
@Valid
private List<NamespacesListConfig> namespacesListConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify this YAML.

namespaces:
- name: "AWS/S3"
   start_time:
   end_time:
- name: "AWS/EC2"
   start_time:
   end_time:

Make this line: private List<NamespaceConfig> namespaceConfigs;`

Then delete the NamespacesListConfig class altogether.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed.

private String endTime;
@JsonProperty("metricDataQueries")
@NotNull
private List<MetricDataQueriesConfig> metricDataQueriesConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my other comment about YAML simplification. We can have:

metricDataQueries:
  - name:                   
     id: "q1"
     period: 
  - name: 
     id: "q2"
     period: 
     stat: "Average"

To resolve:

  1. Change this line to: private List<MetricsConfig> metricsConfigs;
  2. Change the getter and all corresponding usages.
  3. Delete MetricDataQuerieisConfig entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed.


private static final Logger LOG = LoggerFactory.getLogger(CloudwatchMetricsSource.class);

private final Collection<Dimension> dimensionCollection;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a field. It should be a local variable where you use it. Otherwise, this is not thread-safe and may not work with all metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed.

this.sourceCoordinator = mock(SourceCoordinator.class);
this.cloudWatchClient = mock(CloudWatchClient.class);
this.dimensionCollection = new ArrayList<>();
this.dimensionCollection.add(Dimension.builder().name("StorageType").value("StandardStorage").build());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are trying to force dimensionCollection to be a field to help with testing. This may be an indication that you should split some of the logic into another class. That other class could be easier to test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have changed some implementation of CloudWatchMetricWorker constructor so no need of separate logic.


for (DimensionsListConfig dimensionsListConfig : metricDataQueriesConfig.getMetricsConfig().getDimensionsListConfigs()) {

dimensionCollection.add(Dimension.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could build this object on start-up and then keep it in a map for future re-use. But, you'd need to not modify it.

private Map<String, Collection<Dimension>> metricNameToDimensions;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed.

}
});
try {
bufferAccumulator.flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are flushing the buffer accumulator here. As I noted above, you don't even need to BufferAccumulator.

Create a List<Record<Event>> metricEvents.

Then call buffer.writeAll(metricEvents).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed.

final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) {
metricsData.forEach(message -> {
final Record<Event> eventRecord = new Record<Event>(JacksonEvent.fromMessage(message.toString()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are creating a string from the metrics. This is not very useful for pipeline authors. You need to convert the CloudWatch metrics into a org.opensearch.dataprepper.model.metric.Metric instance. This is an Event type already, so you can wrap it in a Record and save it.

This is a very important aspect of the source and it won't be very useful with it.

Also, we'd need unit tests for this. So, you should do all that translation in a new class that is dedicated to the translation of a CloudWatch metric to a Data Prepper Metric class.

Copy link
Contributor

@ashoktelukuntla ashoktelukuntla Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable - Cloudwatch metrics are in format https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_GetMetricData.html i.e. sample reponse
{ "MetricDataResults": [ { "Id": "m1", "StatusCode": "Complete", "Label": "CPUUtilization, peak of 31.5 was at 1-22 13:05", "Timestamps": [ 1518868032, 1518867732, 1518867432 ], "Values": [ 15000, 14000, 16000 ] }, }
which does not fit in to 'org.opensearch.dataprepper.model.metric.Metric' instance object structure. Should Event object be enhanced or a new Object to be created for CloudWatch in Metric ?

dlvenable
dlvenable previously approved these changes Jan 16, 2024
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm approving this as code that we can pull in to help us get started with an implementation. It still requires corrections to the actual model.

kkondaka
kkondaka previously approved these changes Jan 16, 2024
Copy link
Collaborator

@kkondaka kkondaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving to merge the code for now.

Signed-off-by: David Venable <dlv@amazon.com>
@dlvenable dlvenable dismissed stale reviews from kkondaka and themself via 24b3669 January 16, 2024 15:55
@dlvenable
Copy link
Member

I resolved the merge conflicts by accepting the settings.gradle exactly as it is in main. Thus, this project is not going into the Data Prepper build. Merging it in will bring in the code for future development.

@dlvenable dlvenable merged commit 224518f into opensearch-project:main Jan 16, 2024
42 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants