10 min read

Create Dataproc Cluster to Run PySpark using Cloud Functions

Setup cloud functions to create temporary Dataproc cluster to execute PySpark script. Insert into Bigtable using Cloud Functions.
Create Dataproc Cluster to Run PySpark using Cloud Functions
Create Dataproc Cluster to Run PySpark using Cloud Functions

Setup Source Data

Download text file from Project Gutenberg

Download any text file that you like from https://www.gutenberg.org/browse/scores/top

Download the plain text format (e.g. https://www.gutenberg.org/files/11/11-0.txt) → right click → Save link as

How to download text file from Project Gutenberg
How to download text file from Project Gutenberg

Setup Cloud Storage bucket

How to create Cloud Storage bucket
How to create Cloud Storage bucket

Setup Cloud Functions to create temporary Dataproc cluster and execute PySpark script

Cloud Storage Setup

  1. Create a new bucket in Google Cloud Storage to store the script (e.g. chenmingyong-cloud-dataproc-script)
  2. Create a new bucket in Google Cloud Storage to store the output (e.g. chenmingyong-cloud-dataproc-output)
  3. Upload the below main.py into bucket created in step 1, take down the filepath, you will need it later
import sys

from pyspark.sql import SparkSession


def lower_clean_str(x):
    punc = '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~“”’‘—'
    lowercased_str = x.lower()
    for ch in punc:
        lowercased_str = lowercased_str.replace(ch, '')
    return lowercased_str

if len(sys.argv) != 3:
    raise Exception("Please provide 2 inputs: <gcs_input_file> <gcs_output_directory>")
input_file = sys.argv[1]
output_dir = sys.argv[2]

print(f"Getting source data from {input_file}")
sc = SparkSession.builder.appName('wordcount').getOrCreate()
text_file = sc.sparkContext.textFile(input_file)  # Read the text file
text_file_remove_punct_lower_case = text_file.map(
    lower_clean_str)  # Lowercase all the capital letter and remove punctuations
lines = text_file_remove_punct_lower_case.flatMap(lambda line: line.split())  # Split the word by empty space
pairs = lines.map(lambda s: (s, 1))  # Count occurrence of each word (e.g. 'hello':1)
counts = pairs.reduceByKey(
    lambda accumulation, current: accumulation + current)  # Sum all the occurrence of each unique word
df = counts.toDF(["word", "count"])  # Convert rdd to dataframe
# Remove _SUCCESS file by setting the option to False
print(f"Writing to {output_dir}")
df.coalesce(1).write.mode("overwrite").option("header",True).option("mapreduce.fileoutputcommitter.marksuccessfuljobs", False).csv(
    output_dir)

Pub/Sub Topic Setup

Create a new Pub/Sub topic to store the file path of the output file (e.g. dataproc-demo-file-write-completed)

Cloud Functions Setup

Go to cloud function UI and perform below action

Cloud Functions Setup Part 1
Cloud Functions Setup Part 1
Cloud Functions Setup Part 2
Cloud Functions Setup Part 2

main_python_file_uri : gs://chenmingyong-cloud-dataproc-script/main.py (adjust based on your bucket name)

output_dir : gs://chenmingyong-cloud-dataproc-output/output (adjust based on your bucket name)

topic_id : dataproc-demo-file-write-completed

bucket_name : chenmingyong-cloud-dataproc-output ← this is the output GCS bucket of the file

main.py, make sure you pick Python 3.7, anything above python 3.7 should work but I will stick with python 3.7 for now, make sure you change the function entry point as well

Select Python Runtime and Update entry point of Cloud Functions
Select Python Runtime and Update entry point of Cloud Functions 
# Local development
# sudo apt update
# sudo apt install python3 python3-dev python3-venv
# sudo apt install wget
# wget https://bootstrap.pypa.io/get-pip.py
# sudo python3 get-pip.py
# python3 -m venv env
# source env/bin/activate
# pip install --upgrade pip
# pip install --upgrade setuptools
# pip install -r requirements.txt
# You will need to download the authorised key from your project
# export GOOGLE_APPLICATION_CREDENTIALS="/home/chenming_share/credentials.json"


from google.cloud import dataproc_v1 as dataproc
from google.cloud import pubsub_v1
from google.cloud import storage
import os
import logging
import json


def instantiate_inline_workflow_template(event, context):
    # Initialise clients
    storage_client = storage.Client()
    publisher = pubsub_v1.PublisherClient()

    # Get variables
    project_id = os.environ.get("GCP_PROJECT")  # e.g. udemy-xxxxx
    region = os.environ.get("FUNCTION_REGION")  # e.g. us-central1
    main_python_file_uri = os.environ.get(
        "main_python_file_uri")  # e.g. gs://chenmingyong-cloud-dataproc-script/main.py
    input_file = f"gs://{event['bucket']}/{event['name']}"  # e.g. gs://chenmingyong-cloud-dataproc-input/input.txt
    output_dir = os.environ.get("output_dir")  # e.g. gs://chenmingyong-cloud-dataproc-output/output
    topic_id = os.environ.get("topic_id")  # e.g. dataproc-demo-file-write-completed
    bucket_name = os.environ.get("bucket_name")  # e.g. chenmingyong-cloud-dataproc-output

    # Setup target bucket and topic
    bucket = storage_client.bucket(bucket_name)
    topic_path = publisher.topic_path(project_id, topic_id)

    # Create a client with the endpoint set to the desired region.
    workflow_template_client = dataproc.WorkflowTemplateServiceClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )
    parent = f"projects/{project_id}/regions/{region}"

    template = {
        "jobs": [
            {
                "pyspark_job": {
                    "main_python_file_uri": main_python_file_uri,
                    "args": [input_file, output_dir],
                },
                "step_id": "wordcount",
            },
        ],
        "placement": {
            "managed_cluster": {
                "cluster_name": "test-cluster",
                "config": {
                    "gce_cluster_config": {"zone_uri": ""},
                    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
                    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
                    "software_config": {"image_version": "2.0-debian10"}
                },
            }
        },
    }

    logging.info(
        f"Creating temporary dataproc cluster to run pyspark job on {input_file} and extract result to {output_dir}")

    operation = workflow_template_client.instantiate_inline_workflow_template(
        request={"parent": parent, "template": template}
    )
    operation.result()
    logging.info("Workflow ran successfully.")

    # Get the name of the newly created file from Google Cloud Storage
    all_blobs = list(storage_client.list_blobs(bucket, prefix="output/part"))
    file_name = all_blobs[0].name  # There is only 1 output file, so we can choose the first element in the list
    message = f"Result is saved to gs://{bucket_name}/{file_name}"
    logging.info(message)

    # Publish the relevant context to Pub/Sub
    data = {"message": message, "bucket_name": bucket_name, "file_name": file_name}
    data = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data)
    logging.info(future.result())
    logging.info(f"Published messages to {topic_path}")

