Azure Synapse (Pipelines) for Social Media – YouTube example

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Tech Community.

Introduction

This article will show you how to use Azure Synapse (also works with Azure Data Factory) to build a pipeline that gets public data from YouTube using REST API’s and store it in parquet files in ADLS Gen2 and Azure Synapse Analytics. At the end, you should be able to connect to other social media platforms that support REST API’s.

 

In addition, we will have a look at different ways to analyze the data in Azure Synapse Analytics: using SQL Serverless, SQL Provisioned (SQL Pools) and Spark Clusters.

 

Considerations
The YouTube REST API calls used in this exercise use API Keys as credentials. Using OAuth 2.0 is not possible because we’re using an automated pipeline that runs without a UI, required for user interaction and sign-in. Managing these credentials is not covered here – please refer to each social media platform API official documentation.
The pipeline stores data in both ADLS Gen2 parquet files and Azure Synapse Analytics SQL Pool tables – this may or may not be a good implementation option. This decision is taken only to be able to demonstrate the two possibilities, between many – obviously, you should consider your own requirements.

 

What’s needed
Azure Key Vault – to store the API Key; optional but strongly advised.
Azure Synapse Analytics workspace – to build the pipeline (optionally Azure Data Factory can be used)
SQL Serverless – no need to be provision.
SQL Pool – as a provisioned SQL engine; one table and one procedure will be created here.
Spark Cluster – as a provisioned Spark engine to run spark notebooks.
YouTube API Key – to access the YouTube Rest API

 

 

1. Preparation

 

Azure key vault secret
Create a new Azure key vault, or use an existing one, and create a new secret using:
Name: YouTubeMicrosoftDemosProjectAPIKey
Value: <YOUR-YOUTUBE-API-KEY>

 

Screenshots of getting the API Key value and creating the new secret:

 

image.png

 

image.png

Note: you need to grant the Azure Synapse Analytics workspace Managed Identity access to your Azure Key vault. Learn more here.

 

image.png

In this example, only “Get” and “List” are selected on the “Secret Permissions” list.
Next, open the current version of the secret and store the “Secret Identifier” as it will be used later:

image.png

 

Linked service to the Storage Account
Because we will be using the “Primary” Azure Data Lake Storage Gen2 storage account for the Azure Synapse Analytics workspace, specified when creating the workspace, the linked service is already created, and its name should be in this format: “<WORKSPACE-NAME>-WorkspaceDefaultStorage”.
Note: You need to create a new linked service if you want to use a storage account other than the “Primary” or if you’re using Azure Data Factory.

 

Linked service to the SQL Pool
Again, there’s no need to create a linked service to a SQL Pool in the same workspace because we will be using an activity that is unique to Azure Synapse Integrate pipelines: the “SQL pool stored procedure”.
Note: If you are using Azure Data Factory, then you need to create a linked service to the SQL Pool where you want to store your data. Refer to the “ADF linked service to Azure Synapse Analytics” section in the Azure Synapse SQL Pools Auto DR article.

 

Create YouTube video statistics table and related stored procedure
Create a new table in the SQL Pool you want to store the data. It will be used to store some fields from the REST API responses and well as the entire payload.

 

 

 

 

 

 

 

CREATE TABLE [dbo].[youtube_video_likes] ( ITEM_ID VARCHAR(100) NOT NULL, VIDEO_ID VARCHAR(100) NOT NULL, LIKES INT NOT NULL, FULL_JSON_RESPONSE VARCHAR(MAX) NOT NULL, TS_INSERT DATETIME NOT NULL ) WITH (DISTRIBUTION = ROUND_ROBIN, HEAP) GO

 

 

 

 

 

 

 

In addition, create a new stored procedure. This will be called to insert the data in the table.

 

 

 

 

 

 

 

CREATE PROC [dbo].[p_youtube_video_likes_insert] @p_item_id [VARCHAR](100), @p_video_id [varchar](100), @p_likes [INT], @p_full_json_response [VARCHAR](MAX) AS BEGIN INSERT INTO [dbo].[youtube_video_likes](item_id, video_id, likes, full_json_response, ts_insert) SELECT @p_item_id, @p_video_id, @p_likes, @p_full_json_response, SYSDATETIME(); END GO

 

 

 

 

 

 

 

 

 

