Library to turn Azure ML Compute into Ray and Dask cluster

This post has been republished via RSS; it originally appeared at: Microsoft Tech Community - Latest Blogs - .

Ray and Dask are two among the most popular frameworks to parallelize and scale Python computation. They are very helpful to speed up computing for data processing, hyperparameter tunning, reinforcement learning and model serving and many other scenarios.

For an Azure ML compute instance, we can easily install Ray and Dask to take advantage of parallel computing for all cores within the node. However, there is yet an easy way in Azure Machine Learning to extend this to a multi-node cluster when the computing and ML problem require the power of more than one node. One would need to setup a separate environment using VMs or K8s outside Azure ML to run multi-node Ray/Dask. This would mean losing all capabilities of Azure ML.

To address this gap, we have developed a library that can easily turn Azure ML compute instance and compute cluster into Ray and Dask cluster. The library does all the complex wirings and setup of a Ray cluster with Dask behind the scene while exposing a simple Ray context object for users perform parallel Python computing tasks. In addition, it is shipped with high performance APIs based on Pyarrow to access Azure storage and simple interface for user to install additional libraries.

The library also comes with support for both Interactive mode and job mode. Data scientist can perform fast interactive work with the cluster during exploratory phase then easily turn the code into the job mode with minimal change.

Checkout library repo at james-tn/ray-on-aml: Turning AML compute into Ray cluster (github.com) for details.

In this post, we'll walk through steps to setup and use the library

 

jamesnguyen_0-1640901046641.png

 

Installation of the library

  1. Prepare compute environment 

For Interactive use at your compute instance, create a compute cluster in the same vnet where your compute instance is

Check list

[ ] Azure Machine Learning Workspace

[ ] Virtual network/Subnet

[ ] Create Compute Instance in the Virtual Network

[ ] Create Compute Cluster in the same Virtual Network

Use azureml_py38 conda environment from (Jupyter) Notebook in Azure Machine Learning Studio.

   

    2.  Install library

 

pip install --upgrade ray-on-aml

 

Installing this library will also install ray[default]==1.9.1, pyarrow>= 5.0.0, dask[complete]==2021.12.0, adlfs==2021.10.0 and fsspec==2021.10.1

 

    3. Use cluster in interactive mode

       Run in interactive mode in compute instance's notebook. Notice the option ci_is_head to enable your current CI as head node.

 

from ray_on_aml.core import Ray_On_AML
ws = Workspace.from_config()
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster =NAME_OF_COMPUTE_CLUSTER, additional_pip_packages=['torch==1.10.0', 'torchvision', 'sklearn'], maxnode=4)
ray = ray_on_aml.getRay(ci_is_head=True)
# Note that by default, ci_is_head=False which means one of the nodes in the remote AML compute cluster is used as head node and the remaining are worker nodes. 
# But if you want to use your current compute instance as head node and all nodes in the remote compute cluster as workers 
#then simply specify ray = ray_on_aml.getRay(ci_is_head=True)

 

At this point, you have the ray client object where you can use to perform various parallel computing tasks using ray API.

There are two arguments to Ray_On_AML() object initilization with to specify base configuration for the library with following default values

 

base_conda_dep =['adlfs==2021.10.0','pip'], 
base_pip_dep = ['ray[tune]==1.9.1', 'xgboost_ray==0.1.5', 'dask==2021.12.0','pyarrow >= 5.0.0','fsspec==2021.10.1']

 

In case you need to add more libraries to the cluster, you can pass them to the two arguments in the list format 

 

ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="dask-vnet-ct", additional_pip_packages=['torch', 'torchvision'])

 

   Although it's possible, you should not change the default values of base_conda_dep  and base_pip_dep as it may break the package.     Only do so when you need to customize the cluster default configuration such as ray version.

 

    4. Use the cluster in job mode

 For use in an AML job, simply include ray_on_aml as a pip dependency then inside your script, do this to get ray

 

from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML()
ray = ray_on_aml.getRay()

if ray: #in the headnode
    #logic to use Ray for distributed ML training, tunning or distributed data transformation with Dask

else:
    print("in worker node")

 

Example scenarios

  1. Perform big data analysis with Dask on Ray

 

from adlfs import AzureBlobFileSystem

