-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GitHub-issue#1994 : Implementation Of Cloudwatch metrics source plugin c… #3128
GitHub-issue#1994 : Implementation Of Cloudwatch metrics source plugin c… #3128
Conversation
…onfiguration. Signed-off-by: venkataraopasyavula <venkataraopasyavula@gmail.com>
…n configuration. Signed-off-by: venkataraopasyavula <venkataraopasyavula@gmail.com>
…n configuration Junit test cases and source coordinator. Signed-off-by: venkataraopasyavula <venkataraopasyavula@gmail.com>
.thenReturn(bufferAccumulator); | ||
cloudwatchMetricsSource.start(buffer); | ||
} | ||
assertThat(buffer.isEmpty(),equalTo(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the test is asserting empty buffer, that means nothing is being pushed to the buffer, right? Please send N number of records to the source and assert that buffer has N records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review comment Krishna.
Basically this test case is used to test with empty buffer. So we have renamed the method name to avoid the confusion. Testing the buffer with the records test cases is present in CloudwatchMetricsWorkerTest.java file
…n configuration Junit test cases and source coordinator. Signed-off-by: venkataraopasyavula <venkataraopasyavula@gmail.com>
try { | ||
Thread.sleep(waitTimeMillis); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use e.printStackTrace()
. You can use the following instead.
LOG.error("Thread interrupted", e);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
throw new IllegalStateException("Buffer provided is null"); | ||
} | ||
|
||
BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, cloudwatchMetricsSourceConfig.getBatchSize(), BUFFER_TIMEOUT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use a BufferAccumulator
for this source? This was originally added for the S3 source to avoid lots of small calls to write()
on the Buffer which is synchronized.
For this source, each call to GetMetricData
can perform a single writeAll
on the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
private AwsAuthenticationOptions awsAuthenticationOptions; | ||
|
||
@JsonProperty("batch_size") | ||
private Integer batchSize = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the BufferAccumulator. I think we can remove this configuration option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
@JsonProperty("namespaces") | ||
@NotNull | ||
@Valid | ||
private List<NamespacesListConfig> namespacesListConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify this YAML.
namespaces:
- name: "AWS/S3"
start_time:
end_time:
- name: "AWS/EC2"
start_time:
end_time:
Make this line: private List<NamespaceConfig>
namespaceConfigs;`
Then delete the NamespacesListConfig
class altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
private String endTime; | ||
@JsonProperty("metricDataQueries") | ||
@NotNull | ||
private List<MetricDataQueriesConfig> metricDataQueriesConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my other comment about YAML simplification. We can have:
metricDataQueries:
- name:
id: "q1"
period:
- name:
id: "q2"
period:
stat: "Average"
To resolve:
- Change this line to:
private List<MetricsConfig> metricsConfigs;
- Change the getter and all corresponding usages.
- Delete
MetricDataQuerieisConfig
entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
|
||
private static final Logger LOG = LoggerFactory.getLogger(CloudwatchMetricsSource.class); | ||
|
||
private final Collection<Dimension> dimensionCollection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be a field. It should be a local variable where you use it. Otherwise, this is not thread-safe and may not work with all metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
this.sourceCoordinator = mock(SourceCoordinator.class); | ||
this.cloudWatchClient = mock(CloudWatchClient.class); | ||
this.dimensionCollection = new ArrayList<>(); | ||
this.dimensionCollection.add(Dimension.builder().name("StorageType").value("StandardStorage").build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are trying to force dimensionCollection
to be a field to help with testing. This may be an indication that you should split some of the logic into another class. That other class could be easier to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have changed some implementation of CloudWatchMetricWorker constructor so no need of separate logic.
|
||
for (DimensionsListConfig dimensionsListConfig : metricDataQueriesConfig.getMetricsConfig().getDimensionsListConfigs()) { | ||
|
||
dimensionCollection.add(Dimension.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could build this object on start-up and then keep it in a map for future re-use. But, you'd need to not modify it.
private Map<String, Collection<Dimension>> metricNameToDimensions;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
} | ||
}); | ||
try { | ||
bufferAccumulator.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are flushing the buffer accumulator here. As I noted above, you don't even need to BufferAccumulator.
Create a List<Record<Event>> metricEvents
.
Then call buffer.writeAll(metricEvents)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
final BufferAccumulator<Record<Event>> bufferAccumulator, | ||
final AcknowledgementSet acknowledgementSet) { | ||
metricsData.forEach(message -> { | ||
final Record<Event> eventRecord = new Record<Event>(JacksonEvent.fromMessage(message.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are creating a string from the metrics. This is not very useful for pipeline authors. You need to convert the CloudWatch metrics into a org.opensearch.dataprepper.model.metric.Metric
instance. This is an Event
type already, so you can wrap it in a Record
and save it.
This is a very important aspect of the source and it won't be very useful with it.
Also, we'd need unit tests for this. So, you should do all that translation in a new class that is dedicated to the translation of a CloudWatch metric to a Data Prepper Metric
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable - Cloudwatch metrics are in format https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_GetMetricData.html i.e. sample reponse
{ "MetricDataResults": [ { "Id": "m1", "StatusCode": "Complete", "Label": "CPUUtilization, peak of 31.5 was at 1-22 13:05", "Timestamps": [ 1518868032, 1518867732, 1518867432 ], "Values": [ 15000, 14000, 16000 ] }, }
which does not fit in to 'org.opensearch.dataprepper.model.metric.Metric' instance object structure. Should Event object be enhanced or a new Object to be created for CloudWatch in Metric ?
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm approving this as code that we can pull in to help us get started with an implementation. It still requires corrections to the actual model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving to merge the code for now.
Signed-off-by: David Venable <dlv@amazon.com>
I resolved the merge conflicts by accepting the |
Description
Implementation of Cloudwatch metrics source plugin configuration
Issues Resolved
GitHub-issue #1994
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.