requirements.txt

google-cloud-dataproc==3.0.0
google-cloud-pubsub==2.8.0
google-cloud-storage==1.42.2

Setup Bigtable

Resources

Quickstart using HBase shell

HBase Shell Commands Cheat Sheet

Setup Bigtable instance

Create the instance, just choose 1 node should do since the $$ is quite high 😖 Make sure you shut it down once you are done with the demo! Name is as bigtable-demo

How to setup Bigtable using Google Cloud Platform console
How to setup Bigtable using Google Cloud Platform console

Run following command by opening Cloud Shell, read the official guide for more details

sudo apt-get update
sudo apt-get install openjdk-8-jdk-headless
export JAVA_HOME=$(update-alternatives --list java | tail -1 | sed -E 's/\/bin\/java//')

git clone https://github.com/GoogleCloudPlatform/cloud-bigtable-examples.git

cd cloud-bigtable-examples/quickstart

./quickstart.sh

HBase Commands

# Do a list to check if there are any tables, it should be empty
list 

# Create a wordcount table with cf as the column family
create 'wordcount-table', 'cf'

# Write a sample row into the table with rowkey:chenmingyong, column qualifer:count under column family:cf
put 'wordcount-table', 'chenmingyong', 'cf:count', '9999'

# Scan the newly created table 
scan 'wordcount-table'

# Get based on rowkey
get 'wordcount-table','chenmingyong'
# Get based on rowkey + column family:column qualifier
get 'wordcount-table','chenmingyong', {COLUMN => 'cf:count'}

# Drop the table, don't do it first!!! We need it for our demo
disable 'wordcount-table'
drop 'wordcount-table'

# Truncate table
truncate 'wordcount-table'

To exit → click Ctrl+D

Delete Bigtable instance

Remember to delete the instance if you are not using, since you cant stop it at the moment!

How to delete Bigtable instance using Google Cloud Platform console
How to delete Bigtable instance using Google Cloud Platform console

Setup Cloud Functions to Insert into Bigtable

Go to Cloud Function UI and perform following action

Cloud Functions setup Part 1
Cloud Functions setup Part 1
Cloud Functions setup part 2
Cloud Functions setup part 2

instance_name : bigtable-demo

table_name : word-count-table

main.py

Select Python 3.7 runtime and entry point of Cloud Functions
Select Python 3.7 runtime and entry point of Cloud Functions
from google.cloud import bigtable
from google.cloud import storage
import os
import csv
import logging
import base64
import json


