Ingesting Clickstream Data with Python, Kinesis, and Terraform

Jonathan Duran
18 min readJan 28, 2022
Photo by mauro mora on Unsplash

Introduction and motivation.

My goal is to write a few articles, each focusing on a popular data source found at any company, and then tie them all together for a big picture view of a fully-fledged data platform. This article would be the first in such a series. The focus, for now, will be on clickstream data, while future writings will focus on other familiar sources such as OLTP databases, third-party APIs, and possibly web server logs.

Prerequisites.

  • Python3 installed. You should be familiar with programming.
  • Terraform installed. You can follow along with my code if you’ve never used Terraform.
  • AWS Account with credentials (valid access and secret key). It will help if you have the AWS CLI configured on your local machines with a credentials file in the ~/.aws/ directory.

Project requirements.

To keep things as “real-world” as possible, let’s create a common scenario one would find on a data team.

We are data engineers at a fictional tech company. Users visit our company’s app to perform various activities and consequently generate data points.

Our task is to capture this data and make it easily available to other members within the company for further analysis. Furthermore, we should also think about monitoring, scalability, extensibility, and reproducibility as we build out a solution.

Let’s reframe the task in a more modular way — we need to ingest real-time clickstream data into a data lake for downstream consumption by different stakeholders (analytics engineers, data analysts, data scientists, etc.).

After restating the requirement, it’s clear that this task can be broken down into three ordered subtasks.

  1. Ingestion
  2. Storage
  3. Consumption

What is clickstream data?

Imagine owning a website or a web app. Users find their way to your site and perform various actions. Visiting a page, clicking a button, submitting information, and purchasing a product are examples of such activities. But these user data points aren’t helpful unless you create a process to log them.

So how do we capture this stream of untapped data? Nowadays, you can use many services to collect and store this data. These services include Google Analytics, Segment, Amplitude, and MixPanel. Using Segment as an example, your website is added as a data source, and Segment then provides the required JavaScript code to start tracking user events and sending them to Segment.

I hope that the need to capture this rich data is evident to everyone. For example, analyzing a time-ordered list of events for every user (which we call a user journey) allows you to understand user motivation and behavior. This in turn helps make better-informed decisions that optimize for user experience and conversion.

Back to the perspective of the data engineer

🚨 Real World Alert 🚨

As a data engineer, typically, you wouldn’t see much action in setting up the tracking and tagging of clickstream data. Instead, this process would be done by a combination of members from the front end, product management, and analytics teams. However, in some smaller companies, data engineering is a role within the analytics team (instead of rolling up to engineering) and may have some input in the event-tagging process.

How do we get our hands on clickstream data?

If you’ve been following my previous articles, you’d know that I am a fan of generating my own data. This article will be no different.

This project assumes that we already have a service, like Segment, set up, and with tagged user events. Data exports from Segment are semi-structured and nested. Here is an example of an event from Segment’s documentation.

This particular payload represents an Identify call, which matches a known user to their traits. This call can be generated when users register, log in or update their information. Notice how the trait field itself is a semi-structured object making the event nested. Segment also offers other calls such as track (what is the user doing?), page (which page is the user visiting?), and more.

We will model our generated data based on this Identify call. However, let’s not concern ourselves with the actual contents of the payload. Instead, it is more important to imitate the payload structure by making our exports semi-structured and nested.

Previewing an event.

Take a look at an example of an event that we’ll later generate.

In this payload, we have a known iOS user having an ID equal to 90 who visited the www.fakesite.com/books/ path and clicked on a review. The properties field defines another JSON object.

The choice of structure for this example event is driven by the following requirements.

  • The payload should be semi-structured.
  • At least one of our fields should be nested.
  • To simulate actual clickstream data, our script must produce data continuously at some predefined rate.
  • We need to pass arguments that let us 1) control the time between generated events and 2) specify the total number of events we want to generate.

Creating the script.

Let’s kick off this project by creating a directory for your project called clickstream-kinesis .

Create a python file named clickstream_generator.py and paste the following code.

Let’s cover each library in detail.

  • JSON — The events generated by this script will be constructed as Python dictionaries. The JSON library has a dumps method that allows converting a python object to the equivalent JSON object.
  • Random — This library will add variance to the time between generated events. Additionally, values for event description, user Id, page, and operating system will be randomly chosen for each generated event.
  • Datetime —The datetime library will be used to capture the timestamp at which events are created.
  • Sys — This library contains a method that allows for the handling of arguments. As mentioned in the requirements section, you will need to supply a time gap and record count to the script.
  • Time — To make this script unpredictable, this library will allow a pause between creating events. The length of this pause will be randomized with the help of the random module.
  • Hashlib — Every event needs a unique Id. A quick way to accomplish this goal is to hash the current timestamp. How do we know that timestamps between events are unique? Event timestamps are recorded down to the millisecond, while the pause between events is one second at a minimum.

