Use Logic Apps to trigger a purge command on ADX

This post has been republished via RSS; it originally appeared at: Azure Data Explorer articles.

Learn how to exploit LogicApps to trigger a purge command on Azure Data explorer, both from a high-level view and step-by-step with reusable code.

 

Technologies
Azure Data Explorer is a fast, fully managed data analytics service for real-time analysis on large volumes of data streaming from applications, websites, IoT devices, and more.
Azure Logic Apps is a cloud service that helps you schedule, automate, and orchestrate tasks, business processes, and workflows when you need to integrate apps, data, systems, and services across enterprises or organizations.


Challenge
When using a Purge command in Azure Data Explorer, take care to know exactly what you are doing and why. See the official documentation on limitations and considerations:
one limitation is that “The predicate can’t reference tables other than the table being purged (TableName). The predicate can only include the selection statement (where). It can’t project specific columns from the table (output schema when running ‘table | Predicate’ must match table schema)


The purge command structure should look like the following:

// Connect to the Data Management service
#connect "https://ingest-[YourClusterName].[region].kusto.windows.net" 

.purge table [TableName] records in database [DatabaseName] with (noregrets='true') <| [Predicate]

 

An example of purge based on a where clause condition will look like this:

.purge table MyTable records in database MyDatabase <| where RelevantColumn in ('X', 'Y')

 

The list (‘X’,’Y’) must be explicit and cannot come from another table within the query (like a subquery or direct join). You can overcome this by exploiting two tasks of Logic Apps:
1. “Run query and visualize result” connecting it directly to the cluster
2. “Run control command and visualize results” connecting it to “https://ingest-[YourClusterName].[Region].kusto.windows.net”

 

Scenario
The example explained below will read a list of file names (stored in a FileDir column within a table ToDelete) and will purge data from table <TABLENAME> every 24h at 10pm, within <DATABASENAME> database.
Use these instructions to reproduce the LogicApps within the Azure Portal with the LogicApps source code provided at the end of the article. Adapt the names of tables, database, and connection strings as necessary.

 

The Logic App Designer View

Here how it will look like within the WebDesigner:

1.png
 

Tasks

  1. Trigger: in this case it's scheduled with a 24 hours recurrence
  2. String Variable initialization: filled with an initial empty string <''>
  3. KQL query: used to retrieve the list of names of files to purge
  4. Filling the String Variable: insertion of the the list of file names within the local string variable
  5. Run purge command: setting and running the purge command inserting the variable in the predicate

Trigger

 
2.png
This step is needed for scheduling the LogicApps (it can be replaced with other task according to the business need)
 

Variable initialization

 

3.png

 
This step initializes the string variable with an empty string. Don't leave it blank, as the empty string will be the first element of the list of names of files.
 

KQL Query and storing results within variable

 

4.png

 

Note: the “append KQL query result to local string variable” step is a FOR cycle because there is the need of reading all the names from the list: every name of the list needs to be surrounded with quotes and the entire liste has to be preceded by a comma to produce a final list of strings for the KQL purge predicate. The LogicApps array variable is not used because it would create an array defined within brackets “[” and “]”, and the KQL command requires defining a list with parenthesis as “(” and “)”.

The string variable has been initialized with a ” string, then for every FileDir a new element has been added. After the third iteration, the string will look like:
”,’elem1′,’elem2′,’elem3′.

 

Purge command using list of elements previously retrieved

At this stage, the final step will be to insert the list that was previously produced within the purge command, and surround it with parenthesis as shown below.

 

LucaVallarelli_0-1596023966631.png

 

Done!
Every night the LogicApps will purge data from the table indicated. It is possible either to keep the recurrence as explained in this example, or you can just call the LogicApps via API and manage it via external code, replacing the first step with a callable endpoint.


The Logic App Code

Here the Logic App code that can be copy/pasted within the CodeView of the Logic App

 

{
    "definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "Initialize_string_variable_with_\"ElementsToDelete\"": {
                "inputs": {
                    "variables": [
                        {
                            "name": "ElementsToDelete",
                            "type": "string",
                            "value": "''"
                        }
                    ]
                },
                "runAfter": {},
                "type": "InitializeVariable"
            },
            "Recover_list_of_data_to_delete_based_on_custom_logic_with_KQL_query": {
                "inputs": {
                    "body": {
                        "cluster": "https://<CLUSTER>.<REGION>.kusto.windows.net",
                        "csl": "toDelete | limit 10 | project FileDir",
                        "db": "<DATABASENAME>"
                    },
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['kusto']['connectionId']"
                        }
                    },
                    "method": "post",
                    "path": "/ListKustoResults/false"
                },
                "runAfter": {
                    "Initialize_string_variable_with_\"ElementsToDelete\"": [
                        "Succeeded"
                    ]
                },
                "type": "ApiConnection"
            },
            "Run_control_command_using_the_string_variables_in_the_\"in\"_clause_in_KQL": {
                "inputs": {
                    "body": {
                        "chartType": "Html Table",
                        "cluster": "https://ingest-<CLUSTERNAME>.<REGION>.kusto.windows.net",
                        "csl": ".purge table MYTABLE records in database <DATABASENAME> with (noregrets='true') <| where FileDir in (@{variables('ElementsToDelete')})",
                        "db": "DATABASENAME"
                    },
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['kusto']['connectionId']"
                        }
                    },
                    "method": "post",
                    "path": "/RunKustoAndVisualizeResults/true"
                },
                "runAfter": {
                    "append_KQL_query_result_to_local_array_variable": [
                        "Succeeded"
                    ]
                },
                "type": "ApiConnection"
            },
            "append_KQL_query_result_to_local_array_variable": {
                "actions": {
                    "Append_to_string_variable": {
                        "inputs": {
                            "name": "ElementsToDelete",
                            "value": "'@{items('append_KQL_query_result_to_local_array_variable')?['FileDir']}'"
                        },
                        "runAfter": {},
                        "type": "AppendToStringVariable"
                    }
                },
                "foreach": "@body('Recover_list_of_data_to_delete_based_on_custom_logic_with_KQL_query')?['value']",
                "runAfter": {
                    "Recover_list_of_data_to_delete_based_on_custom_logic_with_KQL_query": [
                        "Succeeded"
                    ]
                },
                "type": "Foreach"
            }
        },
        "contentVersion": "1.0.0.0",
        "outputs": {},
        "parameters": {
            "$connections": {
                "defaultValue": {},
                "type": "Object"
            }
        },
        "triggers": {
            "Trigger": {
                "recurrence": {
                    "frequency": "Hour",
                    "interval": 24,
                    "startTime": "2020-06-25T22:00:00Z"
                },
                "type": "Recurrence"
            }
        }
    },
    "parameters": {
        "$connections": {
            "value": {
                "kusto": {
                    "connectionId": "/subscriptions/<SUBSCRIPTIONGUID>/resourceGroups/<RESOURCEGROUPNAME>/providers/Microsoft.Web/connections/kusto",
                    "connectionName": "kusto",
                    "id": "/subscriptions/<SUBSCRIPTIONGUID>/providers/Microsoft.Web/locations/<REGION>/managedApis/kusto"
                }
            }
        }
    }
}

Note: The web designer makes it straightforward to implement the same solution without coding. In case you are choosing to reuse the code instead, after copying and pasting the above code, insert the appropriate values for Subscription, ResourceGroup, Cluster, Database, and Tables. 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.