Integrating application transactional data from Cosmos DB into your Lakehouse

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Tech Community.

This post was authored by William (Gui) Braccialli, a Solutions Architect at Databricks.

 

At Databricks, we want to help organizations to integrate all data into the Lakehouse ecosystem, using the lowest latency and best cost possible. A common question our engineers receive is what is the best approach to sync data from NoSQL databases, more specifically, Azure Cosmos DB, with your Azure Databricks Lakehouse.

 

In this blog post, we will present two different approaches to read data from Cosmos DB, the respective advantages and we provide the recommended solution.

 

The diagram below shows a reference architecture with Azure Databricks and Cosmos DB, in which Cosmos DB can be both a data source and a data destination:

Azure Databricks E2E - Cosmos (1).png

 Image 1. Reference architecture with Azure Databricks and Cosmos DB

 

  1. Application transactional data is stored in Cosmos DB.
  2. Azure Databricks reads the change data feed from Cosmos DB using the Spark Connector and writes data into Azure Data Lake Gen2 using Delta Lake format.

  3. Data can be used for all types of analytics and business insights, machine learning, and BI.
  4. Data is transformed, including the possibility of training/predicting based on Machine Learning models. Enriched data is written into Azure Data Lake Gen2 using Delta Lake format in the gold layer.
  5. Transformed/Enriched data can be loaded back into Cosmos DB to be used by applications, such as mobile apps or other use cases.

The code examples provided are using a sample Cosmos DB database called "Families" provided in the Getting started with SQL queries page from the Azure documentation. Please follow the steps outlined on that page to create a database and import initial data.

 

1. Reading Change Data Feed (recommended approach)

Using the change data feed (CDF) approach, we read/extract only records that were changed or inserted since the previous extraction (streaming or batch). The reads will have minimal impact on transactional workloads and will be very cost effective in terms of Cosmos DB Request Units (RUs).

 

Once change records are extracted from Cosmos DB into the Databricks environment, we will merge the changes into a Delta table that provides a high-performance and highly-scalable environment for any analytics use case.

 

Advantages:

  • The change data feed (CDF) approach supports streaming use cases as well as low-frequency updates (using micro batch), compared to several minutes delay on alternative approaches.
  • Provides minimal impact to Cosmos DB transactional (OLTP) queries, as only a small number of records are read on each batch/micro-batch.
  • Cost-effective, since you don't need to provision a Cosmos DB analytical store to do extraction.
  • Easy to use and maintain, since Spark Structured Streaming automatically controls checkpointing.
  • Open, since it leverages only open-source packages and components.
  • Data governance is simplified since you keep all your analytics data in the lakehouse, backed by Azure Data Lake Storage.

See the below steps to get the Cosmos DB connector running with Change Data Feed:

 

a. Install the new Cosmos DB connector for Spark 3.x

 

In the first half of 2021, Microsoft released a new open source Cosmos DB connector. The new connector has native support for change data feed (CDF).

 

In order to use the Cosmos DB connector in Azure Databricks, you need to create a library and attach the new library to your Azure Databricks Cluster.

 

i. Right-click on your workspace and select "Create | Library":

image2-create-library.png

Image 2. Create library

 

ii. Select "Maven" as the Library Source and paste content the text below into the "Coordinates" field, then click "Create":
com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.3.1

image3-create-library-2.png

 Image 3. Selecting the library source

 

iii. Select your cluster and click "Install" from your Cosmos DB library:

image4-attach-library.png

 Image 4. Install the Cosmos DB library

 

b. Run the initial extraction and save as a Delta table (after initial extraction, only changes will be extracted)

 

Example of reading a change data feed (CDF):

 

 

cosmos_endpoint = "https://YOUR-COSMOSDB-INSTANCE.documents.azure.com:443/" cosmos_masterkey = dbutils.secrets.get('YOUR-SECRET-SCOPE', 'YOUR-SECRET-NAME') cosmos_databasename = "Families" cosmos_containername = "Families" cosmos_read_config = { "spark.cosmos.accountEndpoint": cosmos_endpoint, "spark.cosmos.accountKey": cosmos_masterkey, "spark.cosmos.database": cosmos_databasename, "spark.cosmos.container": cosmos_containername, "spark.cosmos.read.inferSchema.enabled" : "true", "spark.cosmos.write.strategy": "ItemOverwrite" } cosmos_df = spark.read.format("cosmos.oltp").options(**cosmos_read_config).load() cosmos_df.write.mode("overwrite").saveAsTable("families_in_delta")

 

 

 

c. Add or change a record in Cosmos DB
Use the steps outlined in the Cosmos DB getting started guide to add or change a record in the Cosmos DB table.

 

d. Merge the change feed into your Delta table (it can be scheduled hourly, daily, or run as streaming)

 

 