2. Create Datasets

 

We will use the Copy activity to store the REST API responses in parquet files, so we need to create two datasets for the source and sink.

 

Dataset for source
Create a file and upload it into your storage account. This file is a supporting file as the data we want to store is coming from the REST API calls and not from another file. The content is the simpler JSON snippet we can have, an empty JSON.

 

Filename oneLinerEmpty.JSON
Content {}


Then, create a new dataset for this file:

image.png

 

image.png

 

Luis_Soares_0-1628167918772.png

 

image.png

 

image.png

 

Data store Azure Data Lake Storage Gen2
Format JSON
Name oneLinerEmptyJSON
Linked service the linked service for your “Primary” storage account or another one
File path use the browse button to locate and select your uploaded JSON file. In this example, “filesystem001” is the container name, “socialmedia/supportingfiles” is the folder, and “oneLinerEmpty.JSON” is the file name

 

Dataset for sink
Create a new dataset that will be used to store the data in parquet format. This dataset will be parameterized because we will use it to create different files. Follow the same steps as above but select the Parquet format.

image.png

 

Name parameterOutputParquet
Linked service the linked service for your “Primary” storage account or another one
File path leave empty as this will be parameterized
Import schema None

 

Now it’s time to configure this dataset. In the Parameters tab, create 2 parameters as in this image:

 

image.png

 

P_FILE_PATH String, no default value
P_FILE_NAME String, no default value

 

In the Connection tab, we will use these parameters:

 

image.png

 

“filesystem001” is the name of the container: Replace accordingly.
Now we are ready to start developing the pipeline.

 

 

3. Pipelines

 

Create an auxiliary pipeline
The first pipeline is the one that uses the 2 datasets we created above and will only have a Copy activity. The objective of this pipeline is to receive a parameter and write that value in a parquet file. It will be called several times from the main pipeline, to store all the REST API responses.

 

Create a new pipeline, name it “Store Parameter in Parquet File” and create the following parameters, all with type String and no default value:

P_FILE_PATH P_FILE_NAME P_DATA

 

image.png

 

Add a Copy activity, name it “Store parameter P_DATA in Parquet” and configure the Source, Sink and Mapping tabs as shown in the following images.

 

image.png

 

Source dataset the JSON file create in the preparation step
File path type File path in dataset
Recursively unchecked
Additional columns

create a new column “VALUE_COLUMN” and use the “Add dynamic content” of the VALUE field to add a reference to the parameter P_DATA (@pipeline().parameters.P_DATA).

The goal of this additional column is to create a new column with the content of the received parameter, as if it were read from the input file. This is a way to inject whatever values we want in columns of the Copy activity.

 

image.png

 

Sink dataset parameterOutputParquet
P_FILE_PATH @pipeline().parameters.P_FILE_PATH
P_FILE_NAME @pipeline().parameters.P_FILE_NAME

 

image.png

 

Map complex values to string checked

 

Click on the “+ New mapping” to add a new mapping row. Rename the default column name “Column_1” to “VALUE_COLUMN” and map it to the same name in the right side of the mapping.

 

This pipeline is finished and should look like this:

 

image.png

 

 

Create the main pipeline

 

The main pipeline has these steps:

  1. Get YouTube API Key from the secret in Azure Key Vault
  2. Get the playlist ID for a given channel ID
  3. Get the list of videos for a given playlist ID
  4. For each video, get video statistics
  5. Store results in the SQL Pool

In addition, the responses of all 3 YouTube REST API calls will also be stored in parquet files. This will allow us later to look at the data using different analytical/processing engines.

 

The complete pipeline and parameters will look like this:

 

image.png

 

Start by creating a new pipeline, name it “YouTube REST API example with API Key” or whatever you want and add the following parameters and default values:

 

