Detect Anomalies in Equipment with Anomaly Detector in Azure Databricks

This post has been republished via RSS; it originally appeared at: Microsoft Tech Community - Latest Blogs - .

As Industry 5.0 is coming which is regarded as the next industrial evolution, the objective is to leverage the creativity of human experts in collaboration with efficient, intelligent, and accurate machines, to obtain resource-efficient and user-preferred manufacturing solutions compared to Industry 4.0. Many companies are struggling with the realities of AI implementation, and the predictive maintenance is helping determine the condition of equipment and predicting when maintenance should be performed. Needless to say, the implementation of ML-based solutions can lead to major cost savings, human resource cost savings, the increased availability of systems, and avoiding catastrophes that might happen.

 

With Azure Anomaly Detector, especially the feature of Multivariate Anomaly Detector, you could detect anomalies in real time on your equipment and system. This service is easy to use through its simple APIs or SDKs, and will give detailed interpretation at anomalous timestamps to help you better understand what’s happening behind the scenes, and immediately know which sensor(s) to fix among your streaming data of hundreds of sensors.

 

In February this year, we announced a wonderful integration between Multivariate Anomaly Detector and  SynapseML , which provides a solution for developers and customers to do multivariate anomaly detection with SynapseML library, check the announcement blog here. This new capability allows you to detect anomalies quickly and easily in very large datasets, and with easier operations.

 

In this blog, we will show you how to detect anomalies on your equipment in Azure Databricks, which is a fast, easy, and collaborative Apache Spark-based big data analytics service designed for data science and data engineering.

 

Introduction of Multivariate Anomaly Detector

Multivariate Anomaly Detector (MVAD) is an AI service in Cognitive Services, which provides APIs that further enable developers by easily integrating advanced AI for detecting anomalies from groups of sensor data, without the need for machine learning knowledge or labeled data. This service helps you to proactively protect your complex systems such as software applications, servers, factory machines, spacecraft, or even your business, from failures.

 

Refering to the workflow, first you should prepare your data with good quality and quantity. The training dataset should include normal pattern data as much as possible, and you should do some pre-procession work on the data like resampling, splitting out downtime, rounding up timestamps, you could also check detailed best practice of MVAD. After preparing the data, you could call the MVAD API directly or use the SDK or use SynapseML library in Synapse Analytics or Azure Databricks. When the model is trained, you could either trigger batch inference for validation purpose, or trigger streaming inference for real-time monitoring. A workflow of MVAD see below.

Louise_Han_0-1652840478069.png

 

Getting started is simple!

Create resources in Azure Portal

  1. Create an Anomaly Detector to get access to the capability of Multivariate Anomaly Detector, and you’ll use the key to this resource later.
  2. Create a Storage account resource to upload your data for model training and anomaly detection, and you’ll use the connection string of this resource later.
  3. Create an Azure Databricks resource to train a multivariate anomaly detection model and train inference in Azure Databricks workspace.
    Louise_Han_2-1652835204076.png

Create cluster and notebook in Azure Databricks Workspace

  1. After you created an Azure Databricks resource, select ‘Launch Workspace’ to log in Azure Databricks.
    Louise_Han_3-1652837691243.png
  2. Create a cluster. You could create a cluster with either Spark3.2 or Spark3.1 runtime version. In this blog, I’ll use 10.4 LTS(Scala 2.12, Spark 3.2.1).
    Louise_Han_4-1652837738554.png
  3. For Spark3.2, install SynapseML library in the cluster you created use the following settings.

    Coordinates: com.microsoft.azure:synapseml_2.12:0.9.5-103-4975dda5-SNAPSHOT

    Repository: https://mmlspark.azureedge.net/maven

    For Spark3.1, use the following settings to install library.

    Coordinates: com.microsoft.azure:synapseml_2.12:0.9.5-13-d1b51517-SNAPSHOT

    Repository: https://mmlspark.azureedge.net/maven

    Louise_Han_5-1652837810096.png
  4. Create a notebook and attached to the cluster you’ve created.

 

Start coding

(1/6) Import required modules

 

import os
import numpy as np
import pandas as pd
import datetime

import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt

import synapse.ml
from synapse.ml.cognitive import *
import pyspark.sql.functions as F

 

 

(2/6) Load your data

Load your data for training and inference, which could be CSV file stored in blob or another place. You could use our sample data(download here). The timestamp column should be in ISO8601 format, and the feature columns should be string type.

In this demo, we are loading data from 3 sensors in a piece of equipment.

Louise_Han_6-1652837897860.png

 

df = spark.read.format("csv").option("header", True).load("wasbs://mvadcsvdata@sparkdemostorage.blob.core.windows.net/spark-demo-data.csv")

df = df.withColumn("sensor_1", col("sensor_1").cast(DoubleType())) \
    .withColumn("sensor_2", col("sensor_2").cast(DoubleType())) \
    .withColumn("sensor_3", col("sensor_3").cast(DoubleType()))

df.show(10)

 

 

(3/6) Training a model

Use FitMultivariateAnomaly function to train a MVAD model. 

 