def insert_big_table(event, context):
    project_id = os.environ.get("GCP_PROJECT")  # auto populate
    instance_name = os.environ.get("instance_name")  # e.g. bigtable-demo
    table_name = os.environ.get("table_name")  # e.g. wordcount-table

    pubsub_message = base64.b64decode(event['data']).decode('utf-8')
    message = json.loads(pubsub_message)

    storage_client = storage.Client()
    bucket = storage_client.bucket(message['bucket_name'])

    blob = bucket.blob(message['file_name'])
    logging.info(f"Processing file: gs://{message['bucket_name']}/{message['file_name']}")
    blob.download_to_filename("/tmp/input.csv")

    client = bigtable.Client(project=project_id, admin=True)
    instance = client.instance(instance_name)
    table = instance.table(table_name)
    table_rows = []

    with open("/tmp/input.csv") as input_file:
        input_csv = csv.DictReader(input_file, delimiter=',')
        for row in input_csv:
            row = dict(row)
            # Adding utf8 encode just in case it has certain special character that we do not handle in pyspark
            row_key = row['word'].encode('utf-8')
            table_row = table.row(row_key)
            table_row.set_cell('cf', 'count', row['count'])
            table_rows.append(table_row)
    logging.info(f"Inserting rows into Bigtable instance: {instance_name}, table: {table_name}")
    table.mutate_rows(table_rows) # Mutates multiple rows in bulk

requirements.txt

google-cloud-bigtable==2.4.0
google-cloud-storage==1.42.2

Demo

Just drag and drop any file you downloaded from Project Gutenberg in step 1 into your Google Cloud Storage input bucket. It should take about 3-5 minutes to finish the whole end to end process.

Drag and drop file into Cloud Storage
Drag and drop file into Cloud Storage

You can check that cluster is being provisioned to run the pyspark job from Dataproc UI (it might be gone if you wait too long! Since it will be deleted after the job completes)

Provisioning of Dataproc cluster
Provisioning of Dataproc cluster

You can check that jobs is being ran, took about 45 sec to complete the job, ignore other rows as I was doing some testing 😂

Jobs status in Dataproc cluster
Jobs status in Dataproc cluster

(Optional) You can also check the logs of both cloud functions

Logging of Cloud Functions for dataproc cluster creation and pyspark script execution
Logging of Cloud Functions for dataproc cluster creation and pyspark script execution
Logging of Cloud Functions logging to insert into Bigtable
Logging of Cloud Functions logging to insert into Bigtable

You can go to do the output folder to see how the file looks like as well

Output of PySpark script
Output of PySpark script

Finally login to Hbase following the previous step.

Wait for 3-5 minutes before performing below check! ☕️

Sample HBase Command
Sample HBase Command
scan 'wordcount-table'

get 'wordcount-table', 'the'

get 'wordcount-table','you'

get 'wordcount-table','he'

get 'wordcount-table', 'she'

get 'wordcount-table', 'alice'

Overall performance

Overall performance based on execution time
Overall performance based on execution time
Although for batch solution, latency is not a dealbreaker, but the above demo proves how powerful cloud data engineering is! We manage to spawn a dataproc cluster, run a pyspark job, tear down the dataproc cluster, and do the subsequent processing within (09:06:25-09:02:00) ~ 5 minutes! The whole process is automated and you can define variables without hardcoding anything into the script. The performance is fairly consistent and we do not have to maintain any long-running infrastructure despite using Spark in our solution 😎 This approach will work pretty well for existing Spark batch workflow which run once/twice a day. Of course if we talk about Spark streaming, we would need to maintain the Spark infrastructure definitely, Google would suggests us to use Dataflow instead if the migration is not a pain😂

Disclaimer/Warning

The components used in demo serves as a tutorial to teach different components at 1 time but it's not the best solution for a lot of use cases due to following reason, please modify based on your need.

  • Cloud Functions have a 9 minutes max runtime (if the spawning of cluster + your spark job) running more than 9 minutes, you will start seeing timeout in the script. Theoretically, you can fire and forget without waiting for it to complete by doing this. It means you will not care whether the spark cluster being spawned successfully or the spark job is running as expected, it will bring down your cloud function duration to a few seconds.
  • The next question is why we bother to wait then? That is due to how spark writing works, I need to wait for the file to finish writing before triggering the next cloud function. When spark is doing a write, different executors will write into different files into Google Cloud Storage, if we choose GCS trigger instead of PubSub trigger, you will start seeing a lot of "false" triggers due to the process where spark is writing the temp files. Since my script is doing a coalesce, it will first write into part-1, part-2, temp or etc, then finally it will be merged into 1 single file, I have also remove _SUCCESS to make the solution a bit cleaner since we do not need the _SUCCESS file. In order for me to receive only 1 SINGLE trigger when the file is done writing, i have to wait for the pyspark job to complete then only send a Pub/Sub indicating everything has completed (pyspark job done, part-files merged into 1 single file). If your downstream is not doing what I show in the demo, you can just do the step suggested in a, but you will need to think how to trigger the next job if there are subsequent jobs. Perhaps airflow/cloud composer would be a good choice but I am not covering it in this course at the moment.

This is part of my online course on how to kickstart data engineering workflows in Google Cloud Platform (GCP) for beginners, sign up here to watch detailed video explanation! 🤗