Streamlining Real-Time Data Processing with AWS Kinesis, Lambda, and Terraform

DSL
10 min readFeb 5, 2024

--

So I have been preparing for the AWS solution architect exam and I came across this question.

A retail company with many stores and warehouses is implementing IoT sensors to gather monitoring data from devices in each location. The data will be sent to AWS in real time. A solutions architect must provide a solution to ensure events are received in order for each device and also ensure that data is saved for future processing.

So I immediately associated the scenario with AWS Kinesis, Firehose and S3. So first let’s understand what Kinesis and Firehose can do, Amazon Kinesis Data Streams collect and process data in real-time. A Kinesis data stream is a set of shards. Each shard is a uniquely identified sequence of data records in a stream. Each data record has a sequence number that is assigned by Kinesis Data Streams. (see the figure above)

And with AWS Firehose, it can process the streams and then store them to S3 or any other destinations. When I checked the answer this is the correct setup. As always, I think it is important to actually create the project to full understand these technologies.

Project Solution Components

this is diagram is created by CloudThat, it gives a good understand of this workflow
  • IoT Devices: This will be simulated by a nodeJS app sending mock weather data to Kinesis, but you can investigate on how to connect IoT devices to AWS Kinesis from this guide.
  • AWS Kinesis: Facilitates real-time data streaming, allowing for concurrent data handling from thousands of sensors without data loss.
  • AWS Lambda: Processes the streamed data in real-time for tasks like filtering and anomaly detection, leveraging the serverless model for scalability and efficiency.
  • S3: Storage solution
  • Terraform: Manages and provisions the required AWS resources through code, streamlining infrastructure setup and ensuring reproducibility.

I recommend that you follow along so you can learn this better. There are some prerequisites you will need before working on this project.

  • Basic understanding of Terraform and have Terraform configured on your machine
  • Have a AWS account and have configured on your local machine, making you are using us-east-1 region.

Setting up S3 bucket and Kinesis stream

The following code setup a bucket named example-bucket-foriot-data, and notice that versioning is enabled and a Kinesis data stream ExampleDataStream

provider "aws" {
region = "us-east-1"
}

resource "aws_kinesis_stream" "example_stream" {
name = "ExampleDataStream"
shard_count = 1
retention_period = 24
}

resource "aws_s3_bucket" "example_bucket" {
bucket = "example-bucket-for-iot-data"
}

resource "aws_s3_bucket_versioning" "versioning_example" {
bucket = aws_s3_bucket.example_bucket.id
versioning_configuration {
status = "Enabled"
}
}

Next we will need to setup Firehose for delivering stream from Kinesis

resource "aws_kinesis_firehose_delivery_stream" "extended_s3_stream" {
name = "terraform-kinesis-firehose-extended-s3-test-stream"
destination = "extended_s3"
kinesis_source_configuration {
kinesis_stream_arn = aws_kinesis_stream.example_stream.arn
role_arn = aws_iam_role.firehose_delivery_role.arn
}
extended_s3_configuration {
role_arn = aws_iam_role.firehose_delivery_role.arn
bucket_arn = aws_s3_bucket.example_bucket.arn
cloudwatch_logging_options {
enabled = true
log_group_name = aws_cloudwatch_log_group.firehose_log_group.name
log_stream_name = "FirehoseDelivery"
}
processing_configuration {
enabled = "true"

processors {
type = "Lambda"

parameters {
parameter_name = "LambdaArn"
parameter_value = "${aws_lambda_function.lambda_processor.arn}:$LATEST"
}
}
}
}
}

Notice that there are a few important items here, first is the destination, it is set to extended_s3, if you check the documentation you will find there are quite a few destinations, such as elsaticsearch, Mongodb, Splunk etc.

Then, we see the processing_configuration section, it is enabled and processor is set to Lambda. That means we will setup a lambda function to perform data transformation before data is sent to S3 and notice the parameter_value has the $LATEST at the end.

Finally, I want to bring to your attention that we need to set the stream source to Kinesis through the kinesis_source_configuration object. this is extremely important as I forgot to set this at the beginning and ended up the default setting was used.

We intended to use Kinesis data stream to push data through Firehose, but the default setting is using DIRECT_PUT which only allows data to be put to Firehose directly. So all my data streaming to Kinesis did not go through Firehose. Now we will setup the Lambda function next.

Lambda function setup

The following snippet setups up the lambda function

