How to stream Google Pub/Sub into BigQuery using Dataflow

Contents
Setup Source Data
Setup Google Pub/Sub Topic
Make sure you create a pubsub topic in GCP, i called it dataflow-demo, you can use any topic name you want. Take down the subscription name (e.g. projects/udemy-325708/subscriptions/dataflow-demo-sub)

GenerateFlowFile
This is to generate 10 trigger every seconds to simulate streaming case 😎

RetryFlowFile
This is to introduce a 10s delay to every batch of file, just take it like a sleep function in python. The purpose is to show some time difference between entry date and current datetime. You can just tick the failure relationship since we know that it's not going to fail!


ReplaceText
Generate dummy json based on nifi built in variables. The whole purpose of this is to generate some sort of "sensor data" or real time messages in a production environment. Wanted to use some real API but the setup was too troublesome for all of you to follow along and the API might get updated in the near future so this method is much more reliable and easy to replicate for tutorial purpose.


{
"filename": "${uuid}",
"file_size_byte": ${fileSize},
"entry_date_epoch": ${entryDate},
"current_datetime": "${now():format('yyyy:MM:dd HH/mm/ss')}"
}
PublishGCPubSub

Overall Architecture

Setup BigQuery
Go to BigQuery UI → Click on your project → Create dataset

Then open the newly created test dataset

Click on Create table

Input following, and yay you are done! 😍

Deploy Dataflow
Prerequisite
You need to create a gcs bucket first for dataflow deployment
Define Pipeline
import logging
import json
import time
import traceback
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.runners import DataflowRunner
def correct_datetime_format(data):
import datetime
temp = datetime.datetime.strptime(data['current_datetime'], "%Y:%m:%d %H/%M/%S") # Convert string object to datetime object
data['current_datetime_new'] = temp.strftime("%Y-%m-%d %H:%M:%S") # Convert datetime object back to string object
return data
def convert_epoch_to_datetime(data):
import datetime
# Divide epoch time by 1000 to change it to seconds instead of milliseconds
# Convert epoch time into datetime format
temp=datetime.datetime.fromtimestamp(data['entry_date_epoch']//1000)
data['entry_datetime'] = temp.strftime("%Y-%m-%d %H:%M:%S") # Convert datetime object into string object
return data
def streaming_pipeline(project,bucket,dataset,table,subscription,region="us-central1"):
subscription= f"projects/{project}/subscriptions/{subscription}"
table = f"{project}:{dataset}.{table}"
schema = "filename:string,file_size_byte:integer,entry_date_epoch:integer,entry_datetime:datetime,current_datetime:string,current_datetime_new:datetime"
bucket = f"gs://{bucket}"
options = PipelineOptions(
streaming=True,
project=project,
region=region,
staging_location=f"{bucket}/staging",
temp_location=f"{bucket}/temp"
)
p = beam.Pipeline(DataflowRunner(), options=options)
test = (p | "Read Topic" >> ReadFromPubSub(subscription=subscription)
| "To Dict" >> beam.Map(json.loads)
| 'Convert Entry Date Epoch To Datetime Fromat' >> beam.Map(convert_epoch_to_datetime)
| 'Correct Current Datetime Format' >> beam.Map(correct_datetime_format)
| 'WriteToBigQuery' >> WriteToBigQuery(table=table, schema=schema, write_disposition=BigQueryDisposition.WRITE_APPEND))
return p.run()
Deploy Pipeline
Replace the project, subscription, dataset, table, bucket with your own variables
project= "udemy-325708"
subscription="dataflow-demo-sub"
dataset="test"
table="dataflow_demo"
bucket="chenmingyong-cloud-dataflow"
try:
pipeline = streaming_pipeline(project=project,bucket=bucket,dataset=dataset,table=table,subscription=subscription)
print("\n PIPELINE RUNNING \n")
except (KeyboardInterrupt, SystemExit):
raise
except:
print("\n PIPELINE FAILED")
traceback.print_exc()
Stop Pipeline
pipeline.cancel()
Optional/Bonus
Other ways to deploy dataflow job https://cloud.google.com/sdk/gcloud/reference/dataflow/jobs/run
Demo
Make sure you enable dataflow api

Start your nifi pipeline

Check from BigQuery, you should see a very low latency value between now and current_datetime_new!
SELECT *, CURRENT_DATETIME() as now FROM `udemy-325708.test.dataflow_demo` ORDER BY entry_date_epoch DESC LIMIT 1000
Remember to stop your dataflow pipeline and nifi pipeline after you are done. You may delete the bigquery dataset and table as well.
This is part of my online course on how to kickstart data engineering workflows in Google Cloud Platform (GCP) for beginners, sign up here to watch detailed video explanation! 🤗