Train your Model on Spark/Databricks, score it on ADX

This post has been republished via RSS; it originally appeared at: Azure Data Explorer Blog articles.

In many use cases Machine Learning models are built and applied over data that is stored and managed by Azure Data Explorer (ADX). Most ML models are built and deployed in two steps:

  • Offline training
  • Real time scoring

ML Training is a long and iterative process. Commonly, a model is developed by researchers/data scientists. They fetch the training data, clean it, engineer features, try different models and tune parameters, repeating this cycle until the ML model meets the required accuracy and robustness. To improve accuracy, they can:

  • Use a big data set, if available, that might contain hundreds of millions of records and plenty of features (dimensions and metrics)
  • Train very complex models, e.g., a DNN with dozens of layers and millions of free parameters
  • Perform a more exhaustive search for tuning the model’s hyper parameters

Once the model is ready, it can be deployed to production for scoring.

 

ML Scoring is the process of applying the model on new data to get predictions/regressions. Scoring usually needs to be done with minimal latency (near real time) for batches of streamed data.

 

Azure Data Explorer (ADX) supports running inline Python scripts that are embedded in the KQL query. The Python code runs on the existing compute nodes of ADX, in distributed manner near the data. It can handle Data Frames containing many millions of records, partitioned and processed on multiple nodes. This optimized architecture results in great performance and minimal latency.

Specifically, for ML workloads, ADX can be used for both training and scoring:

  • Scoring on ADX is the ultimate solution for data that is stored on ADX, as
    • Processing is done near the data, which guarantees the fastest performance
    • Embedding the scoring Python code in KQL query is simple, robust and cheap, relative to the usage of an external scoring service that requires management, networking, security, etc.

Scoring can be done using the predict_fl() library function

  • Training on ADX can be done in case the full training data set is stored in ADX, the training process takes up to few minutes and doesn’t require GPUs or other special hardware

Still in many scenarios training is done on Big Data systems, such as Spark/Databricks. Specifically, ML training on these systems is preferred in case that:

  • The training data is not stored in ADX, but in the data lake or other external storage/db
  • The training process is long (takes more than 5-10 minutes), usually done in batch/async mode
  • Training can be accelerated by using GPUs
  • ADX production workflows must not be compromised by lengthy, CPU intensive, training jobs

So we end up in a workflow that uses Spark/Databricks for training, and ADX for scoring. But the problem is that training on these Spark platforms is mostly done using the Spark ML framework, that is optimized for Spark architecture, but not supported by plain vanilla Python environment like ADX Python. So how can we still score in ADX?

We present a solution which is built from these steps:

  1. Fetch the training data from ADX to Azure Databricks using ADX Spark Connector
  2. Train an ML model in Azure Databricks
  3. Convert the model to ONNX
  4. Serialize and export the model to ADX using the same Spark connector
  5. Score in ADX using onnxruntime

Prerequisite

  • Enable Python plugin on your ADX cluster (see the Onboarding section of the python() plugin doc)
  • Create a workspace in Azure Databricks
  • Install the Spark connector in that workspace as explained here
  • Install onnxmltools in that workspace

In the following example we build a logistic regression model to predict room occupancy based on Occupancy Detection data, a public dataset from UCI Repository. This model is a binary classifier to predict occupied/empty room based on Temperature, Humidity, Light and CO2 sensors measurements. The example contains code snips from Databricks notebook showing for the full process of retrieving the data from ADX, building the model, convert it to ONNX and push it to ADX. Finally the KQL scoring query to be run using Kusto Explorer.

 

1. Load the data from ADX to Databricks

 

