4 min read

How to insert data into Cloud SQL using Cloud Functions

How to setup dummy data from Mockaroo, setup Google Cloud SQL instance and table. Write and deploy cloud functions using Python.
How to insert data into Cloud SQL using Cloud Functions
How to insert data into Cloud SQL using Cloud Functions

Setup source data

Download Data from Mockaroo

  1. Go to https://www.mockaroo.com/
  2. Change the gender to Binary
  3. Remove binary address
  4. Set 1000 Rows, Format JSON, tick array and untick nul values
How go generate dummy data using Mocakaroo
How go generate dummy data using Mocakaroo

Cloud Storage Setup

Create a new bucket from the UI e.g. chenmingyong-cloud-functions


NiFi Setup

GenerateFlowFile → paste the content into it → set the frequency to 6000 sec or higher to prevent spam!

GenerateFlowFile settings
GenerateFlowFile settings

PutGCSObject, make sure you put ${uuid} as the filename to make sure it's unique

PutGCSObject settings
PutGCSObject settings

(Used for latter demo) Drag a SplitJson block

SplitJson settings
SplitJson settings

(Used for latter demo) Put a ReplaceText block, this is to add square bracket before and after the split json to make it a valid json e.g. "key":"value" into ["key":"value"]

ReplaceText settings
ReplaceText settings

Finally, test out the PutGCS, check that it manage to write successfully into GCS


Setup Cloud SQL

How to create MySQL Instance

Create a MySQL Instance from the GCP Console , copy down the password somewhere, will need to use it later to connect

How to create MySQL in Google Cloud SQL
How to create MySQL in Google Cloud SQL
How to create MySQL in Google Cloud SQL
How to create MySQL in Google Cloud SQL

How to create MySQL Table

Sample dummy json
Sample dummy json
CREATE DATABASE my_company;
USE my_company;

DROP TABLE IF EXISTS `employee`;

CREATE TABLE `employee` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `first_name` varchar(50) COLLATE utf8_unicode_ci NOT NULL,
  `last_name` varchar(50) COLLATE utf8_unicode_ci NOT NULL,
  `email` varchar(100) COLLATE utf8_unicode_ci NOT NULL,
  `gender` varchar(1) COLLATE utf8_unicode_ci NOT NULL,
  `updated` timestamp NOT NULL DEFAULT current_timestamp(),
  PRIMARY KEY (`id`),
  UNIQUE KEY `email` (`email`)
) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

DESCRIBE `employee`

SELECT * FROM employee;

Setup Cloud Functions

Cloud Functions Sample Code

Connecting to Cloud SQL with Cloud Functions

Downloading objects from Google Cloud Storage


requirements.txt

SQLAlchemy==1.3.12      
PyMySQL==0.9.3
google-cloud-storage==1.42.2
pandas==1.3.3

main.py

import json
import logging
import os
import pandas as pd
import sqlalchemy
from google.cloud import storage


def convert_gender_col_to_shortform(gender):
    if gender == 'Male':
        return 'M'
    elif gender == 'Female':
        return 'F'


def hello_gcs(event, context):
    # Variables
    drivername = "mysql+pymysql"
    username = os.environ.get("username")
    password = os.environ.get("password")
    database = os.environ.get("database")
    project_id = os.environ.get("project_id")
    instance_region=os.environ.get("instance_region")
    instance_name=os.environ.get("instance_name")
    query_string = dict({"unix_socket": f"/cloudsql/{project_id}:{instance_region}:{instance_name}"})
    table = os.environ.get("table")

    # Download the json file from Google Cloud Storage
    logging.info(f"Processing file: {event['name']}.")
    storage_client = storage.Client()
    bucket = storage_client.bucket(event['bucket'])
    blob = bucket.blob(event['name'])
    data = json.loads(blob.download_as_string())

    # Convert the json into dataframe
    # Apply transformation by converting "Female" to "F", "Male" to "M"
    df = pd.DataFrame.from_dict(data)
    df['gender'] = df['gender'].apply(convert_gender_col_to_shortform)

    # Create a sql pool connection
    pool = sqlalchemy.create_engine(
        sqlalchemy.engine.url.URL(
            drivername=drivername,
            username=username,
            password=password,
            database=database,
            query=query_string,
        ),
        pool_size=5,
        max_overflow=2,
        pool_timeout=30,
        pool_recycle=1800
    )

    # Connect to the database and append the rows into the target table
    try:
        db_connection = pool.connect()
        frame = df.to_sql(table, db_connection, if_exists="append", index=False)
        db_connection.close()
        logging.info("Rows inserted into table successfully...")
    except Exception as e:
        return 'Error: {}'.format(str(e))

Google Cloud Platform Console

Create Cloud Functions using Console Part 1
Create Cloud Functions using Console Part 1
Create Cloud Functions using Console Part 2
Create Cloud Functions using Console Part 2

Finally, copy the main.py and requirements.txt above and just click on deploy! 😎


Demo

Clear mysql screen

system clear

Truncate table

TRUNCATE TABLE employee;

Select TOP 10

SELECT *
FROM employee
LIMIT 10;

Find the max min updated time

SELECT min(updated) as min_updated, max(updated) as max_updated from employee;

Delete all the files at the end

gsutil rm -r -m gs://chenmingyong-cloud-functions

This is part of my online course on how to kickstart data engineering workflows in Google Cloud Platform (GCP) for beginners, sign up here to watch detailed video explanation! 🤗