2 min read

How to setup Apache Beam notebooks for development in GCP

Create a Jupyter notebook with Apache Beam environment in Google Cloud Platform. Run an interactive runner pipeline with sample Python code.
How to setup Apache Beam notebooks for development in GCP
How to setup Apache Beam notebooks for development in GCP

Steps

Click on Dataflow → Notebooks → New Instance

Create Apache Beam Jupyter Notebook in Google Cloud Platform
Create Apache Beam Jupyter Notebook in Google Cloud Platform

You can leave everything to default or change the machine type to a lower specs to save some money if you intend to keep it running for long

Notebook instance setup
Notebook instance setup

This is the code or you can just upload the jupyter notebook on the top to your machine, depends on your preference

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
import google.auth
import json
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner


def correct_datetime_format(data):
    import datetime
    temp = datetime.datetime.strptime(data['current_datetime'], "%Y:%m:%d %H/%M/%S") # Convert string object to datetime object 
    data['current_datetime_new'] = temp.strftime("%Y-%m-%d %H:%M:%S") # Convert datetime object back to string object
    return data

def convert_epoch_to_datetime(data):
    import datetime
    # Divide epoch time by 1000 to change it to seconds instead of milliseconds
    # Convert epoch time into datetime format 
    temp=datetime.datetime.fromtimestamp(data['entry_date_epoch']//1000) 
    data['entry_datetime'] = temp.strftime("%Y-%m-%d %H:%M:%S") # Convert datetime object into string object
    return data

project = google.auth.default()[1]
subscription= f"projects/{project}/subscriptions/dataflow-demo-sub"
options = pipeline_options.PipelineOptions(
    streaming=True,
    project=project
)


p = beam.Pipeline(InteractiveRunner(), options=options)
test = (p | "Read Topic" >> ReadFromPubSub(subscription=subscription)
   | "To Dict" >> beam.Map(json.loads) # Load input as json
   | 'Correct Current Datetime Format' >> beam.Map(correct_datetime_format) 
   | 'Convert Entry Date Epoch To Datetime Fromat' >> beam.Map(convert_epoch_to_datetime)
   | 'Print' >> beam.Map(print))


ib.options.recording_duration = '1m'
ib.show(test)


# If you want to test the insert of bigquery as well, can uncomment below 
# table = f"{project}:test.dataflow_demo"
# schema = "filename:string,file_size_byte:integer,entry_date_epoch:integer,entry_datetime:datetime,current_datetime:string,current_datetime_new:datetime"
# test = (p | "Read Topic" >> ReadFromPubSub(subscription=subscription)
#    | "To Dict" >> beam.Map(json.loads)
#    | 'Convert Entry Date Epoch To Datetime Fromat' >> beam.Map(convert_epoch_to_datetime)
#    | 'Correct Current Datetime Format' >> beam.Map(correct_datetime_format)
#    | 'WriteToBigQuery' >> WriteToBigQuery(table=table, schema=schema,
#                               write_disposition=BigQueryDisposition.WRITE_APPEND))

Stop the pipeline gracefully

# Stop and clear the pipeline
ib.recordings.stop
ib.recordings.clear
ib.recordings.record

This is part of my online course on how to kickstart data engineering workflows in Google Cloud Platform (GCP) for beginners, sign up here to watch detailed video explanation! 🤗