Using Azure Data Factory orchestrating Kusto query-ingest

This post has been republished via RSS; it originally appeared at: Microsoft Tech Community - Latest Blogs - .

In this blog post, we’ll explore how Azure Data Factory (ADF) or Synapse Pipelines can be used for orchestrating large query ingestions. With this approach you will learn, how to split one large query ingest into multiple partitions, orchestrated with ADF.

 

Common use-cases for this are:

  • Event tables requiring a certain query complexity ( e. g. window-functions requiring serialized datasets, currently not supported by materialized views).
  • Backfill scenarios, for example for new created materialized views or derived event tables, based on existing very large datasets. (For materialized views there are built in functionalities doing the backfill, see also Create materialized view - Azure Data Explorer | Microsoft Learn), that you should check first.)

 

There are other ways of dealing with this, in this blog we will focus on orchestrating the query ingest with ADF.

From a conceptional view the approach is very simple: In a first step we identify time-partitions on the source dataset. With this information we will split the query ingest into these partitions, so instead of doing one query ingest for the overall dataset, we do several query ingests for all time-based partitions.

This can be solved with three Azure Data Factory activities:

  • A Lookup activity, reading all source partitions
  • A ForEach activity, executing an Azure Data Exlorer commant activity
  • The Azure Data Explorer command activity, triggering the append command for each partition

 

The overall data pipeline looks like:

HaukeMallow_0-1671631218165.png

 

Step by step guide:

To follow this guide you need as a pre-requisite:

With this you will start

(1) creating a new pipeline.

(2) In the pipeline create a Lookup activity. In the settings select the ADX dataset. If it does not exist, create a new dataset, for this select Azure Data Explorer for the new integration dataset. Select the linked service you have created, and the table that will be used as a data-source. De-select first-row only select box and enter manually the query:

 

 

 

measurement
| summarize by bin(enqueuedTime, 1d)
| order by enqueuedTime asc

 

 

 

The query should return the time-partitions for your source data-set. In our example, we query all days from the table measurement.

HaukeMallow_1-1671631218169.jpeg

 

In the general tab configure a retry of at least 3 to make your activity resilient:

HaukeMallow_2-1671631218171.jpeg

 

(3) Add a ForEach activity and connect the Lookup activity with it (on success). In the setting tab under Items add:

 

 

 

@activity('lkpDayPartition').output.value

 

 

 

This is the value that will be used in the inner activity of the ForEach activity.

The batch count controls how many child activities will be executed in parallel. As the query-ingest can be very resource intensive, recommendation is to start with only 1 and do some testing with a slight increase.

 

(4) In the for-each-activity add an Azure Data Explorer Command activity. In the general tab do some changes to make the overall process resilient:

  1. Configure a retry (e. g. for the case your command gets throttled) of for example 5, and
  2. slightly increase the retry interval (e. g. to 120 sec)

Under connection select the Azure Data Explorer linked service.

Now, you have to add the command for the query ingest. We will configure two parameters:

The final command will also have a filter on the datetime column and will look as follows:

 

 

 

append measurement_copy with (distributed=true, creationTime = "@{item().enqueuedTime}") <| measurement
| where enqueuedTime >= todatetime("@{item().enqueuedTime}") and enqueuedTime < todatetime("@{item().enqueuedTime}") +1d

 

 

 

With this final step you are done, and you can trigger your new data pipeline.

 

Conclusion

In this blog post you have learned:

  • How to orchestrate Azure Data Explorer query ingest with Azure Data Factory.
  • Making the query ingest resilient, simply configuring retries in the Azure Data Factory activities.
  • Passing parameters from Azure Data Factory activities to the ADX ingest command, for flexible filtering and further settings.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.