from pyspark.sql import SparkSession pyKusto = SparkSession.builder.appName("kustoPySpark").getOrCreate() cluster = 'https://demo11.westus.kusto.windows.net' db = 'ML' query = 'OccupancyDetection' AppId = '***** Your App Id *****' AppSecret = '***** Your App Secret *****' AuthorityId = '***** Your Authority Id *****' # Read the data from the kusto table with default reading mode s_df = pyKusto.read. \ format("com.microsoft.kusto.spark.datasource"). \ option("kustoCluster", cluster). \ option("kustoDatabase", db). \ option("kustoQuery", query). \ option("kustoAadAppId", AppId). \ option("kustoAadAppSecret", AppSecret). \ option("kustoAadAuthorityID", AuthorityId). \ load() s_df.take(4)Out[37]: [Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 51), Temperature=23.18, Humidity=27.272, Light=426.0, CO2=721.25, HumidityRatio=0.004792988, Occupancy=True, Test=False),  Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 51), Temperature=23.15, Humidity=27.2675, Light=429.5, CO2=714.0, HumidityRatio=0.004783441, Occupancy=True, Test=False),  Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 53), Temperature=23.15, Humidity=27.245, Light=426.0, CO2=713.5, HumidityRatio=0.004779464, Occupancy=True, Test=False),  Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 54), Temperature=23.15, Humidity=27.2, Light=426.0, CO2=708.25, HumidityRatio=0.004771509, Occupancy=True, Test=False)]

 

2. Train the ML model in Azure Databricks

 

s_df.groupBy('Test', 'Occupancy').count().show()+-----+---------+-----+ | Test|Occupancy|count| +-----+---------+-----+ | true| false| 9396| | true| true| 3021| |false| false| 6414| |false| true| 1729| +-----+---------+-----+ # Prepare the input for the model # Spark Logistic Regression estimator requires integer label so create it from the boolean Occupancy column s_df = s_df.withColumn('Label', s_df['Occupancy'].cast('int')) # Split to train & test sets s_train = s_df.filter(s_df.Test == False) s_test = s_df.filter(s_df.Test == True) # Create the Logistic Regression model from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import LogisticRegression # The Logistic Regression estimator expects the features in a single column so create it using VectorAssembler features = ('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio') assembler = VectorAssembler(inputCols=features,outputCol='Features') s_train_features = assembler.transform(s_train) s_train_features.take(4) lr = LogisticRegression(labelCol='Label', featuresCol='Features',maxIter=10) s_clf = lr.fit(s_train_features) # Predict the training set s_predict_train = s_clf.transform(s_train_features) # Predict the testing set s_test_features = assembler.transform(s_test) s_predict_test = s_clf.transform(s_test_features) s_predict_test.select(['Timestamp', 'Features', 'Label', 'prediction']).show(10)+-------------------+--------------------+-----+----------+ | Timestamp| Features|Label|prediction| +-------------------+--------------------+-----+----------+ |2015-02-02 14:19:00|[23.7,26.272,585....| 1| 1.0| |2015-02-02 14:19:00|[23.718,26.29,578...| 1| 1.0| |2015-02-02 14:21:00|[23.73,26.23,572....| 1| 1.0| |2015-02-02 14:22:00|[23.7225,26.125,4...| 1| 1.0| |2015-02-02 14:23:00|[23.754,26.2,488....| 1| 1.0| |2015-02-02 14:23:00|[23.76,26.26,568....| 1| 1.0| |2015-02-02 14:25:00|[23.73,26.29,536....| 1| 1.0| |2015-02-02 14:25:00|[23.754,26.29,509...| 1| 1.0| |2015-02-02 14:26:00|[23.754,26.35,476...| 1| 1.0| |2015-02-02 14:28:00|[23.736,26.39,510...| 1| 1.0| +-------------------+--------------------+-----+----------+ only showing top 10 rows# Calculate accuracy on the testing set import pyspark.sql.functions as F check = s_predict_test.withColumn('correct', F.when(F.col('Label') == F.col('prediction'), 1).otherwise(0)) check.groupby('correct').count().show() accuracy = check.filter(check['correct'] == 1).count()/check.count()*100 print(f'Accuracy: {accuracy}')+-------+-----+ |correct|count| +-------+-----+ | 1|12271| | 0| 146| +-------+-----+ Accuracy: 98.8241926391238

 

3. Convert the model to ONNX

 