🚨 Real World Alert 🚨

In stream processing, each event typically contains three timestamps. This level of complexity isn’t needed for this project still lets take a closer look at these timestamps.

1. The event-time — the timestamp at which an event is created

2. The ingestion-time — the timestamp at which an event is captured by the streaming process

3. The wall-clock — the timestamp at which an event is processed by the stream processor.

As for the rest of the script, each field in the event has a function that determines its value. A python dictionary is instantiated and populated iteratively and then converted to a JSON during each iteration to construct the event.

Let’s give the script a test run. To run the script for 10 records with a max delay between events of 2 seconds, use the command python3 clickstream_generator.py 10 2.

The next step is setting up kinesis to receive these event messages.

Infrastructure

Below is an illustration of the entire infrastructure that will support the clickstream data pipeline.

Let’s return to our fictional scenario and recall our project requirements. Ingestion was the first subtask to deal with. We choose Kinesis Firehose because the goal is to simply move data from source to target without any transformation. Firehose is an obvious choice for such a simple task because it doesn’t add any technical overhead. There isn’t anything to manage. All resources (network, compute, memory, etc.) needed to load data are handled by AWS. This provides the scalability we desired.

Additionally, we can assume that our users do not need data in real-time. The analytics and data science teams may have a few dashboards and analyses that need to be refreshed in near real-time, but the latency introduced by Firehose is more than reasonable given their needs (Firehose can send data to S3 within 60 seconds of receiving it).

Turning our focus from ingestion to storage, we choose to send the events to a data lake (a collection of S3 buckets). We want to avoid creating data systems that are limited to a few use-cases. Instead, sending the event data to a lake allows our colleagues to self-serve without any limitations (keeping in mind that for security reasons, not all data will make it through to our stakeholders). The data lake is made available for anyone to use, in the way they need to use it with minimal intervention from the data engineering team.

Finally, keeping in mind that data is messy and ever-changing, we need monitoring. Errors can and will occur. It is up to us to get ahead of these errors by setting up proper logging. CloudWatch is an easy choice as it can receive any error logs from Kinesis Firehose.

Terraform — Infrastructure as Code

By far, the coolest thing I’ve ever done is build and destroy entire infrastructures on AWS with a few terminal commands. So, if you are a data engineer that works with the cloud, please take some time to learn Terraform.

You won’t need any prior experience with Terraform. All of the code I use to build our pipeline will be explained to the point that you should be able to follow along, but I will not get into the details of Terraform in this article.

To make this work, you will need an active AWS account and valid access and secret keys for your IAM user. You will also need to have Terraform installed.

🚨 Real World Alert 🚨

Data engineering roles differ vastly across companies. Not all data engineers focus on infrastructure. It isn’t uncommon for the data platform decisions to be made by members of the back end team. For this reason, you probably shouldn’t focus too much on Terraform. Instead, having a high-level grasp of the cloud and focusing more on the details of data processing and transformation might be a better use of your time.

Project Structure

Within your project directory, create another directory and call it infrastructure/. In this directory, create five empty terraform configuration files - provider.tf, cloudwatch.tf, iam.tf, kinesis.tf, and s3.tf.

The project directory tree should look like the following.

|- clickstream-kinesis/
|- clickstream_generator.py
|- infrastructure/
|- provider.tf
|- s3.tf
|- kinesis.tf
|- iam.tf
|- cloudwatch.tf

Let’s take a closer look at each configuration file.

provider.tf

We will assume that our fictional company uses AWS for its cloud needs.

This configuration is used to specify AWS as the cloud provider. We also need to pass in the region and credentials associated with our IAM user. This action lets Terraform know which set of resources to make available for our project.

I hope that passing your credentials into this file as a plain text string causes some level of concern. With that being said, this configuration is fine as long as the code never leaves your local machine.

There are a few alternatives for passing in your AWS credentials to Terraform. I will include links to some useful Terraform documentation that you should follow if you plan on making your code public (e.g. adding it to a Github repository).

** If using a credentials file to authenticate, please follow these steps to configure your AWS CLI.

s3.tf

For this article, we will limit our data lake to just two S3 buckets.

Notice the two types of configuration blocks within the script — data and resource.

In this context, we use the data block to get information specific to our account. Since bucket names are required to be unique across AWS accounts, we call the aws_caller_identity data block to retrieve our account ID which we add to the names of the two buckets. The two aws_s3_bucket resource blocks create private S3 buckets, one meant to store raw data and the other for transformed data.

Take note of how we refer to our account ID within the bucket name parameter. The aws_caller_identity data block contains a number of attributes available to us. We use the DATA.<DATA_RESOURCE_NAME>.<ATTRIBUTE> syntax to access the account_id attribute.

kinesis.tf