P_SOCIAL_MEDIA_PLATFORM YouTube
P_YOUTUBE_CHANNEL_ID <YOUR-YOUTUBE-API-KEY>
P_FILE_OUTPUT_PATH socialmedia/runs/

 

Add 3 Web activities, link them as shown in the previous picture and configure with the following properties. Only the non-default and important to mention properties are listed.

 

Web activity 1

 

Name Get YouTube API Key from Key Vault
Secure output checked
URL <YOUR-YOUTUBE-API-KEY-SECRET-URL> e.g. https://mykeyvault.vault.azure.net/secrets/myKeyName/999cf304c27db?api-version=7.0
Method GET
Authentication Managed Identity
Resource https://vault.azure.net

 

The secure output option will hide the results from the API call, in this case it will hide the value we’re getting from the secret, as per the best practices. When we try to look at the result, it looks like this:

 

image.png

 

To access the value of the secret, we can now use this expression: activity('Get YouTube API Key from Key Vault').output.value

 

Note: any developer can turn off the secure output option and see the value of the secret. This can happen in a development environment, where the sensitivity is not the same as in a production environment. Make sure that no sensitive information can be seen in a production environment, by including revision rules that validate that no pipelines can be promoted if proper settings for security are not met.

 

Web activity 2

 

Name Get playlistID
Secure input checked
URL @concat(
'https://youtube.googleapis.com/youtube/v3/channels?part=contentDetails&id=',
pipeline().parameters.P_YOUTUBE_CHANNEL_ID,
'&key=',
activity('Get YouTube API Key from Key Vault').output.value
)
Method GET

 

The secure input option will hide the values sent to this activity, in this case it will hide the API Key used in the URL. If we try to see the runtime input, all we’ll see is this:

 

image.png

 

We now have the playlist ID for a channel, so we can request a list of videos for that playlist.

 

Web activity 3

 

Name Get Playlist Videos
Secure input checked
URL @concat(
'https://youtube.googleapis.com/youtube/v3/playlistItems?part=snippet,contentDetails,status&maxResults=50&playlistId=',
activity('Get playlistID').output.items[0].contentDetails.relatedPlaylists.uploads,
'&key=',
activity('Get YouTube API Key from Key Vault').output.value
)
Method GET

 

The secure input option is also checked. Notice what we would see if this was not the case:

 

image.png

 

We want to store the responses of the 2 YouTube REST API calls in parquet format. We already have a pipeline to do that so we will now invoke it.


Add 2 Execute Pipeline activities to the canvas, link them as show in the complete pipeline design and configure as follows.

 

Execute Pipeline 1

 

Name Store playlistID result in Parquet
Invoked pipeline Store Parameter in Parquet File, or any other name you gave to the auxiliary pipeline
Wait on completion checked
P_FILE_PATH @concat(
pipeline().parameters.P_FILE_OUTPUT_PATH,
pipeline().parameters.P_SOCIAL_MEDIA_PLATFORM
)
P_FILE_NAME @concat(
'getPlaylistID-',
utcnow(),
'.parquet'
)
P_DATA @string(activity('Get playlistID').output)

 

This activity will store the REST API response in a parquet file in the storage account. The file name will be similar to “getPlaylistID-2021-04-23T13:13:19.4426957Z.parquet”.

 

Execute Pipeline 2

 

Name Store Playlist Videos result in Parquet
Invoked pipeline Store Parameter in Parquet File, or any other name you gave to the auxiliary pipeline
Wait on completion checked
P_FILE_PATH @concat(
pipeline().parameters.P_FILE_OUTPUT_PATH,
pipeline().parameters.P_SOCIAL_MEDIA_PLATFORM
)
P_FILE_NAME @concat(
'getPlaylistVideos-',
utcnow(),
'.parquet'
)
P_DATA @string(activity('Get Playlist Videos').output)

 

This activity will store the REST API response in a parquet file in the storage account. The file name will be similar to “getPlaylistVideos-2021-04-23T13:13:22.3769063Z.parquet”.

 

Now that we have a list of videos, we need to get the statistics for each one of them. To accomplish that, we add a ForEach activity to the pipeline, connect it to the output of the “Get Playlist Videos” activity and configure as:

 

