Turn AML compute cluster into Spark

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Tech Community.

Following the blog post on Turning AML compute into Ray and Dask , we've added a new exciting capability to run Spark within AML compute where Spark shares the same context with your ML code. The Spark version is 3.2.1 with support for Delta Lake and Synapse SQL read/write. This enables users of AML to perform powerful data transformation and even Spark ML within AML interactive notebook or in a job run. 
Traditionally, Azure ML integrates with Spark Synapse or external compute services via a pipeline step or better via magic command like %synapse, but the computing context is separate from your AML logic so you still need to run Spark in a separate step and persist the output to some storage and load it in your AML script.
With this approach, Spark is available right within your AML code whether it's AML notebook, python script or pipeline step. It shares the common computing context and most of the cases you can just directly convert the Spark Dataframe to Pandas and Dask Dataframe without persisting first to an intermediary storage.
Here are examples of how Spark inside AML can benefit users:
1. Easy Spark setting up with credentials to access Azure Storage
 from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="d12-v2-ssh", maxnode=5)
ray = ray_on_aml.getRay()
storage_account_name ="adlsdatalakegen6"
storage_account_key=ws.get_default_keyvault().get_secret("adlsdatalakegen6")
additional_spark_configs ={f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net":f"{storage_account_key}"}
spark = ray_on_aml.getSpark(executor_cores =3,num_executors =3 ,executor_memory='10GB', additional_spark_configs=additional_spark_configs)
#Number of nodes (including head node) can be set as number of executor.
 
2. Fast loading high volume data from delta lake in ADLS or Synapse SQL Pool
 #Access delta lake data
adls_data = spark.read.format("delta").load("abfss://mltraining@adlsdatalakegen6.dfs.core.windows.net/ISDWeatherDelta")server_name = "jdbc:sqlserver://sy2qwhqqkv7eacsws1.sql.azuresynapse.net:1433"
database_name = "sy2qwhqqkv7eacsws1p1"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "ISDWeatherDelta"
username = "azureuser"
password = ""
jdbcDF = spark.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", table_name) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("user", username) \
.option("password", password).load()

jdbcDF.show()
 
3. Using Koalas (pyspark Pandas) API for EDA and data transformation on big dataset.
Data scientists loves pandas but not everyone is comfortable with Pyspark so Koalas (Pandas on pyspark) is the answer. You can easily load dataset that is way bigger than the memory of a machine and use Pandas like APIs to manipulate and visualize big dataset.
 #Reading 73M rows of data into a Pandas like dataframe
from pyspark.pandas import read_delta
koalas_df= read_delta("abfss://mltraining@adlsdatalakegen6.dfs.core.windows.net/ISDWeatherDelta")
koalas_df.describe()
 

and plotting with big dataset with plotly backend

4. Loading big dataset, transform with Spark then train XGBoost within a single ML step.
(credit: The below example is taken from raydp project)
 data = spark.read.format("csv").option("header", "true") \
.option("inferSchema", "true") \
.load(NYC_TRAIN_CSV)
# Set spark timezone for processing datetime
spark.conf.set("spark.sql.session.timeZone", "UTC")
# Transform the dataset
data = nyc_taxi_preprocess(data)
# Split data into train_dataset and test_dataset
train_df, test_df = random_split(data, [0.9, 0.1], 0)
# Convert spark dataframe into ray dataset
train_dataset = ray.data.from_spark(train_df)
test_dataset = ray.data.from_spark(test_df)
# Then convert them into DMatrix used by xgboost
dtrain = RayDMatrix(train_dataset, label='fare_amount')
dtest = RayDMatrix(test_dataset, label='fare_amount')
# Configure the XGBoost model
config = {
"tree_method": "hist",
"eval_metric": ["logloss", "error"],
}
evals_result = {}
# Train the model
bst = train(
config,
dtrain,
evals=[(dtest, "eval")],
evals_result=evals_result,
ray_params=RayParams(max_actor_restarts=2, num_actors=2, cpus_per_actor=2),
num_boost_round=10)
# print evaluation stats
print("Final validation error: {:.4f}".format(
evals_result["eval"]["error"][-1]))
 
This capability is based on the work from raydp which develops a framework to run Spark on ray
For installation, this capability is part of ray-on-aml library, so you just need to upgrade the library to the latest version to use the Spark capability.
4. Use in a remote AML job
You can easily turn your remote AML cluster into Spark in the same way you would do with ray in ray-on-aml.
In your python script, do something like thisif __name__ == "__main__":
run = Run.get_context()
ws = run.experiment.workspace
ray_on_aml =Ray_On_AML()
ray = ray_on_aml.getRay()
if ray: #in the headnode
spark = ray_on_aml.getSpark(executor_cores =3,num_executors =3 ,executor_memory='10GB')

Checkout Spark example notebook from ray-on-aml repo to learn more!
 

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.