There are quite a few parameters to take care of with Kinesis, however, most aren’t essential to the minimum requirements of this project — streaming data into a data lake. In any event, I will cover the important bits and you should feel comfortable copying and pasting the rest (and also look further into these advanced parameters on your own time).

Before breaking down the above code, we should cover how resources can reference other resources in Terraform.

Recall the two aws_s3_bucket resources that we named datalake_raw and datalake_staging in the s3.tf configuration (Notice the distinction between resource name and bucket name). We can access a number of properties specific to created resources using the following syntax <RESOURCE>.<RESOURCE_NAME>.<PROPERTY> . For example, if we wanted to access the bucket name of the raw bucket, <ACCOUNT_ID>-datalake-raw , then the correct syntax to use would be aws_s3_bucket.datalake_raw.bucket . This value could also be used within a string by encapsulating it in a template literal "${aws_s3_bucket.datalake_raw.bucket}" .

Shifting focus back to the Kinesis configuration. The first few parameters name our firehose stream and specify a destination. The S3 configuration block tells Kinesis where to send data, which permissions to use, and how to handle error logs.

* There are a few references to CloudWatch resources that will be created in the next configuration file.

Two important parameters to note are buffer_size and buffer_interval . Kinesis Firehose holds on to incoming streams of data until one of these two parameters is satisfied. Given our values, S3 would receive data when the size of our streams reaches 64MB, or after 60 seconds, whichever comes first.

Additionally, Kinesis offers dynamic partitioning on the data sent to S3. This means that we control how data is distributed across S3 by choosing a key/field from our event to group data. In this configuration, the user id is chosen as the partition key, meaning that a user with ID 45 will have their own “file” path in S3 — s3://datalake_raw/clickstream/userId=45/... . With partitioning in place, we avoid scanning a large number of files to retrieve data for user ID 45, instead, all the data needed for this user lives in a single object. This concept not only can be extended beyond a single user but also to additional partition keys.

Finally, the processing configuration allows us to do some light transformation on the stream. The MetaDataExtraction helps with parsing the user ID for partitioning, while RecordDeAggregation distinguishes between incoming records.

The AppendDelimiterRecord is much more important to understand compared to the prior two parameters. For some reason, events added to the Kinesis stream are joined as one continuous line of data. I don’t think I have to explain why querying one continuous line of data is a problem. This is where the AppendDelimiterRecord processor comes in to help. It adds a newline character between event payloads, making the distinction between events a lot clearer.

cloudwatch.tf

To capture error logs, we create a log group and log stream in Cloudwatch.

Errors can occur if for some reason events are sent to Kinesis without the required partitioning key. You can simulate this by commenting out the user ID field within the clickstream_generator.py script once everything is set up.

iam.tf

Kinesis will be interacting with S3 and Cloudwatch. For S3, Kinesis needs to be able to list the required buckets, as well as load objects to those buckets. For Cloudwatch, Kinesis needs access to add events to the log group and stream created in the previous step.

To accomplish this, we create a role that Kinesis can assume. Let’s name this datalake_role .

For S3, we need policies that give access to the data lake bucket and objects to this role.

** Referencing the S3 bucket resources within the aws_iam_policy resource tells Terraform that it needs to create the S3 buckets before creating this policy. This is always the case when you reference resources. Super smart Terraform!

For Cloudwatch, we create the policy to add events to the log group and stream.

Make sure to add your AWS region to the resource parameter within the policy definition (line 80). The format should look like arn:aws:logs:<REGION>:<ACCOUNT_ID>:log-group:${aws_cloudwatch_log_group.clickstream_firehose.name}:* .

Finally, we attach all the policies to the datalake_role and use this role within the Kinesis configuration.

Congratulations! you now have the power to create an entire AWS infrastructure with a few commands. But be patient, we have to make one last change to our script before moving forward with Terraform.

Adding Boto3 to the script.

Let’s go back to our script and add a few lines of code to communicate with Kinesis.

Begin with importing the boto3 library.

This library needs to be installed, which can be done with a simple pip3 install boto3 .

Now, make a few changes to the loop that generates the events.

Instantiate the boto3.Session() object and pass in the required parameters for authentication. Again, this code is fine for local work but consider using environment variables or shared credentials for any public projects. From the session object, we create a client to access our Kinesis firehose resource. Before sending data to Kinesis, it must first be encoded, which can easily be done with the encode method available in JSON dumps. Finally, we create a response object that sends the data to the clickstream delivery stream and returns an HTTPStatusCode .

Save the file and head back over to the directory containing your Terraform files.

Terraform commands.

Terraform Init

The first step in any Terraform project is running the terraform init command within the directory containing your configuration files. If you’ve been following along with the previous steps, this directory is clickstream-kinesis/infrastructure/ .

