How to insert data into Cloud SQL using Cloud Functions

Setup source data
Download Data from Mockaroo
- Go to https://www.mockaroo.com/
- Change the gender to Binary
- Remove binary address
- Set 1000 Rows, Format JSON, tick array and untick nul values

Cloud Storage Setup
Create a new bucket from the UI e.g. chenmingyong-cloud-functions
NiFi Setup
GenerateFlowFile → paste the content into it → set the frequency to 6000 sec or higher to prevent spam!

PutGCSObject, make sure you put ${uuid} as the filename to make sure it's unique

(Used for latter demo) Drag a SplitJson block

(Used for latter demo) Put a ReplaceText block, this is to add square bracket before and after the split json to make it a valid json e.g. "key":"value" into ["key":"value"]

Finally, test out the PutGCS, check that it manage to write successfully into GCS
Setup Cloud SQL
How to create MySQL Instance
Create a MySQL Instance from the GCP Console , copy down the password somewhere, will need to use it later to connect


How to create MySQL Table

CREATE DATABASE my_company;
USE my_company;
DROP TABLE IF EXISTS `employee`;
CREATE TABLE `employee` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`first_name` varchar(50) COLLATE utf8_unicode_ci NOT NULL,
`last_name` varchar(50) COLLATE utf8_unicode_ci NOT NULL,
`email` varchar(100) COLLATE utf8_unicode_ci NOT NULL,
`gender` varchar(1) COLLATE utf8_unicode_ci NOT NULL,
`updated` timestamp NOT NULL DEFAULT current_timestamp(),
PRIMARY KEY (`id`),
UNIQUE KEY `email` (`email`)
) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
DESCRIBE `employee`
SELECT * FROM employee;
Setup Cloud Functions
Cloud Functions Sample Code
Connecting to Cloud SQL with Cloud Functions
Downloading objects from Google Cloud Storage
requirements.txt
SQLAlchemy==1.3.12
PyMySQL==0.9.3
google-cloud-storage==1.42.2
pandas==1.3.3
main.py
import json
import logging
import os
import pandas as pd
import sqlalchemy
from google.cloud import storage
def convert_gender_col_to_shortform(gender):
if gender == 'Male':
return 'M'
elif gender == 'Female':
return 'F'
def hello_gcs(event, context):
# Variables
drivername = "mysql+pymysql"
username = os.environ.get("username")
password = os.environ.get("password")
database = os.environ.get("database")
project_id = os.environ.get("project_id")
instance_region=os.environ.get("instance_region")
instance_name=os.environ.get("instance_name")
query_string = dict({"unix_socket": f"/cloudsql/{project_id}:{instance_region}:{instance_name}"})
table = os.environ.get("table")
# Download the json file from Google Cloud Storage
logging.info(f"Processing file: {event['name']}.")
storage_client = storage.Client()
bucket = storage_client.bucket(event['bucket'])
blob = bucket.blob(event['name'])
data = json.loads(blob.download_as_string())
# Convert the json into dataframe
# Apply transformation by converting "Female" to "F", "Male" to "M"
df = pd.DataFrame.from_dict(data)
df['gender'] = df['gender'].apply(convert_gender_col_to_shortform)
# Create a sql pool connection
pool = sqlalchemy.create_engine(
sqlalchemy.engine.url.URL(
drivername=drivername,
username=username,
password=password,
database=database,
query=query_string,
),
pool_size=5,
max_overflow=2,
pool_timeout=30,
pool_recycle=1800
)
# Connect to the database and append the rows into the target table
try:
db_connection = pool.connect()
frame = df.to_sql(table, db_connection, if_exists="append", index=False)
db_connection.close()
logging.info("Rows inserted into table successfully...")
except Exception as e:
return 'Error: {}'.format(str(e))
Google Cloud Platform Console


Finally, copy the main.py and requirements.txt above and just click on deploy! 😎
Demo
Clear mysql screen
system clear
Truncate table
TRUNCATE TABLE employee;
Select TOP 10
SELECT *
FROM employee
LIMIT 10;
Find the max min updated time
SELECT min(updated) as min_updated, max(updated) as max_updated from employee;
Delete all the files at the end
gsutil rm -r -m gs://chenmingyong-cloud-functions
This is part of my online course on how to kickstart data engineering workflows in Google Cloud Platform (GCP) for beginners, sign up here to watch detailed video explanation! 🤗