Ingest streaming data using Azure Event Hub and Azure Databricks

This post has been republished via RSS; it originally appeared at: Azure Data Explorer Blog articles.

Introduction

 

I recently worked with a customer to develop a demo that showcased an event-driven architecture using multiple Azure technologies that supported a use case where point-of-sale data would be streaming and needed to be transformed to support an API.  Additionally, complex transformations needed to be considered and Databricks was required.

 

One of the requirements was to compare multiple streaming and transformation approaches which culminated in Azure Data Explorer (ADX).  I developed a two-path demo that shows data streaming through an Event Hub into both ADX directly and Databricks.  The end result was the same table layout for both approaches.

 

Data Set

 

To simulate inbound events, I created a simulator in Databricks which uses New York City taxi trip data as its source.  This data is readily available and can be scaled to millions of events.  It also offers easy options for partitioning and demonstrates well.  Data used was from the year 2022 for green and yellow taxis and can be found at TLC Trip Record Data - TLC (nyc.gov). I copied all the parquet files into a container called nytlc->parquet->tripdata->yellow and green.

Timothy_Brown_0-1671550324882.png

 

Architecture

 

At first glance, this architecture overlaps technology and many transformations can be accomplished using Azure Data Explorer or Databricks.  This is understood.  I was tasked with demonstrating through comparison how events could be processed using both techniques allowing the end user to determine which approach worked best based on their requirements.  Essentially, which skill sets they have and how complex the transformations are.

 

The simulator was developed in Databricks and consumes New York City Taxi trip data stored in Azure Data Lake sending events to an Azure Event Hub for downstream processing.  Approach one demonstrates how Azure Data Explorer consumes events directly from Azure Event Hub and stores them in a database where ADX functions split the data into yellow and green tables.

Approach two leverages Databricks which consumes the same simulator events on a different Event Hub consumer group transforming those events into multiple dataframes for persistence in the delta format on Azure Data Lake and also in Azure Data Explorer in three tables ( AllTripData, yellow, and green).  This  architecture is depicted below.

 

Timothy_Brown_1-1671548701965.png

 

Configuration

 

This blog is not intended to dive into specifics on setting up each piece of technology, but to show case how they interact with each other.  However, links are provided as applicable.

 

Create an Azure Event Hub

 

Follow the steps in Azure Quickstart - Create an event hub using the Azure portal - Azure Event Hubs | Microsoft Learn to create an Event Hub.  I used the defaults for the namespace.  The Event Hub was created with 8 partitions and three Consumer Groups:

  • Databricks-reader
  • Databricks-simulator
  • ADX-reader

 

These consumer groups can be called by any name and is a best practice to allow each consumer to have its own “view” of the event data and process it at its own pace as described at Consumer Groups

 

Create a Databricks Environment

 

Follow the guidance in Get started: Free trial & setup - Azure Databricks | Microsoft Learn to setup a Databricks environment.  I created a single user cluster as shown below.

 

Timothy_Brown_2-1671548701974.png

 

Libraries required:

Library Links:

 

Notebooks:

  • nycExecSim
  • nycInboundEvents
  • nycUtilities

There is documentation inside each notebook where applicable.  You can import the BlogCode.dbc file.  Review each notebook and make changes where applicable.  As an example, in the nycUtilities notebook, add your credentials in cmd 3:

 

 

 

 

clientId = <ADD CLIENT ID> clientSecret = <ADD CLIENT SECRET> tenantId = <ADD TENANT ID>

 

 

 

Configure Azure Storage

 

Follow the guidance at Create a storage account - Azure Storage | Microsoft Learn to create a storage account along with Azure Data Lake Storage Gen2 Introduction | Microsoft Learn for information.

 

I used two different containers in the same storage account:

  • nytlc
  • streaming

 

However, it does not matter if you use a single container with multiple folders – just define them correctly in the notebooks.  I used the nytlc container for the nyc taxi parquet source data and the streaming container for all notebook output.

Note:  Make sure to give your service principal appropriate access such as Storage Blob Data Contributor.

 

Create Azure Data Explorer

 

Follow the guidance at Quickstart: Create an Azure Data Explorer cluster and database | Microsoft Learn to create an Azure Data Explorer cluster.

I like to use https://dataexplorer.azure.com/ for the queries.

 

Create two databases:

  • TripDataDatabricksSource
  • TripDataEventHubSource

Both databases have an unlimited retention period and cache period of 31 days although these do not affect the demo.

 

Create the table [ AllTripData ] in both databases using the following command:

 

 

 

.create table AllTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string)

 

 

Update Policies

 

Modify the update policies for both databases using the following commands:

 

 

 

.alter table AllTripData policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:02","MaximumNumberOfItems": "100","MaximumRawDataSizeMB": "300"}' .alter table AllTripData policy streamingingestion '{"IsEnabled": true}'

 

 

 

TripDataEventHubSource

 

The next series of commands are only applicable for the TripDataEventHubSource.

 

Create a data connection to ingest events from an Event Hub. Azure Data Explorer

 

Timothy_Brown_3-1671548701991.png

 

 

Table Creation

 

Create the yellow and green trip data tables:

 

 

 

.create table YellowTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string)

 

 

 

 

 

.create table GreenTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string)

 

 

 

Function Creation

Create two functions:

 

 

 

 

.create-or-alter function with (docstring = 'Append Records where TaxiType = Yellow' , folder = 'UpdatePolicyFunctions') CopyYellowTaxiData() { AllTripData | where TaxiType == "Yellow" }

 

 

 

 

 

.create-or-alter function with (docstring = 'Append Records where TaxiType = Green' , folder = 'UpdatePolicyFunctions') CopyGreenTaxiData() { AllTripData | where TaxiType == "Green" }

 

 

 

Update policies

Alter the update policies for the yellow and green tables:

 

 

 

 

.alter table YellowTripData policy update @'[{ "IsEnabled": true, "Source": "AllTripData", "Query": "CopyYellowTaxiData()", "IsTransactional": false, "PropagateIngestionProperties": false}]'

 

 

 

 

 

.alter table GreenTripData policy update @'[{ "IsEnabled": true, "Source": "AllTripData", "Query": "CopyGreenTaxiData()", "IsTransactional": false, "PropagateIngestionProperties": false}]'

 

 

Demo Execution

 

Now that everything is configured and you have added your credential information to the notebooks, start the Databricks cluster and execute the nycExecSim notebook to begin sending events to the Event Hub.  You should begin to see messages in the overview of the

Event Hub:

 

Timothy_Brown_4-1671548702005.png

 

Execute the nycInboundEvents notebook to begin processing events consumed from the Event Hub.

 

Head on over to the Azure Data Explorer UX at Azure Data Explorer and execute this command on the TripDataEventHubSource database.

 

 

 

AllTripData | count

 

 

 

You should begin to see the record count increase.

 

Execute these commands as well:

 

 

 

YellowTripData | count GreenTripData | count

 

 

Again, you should see the record count increase.

 

To view the records, execute the following command:

 

 

 

select * from YellowTripData

 

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.