You can run this command multiple times without worrying about breaking anything. A few new files are created after initializing Terraform. The only one you need to worry about is terraform.tfstate which stores metadata about the resources in your Terraform infrastructure. This is how Terraform knows which resources to manage.

** The Terraform state file should not be made public. Consider adding the state file to your .gitignore or storing it in S3 as described here. There may be sensitive information stored as plain text in this file that should not be exposed publicly.

Terraform Plan

Using the terraform plan outputs the changes Terraform plans to make to your infrastructure.

Inspect this output and you will see a plan to create all the resources we’ve worked on throughout this article.

Terraform Apply

Run terraform apply to execute the proposed plan. You will be prompted to confirm the creation of resources. Enter “yes” and wait a few minutes while Terraform sets everything up.

Navigate to the AWS console to check on the resources.

Both S3 buckets were created and named as expected.

The Kinesis stream correctly points to the raw data lake destination.

Generating Events.

Now that the infrastructure is ready, it’s time to test out the pipeline. Let’s generate 100 events with a max of 1 second between events.

Take note of the 200 HTTP response code in the terminal output indicating that the records were successfully delivered to Kinesis.

Let’s jump back into the console and take a look at the delivery stream metrics in Kinesis.

Check out the activity by hovering over the blue data points. The partition count shows you how the 100 events were distributed. In my case, 65 partitions were created, meaning that my 100 records were made up of 65 unique users and the S3 objects were created accordingly.

Moving over to S3, we can confirm that objects were partitioned on user ID.

Once again, partitioning the data this way will avoid scanning a large number of files if any of our downstream users need to query data for a specific (or subset) user ID.

Lastly, download one of these objects to verify that each record is separated by a newline.

As expected, each line is a JSON object that represents a distinct record. This format is easily parsed by Spark, Pandas, or a modern data warehouse like Snowflake.

Great! our streaming pipeline is up and running. We can generate as many events from our script and populate our data lake in near real-time as required.

Generating erroneous events.

I mentioned earlier that if for some reason, events were missing the user ID field used for partitioning, a data delivery error would occur and those events would never make it to our data lake. To address this, we set up CloudWatch logging to capture these errors. Let’s simulate an erroneous event and check out how our system handles it.

Head back to the clickstream_generator.py script and comment out the user_id attribute in the event object.

Rerun the script but this time creating a single event (or more if you wish). You should still see a 200 response because the delivery to Kinesis will work just fine, however, the error occurs once the data is sent to S3.

Go back to the AWS console and head over to CloudWatch. Search for the aws/kinesisfirehose/clickstream log group and click on the ClickStreamLogStream log stream you created via Terraform. After about 5 minutes, an error should be logged and visible in CloudWatch.

The error is as expected — “Partition key’s value should not be null or empty”.

We can check S3 to confirm that the error logging configuration we included in Terraform is working.

Perfect! another object — clickstream-errors/... — was created within our raw data lake to capture these erroneous records.

Terraform ... DESTROY!

This is my favorite part of the project.

Before we nuke our entire infrastructure with a single command, we have to empty our buckets. S3 has this thing where it produces an error when trying to delete a non-empty bucket. Go to the <ACCOUNT_ID>-datalake-raw bucket and delete the clickstream/ and clickstream-errors/ object. Leave everything else alone and head back to your terminal.

Navigate to the infrastructure/ directory and enter terraform destroy . Enter “yes” when asked for confirmation.

In the AWS console, you should not be able to find a trace of the pipeline. No IAM roles or policies, no S3 buckets, no error logs, and no Kinesis.

Wrap Up

We separated our requirements into three distinct pieces — ingestion, storage, and consumption. This provides a framework that extends beyond this clickstream pipeline. Engineers who focus on these areas now have a pattern to follow for every additional project that needs to be incorporated into the data platform. The more we standardize, the more we scale.

Despite the length of this article, there is still plenty left to be done.

If you’ve followed along, I suggest you save your code. In a future article, we will make the following additions.

  • Use AWS Lake Formation to implement access controls on the raw and staging buckets. In the real world, the raw bucket would contain sensitive data that would not be accessible to everyone (e.g. SSN, bank information, etc.)
  • Transform our raw events and send the output to the staging bucket. We can accomplish this using Spark via AWS Glue or EMR.
  • Set up Kinesis Data Streams (different from Kinesis Firehose) for use-cases that require real-time data.
  • Extend our data lake to receive data from different sources such as OLTP and third-party APIs.

As a reminder, the goal of these next few articles is to build an entire data platform. As such, they will be longer-than-usual reads but, hopefully, rich in content.

Check out the Github repository for this project. Keep in mind that this repo will evolve over time as I add more sources and pipelines.

If anyone has any issues with this project, my guess is that it has to do with AWS authentication and I’d be happy to help out. The best way to reach out to me is through Twitter.

Thanks for reading and lookout for the next article.

--

--