CloudWatch Metric Streams to Amazon Opensearch(formerly Amazon Elasticsearch)
Using Kinesis Firehose + Lambda Transformation
By Ajinkya Shinde in AWS Opensearch Kinesis Firehose Lambda CloudWatch Metric Streams
February 5, 2022
Summary
In this article, we will stream CW Metrics to Opensearch using below workflow: CW Metric Stream -> Firehose -> Lambda Transformation -> Opensearch
Why we need Lambda Transformation ?
The reason why we need Lambda transformation is because CW Metric Stream will forward the requests to the Firehose as a payload with each record’s separated with “\n”(newline characters). Once the Firehose receives payload consisting of the multi-records as base64 encoded , it then generates an OpenSearch Service bulk request to index these multiple records to your OpenSearch Service cluster. Thus, if we look at individual payload received to Firehose decoded using base64 format, the payload looks like:
{Record1 JSON}\n{Record2 JSON}\n{Record-(N-1) JSON}\n{Record-N JSON}\n
Now, Firehose will generate a bulk request
POST _bulk
{Record1 JSON}\n
{Record2 JSON}\n
...
{Record-(N-1) JSON}\n
{Record-N JSON}\n
As these request needs to be in format as per below:
POST _bulk
{ "index" : { "_index" : "<index>"} }
{ Record-1 JSON }
{ "index" : { "_index" : "<index>"} }
{ Record-2 JSON }
...
{ "index" : { "_index" : "<index>"} }
{ Record-N JSON }
And since the request is not in a correct format, we will see below errors
{
"errorCode": "ES.MalformedData"
"message": "One or more records are malformed. Please ensure that each record is single valid JSON object and that it does not contain newlines.",
"deliveryStreamVersionId": 22,
"destination": "arn:aws:es:us-east-1:012345678910:domain/fgac-es",
"deliveryStreamARN": "arn:aws:firehose:us-east-1:012345678910:deliverystream/stream-metrics-to-es",
}
followed by
{
"deliveryStreamARN": "arn:aws:firehose:us-east-1:012345678910:deliverystream/stream-metrics-to-es",
"destination": "arn:aws:es:us-east-1:0123456788910:domain/fgac-es",
"deliveryStreamVersionId": 22,
"message": "Error received from Elasticsearch cluster. {\"error\":{\"root_cause\":[{\"type\":\"illegal_argument_exception\",\"reason\":\"Malformed action/metadata line [3], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]\"}],\"type\":\"illegal_argument_exception\",\"reason\":\"Malformed action/metadata line [3], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]\"},\"status\":400}",
"errorCode": "ES.ServiceException"
}
Setting up Lambda transformation
We use the lambda-blueprint “kinesis-firehose-process-record-python” which reads the records, decodes them and encodes them back to UTF-8 before sending it to Opensearch. To this function, we need to add logic that adds metadata to separate the multi-record data. The resultant code for adding the metadata for multi-records received from CW Stream is
action_metadata = '{ "index" : { "_index" : "<index-name>" } }'
new_payload = ""
for each in payload:
if new_payload != "":
new_payload+= action_metadata + "\n" + each + "\n"
elif new_payload == "":
new_payload += each + "\n"
Final Setup
Step 1. Create the lambda function with lambda-blueprint “kinesis-firehose-process-record-python” . Replace the code with the attached code lambda-code.py. Ensure to replace the line
Step 2. Create Firehose with steps as mentioned at this link[1] with below considerations:
For source, select “Direct PUT” and destination “Amazon OpenSearch Service”.
For “Transform records”, select Data Transformation to “Enabled”. Select the lambda function from Step 1. and the appropriate version.Please select the version which has the modified code.
For step “Destination settings”, select opensearch as per this link[2]. Please note that the entry in the “Index” should match with the lambda code
Step 3. Setup CW Metric Stream by either following this link[3] or below steps: a. Create the CW Metric stream by going to Services > CloudWatch> Under Metrics > Select “Streams”. You can either select “All metrics” for streaming all namespaces or “Selected namestpaces” for sending only some namespaces. For instance, to send EC2 metrics, we can select “AWS/EC2”.
b. Under Configuration, select “Select an existing Firehose owned by your account”. Select the firehose stream.
c. For select exisiting service role, refers to service role which CW Service requires to put records into Firehose
d. Under “Add additional statistics”, select only certain metrics and their corresponding statistics. For instance, to send only CPU metrics from EC2, select “CPUUtilization” under “AWS\EC2”. Select the required statistics for that metric.
Step 4. Finally to see if the records are delivered from CW Metric Stream, check CloudWatch metrics MetricUpdate, TotalMetricUpdate or PublishErrorRate. You
can check this under “Dashboard” section of the selected CW Metric Stream or under Cloudwatch metrics in the AWS/CloudWatch/MetricStreams namespace.
To see if the delivery has failed from Firehose to Lambda Function and/or Opensearch, select the “Destination error logs” tab and see the errors. Alternatively, check this link[4] for troubleshooting and handling data delivery failure from Firehose to Opensearch.
References:
[1] Creating an Amazon Kinesis Data Firehose Delivery Stream - https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html
[2] Destination Settings - Choose OpenSearch Service for Your Destination - https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-elasticsearch
[3] Setting up a metric stream to an AWS service (data lake scenario) - CloudWatch console - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-setup-datalake.html#CloudWatch-metric-streams-setup-datalake-console
[4] Amazon Kinesis Data Firehose Data Delivery - Data Delivery Failure Handling - https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry
- Posted on:
- February 5, 2022
- Length:
- 4 minute read, 775 words
- Categories:
- AWS Opensearch Kinesis Firehose Lambda CloudWatch Metric Streams
- See Also: