How to stream logs in AWS from CloudWatch to ElasticSearch

You can download this article in PDF format via the link below to support us.Download the guide in PDF formatClose

This article aims to guide readers through the process of streaming logs from the CloudWatch log group to the AWS Elasticsearch cluster. It will guide readers to create lambda strategies and roles. Then, it will focus on how to create an elastic search subscription filter. Finally, it will show users how to change the lambda function to allow multiple log groups to be streamed to the cluster.

prerequisites

Before performing the steps in this article, the reader should have:

  • An AWS account.
  • Created a user authorized to create resources on the AWS account.
  • An Elasticsearch cluster is created on an AWS account, and the cluster can be accessed through VPC or Internet terminal nodes.

Create Lambda execution role

We will use lambda functions to stream logs to Elasticsearch. On the AWS IAM console, click Policies. Then select Create Strategy.

Establish policy

Open a window. Then select the JSON tab.

How to stream logs in AWS from CloudWatch to ElasticSearchCreate JSON policy

On the JSON tag, paste the following command.It’s also important to make sure you Replace resources, And your Elasticsearch cluster arn.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "es:*"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:es:eu-west-1:****************:domain/test-es-cluster/*"
        }
    ]
}

Click the View Policy tab, and then fill in the name and description of the policy. Then click Create Policy. After finishing, you can still return to the role on the IAM console and click “Create Role”.

How to stream logs in AWS from CloudWatch to ElasticSearchCreating a Role

Select the service role. In addition, select lambda as the use case.

How to stream logs in AWS from CloudWatch to ElasticSearchSelect services and use cases

Then click Permissions, and then select the policy you created earlier on the policy.

How to stream logs in AWS from CloudWatch to ElasticSearchAttach policy to role

Click the label, and add the role label you want. Then click View and enter the name and description of the role. Then, click Create Role. Now you are ready for the lambda role.

Edit the trust relationship of the Lambda role

On the AWS IAM console, select the lambda role we created above. Then select the “Trust Relationship” tab, and click “Edit Trust Relationship”.

How to stream logs in AWS from CloudWatch to ElasticSearchEdit the trust relationship of the Lambda role

In the window that opens, delete everything in the policy document, and then paste the following code. Then click Update Trust Policy.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Your lambda role can now stream logs to Elasticsearch Kibana.

Create an Elasticsearch subscription for your log group

On the CloudWatch console, select the log group. Select the log group you want to create an Elasticsearch subscription. In the log group window, select Action, and then select Create Elasticsearch Subscription Filter from the drop-down menu.

How to stream logs in AWS from CloudWatch to ElasticSearchCreate ElasticSearch subscription filter

In the window that opens, select the account that created the ES cluster. In our case, it is in the same account as the CloudWatch log group. We select “this account” and then the Amazon ES cluster to which we want to stream the log. Then select the Lambda execution role you created earlier from the drop-down list under “Lambda Function”.

How to stream logs in AWS from CloudWatch to ElasticSearchSelect Elasticsearch cluster and Lambda execution role

Scroll down and you will be asked to configure the log format and filters. as follows.

How to stream logs in AWS from CloudWatch to ElasticSearchConfigure log format and filters

Choose your preferred log. In addition, you can test and see its appearance. If satisfied, click to start streaming. You should now see the logs as an index on Elasticsearch Kibana.

Modify the Lambda function to stream logs from multiple log groups

To stream logs from multiple CloudWatch log groups to the Elasticsearch cluster, we must modify the code of the original Lambda function created above. Replace the Lambda function code with the following code. The only code you need to change is the var endpoint (line 5 of the code snippet). Make sure to replace it with the Elasticsearch cluster endpoint. When finished, click Save.

// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');

var endpoint="search-test-es-cluster-**************************.eu-west-1.es.amazonaws.com";

// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;

exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer.from(input.awslogs.data, 'base64');

    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
        if (error) { context.fail(error); return; }

        // parse the input from JSON
        var awslogsData = JSON.parse(buffer.toString('utf8'));

        // transform the input to Elasticsearch documents
        var elasticsearchBulkData = transform(awslogsData);

        // skip control messages
        if (!elasticsearchBulkData) {
            console.log('Received a control message');
            context.succeed('Control message handled successfully');
            return;
        }

        // post documents to the Amazon Elasticsearch Service
        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
            console.log('Response: ' + JSON.stringify({
                "statusCode": statusCode
            }));

            if (error) {
                logFailure(error, failedItems);
                context.fail(JSON.stringify(error));
            } else {
                console.log('Success: ' + JSON.stringify(success));
                context.succeed('Success');
            }
        });
    });
};

function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);

        // index name format: cwl-YYYY.MM.DD
        //var indexName = [
            //'cwl-' + timestamp.getUTCFullYear(),              // year
            //('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            //('0' + timestamp.getUTCDate()).slice(-2)          // day
        //].join('.');
        // index name format: cwl-YYYY.MM.DD
        //var appName =payload.logGroup.toLowerCase();
        //var indexName="";
        var indexName = [
            'cwl-' + payload.logGroup.toLowerCase().split('/').join('-') + '-' + timestamp.getUTCFullYear(), // log group + year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');
        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;

        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;

        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('n') + 'n';
    });
    return bulkRequestBody;
}

function buildSource(message, extractedFields) {
    if (extractedFields) {
        var source = {};

        for (var key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                var value = extractedFields[key];

                if (isNumeric(value)) {
                    source[key] = 1 * value;
                    continue;
                }

                jsonSubString = extractJson(value);
                if (jsonSubString !== null) {
                    source['$' + key] = JSON.parse(jsonSubString);
                }

                source[key] = value;
            }
        }
        return source;
    }

    jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString);
    }

    return {};
}

function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
}

function isValidJson(message) {
    try {
        JSON.parse(message);
    } catch (e) { return false; }
    return true;
}

function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
}

function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);

    var request = https.request(requestParams, function(response) {
        var responseBody = '';
        response.on('data', function(chunk) {
            responseBody += chunk;
        });

        response.on('end', function() {
            var info = JSON.parse(responseBody);
            var failedItems;
            var success;
            var error;

            if (response.statusCode >= 200 && response.statusCode < 299) {
                failedItems = info.items.filter(function(x) {
                    return x.index.status >= 300;
                });

                success = {
                    "attemptedItems": info.items.length,
                    "successfulItems": info.items.length - failedItems.length,
                    "failedItems": failedItems.length
                };
            }

            if (response.statusCode !== 200 || info.errors === true) {
                // prevents logging of failed entries, but allows logging
                // of other errors such as access restrictions
                delete info.items;
                error = {
                    statusCode: response.statusCode,
                    responseBody: info
                };
            }

            callback(error, success, response.statusCode, failedItems);
        });
    }).on('error', function(e) {
        callback(e);
    });
    request.end(requestParams.body);
}

function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^.]+).?([^.]*).?([^.]*).amazonaws.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:-]|.d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');

    var request = {
        host: endpoint,
        method: 'POST',
        path: '/_bulk',
        body: body,
        headers: {
            'Content-Type': 'application/json',
            'Host': endpoint,
            'Content-Length': Buffer.byteLength(body),
            'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
            'X-Amz-Date': datetime
        }
    };

    var canonicalHeaders = Object.keys(request.headers)
        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
        .join('n');

    var signedHeaders = Object.keys(request.headers)
        .map(function(k) { return k.toLowerCase(); })
        .sort()
        .join(';');

    var canonicalString = [
        request.method,
        request.path, '',
        canonicalHeaders, '',
        signedHeaders,
        hash(request.body, 'hex'),
    ].join('n');

    var credentialString = [ date, region, service, 'aws4_request' ].join('/');

    var stringToSign = [
        'AWS4-HMAC-SHA256',
        datetime,
        credentialString,
        hash(canonicalString, 'hex')
    ] .join('n');

    request.headers.Authorization = [
        'AWS4-HMAC-SHA256 Credential=" + process.env.AWS_ACCESS_KEY_ID + "/' + credentialString,
        'SignedHeaders=" + signedHeaders,
        "Signature=" + hmac(kSigning, stringToSign, "hex')
    ].join(', ');

    return request;
}

function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}

function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}

function logFailure(error, failedItems) {
    if (logFailedResponses) {
        console.log('Error: ' + JSON.stringify(error, null, 2));

        if (failedItems && failedItems.length > 0) {
            console.log("Failed Items: " +
                JSON.stringify(failedItems, null, 2));
        }
    }
}

You can now stream multiple log streams to your Elasticsearch cluster Kibana.

related articles:

  • Set up Elasticsearch Cluster with Kibana on AWS
  • Implement new features in Git without affecting the master branch
  • Configure AWS VPC Flow logs to CloudWatch Log group

AWS learning courses:


AWS Certified Solution Architect-Assistant 2020

AWS Certified Solution Architect-Assistant 2020

★★★★★ (190636) $14.15 $153.32 in stock

Buy now

How to stream logs in AWS from CloudWatch to ElasticSearchUdemy.com


Final AWS Certified Solution Architect 2020 Assistant

Final AWS Certified Solution Architect 2020 Assistant

★★★★★ (45225) $16.51 $176.91 in stock

Buy now

How to stream logs in AWS from CloudWatch to ElasticSearchUdemy.com


AWS Certified Developer-Assistant 2020

AWS Certified Developer-Assistant 2020

★★★★☆ (37770) $14.15 $153.32 in stock

Buy now

How to stream logs in AWS from CloudWatch to ElasticSearchUdemy.com


The final AWS Certified Developer Assistant 2020-New!

The final AWS Certified Developer Assistant 2020-New!

★★★★★ (26864) $20.05 $200.50 in stock

Buy now

How to stream logs in AWS from CloudWatch to ElasticSearchUdemy.com You can support us by downloading the article in PDF format from the link below.Download the guide in PDF formatClose

Sidebar