data "archive_file" "lambda_zip" {
type = "zip"
source_file = "${path.module}/lambdas/index.js"
output_path = "${path.module}/lambdas/lambda.zip"
}

resource "aws_lambda_function" "lambda_processor" {
function_name = "firehose_lambda_processor"
handler = "index.handler"
role = aws_iam_role.lambda_iam.arn
runtime = "nodejs16.x"
timeout = 60
filename = data.archive_file.lambda_zip.output_path
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
}

The highlight here is that the timeout is recommended to be set to at last 1 minute, or longer if it is necessary. So that is has enough time to process the stream. And next we will use NodeJS to write a simple transform function to convert the temperature from Farenheit to Celcius.

exports.handler = async (event, context) => {
// Function to convert Celsius to Fahrenheit
const celsiusToFahrenheit = (celsius) => (celsius * 9) / 5 + 32;

// Filter and transform the incoming records
const output = event.records.map((record) => {
// Decode base64 encoded record data
const payload = Buffer.from(record.data, 'base64').toString('ascii');
let data = JSON.parse(payload);

// Check and filter data based on humidity (for example, filter out humidity > 90%)
if (data.humidity <= 90) {
// Convert temperature to Fahrenheit and add processing timestamp
data.temperature = celsiusToFahrenheit(data.temperature);
data.processedAt = new Date().toISOString();

// Re-encode the transformed data to base64
const outputPayload = Buffer.from(JSON.stringify(data)).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: outputPayload,
};
} else {
// Skip this record by marking it as 'Dropped'
return {
recordId: record.recordId,
result: 'Dropped',
data: record.data, // Original data in case it needs to be inspected later
};
}
});

console.log(`Processed ${output.length} records.`);
return { records: output };
};

The highlight of this lambda function code is to use Buffer class to convert the stream data from base64 to ascii, then when the processing steps complete, it will convert it back to base64.

Cloudwatch logs setup

Now we will setup logs for both the Firehose and Lambda, so we can debug if something goes wrong.

resource "aws_cloudwatch_log_group" "lambda_log_group" {
name = "/aws/lambda/firehose_lambda_processor"
retention_in_days = 14
}

resource "aws_cloudwatch_log_group" "firehose_log_group" {
name = "/aws/kinesisfirehose/terraform-kinesis-firehose-extended-s3-test-stream"
retention_in_days = 14
}

if you do not configure the log for Firehose, you will see this warning message on your AWS Firehose console later. (see figure below)

Role and Policies setup

I will not go over every single policy and their details here, as you can visit the project repository or consult this AWS guide. However, there is one important thing about the lambda invoke policy I need to point out.

resource "aws_iam_policy" "firehose_lambda_invocation_policy" {
name = "firehose_lambda_invocation_policy"
description = "Allow Firehose to invoke Lambda function"

policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"lambda:InvokeFunction",
"lambda:GetFunctionConfiguration",
]
Resource = "${aws_lambda_function.lambda_processor.arn}:$LATEST"
},
]
})
}

Notice the $LATEST specified on the resource line above, this matches the Lambda version we setup earlier. If you do not specify the Lambda function version, it will give you the following error. It complains it does not have permission to access the Lambda even though the policy has already been setup.

Another highlight is that beside the InvokeFunction action, you also need to include the GetFunctionConfiguration action

NodeJS app simulate IoT devices

So to complete this project, we need a data source. For demo purpose, I created this nodeJS app that generates random weather data for 4 US cities.

const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis({ region: 'us-east-1' });

const streamName = 'ExampleDataStream';
let intervalInMilliseconds = 10000; // Default interval set to 10 seconds

// Function to generate random weather data
function generateWeatherData() {
const cities = ['Philadephia', 'New York', 'California', 'New Orleans'];
const city = cities[Math.floor(Math.random() * cities.length)];
const temperature = (Math.random() * 30).toFixed(2); // Generates a random temperature between 0 and 30
const humidity = (Math.random() * 100).toFixed(2); // Generates a random humidity percentage

return {
city,
temperature: `${temperature}C`,
humidity: `${humidity}%`,
timestamp: new Date().toISOString(),
};
}

// Function to send data to Kinesis
function sendDataToKinesis() {
const data = generateWeatherData();
const params = {
Data: JSON.stringify(data), // Data must be a string
PartitionKey: data.city, // Using city as the partition key
StreamName: streamName,
};

kinesis.putRecord(params, (err, data) => {
if (err) {
console.error('Error sending data to Kinesis:', err);
} else {
console.log(data)
console.log('Successfully sent data to Kinesis:', params.Data);
}
});
}

