ADF Mapping Data Flows for Databricks Notebook Developers

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Tech Community.

Here are 3 examples of how to migrate from hand-coded Databricks Notebooks ETL to automated, visually designed ETL process in ADF using Mapping Data Flows. In each of these examples that I outline below, it takes just a few minutes to design these coded ETL routines into ADF using Mapping Data Flows without writing any code.

 

1) Convert Databricks Python ETL from Notebooks to ADF using Mapping Data Flows with Soccer statistics

 

spark.read.csv("/data/eu-soccer-events/input/events.csv", ...)

When you see Databricks ETL code that is reading files, represent that action as a Source transformation. The location of the data is defined in an ADF dataset and the source projection and data types are defined in the Source projection, Select transformation, and Derived Columns. The credentials for your file systems and databases are stored in the Linked Service.

 

eventsDf = eventsDf.na.fill({'player': 'NA', 'event_team': 'NA', 'opponent': 'NA', 
                             'event_type': 99, 'event_type2': 99, 'shot_place': 99, 
                             'shot_outcome': 99, 'location': 99, 'bodypart': 99, 
                             'assist_method': 99, 'situation': 99})

A common operation in data lake file processing for ETL is to map codes to values. There are several ways to accomplish this task of building enumerations in Data Flow. In the sample at the link for #1, I show you how to build this as a case statement inside a Derived Column transformation, or use an external lookup file using a Join or Lookup transformation. The lookup file is essentially just the mapping of key to value.

 

eventsDf = (
             eventsDf.
             withColumn("event_type_str", mapKeyToVal(evtTypeMap)("event_type")).
             withColumn("event_type2_str", mapKeyToVal(evtTyp2Map)("event_type2")).
             withColumn("side_str", mapKeyToVal(sideMap)("side")). ... )

2) Convert Databricks Loans Risk Analysis ETL to ADF using Mapping Data Flows

 

display(loan_stats)
display(loan_stats.groupBy("addr_state").agg((count(col("annual_inc"))).alias("ratio")))

When you see any operation in Notebooks that displays stats or row values, you will accomplish this same task using the Debug session switch in Data Flows. Go into the Data Preview tab to interact with the data and see your live transformation results there. You can also click on each column to see column property statistics in charts and descriptive form.

 

loan_stats = loan_stats.filter( \
                loan_stats.loan_status.isin( \
                    ["Default", "Charged Off", "Fully Paid"]
                    )\
             ).withColumn(
                    "bad_loan", 
                    (~(loan_stats.loan_status == "Fully Paid")
             ).cast("string"))

 

For grouping, you will use Aggregate transformations and for filtering use the Filter transformation. You can filter and clean values using both common equality functions or regular expressions in ADF:

 

(loan_status == 'Default' || loan_status == 'Charged Off' || loan_status == 'Fully Paid')

&&

regexMatch(addr_state, '^(?:(A[KLRZ]|C[AOT]|D[CE]|FL|GA|HI|I[ADLN]|K[SY]|LA|M[ADEINOST]|N[CDEHJMVY]|O[HKR]|P[AR]|RI|S[CD]|T[NX]|UT|V[AIT]|W[AIVY]))$')

3) Convert from Scala Notebook ETL to ADF mapping data flows

 

val df = spark.read.json("abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/small_radio_json.json")

In ADF, use Source and datasets to define file location and projections. Use Select to refine the field names:

val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
specificColumnsDf.show()

Use the datasets and Sink transformations to set sink properties:

//SQL Data Warehouse related settings
val dwDatabase = "<database-name>"
val dwServer = "<database-server-name>"
val dwUser = "<user-name>"
val dwPass = "<password>"
val dwJdbcPort = "1433"
val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.