Aggregating Data with PySpark on GCP

Jonathan Duran
6 min readAug 28, 2021
Photo by Nana Smirnova on Unsplash

👋🏼 Hello all!

This article is about gaining familiarity with a few of data engineering’s cornerstone tools. We’ll fetch (manually) an IMBD review dataset from Kaggle, upload it to a Hadoop cluster, aggregate the data using PySpark, and load the results into a Hive table.

This exercise is just as much about learning the data lifecycle as getting comfortable with tooling.

For those who aren’t familiar with the technologies above, here’s a brief description:

Hadoop — A framework that allows us to process (in parallel) and store large datasets across a cluster of machines. The processing framework is called MapReduce, and the storage is referred to as the Hadoop File System (HDFS). Instead of setting up our a cluster from scratch, we will use GCP’s Dataproc — a managed Hadoop service.

Spark — Much like Hadoop, Spark is also a framework for distributed data processing (minus the storage). This processing happens in-memory, much faster than Hadoop’s method of pulling data from where it’s stored to where it's computed. Spark offers many API’s; among those, is the Python interface PySpark.

Hive — A distributed data warehouse analytics system. Hive is built on top of Hadoop and will be used to query the results from our PySpark job.

🚨 Warning!!! GCP is not free and you will pay a couple of cents in order to finish this project (unless you have a credit).

Dataproc

Let’s begin by heading over to the Google Cloud Platform console and creating a new project imbd-agg .

Before creating the cluster, you must enable the Dataproc API (you may have to enable billing first). So hang tight; this step will take a few minutes.

After enabling the Dataproc API, go ahead and create the cluster, leaving most of the default values intact except for the worker machines where we change from 4 vCPUs to 2.

So what in the world did we just create?

Well, Hadoop’s architecture is made up by a network of master nodes and worker nodes. At a very high level, the master node (specifically the NameNode) is responsible for coordinating the HDFS functions performed by the workers. Our cluster is made up of one master and two workers machines.

IMBD Review Dataset

Let’s take a quick break from GCP in order to download our dataset.

Head over to Kaggle.com and download the movie reviews to your local machine. Expect two columns — review and sentiment — and 50K rows in this dataset.

The goal here is to use Spark for a simple aggregation. We will group the data to find counts between positive and negative reviews. The rows are evenly balanced between negative and positive reviews so we expect the counts to be 25K a piece. I want this article to favor the data lifecycle over any specific tool but feel free to look over Spark’s documentation for more complicated transformations.

PySpark Script

""" main.py pyspark job to group and count positive and negative
reviews """
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('sentiment').getOrCreate()reviews = spark \
.read \
.option('escape', '"') \
.csv('/user/jduran9987/data/imbd_dataset.csv', header=True)
sentiment_breakdown = reviews \
.groupBy('sentiment') \
.count()
sentiment_breakdown.write.format('csv').save('sentiment_breakdown')

🚨 FYI All our work will be done within our master node in the Dataproc cluster so it isn’t necessary to come up with a folder structure for this project. When the times comes, the above script will be added into our main node and executed from there.

Starting at the top of the script, we import the SparkSession object. This is the entrypoint to our Spark application and used (in our case) to create a abstracted representation of our data called a dataframe.

There are many available methods when creating a dataframe but for this simple project, we only need to read a csv from our filesystem (We will create this path in HDFS a bit later). We also include an option to escape quotes because some of our reviews contain comma’s which will really mess with how Spark parses the csv file.

A new dataframe, sentiment_breakdown , is created by grouping the reviews dataframe by sentiment and taking a count of the values. Spark offers two main types of operations — transformations and actions. Grouping the data is an example of a transformation while performing a count is an action. Everytime we transform our data — we may create a dataframe that groups the data and use that result to create another dataframe that filters the data — a new immutable dataframe is created. No action is actually taken, instead, Spark creates a logical plan for how to best execute your transformations. It is only when you call an action (like count) that Spark executes your code and produces results.

Finally, the results are written back to our filesystem.

Back to Dataproc

In the cluster details page, navigate to the VM instance tab and click SSH to enter the master machine.

A terminal should open up with a gear menu at the top right corner containing the option to upload a file. Go ahead and upload the main.py pyspark job along with the IMBD reviews file to the instance.

Once our files are in our machine, we can get started by creating a user directory on HDFS by executing the following command.

$ hadoop fs -mkdir /user/<your-gcp-project-username>

In my case the command is /user/jduran9987 however you must use the username displayed on your terminal before the @ symbol.

Perfect! we now create a data directory then use the put command to add the reviews dataset to HDFS.

$ hadoop fs -mkdir /user/<your-gcp-project-username>/data
$ hadoop fs -put imbd_dataset.csv /user/<your-gcp-project-username>/data/

Now that the data is in HDFS, its time to run our PySpark job.

$ spark-submit main.py

You can verify the results with the following command.

$ hadoop fs -ls /user/<your-gcp-project-username>/sentiment_breakdown

Success!

Hive

Time to create a Hive table to query our results. We can access Hive in our terminal by simply typing in hive.

$ hive

You should now be able to see the >hive prompt.

Let’s finally create a database, table, and query the results of our PySpark job with the following commands.

>hive create database if not exists sentiment;
>hive use sentiment;
>hive create table sentiment_breakdown
> (sentiment string, review_count int)
> row format delimited fields terminated by ','
> location '/user/<your-gcp-username>/sentiment_breakdown';
>hive select * from sentiment_breakdown

This shows the tight relationship between Hive and HDFS. Unlike other warehouses, we did not have to copy data into the table.

Wrap Up

  • Make sure you delete your Dataproc cluster to avoid a big bill. The resources for this project cost next to nothing but you don’t want to leave your cluster running.
  • This setup isn’t meant for production. The goal was to gain familiarity with Hadoop, Spark, GCP, and Hive. In the real world, use something like Airflow (which has a Dataproc operator) to provision a cluster, submit a spark job, and delete the cluster on a schedule.
  • If you are an aspiring data engineer and overwhelmed by these tools don’t worry. Try to master these concepts at a very high level and learn the details on the job. I’ve been in the industry for some time now and still had to look up the PySpark group by transformation command 🤷🏼‍♂️. I have no interest in memorizing commands and neither should you.

Thanks for reading!

--

--