Train your Model in AML, score it in ADX

This post has been republished via RSS; it originally appeared at: Azure Data Explorer Blog articles.

Introduction

In many use cases Machine Learning models are built and applied over data that is stored and managed by Azure Data Explorer (ADX). Typical tasks can be fraud detection, identifying malicious attacks, predicting device failure, predicting capacity usage, recommendations for shopping or entertainment, medical diagnosis and many more. Most ML models are built and deployed in two steps:

  • Offline training
  • Real time scoring

ML Training is a long and iterative process. Commonly, developing a model starts by researchers/data scientists. They fetch the training data, clean it, engineer features, try different models and tune parameters, repeating this cycle until the ML model meets the required accuracy and robustness. Once this phase is done, software engineers takes the ML algorithm to be implemented in production code and deployed. Azure Machine Learning (AML) service is a great solution for managing and authoring the e2e process of ML models development, deployment and monitoring, aka ML Ops.

ML Scoring is the process of applying the model on new data to get insights and decision making. Scoring usually needs to be done at scale with minimal latency, processing large sets of new records. For ADX users the best solution for scoring data is directly in ADX. ADX scoring is done on its compute nodes, in distributed manner near the data, thus achieving the best performance with minimal latency.

 

How to use ADX for scoring AML models

ADX supports running Python code embedded in Kusto Query Language (KQL) using the python() plugin.  The Python code is run in multiple sandboxes on ADX existing compute nodes. The Python image is based on Anaconda distribution and contains the most common ML frameworks including Scikit-learn, TensorFlow, Keras and PyTorch. To score AML models in ADX follow these steps:

  1. Develop your ML model in AML in Python. Make sure to save your final model in pickle format
  2. Export the model to Azure blob container
  3. Score new data in ADX using the inline  python() plugin

Example

We build a model to predict room occupancy based on Occupancy Detection data, a public dataset from UCI Repository. This model is a binary classifier to predict occupied/empty room based on Temperature, Humidity, Light and CO2 sensors measurements. The complete process can be found in this Jupyter notebook. Here we embed few snips just to present the main concepts

 

Prerequisite

  • Enable Python plugin on your ADX cluster (see the Onboarding section of the python() plugin doc)
  • Whitelist a blob container to be accessible by ADX Python sandbox (see the Appendix section of that doc)
  • Create a Python environment (conda or virtual env) that reflects the Python sandbox image
  • Install in that environment AML SDK
  • Install in that environment Azure Blob Storage SDK

 

Set up your AML workspace, experiment and compute target

 

ws = Workspace.create("AML-ADX", "your subscription", "your RG", location = "westus", exist_ok = True) exp = Experiment(ws, name="Prediction-Occupancy") compute_name = "cpu-cluster" vm_sku = "STANDARD_D2_V2" if compute_name in ws.compute_targets: compute_target = ws.compute_targets[compute_name] if compute_target and type(compute_target) is AmlCompute: print("found compute target: " + compute_name) else: print("creating new compute target...") provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_sku, min_nodes=1,max_nodes=2) compute_target = ComputeTarget.create(ws, compute_name, provisioning_config) compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=10) found compute target: cpu-cluster

 

 

Download and explore the data

 

from azure.storage.blob import BlockBlobService # v2.1 block_blob_service = BlockBlobService(account_name=aml_storage_account, sas_token=aml_sas_token) block_blob_service.get_blob_to_path(aml_container_name, data_blob_name, 'data.csv') df = pd.read_csv('data.csv') df[-4:]

 

 

 

Timestamp

Temperature

Humidity

Light

CO2

HumidityRatio

Occupancy

Test

20556

2015-02-18 09:16:00.0000000

20.865

27.7450

423.50

1514.5

0.004230

True

True

20557

2015-02-18 09:16:00.0000000

20.890

27.7450

423.50

1521.5

0.004237

True

True

20558

2015-02-18 09:17:00.0000000

20.890

28.0225

418.75

1632.0

0.004279

True

True

20559

2015-02-18 09:19:00.0000000

21.000

28.1000

409.00

1864.0

0.004321

True

True

 

Submit the job to the remote cluster and view results log

 

run = exp.submit(config=est)run

 

 

Experiment

Id

Type

Status

Prediction-Occupancy

Prediction-Occupancy_1587550546_0dd38412

azureml.scriptrun

Starting

 

 

run.wait_for_completion(show_output=True)

 

 

RunId: Prediction-Occupancy_1587550546_0dd38412

Streaming azureml-logs/70_driver_log.txt

========================================

Trimmed...

Accuracy: 0.8571 (+/- 0.1219) [Decision Tree]

Accuracy: 0.9887 (+/- 0.0071) [Logistic Regression]

Accuracy: 0.9656 (+/- 0.0224) [K Nearest Neighbour]

Accuracy: 0.8893 (+/- 0.1265) [Naive Bayes]

The experiment completed successfully. Finalizing run...

 

Scoring in ADX

Download the model to local file and copy it to blob in a storage container in the same ADX region

 

model_path = model.download(exist_ok=True) adx_storage_account = "artifcatswestus" adx_container_name = "kusto/AML" model_blob_name = model_name + '.pkl' adx_sas_token = "?********" block_blob_service = BlockBlobService(account_name=adx_storage_account, sas_token=adx_sas_token) block_blob_service.create_blob_from_path(adx_container_name, model_blob_name, model_path) uri = f'https://{adx_storage_account}.blob.core.windows.net/{adx_container_name}/{model_blob_name}{adx_sas_token}'

 

 

Score in ADX from Jupyter notebook using KqlMagic

 

 

scoring_from_blob_query = r''' let classify_sf=(samples:(*), model_sas:string, features_cols:dynamic, pred_col:string) { let kwargs = pack('model_sas', model_sas, 'features_cols', features_cols, 'pred_col', pred_col); let code = 'import pickle\n' 'model_sas = kargs["model_sas"]\n' 'features_cols = kargs["features_cols"]\n' 'pred_col = kargs["pred_col"]\n' 'with open("/Temp/model.pkl", "rb") as f:\n' ' bmodel = f.read()\n' 'clf1 = pickle.loads(bmodel)\n' 'df1 = df[features_cols]\n' 'predictions = clf1.predict(df1)\n' 'result = df\n' 'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'; samples | evaluate python(typeof(*), code, kwargs, external_artifacts=pack('model.pkl', model_sas)) }; OccupancyDetection | where Test == 1 | extend pred_Occupancy=bool(0) | invoke classify_sf('$model_uri$', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy') | summarize n=count() by Occupancy, pred_Occupancy // confusion matrix ''' scoring_from_blob_query = scoring_from_blob_query.replace('$model_uri$', uri) %kql res << -query scoring_from_blob_query df = res.to_dataframe() print('Confusion Matrix')

 

 

 

Occupancy

pred_Occupancy

n

0

True

True

3006

1

False

True

112

2

True

False

15

3

False

False

9284

 

REMEMBER: these articles are REPUBLISHED. Your best bet to get a reply is to follow the link at the top of the post to the ORIGINAL post! BUT you're more than welcome to start discussions here:

This site uses Akismet to reduce spam. Learn how your comment data is processed.