Structured streaming in Synapse Spark

Posted by

This post has been republished via RSS; it originally appeared at: Microsoft Tech Community - Latest Blogs - .

ryanjadams_0-1670520254524.png

Author: Ryan Adams is a Program Manager in Azure Synapse Customer Success Engineering (CSE) team. 

 

In this post we are going to look at an example of streaming IoT temperature data in Synapse Spark.  I have an IoT device that will stream temperature data from two sensors to IoT hub. We’ll use Synapse Spark to process the data, and finally write the output to persistent storage. Here is what our architecture will look like: 

 

ryanjadams_1-1670520293672.png

 

 

Prerequisites 

Our focus for this post is on streaming the data in Synapse Spark so having a device to send data and the setup of IoT Hub are considered prerequisites. Their setup will not be covered in this article.

 

Step 1 – Create A New Notebook 

The first thing you need to do is create a new notebook in Synapse, initialize the variables, and do some setup in the first cell.  The setup code is provided below. Go to the Develop Hub and click the ellipsis that appears when you hover your cursor over the Notebooks heading.  That will display a drop-down menu where you can select “New notebook”. 

 

ryanjadams_2-1670520293674.png

 

Step 2 – Setup and configuration 

The code below has comments to help you fill everything in with the service names and keys of your own services.  You need to setup your source by providing the name of your IoT or Event Hub along with the connection string and consumer group (if you have one).  Your destination will be Data Lake, so you need to supply the container and folder path where you want to land the streaming data.

 

There are two pieces of sensitive information that you do not want to expose in plain text, so you’ll store the key for the storage account and the Event Hub connection string in Azure Key Vault.  You can easily call these using mssparkutils once you create a linked service to your Key Vault. 

 

 

 

 

 

# Initialize Variables 
storage_account = 'YourStorageAccountName' # This is the storage account where we will write out our data stream 
event_hub = 'YourEventHubName' # This the event hub where we will grab our stream from 
consumer_group = 'YourCGNAme' #This is our event hub consumer group in case we add more consumers later 
key_vault = 'YourKeyVault' #This is the name of our key valut where we will store our event hub connection string and storage account secret 

# Setup access to storage account using Key Vault Secret. Last parameter is the secret name 
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", mssparkutils.credentials.getSecret(key_vault,"YourKVSecretName")) 

# Setup storage locations for all data 
ROOT_PATH = f"abfss://YourContainerName@{storage_account}.dfs.core.windows.net/" # Put your container before the @ sign 
BRONZE_PATH = ROOT_PATH + "bronze/" #Folder where we land our raw data stream 
CHECKPOINT_PATH = ROOT_PATH + "checkpoints/" #Folder to store our checkpoints in case the stream gets broken 

# Get Event Hub Connection String from Key Vault 
IOT_CS = mssparkutils.credentials.getSecret(key_vault,'YourHubConnectionStringSecretName') # IoT Hub connection string (Event Hub Compatible) 
#print(IOT_CS) 

# Setup Event Hub Dictionary with Config Settings 
ehConf = {  
  'eventhubs.connectionString':sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(IOT_CS), 
  'ehName':event_hub, 
  'eventhubs.consumerGroup':consumer_group 
} 

 

 

 

 

 

 

Step 3 - Start fresh and wipe out ADLS 

In case you want to run this action multiple times, you can delete everything in your ADLS container for a fresh start. 

 

 

 

 

 

# Make sure root path is empty 
mssparkutils.fs.rm(ROOT_PATH, True)

 

 

 

 

 

Step 4 - Start streaming the data 

Now that you have everything setup and configured it’s time to read the data stream and then write it out to ADLS.  First you create a data frame that uses the Event Hub format and our Event Hub Configuration created in Step 1. 

 

The next part gets a little tricky because the entire JSON payload of the stream is stored in a single column called “body”.  To handle this, we create the schema we intend to use when landing the data in a final data frame.  Last, we write the data out to ADLS in parquet format. 

 

 

 

 

 

from pyspark.sql.functions import * 
from pyspark.sql.types import * 

# Create a DF and load it from the EH stream 
df = spark.readStream.format("eventhubs").options(**ehConf).load() 

# The stream stores the JSON payload in a single "body" column. 
# We need a schema to match and extract the JSON 
Schema1 = StructType([StructField("temp1", StringType(), True), 
                      StructField("temp2", StringType(), True) 
                    ]) 

# Extract the JSON and enqueued time 
dfRaw = df.select(df.body.cast('string'), df.enqueuedTime.alias('ArrivalTime')) 
dfJson = dfRaw.select(from_json(dfRaw.body, schema=Schema1).alias("json"), dfRaw.ArrivalTime) 
dfFinal = dfJson.select("json.*", dfJson.ArrivalTime) 

# Write the stream to ADLS 
# We have also enabled checkpoints so if the stream fails we can pick right where we left off 
dfFinal.writeStream.format("parquet").outputMode("append").option("checkpointLocation", CHECKPOINT_PATH + "temperature_raw").start(BRONZE_PATH + "temperature_raw") 

 

 

 

 

 

 

Step 5 - Read the data streamed to ADLS with Spark Pool 

Now that we have the data streaming to ADLS in parquet format, we are going to want to read it and validate the output.  You could use the Azure portal or Azure Storage Explorer, but it would be much easier to do it right here using Spark in the same notebook. 

 

This part is super easy! We simply configure a few variables for connecting to our ADLS account, read the data into a data frame, and then display the data frame. 

 

 

 

 

 

from pyspark.sql import SparkSession 
from pyspark.sql.types import * 
 
# Primary storage info 
account_name = 'YourStorageAccountName' # fill in your primary account name 
container_name = 'YourContainerName' # fill in your container name 
relative_path = 'bronze/temperature_raw/'  # fill in your relative folder path 

adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path) 
print('Primary storage account path: ' + adls_path) 

df_parquet = spark.read.parquet(adls_path) 
display(df_parquet)

 

 

 

 

 

 

Conclusion 

Spark Pools in Azure Synapse support Spark structured streaming so you can stream data right in your Synapse workspace where you can also handle all your other data streams.  This makes managing your data estate much easier.  You also have the option of four different analytics engines to suit various use-cases or user personas. 

 

Our team publishes blog(s) regularly and you can find all these blogs here: https://aka.ms/synapsecseblog  

For deeper level understanding of Synapse implementation best practices, check out our Success by Design (SBD) site at https://aka.ms/Synapse-Success-By-Design.

 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.