Turning AML compute cluster into Spark

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Tech Community.

Following the blog post on Turning AML compute into Ray and Dask , we've added a new exciting capability to run Spark within AML compute where Spark shares the same context with your ML code. The Spark version is 3.2.1 with support for Delta Lake and Synapse SQL read/write. This enables users of AML to perform powerful data transformation and even Spark ML within AML interactive notebook or in a job run. 

Traditionally, Azure ML integrates with Spark Synapse or external compute services via a pipeline step or better via magic command like %synapse, but the computing context is separate from your AML logic so you still need to run Spark in a separate step and persist the output to some storage and load it in your AML script.

With this approach, Spark is available right within your AML code whether it's AML notebook, python script or pipeline step. It shares the common computing context and most of the cases you can just directly convert the Spark Dataframe to Pandas and Dask Dataframe without persisting first to an intermediary storage.

Here are examples of how Spark inside AML can benefit users:

1. Easy Spark setting up with credentials to access Azure Storage

 

from ray_on_aml.core import Ray_On_AML ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="d12-v2-ssh", maxnode=5) ray = ray_on_aml.getRay() storage_account_name ="adlsdatalakegen6" storage_account_key=ws.get_default_keyvault().get_secret("adlsdatalakegen6") additional_spark_configs ={f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net":f"{storage_account_key}"} spark = ray_on_aml.getSpark(executor_cores =3,num_executors =3 ,executor_memory='10GB', additional_spark_configs=additional_spark_configs) #Number of nodes (including head node) can be set as number of executor.

 

2. Fast loading high volume data from delta lake in ADLS or Synapse SQL Pool

 

#Access delta lake data adls_data = spark.read.format("delta").load("abfss://mltraining@adlsdatalakegen6.dfs.core.windows.net/ISDWeatherDelta") server_name = "jdbc:sqlserver://sy2qwhqqkv7eacsws1.sql.azuresynapse.net:1433" database_name = "sy2qwhqqkv7eacsws1p1" url = server_name + ";" + "databaseName=" + database_name + ";" table_name = "ISDWeatherDelta" username = "azureuser" password = "" jdbcDF = spark.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", table_name) \ .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \ .option("user", username) \ .option("password", password).load() jdbcDF.show()

 

3. Using Koalas (pyspark Pandas) API for EDA and data transformation on big dataset.

Data scientists loves pandas but not everyone is comfortable with Pyspark so Koalas (Pandas on pyspark) is the answer. You can easily load dataset that is way bigger than the memory of a machine and use Pandas like APIs to manipulate and visualize big dataset.

 

#Reading 73M rows of data into a Pandas like dataframe from pyspark.pandas import read_delta koalas_df= read_delta("abfss://mltraining@adlsdatalakegen6.dfs.core.windows.net/ISDWeatherDelta") koalas_df.describe()

 

JamesN_0-1643764619079.png

and plotting with big dataset with plotly backend

JamesN_1-1643764868370.png

4. Loading big dataset, transform with Spark then train XGBoost within a single ML step.

(credit: The below example is taken from raydp project)

 

data = spark.read.format("csv").option("header", "true") \ .option("inferSchema", "true") \ .load(NYC_TRAIN_CSV) # Set spark timezone for processing datetime spark.conf.set("spark.sql.session.timeZone", "UTC") # Transform the dataset data = nyc_taxi_preprocess(data) # Split data into train_dataset and test_dataset train_df, test_df = random_split(data, [0.9, 0.1], 0) # Convert spark dataframe into ray dataset train_dataset = ray.data.from_spark(train_df) test_dataset = ray.data.from_spark(test_df) # Then convert them into DMatrix used by xgboost dtrain = RayDMatrix(train_dataset, label='fare_amount') dtest = RayDMatrix(test_dataset, label='fare_amount') # Configure the XGBoost model config = { "tree_method": "hist", "eval_metric": ["logloss", "error"], } evals_result = {} # Train the model bst = train( config, dtrain, evals=[(dtest, "eval")], evals_result=evals_result, ray_params=RayParams(max_actor_restarts=2, num_actors=2, cpus_per_actor=2), num_boost_round=10) # print evaluation stats print("Final validation error: {:.4f}".format( evals_result["eval"]["error"][-1]))

 

This capability is based on the work from raydp which develops a framework to run Spark on ray

For installation, this capability is part of ray-on-aml library, so you just need to upgrade the library to the latest version to use the Spark capability.

4. Use in a remote AML job

You can easily turn your remote AML cluster into Spark in the same way you would do with ray in ray-on-aml.

In your python script, do something like this

if __name__ == "__main__": run = Run.get_context() ws = run.experiment.workspace ray_on_aml =Ray_On_AML() ray = ray_on_aml.getRay() if ray: #in the headnode spark = ray_on_aml.getSpark(executor_cores =3,num_executors =3 ,executor_memory='10GB')

Checkout Spark example notebook from ray-on-aml repo to learn more!

 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.