import delta import time from datetime import timedelta #using creation timestamp to consider for starting point of change data feed #spark streaming will control checkpoint automatically and will only request changes since last extraction timestamp table_creation_timestmap = (delta.DeltaTable.forName(spark,"families_in_delta").history().collect()[0]["timestamp"] - timedelta(hours=1)).isoformat() + "Z" cosmos_change_feed_config = { "spark.cosmos.accountEndpoint": cosmos_endpoint, "spark.cosmos.accountKey": cosmos_masterkey, "spark.cosmos.database": cosmos_databasename, "spark.cosmos.container": cosmos_containername, "spark.cosmos.read.partitioning.strategy": "Default", "spark.cosmos.read.inferSchema.enabled" : "true", "spark.cosmos.changeFeed.startFrom" : table_creation_timestmap, "spark.cosmos.changeFeed.mode" : "Incremental", } df_change_feed = ( spark .readStream .format("cosmos.oltp.changeFeed") .options(**cosmos_change_feed_config) .load() ) def merge_delta(incremental, target): incremental.createOrReplaceTempView("incremental") incremental._jdf.sparkSession().sql(f""" MERGE INTO {target} t USING incremental i ON i.id=t.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """) streaming_output = ( df_change_feed .writeStream .trigger(once=True) #it can be changed to run in streaming mode instead of batch .format('delta') .outputMode("update") .option("checkpointLocation", "/cosmos/checkpoint_changefeed_families_table") .foreachBatch(lambda i, b: merge_delta(i, "families_in_delta")) .start() ) for s in spark.streams.active: while s.isActive: print('waiting for trigger once to finish') time.sleep(1) if streaming_output.lastProgress: print(f"{streaming_output.lastProgress['numInputRows']} rows read") else: print("no changes in cosmos since last execution")

 

 

Example output of streaming reads:

 

 

streaming_output = ( df_change_feed .writeStream .format('delta') .outputMode("update") .option("checkpointLocation", "/cosmos/checkpoint_changefeed_families_table") .foreachBatch(lambda i, b: merge_delta(i, "families_in_delta")) .start() )

 

 

 

image5-streaming.png

Image 5. Sample output of streaming reads

 

e. Check Delta history to see the merge operation

 

 

%sql describe history families_in_delta

 

 

 

image6-delta-history.png

Image 6. Delta history

 

f. Performance recommendations

i. Enable low shuffle merge for faster merge results:

 

 

set spark.databricks.delta.merge.enableLowShuffle = true;

 

 

ii. Schedule regular jobs to run optimize and vacuum commands on your Delta table.

 

iii. Use partition and z-order partition in order to reduce the number of files affected updated records.

 

iv. Use append-only mode. If your application doesn't update records, you can remove the merge operation and just insert records into your Delta table.

 

 

streaming_output = ( df_change_feed .writeStream .format('delta') .outputMode("append") .option("checkpointLocation", "/cosmos/checkpoint_changefeed_families_table") .table("families_in_delta") )

 

 

g. Other considerations

i. If you are using triggerOnce, we recommend running extraction multiple times per day to reduce the number of records read on each execution. If you decide to read only once a day, you might want to temporarily increase Cosmos RU's during the extraction, returning to original values after extraction finishes.

 

ii. The change data feed connector doesn't report delete operations. The recommendation from Azure documentation is to use "soft delete":

 

"Currently change feed doesn't log deletes. you can add a soft marker on the items that are being deleted. For example, you can add an attribute in the item called 'deleted' and set it to 'true' and set a TTL on the item, so that it can be automatically deleted."
Source: https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed#change-feed-and-different-operations

 

h. Example notebook

See the code examples in this Databricks notebook.

 

2. Full Scan or Partial Scan (not recommended)

The alternative approach to using the change data feed (CDF) is to run a full extraction from Cosmos DB, or to manually create logic to run partial extractions (for example, reading the data from the past 2 days).

 

A full scan or partial scan would either run on the Cosmos DB transactional store and lead to resource contention with your applications, or you would need to enable the Cosmos Analytical Store (HTAP) component.

 

In most cases, full scans or partial scans are not recommended. However, there are cases where this should be considered, such as referential lookups for Spark jobs.

 

Reasons why ‘full extraction or manual partial extraction’ approach is not recommended:

  • A full scan of the transactional store may impact the performance of transactional workloads and consume additional Cosmos DB RUs.
  • A partial scan requires custom logic to filter recent transactions or requires you to configure time to live (TTL) in the Analytical Store (HTAP) and keep it in sync with your extraction application. If you want to change the frequency of the extraction, you would need to adjust TTL as well.
  • For streaming use cases, the change data feed approach is required.
  • The analytical store creates an additional copy of your data in a separate column-oriented store.
  • The HTAP connector replicates inserts, updates, and deletes and this replica can take up to 5 minutes to update the analytical store.
  • Although you can use open source tools to interact with Azure Cosmos DB using Azure Synapse Link, the Cosmos DB HTAP connector only works with Synapse components and is not compatible with open source tools.

Get Started

Begin using Azure Databricks and Cosmos DB by signing up for a 14-day trial and import this Databricks notebook to get started.

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.