from onnxmltools import convert_sparkml from onnxmltools.convert.sparkml.utils import FloatTensorType initial_types = [('Features', FloatTensorType([None, 5]))] onnx_model = convert_sparkml(s_clf, 'Occupancy detection Pyspark Logistic Regression model', initial_types, spark_session = pyKusto) onnx_model{'classlabels_ints': [0, 1], 'coefficients': [0.2995554662269534, 0.08678036676466962, -0.01768699375517248, -0.005589950773872156, 19.092004694715197, -0.2995554662269534, -0.08678036676466962, 0.01768699375517248, 0.005589950773872156, -19.092004694715197], 'intercepts': [1.396631045353889, -1.396631045353889], 'multi_class': 1, 'name': 'LinearClassifier', 'post_transform': 'LOGISTIC'} (full print trimmed here)

 

4. Export the model to ADX

 

import datetime import pandas as pd smodel = onnx_model.SerializeToString().hex() models_tbl = 'Databricks_Models' model_name = 'Occupancy_Detection_LR' # Create a DataFrame containing a single row with model name, training time and # the serialized model, to be appended to the models table now = datetime.datetime.now() dfm = pd.DataFrame({'name':[model_name], 'timestamp':[now], 'model':[smodel]}) sdfm = spark.createDataFrame(dfm) sdfm.show()+--------------------+--------------------+--------------------+ | name| timestamp| model| +--------------------+--------------------+--------------------+ |Occupancy_Detecti...|2021-01-26 19:02:...|0807120b4f6e6e784...| +--------------------+--------------------+--------------------+

 

 

# Write the model to Kusto sdfm.write.format("com.microsoft.kusto.spark.datasource"). \ option("kustoCluster", cluster). \ option("kustoDatabase", db). \ option("kustoAadAppId", AppId). \ option("kustoAadAppSecret", AppSecret). \ option("kustoAadAuthorityID", AuthorityId). \ option("kustoTable", models_tbl). \ mode("Append"). \ save()​

 

5. Score in ADX

Is done by calling predict_onnx_fl() You can either install this function in your database, or call it in ad-hoc manner:

 

let predict_onnx_fl=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string) { let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model); let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col); let code = '\n' 'import binascii\n' '\n' 'smodel = kargs["smodel"]\n' 'features_cols = kargs["features_cols"]\n' 'pred_col = kargs["pred_col"]\n' 'bmodel = binascii.unhexlify(smodel)\n' '\n' 'features_cols = kargs["features_cols"]\n' 'pred_col = kargs["pred_col"]\n' '\n' 'import onnxruntime as rt\n' 'sess = rt.InferenceSession(bmodel)\n' 'input_name = sess.get_inputs()[0].name\n' 'label_name = sess.get_outputs()[0].name\n' 'df1 = df[features_cols]\n' 'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]\n' '\n' 'result = df\n' 'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])' '\n' ; samples | evaluate python(typeof(*), code, kwargs) }; // OccupancyDetection | where Test == 1 | extend pred_Occupancy=int(null) | invoke predict_onnx_fl(Databricks_Models, 'Occupancy_Detection_LR', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy') | summarize correct = countif(Occupancy == pred_Occupancy), incorrect = countif(Occupancy != pred_Occupancy), total = count() | extend accuracy = 100.0*correct/totalcorrect incorrect total accuracy 12271 146 12417 98.8241926391238

 

Summary

In this blog we presented how to train your ML model in Azure Databricks, and use it for scoring in ADX. This can be done by converting the trained model from Spark ML to ONNX, a common ML model exchange format, enabling it to be consumed for scoring by ADX python() plugin.

This workflow is common for ADX customers that are building Machine Learning algorithms by batch training using Spark/Databricks models on big data stored in the data lake. This new option to use this model for scoring directly on ADX is very appealing as it's fast, simple and free.

REMEMBER: these articles are REPUBLISHED. Your best bet to get a reply is to follow the link at the top of the post to the ORIGINAL post! BUT you're more than welcome to start discussions here:

This site uses Akismet to reduce spam. Learn how your comment data is processed.