Name For Each Video
Sequential checked, but can also run in parallel
Items @activity('Get Playlist Videos').output.items

 

Next, we add the remaining activities inside the loop, that will run for each video in the playlist. At the end it will look like this:

 

image.png

 

Web activity

 

Name Get Video Stats
Secure input checked
URL @concat(
'https://youtube.googleapis.com/youtube/v3/videos?part=snippet,contentDetails,statistics&id=',
item().snippet.resourceId.videoId,
'&key=',
activity('Get YouTube API Key from Key Vault').output.value
)
Method GET

 

This activity will request some statistics for a given video. Statistics include the number of views, likes, comments, and so on.

 

Execute pipeline activity

 

Name Store Video Stats in Parquet
Invoked pipeline Store Parameter in Parquet File, or any other name you gave to the auxiliary pipeline
Wait on completion checked
P_FILE_PATH @concat(
pipeline().parameters.P_FILE_OUTPUT_PATH,
pipeline().parameters.P_SOCIAL_MEDIA_PLATFORM
)
P_FILE_NAME @concat(
'getVideoStats-',
utcnow(),
'.parquet'
)
P_DATA @string(activity('Get Video Stats').output)

 

This activity will store the REST API response in a parquet file in the storage account. The file name will be similar to “getVideoStats-2021-04-23T13:13:25.6924926Z.parquet”.

 

SQL Pool Stored Procedure

 

Name SQL pool SP - Insert data
Azure Synapse dedicated SQL pool select the SQL pool where you created the table and stored procedure
Stored procedure name [dbo].[p_youtube_video_likes_insert], or whatever you called it

 

Stored procedure parameters (use the import button to list them):

 

p_item_id @item().id
p_video_id @item().snippet.resourceId.videoId
p_likes @activity('Get Video Stats').output.items[0].statistics.likeCount
p_full_json_response @string(activity('Get Video Stats').output)

 

The call to the stored procedure will make sure that some individual fields (item id, video id, count of likes) as well as the full response are stored in a table in a SQL pool. In a real scenario, we would need to create a model to hold the data, as per the requirements.

 

Note: if you use Azure Data Factory, the same goal can be achieved using the “Stored procedure” activity, but you would need to use a linked service to the target SQL pool.

 

We can now run the full pipeline and validate that several files were created in the storage account and some data inserted in the SQL pool table.

 

 

4. Look at the data


Because we stored our data in parquet files and tables in a SQL pool, we have several options to use to manipulate the data.
This table summarizes some of the options using Azure Synapse Analytics:

 

  Data format and store
Compute options Parquet files in ADLS Gen2 Tables in SQL Pools

SQL Serverless

Y (*)

N

SQL Provisioned

Y

Y (*)

Spark Cluster

Y (*)

Y

 

Below we will look at short examples of the options marked with (*). The goal is not to go deep into technical details of JSON handling but rather have an idea of the possibilities to use.

 

 

a) SQL Serverless on Parquet files in ADLS Gen2

 

We can use the OPENROWSET function to read a parquet file and show its contents.

 

 

 

 

 

 

 

-- check file as-is SELECT * FROM OPENROWSET( BULK 'https://mydatalake.dfs.core.windows.net/filesystem001/socialmedia/runs/YouTube/getVideoStats-2021-04-23T12:37:12.2853807Z.parquet', FORMAT='PARQUET' ) AS [result]

 

 

 

 

 

 

 

image.png

 

Since we have a JSON file, we can take advantage of the OPENJSON function to parse the elements in the first level of the file.

 

 

 

 

 

 

 

-- OPENJSON in action SELECT [key], [value], [type] FROM OPENROWSET( BULK 'https://mydatalake.dfs.core.windows.net/filesystem001/socialmedia/runs/YouTube/getVideoStats-2021-04-23T12:37:12.2853807Z.parquet', FORMAT='PARQUET' ) as [result2] CROSS APPLY OPENJSON(VALUE_COLUMN)

 

 

 

 

 

 

 

image.png

 