// Configurable interval for sending data
function startDataGeneration(interval) {
intervalInMilliseconds = interval || intervalInMilliseconds;
setInterval(sendDataToKinesis, intervalInMilliseconds);
console.log(`Starting data generation, sending data every ${intervalInMilliseconds / 1000} seconds...`);
}

// Start generating weather data
startDataGeneration(5000); // Configure the interval as needed, e.g., 5000 milliseconds for 5 seconds

I think it is important to mention the Patition Key here, in Kinesis, A partition key is used to group data by shard within a stream. Kinesis Data Streams segregates the data records belonging to a stream into multiple shards. It uses the partition key that is associated with each data record to determine which shard a given data record belongs to.

So here it uses the cities as unique partition keys, so that the stream data will be stored in shards according to these keys.

Finally, the app uses the AWS-SDK to setup the Kinesis client and it uses the put record method to push data to Kinesis stream.

Deploy project and test run

Now it is time to deploy the project with terraform and then run the nodeJS app to test if it is working.

To deploy the project, we will first run

Terraform init

Then we will run

terraform plan

If everything runs successfully, then we can run the final command.

terraform apply -auto-approve

when everything runs successfully, we can log into AWS to check if everything looks good. If your deployment is successful, you will see these followings.

Kinesis console with ExampleDataStream data stream
Kinesis Firehose console with terraform-kinesis-firehose-extended-s3-test-stream delivery stream setup

You will also see a S3 bucket named example-bucket-for-iot-data, the 2024 directory is where all the streams will be stored. The process_failed directory stores failed lambda processing streams. With this, we can access the streams and translate them back to askii for downstream business logics.

example-bucket-for-iot-data, note: 2024 is where the data is stored

and finally a lambda function firehose_lambda_processor, notice although Kinesis Firehose is configured to trigger Lambda, you do not see any trigger from the Lambda console, that is because Firehose will trigger the Lambda internally, but you will see the Cloudwatch logs later when stream is processed by this Lambda function.

Now let’s run the NodeJS app and produce some IoT weather data. Before you run the NodeJS app you need to install its dependency. Note: make sure you run the following commands inside the weatherServer folder.

npm install

when it is done you will run

node weatherServer.js

This will produce weather data in the interval of every second. Notice, I printed out the ShardId and the SequenceNumber of every data message, the ShardId can be used to trace the data within the stream on the Kinesis console. Now let’s examine the metrics on both Kinesis and Firehose consoles, so we can confirm the data is streaming successfully.

weather data being put to Kinesis

It is important to notice that steam has some delay so it will take a little bit time before the stream data is populated on cloudwatch. If you do not see any data coming after a long time, it is very likely something is wrong with your setup or permissions in the policies need to be modified. To make sure that all the permission is setup correctly, I recommand that you clone down the project from my github repository for all the details.

Kinesis monitor metrics
Firehose monitor metrics

Now, here is a trick I want to share with you. You have to max out the window of any individual metric you want to see, and select the 1 minute interval so that the specific data point will show, otherwise, you may likely miss them if you see it in the minimized hours chart due to the UI glitches.

Conclusion and Key Learnings

In this project, we built a real-time data processing system using AWS Kinesis, Lambda, and Terraform, simulating IoT device data collection and processing. Here are the simplified conclusions and key learnings:

Simplified System Overview:

  • IoT Device Simulation: Used Node.js to generate and send weather data, mimicking IoT devices.
  • Data Streaming with AWS Kinesis: Managed real-time data streams efficiently.
  • Processing with AWS Lambda: Leveraged serverless functions for data processing tasks.
  • Storage in AWS S3: Safely stored processed data for future use.
  • Infrastructure Management with Terraform: Automated resource provisioning.

Key Learnings:

  • Partition Keys Importance: Learned their role in managing data streams effectively in Kinesis.
  • IAM Roles and Policies: Understood the critical setup for secure data flow between AWS services.
  • Serverless Processing Power: Experienced the benefits of using AWS Lambda for scalable, cost-effective data processing.
  • Terraform’s Efficiency: Saw firsthand how Terraform streamlines infrastructure setup and management.

If you like this article, please give me a clap and follow me on Linkedin or Twitter.

--

--

DSL
DSL

Written by DSL

Sr software engineer. Love in Go, JavaScript, Python, and serverless AWS. Follow me for tech insights and experiences. follow me on twitter @terraformia

No responses yet