abfs = AzureBlobFileSystem(account_name="azureopendatastorage",  container_name="isdweatherdatacontainer")
data = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2012/"], filesystem=abfs)
data1 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2015/"], filesystem=abfs)
data2 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2010/"], filesystem=abfs)
data3 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2009/"], filesystem=abfs)
data4 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2011/"], filesystem=abfs)
data5 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2013/"], filesystem=abfs)
data6 = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2014/"], filesystem=abfs)
all_data =data.union(data1).union(data2).union(data3).union(data4).union(data5).union(data6)
print(all_data.count())
all_data_dask = data.to_dask().describe().compute()
print(all_data_dask)

 

        2. Distributed hypeparam tunning with ray.tune

 

 import sklearn.datasets
 import sklearn.metrics
 from sklearn.model_selection import train_test_split
 import xgboost as xgb

 from ray import tune


 def train_breast_cancer(config):
     # Load dataset
     data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
     # Split into train and test set
     train_x, test_x, train_y, test_y = train_test_split(
         data, labels, test_size=0.25)
     # Build input matrices for XGBoost
     train_set = xgb.DMatrix(train_x, label=train_y)
     test_set = xgb.DMatrix(test_x, label=test_y)
     # Train the classifier
     results = {}
     xgb.train(
         config,
         train_set,
         evals=[(test_set, "eval")],
         evals_result=results,
         verbose_eval=False)
     # Return prediction accuracy
     accuracy = 1. - results["eval"]["error"][-1]
     tune.report(mean_accuracy=accuracy, done=True)


 config = {
     "objective": "binary:logistic",
     "eval_metric": ["logloss", "error"],
     "max_depth": tune.randint(1, 9),
     "min_child_weight": tune.choice([1, 2, 3]),
     "subsample": tune.uniform(0.5, 1.0),
     "eta": tune.loguniform(1e-4, 1e-1)
 }
 analysis = tune.run(
     train_breast_cancer,
     resources_per_trial={"cpu": 1},
     config=config,
     num_samples=10)

 

       3. Distributed XGBoost 

 

from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

seed = 42

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=0.25, random_state=42
)

clf = RayXGBClassifier(
    n_jobs=4,  # In XGBoost-Ray, n_jobs sets the number of actors
    random_state=seed
)

# scikit-learn API will automatically conver the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored.

clf.fit(X_train, y_train)

pred_ray = clf.predict(X_test)
print(pred_ray.shape)

pred_proba_ray = clf.predict_proba(X_test)
print(pred_proba_ray.shape)

# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization

clf.fit(X_train, y_train, ray_params=RayParams(num_actors=4))

pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=4))
print(pred_ray.shape)

 

4. Use with in job mode with AML job

 

ws = Workspace.from_config()

compute_cluster = 'worker-cpu-v3'
maxnode =5
vm_size='STANDARD_DS3_V2'
vnet='rayvnet'
subnet='default'
exp ='ray_on_aml_job'
ws_detail = ws.get_details()
ws_rg = ws_detail['id'].split("/")[4]
vnet_rg=None
try:
    ray_cluster = ComputeTarget(workspace=ws, name=compute_cluster)

    print('Found existing cluster, use it.')
except ComputeTargetException:
    if vnet_rg is None:
        vnet_rg = ws_rg
    compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
                                                        min_nodes=0, max_nodes=maxnode,
                                                        vnet_resourcegroup_name=vnet_rg,
                                                        vnet_name=vnet,
                                                        subnet_name=subnet)
    ray_cluster = ComputeTarget.create(ws, compute_cluster, compute_config)

    ray_cluster.wait_for_completion(show_output=True)


rayEnv = Environment.from_conda_specification(name = "rayEnv",
                                             file_path = "../examples/conda_env.yml")

# rayEnv = Environment.get(ws, "rayEnv", version=19)


src=ScriptRunConfig(source_directory='../examples/job',
                script='aml_job.py',
                environment=rayEnv,
                compute_target=ray_cluster,
                distributed_job_config=PyTorchConfiguration(node_count=maxnode),
                    # arguments = ["--master_ip",master_ip]
                )
run = Experiment(ws, exp).submit(src)

 

This is the code inside aml_job.py with details omitted for brevity 

 


if __name__ == "__main__":
    run = Run.get_context()
    ws = run.experiment.workspace
    account_key = ws.get_default_keyvault().get_secret("adls7-account-key")
    ray_on_aml =Ray_On_AML()
    ray = ray_on_aml.getRay()

    if ray: #in the headnode
        print("head node detected")

        datasets.MNIST("~/data", train=True, download=True)

        analysis = tune.run(train_mnist, config=search_space)
        print(ray.cluster_resources())
        print("data count result", get_data_count(account_key))

    else:
        print("in worker node")

 

 

        See more examples at ray-on-aml/quick_use_cases.ipynb at master · james-tn/ray-on-aml (github.com)

        In partnership with Hyun Suk Shin.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.