#Specify information about your data.
startTime = "2021-01-01T00:00:00Z"
endTime = "2021-01-02T09:18:00Z"
timestampColumn = "timestamp"
inputColumns = ["sensor_1", "sensor_2", "sensor_3"]
#Specify the container you created in Storage account, you could also initialize a new name here, and SynapseML will help you create that container automatically.
containerName = "mvadtest"
#Set a folder name in Storage account to store the intermediate data.
intermediateSaveDir = "intermediateData"

estimator = (FitMultivariateAnomaly()
    .setSubscriptionKey("Key of your Anomaly Detector")
#In .setLocation, use lowercase letter like: eastus.
    .setLocation("Region of your Anoamly Detector")
    .setStartTime(startTime)
    .setEndTime(endTime)
    .setContainerName(containerName)
    .setIntermediateSaveDir(intermediateSaveDir)
    .setTimestampCol(timestampColumn)
    .setInputCols(inputColumns)
    .setSlidingWindow(200)
    .setConnectionString("Connection String of your Storage Account"))

model = estimator.fit(df)
type(model)

 

 

(4/6) Inference

Use below code block to do inference with extra part of data.

 

# Specify the time range for inference task.
startInferenceTime = "2021-01-02T09:19:00Z"
endInferenceTime = "2021-01-03T01:59:00Z"
result = (model
      .setStartTime(startInferenceTime)
      .setEndTime(endInferenceTime)
      .setOutputCol("results")
      .setErrorCol("errors")
      .setTimestampCol(timestampColumn)
      .setInputCols(inputColumns)
      .transform(df))

rdf = (result.select("timestamp",*inputColumns, "results.contributors", "results.isAnomaly", "results.severity")
    .orderBy('timestamp', ascending=True)
    .filter(col('timestamp') >= lit(startInferenceTime))
    .toPandas())

def parse(x):
    if type(x) is list:
        return dict([item[::-1] for item in x])
    else:
        return {'series_0': 0, 'series_1': 0, 'series_2': 0}

rdf['contributors'] = rdf['contributors'].apply(parse)
rdf = pd.concat([rdf.drop(['contributors'], axis=1), pd.json_normalize(rdf['contributors'])], axis=1)
rdf

 

 

In result, you will get a dataframe containing detecting timestamps and anomaly detection results. If the timestamp is anomalous, then the severity will be a number above 0 and below 1. For the last three columns, they indicated the contribution score of each sensor accordingly, the larger the contirbution score is, the more anomalous the sensor is.

Louise_Han_7-1652838509163.png

 

(5/6) Visualizations

You could draw a plot to see the results of anomaly detection using below codes.

 

minSeverity = 0.1


####### Main Figure #######
plt.figure(figsize=(23,8))
plt.plot(nrdf['timestamp'],nrdf['sensor_1'], color='tab:orange', linestyle='solid', linewidth=2, label='sensor_1')
plt.plot(nrdf['timestamp'],nrdf['sensor_2'], color='tab:green', linestyle='solid', linewidth=2, label='sensor_2')
plt.plot(nrdf['timestamp'],nrdf['sensor_3'], color='tab:blue', linestyle='solid', linewidth=2, label='sensor_3')
plt.grid(axis='y')
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.legend()

anoms = list(nrdf["severity"] >= minSeverity)
_, _, ymin, ymax = plt.axis()
plt.vlines(np.where(anoms), ymin=ymin , ymax=ymax , color='r', alpha=0.8)

plt.legend()
plt.title('A plot of the values from the three sensors with the detected anomalies highlighted in red.')
plt.show()

####### Severity Figure #######
plt.figure(figsize=(23,1))
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.plot(nrdf['timestamp'],nrdf['severity'], color='black', linestyle='solid', linewidth=2, label='Severity score')
plt.plot(nrdf['timestamp'],[minSeverity]*len(nrdf['severity']), color='red', linestyle='dotted', linewidth=1, label='minSeverity')
plt.grid(axis='y')
plt.legend()
plt.ylim([0,1])
plt.title("Severity of the detected anomalies")
plt.show()

####### Contributors Figure #######
plt.figure(figsize=(23,1))
plt.tick_params(axis='x',which='both',bottom=False,labelbottom=False)
plt.bar(nrdf['timestamp'],nrdf['series_0'], width=2, color='tab:orange', label='sensor_1')
plt.bar(nrdf['timestamp'],nrdf['series_1'], width=2, color='tab:green', label='sensor_2', bottom=nrdf['series_0'])
plt.bar(nrdf['timestamp'],nrdf['series_2'], width=2, color='tab:blue', label='sensor_3', bottom=nrdf['series_0']+nrdf['series_1'])
plt.grid(axis='y')
plt.legend()
plt.ylim([0,1])
plt.title("The contribution of each sensor to the detected anomaly")
plt.show()

 

 

Louise_Han_0-1652839106169.png

 

(6/6) Real-time inference

For real-time inference, you could split out the code of inference part to another notebook and set a scheduler like below, which will trigger inference job running automatically per the cadence you set.

Louise_Han_8-1652838850548.png

 

Resources

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.