You can read more about OPENROWSET and OPENJSON here:

 

b) SQL Provisioned on Tables in SQL Pools


OPENJSON is also available on the SQL Provisioned engine.

 

 

 

 

 

 

 

-- check data as-is SELECT *, isjson(full_json_response) as 'IS_JSON' FROM [dbo].[youtube_video_likes] GO

 

 

 

 

 

 

 

image.png

 

 

 

 

 

 

 

 

SELECT * FROM [dbo].[youtube_video_likes] cross apply openjson(full_json_response) where video_id = 'A212x5XXXXX' GO

 

 

 

 

 

 

 

image.png

 

You can read more about OPENJSON for SQL Provisioned and JSON functions here:

 

c) Spark Cluster on Parquet files in ADLS Gen2

 

We can also use a Spark Cluster as a Spark processing engine to look at the data.

 

For this task, we create a new notebook. By default, the selected language is Python (Spark) and we can leave it like that, although we can use different languages by using Cell Magic Commands like %%pyspark or %%sql. We also need to attach the notebook to an existing Apache Spark Pool.

 

First, we load the data into a dataframe:

 

 

 

 

 

 

 

%%pyspark df = spark.read.load('abfss://filesystem001@mydatalake.dfs.core.windows.net/socialmedia/runs/YouTube/getVideoStats-2021-04-23T12:37:12.2853807Z.parquet', format='parquet') display(df)

 

 

 

 

 

 

 

image.png

 

Since we have the data in a dataframe, we can use pyspark to manipulate the data:

 

 

 

 

 

 

 

%%pyspark from pyspark.sql.functions import from_json, col from pyspark.sql.types import StructType, StructField, StringType, ArrayType statistics_schema = StructType([ StructField('likeCount', StringType(), True) ]) items_schema = StructType([ StructField('id', StringType(), True), StructField('statistics', statistics_schema, True) ]) schema = StructType([ StructField('kind', StringType(), True), StructField('etag', StringType(), True), StructField('items', ArrayType(items_schema), True) ]) display( df.withColumn('kind', from_json(col('VALUE_COLUMN'), schema).getItem('kind')) .withColumn('etag', from_json(col('VALUE_COLUMN'), schema).getItem('etag')) .withColumn('video_id', from_json(col('VALUE_COLUMN'), schema).getItem('items')[0].getItem('id')) .withColumn('likes', from_json(col('VALUE_COLUMN'), schema).getItem('items')[0].getItem('statistics').getItem('likeCount')) .select('kind', 'etag', 'video_id', 'likes') )

 

 

 

 

 

 

 

image.png

 

If SQL is the preferred way, we can create a table from the dataframe and use the get_json_object function to parse the json strings:

 

 

 

 

 

 

 

%%pyspark df.write.mode("overwrite").saveAsTable("default.youtube_video_likes")

 

 

 

 

 

 

 

image.png

 

 

 

 

 

 

 

 

%%sql SELECT get_json_object(VALUE_COLUMN, '$.kind') as `kind`, get_json_object(VALUE_COLUMN, '$.etag') as `etag`, get_json_object(VALUE_COLUMN, '$.items[0].id') as `videoId`, get_json_object(VALUE_COLUMN, '$.items[0].statistics.likeCount') as `likes` FROM default.youtube_video_likes

 

 

 

 

 

 

 

image.png

 

 

Summary
It is easy to build an Azure Synapse Pipeline that gets information from social media.
In the example shown in this article, on how to get some data from YouTube, we covered the ingestion, storing and consumption using different techniques, and more can be used. There is not “the correct” way to do it but rather a way that makes sense in your environment.

 

You can find all the code in the attachment file below, including the pipelines and datasets, SQL scripts, notebooks and supporting files, as well as on GitHub under the repository name SynapseIntegrateSocialMedia.

 

 

 

REMEMBER: these articles are REPUBLISHED. Your best bet to get a reply is to follow the link at the top of the post to the ORIGINAL post! BUT you're more than welcome to start discussions here:

This site uses Akismet to reduce spam. Learn how your comment data is processed.