4 min read

How to stream Google Pub/Sub into BigQuery using Dataflow

Setup Google Pub/Sub as source data and setup BigQuery table as sink. Define and deploy dataflow pipeline using Python. End-to-end data pipeline.
How to stream Google Pub/Sub into BigQuery using Dataflow
How to stream Google Pub/Sub into BigQuery using Dataflow

Setup Source Data

Setup Google Pub/Sub Topic

Make sure you create a pubsub topic in GCP, i called it dataflow-demo, you can use any topic name you want. Take down the subscription name (e.g. projects/udemy-325708/subscriptions/dataflow-demo-sub)

How to Create a Pub/Sub topic using Google Cloud Platform console
How to Create a Pub/Sub topic using Google Cloud Platform console

GenerateFlowFile

This is to generate 10 trigger every seconds to simulate streaming case 😎

GenerateFlowFile Setup in NiFi
GenerateFlowFile Setup in NiFi

RetryFlowFile

This is to introduce a 10s delay to every batch of file, just take it like a sleep function in python. The purpose is to show some time difference between entry date and current datetime. You can just tick the failure relationship since we know that it's not going to fail!

RetryFlowFile setup in NiFi Part 1
RetryFlowFile setup in NiFi Part 1
RetryFlowFile setup in NiFi Part 2
RetryFlowFile setup in NiFi Part 2

ReplaceText

Generate dummy json based on nifi built in variables. The whole purpose of this is to generate some sort of "sensor data" or real time messages in a production environment. Wanted to use some real API but the setup was too troublesome for all of you to follow along and the API might get updated in the near future so this method is much more reliable and easy to replicate for tutorial purpose.

Detailed explanation of each key in mock json
Detailed explanation of each key in mock json
ReplaceText setup in NiFi
ReplaceText setup in NiFi
{
  "filename": "${uuid}",
  "file_size_byte": ${fileSize},
  "entry_date_epoch": ${entryDate},
  "current_datetime": "${now():format('yyyy:MM:dd HH/mm/ss')}"
}

PublishGCPubSub

PublishGCPubSub setup in NiFi
PublishGCPubSub setup in NiFi

Overall Architecture

Overall architecture of processors setup in NiFi
Overall architecture of processors setup in NiFi

Setup BigQuery

Go to BigQuery UI → Click on your project → Create dataset

Create dataset using BigQuery UI
Create dataset using BigQuery UI

Then open the newly created test dataset

How to open newly created test BigQuery dataset
How to open newly created test BigQuery dataset

Click on Create table

How to create table in BIgQuery dataset
How to create table in BIgQuery dataset

Input following, and yay you are done! 😍

BigQuery table schema
BigQuery table schema

Deploy Dataflow

Prerequisite

You need to create a gcs bucket first for dataflow deployment

Define Pipeline

import logging
import json
import time
import traceback
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.runners import DataflowRunner

def correct_datetime_format(data):
    import datetime
    temp = datetime.datetime.strptime(data['current_datetime'], "%Y:%m:%d %H/%M/%S") # Convert string object to datetime object 
    data['current_datetime_new'] = temp.strftime("%Y-%m-%d %H:%M:%S") # Convert datetime object back to string object
    return data

def convert_epoch_to_datetime(data):
    import datetime
    # Divide epoch time by 1000 to change it to seconds instead of milliseconds
    # Convert epoch time into datetime format 
    temp=datetime.datetime.fromtimestamp(data['entry_date_epoch']//1000) 
    data['entry_datetime'] = temp.strftime("%Y-%m-%d %H:%M:%S") # Convert datetime object into string object
    return data


def streaming_pipeline(project,bucket,dataset,table,subscription,region="us-central1"):
    subscription= f"projects/{project}/subscriptions/{subscription}"
    table = f"{project}:{dataset}.{table}"
    schema = "filename:string,file_size_byte:integer,entry_date_epoch:integer,entry_datetime:datetime,current_datetime:string,current_datetime_new:datetime"
    bucket = f"gs://{bucket}"
    
    options = PipelineOptions(
        streaming=True,
        project=project,
        region=region,
        staging_location=f"{bucket}/staging",
        temp_location=f"{bucket}/temp"
    )

    p = beam.Pipeline(DataflowRunner(), options=options)

    test = (p | "Read Topic" >> ReadFromPubSub(subscription=subscription)
            | "To Dict" >> beam.Map(json.loads)
            | 'Convert Entry Date Epoch To Datetime Fromat' >> beam.Map(convert_epoch_to_datetime)
            | 'Correct Current Datetime Format' >> beam.Map(correct_datetime_format)
            | 'WriteToBigQuery' >> WriteToBigQuery(table=table, schema=schema, write_disposition=BigQueryDisposition.WRITE_APPEND))

    return p.run()

Deploy Pipeline

Replace the project, subscription, dataset, table, bucket with your own variables

project= "udemy-325708"
subscription="dataflow-demo-sub"
dataset="test"
table="dataflow_demo"
bucket="chenmingyong-cloud-dataflow"

try:
    pipeline = streaming_pipeline(project=project,bucket=bucket,dataset=dataset,table=table,subscription=subscription)
    print("\n PIPELINE RUNNING \n")
except (KeyboardInterrupt, SystemExit):
    raise
except:
    print("\n PIPELINE FAILED")
    traceback.print_exc()

Stop Pipeline

pipeline.cancel()

Optional/Bonus

Other ways to deploy dataflow job https://cloud.google.com/sdk/gcloud/reference/dataflow/jobs/run


Demo

Make sure you enable dataflow api

Enable Dataflow API
Enable Dataflow API

Start your nifi pipeline

Start source pipeline in NiFi
Start source pipeline in NiFi

Check from BigQuery, you should see a very low latency value between now and current_datetime_new!

SELECT *, CURRENT_DATETIME() as now FROM `udemy-325708.test.dataflow_demo` ORDER BY entry_date_epoch DESC LIMIT 1000

Remember to stop your dataflow pipeline and nifi pipeline after you are done. You may delete the bigquery dataset and table as well.


This is part of my online course on how to kickstart data engineering workflows in Google Cloud Platform (GCP) for beginners, sign up here to watch detailed video explanation! 🤗