Step-by-Step Guide: Building and Integrating Custom Package in ADF Workflow Orchestration Manager

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Community Hub.

Introduction

 

The Workflow Orchestration Manager in Azure Data Factory streamlines setting up and managing Apache Airflow environments, enhancing your ability to execute scalable data pipelines efficiently. Apache Airflow, a robust open-source platform, allows for the programming, scheduling, and monitoring of intricate workflows by organizing tasks into data pipelines. This capability is highly valued in data engineering and data science for its adaptability and user-friendliness.

 

In this guide, I will walk you through a demonstration where we extract insights from GitHub data using the GitHub public API, and run custom operators in a private package within the Workflow Orchestration Manager in Azure Data Factory.

 

Sally_Dabbah_0-1715979334151.png

 


Prerequisites
- Tools and Technologies Needed:

  • Azure data factory account 
  • knowledge in Apache Airflow 
  • knowledge in Python

- Initial Setup

  •  ADF: create workflow orchestration manager 
  • Airflow (Optional): In this blog, I'm primarily focusing on running custom operators in Airflow. However, if you want to trigger Azure Data Factory (ADF) pipelines directly from Airflow, you'll need to establish a connection within the Airflow UI. This setup enables the triggering of ADF pipelines from Airflow, for more details click here


Table of Contents:

 

 

  • Step 1: Designing Your Custom Package
    In this tutorial, I am utilizing the GitHub API and have written two Python operators: `GitHubAPIReaderOperator` and `CountLanguagesOperator`. These operators are designed to fetch data from GitHub repositories and count the programming languages used, respectively.

     

    from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults import requests import logging import re class GitHubAPIReaderOperator(BaseOperator): @apply_defaults def __init__(self, api_url, max_pages=20, token=None, *args, **kwargs): super(GitHubAPIReaderOperator, self).__init__(*args, **kwargs) self.api_url = api_url self.max_pages = max_pages self.token = token def execute(self, context): headers = {"Accept": "application/vnd.github.v3+json"} if self.token: headers["Authorization"] = f"Bearer {self.token}" session = requests.Session() session.headers.update(headers) next_url = self.api_url all_data = [] page_count = 0 while next_url and page_count < self.max_pages: response = session.get(next_url) response.raise_for_status() data = response.json() all_data.extend(data) next_url = self.get_next_link(response.headers.get('Link')) page_count += 1 return all_data def get_next_link(self, link_header): if link_header: links = link_header.split(',') next_link = [link for link in links if 'rel="next"' in link] if next_link: match = re.search(r'<(.*)>', next_link[0]) if match: return match.group(1) return None class CountLanguagesOperator(BaseOperator): @apply_defaults def __init__(self, api_url, token=None, *args, **kwargs): super(CountLanguagesOperator, self).__init__(*args, **kwargs) self.api_url = api_url self.token = token def execute(self, context): repos = context['task_instance'].xcom_pull(task_ids='fetch_github_data') headers = {"Accept": "application/vnd.github.v3+json"} if self.token: headers["Authorization"] = f"Bearer {self.token}" session = requests.Session() session.headers.update(headers) language_counts = {} for repo in repos: languages_url = repo.get('languages_url') if not languages_url: continue # Skip repos without a languages URL try: response = session.get(languages_url) response.raise_for_status() languages_data = response.json() for language in languages_data.keys(): if language in language_counts: language_counts[language] += 1 else: language_counts[language] = 1 except requests.exceptions.HTTPError as error: if error.response.status_code == 403: logging.warning(f"Skipping repository due to HTTP 403 Forbidden: {languages_url}") continue else: raise # Output the results for lang, count in language_counts.items(): logging.info(f"{lang} repositories count: {count}") return language_counts

     

    Please check API's documentation and limitations.

     

     

     

    Step 2: Create the Custom Package
     Follow steps below to create wheel package in Python

    • you have to have folder hierarchy:

       Sally_Dabbah_1-1715766614291.png

       

    •  in the setup file add the package folder name like so: 
      from setuptools import setup, find_packages setup( name="custom_operators", version="0.1.0", package_dir={"": "src"}, packages=find_packages(where="src"), install_requires=[ # List your dependencies here, e.g., 'numpy', 'pandas' ], classifiers=[ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], python_requires='>=3.6', ) ​
    • in CMD, run this command to create the wheel package: 
      pip install setuptools wheel​python setup.py sdist bdist_wheel

      This command will create a source distribution and a wheel for your package. The wheel file (.whl) will be stored in a newly created dist/ directory under custom_operators folder.

     

     

    Step 3: Building Airflow DAG 
    Now that we have build our custom operators and created the wheel package, now we need to create a dag that will trigger these operators.
    for that i created 2 tasks, fetch_github_data and count_languages. 
    each will call the operators above  

     

    from airflow import DAG from datetime import datetime, timedelta from custom_operators.github_operators import GitHubAPIReaderOperator,CountLanguagesOperator default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'github_language_analysis', default_args=default_args, description='Analyze GitHub repos for language usage', schedule_interval=timedelta(days=1), ) fetch_github_data = GitHubAPIReaderOperator( task_id='fetch_github_data', api_url='https://api.github.com/repositories', max_pages=10, token='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', # Replace with your actual token dag=dag ) count_languages = CountLanguagesOperator( task_id='count_languages', api_url='https://api.github.com', token='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', # Replace with your actual token dag=dag ) fetch_github_data >> count_languages

     

     

     

    Step 4: Run DAG in ADF Data orchestration manager 

    Now, we built our DAG and our custom package.
    in order to run it in ADF.

    1. Create managed Airflow instance in ADF following MS docs.

    2. in ADLS workspace, create the folder hierarchy as the following:

    Sally_Dabbah_0-1715841039724.png


    In the requirements file, include the path to the custom package stored in your ADLS storage account as follows:

    /opt/airflow/dags/custom_operators-0.1.0-py3-none-any.whl


    3. In the ADF workspace, click on "Import files." Navigate to your ADLS storage account, locate the "Airflow" folder, and check the "Import requirements" checkbox.

     

    Sally_Dabbah_1-1715841180307.png


    it will take a few minutes till ADF orchestration manager will update the code and the custom package. 

     

     


    Step 5: Logs and Monitoring 

    After importing the files, click on the "Monitor" button in the Data Orchestration Manager to view task execution and export Airflow logs. This will open the Airflow UI.

     

    DAG

     

    Sally_Dabbah_2-1715841314265.png

     

    Logs in count_languages task : 

     

    Sally_Dabbah_3-1715841361124.png

     

    P.S: For more dynamic work, you can save the languages count as a JSON file and store it in your storage account.

     

     

     

    Links:

    Install a Private package - Azure Data Factory | Microsoft Learn
    How does Workflow Orchestration Manager work? - Azure Data Factory | Microsoft Learn

    airflow.operators.python — Airflow Documentation (apache.org) 
    airflow.providers.microsoft.azure — apache-airflow-providers-microsoft-azure Documentation 

     

     

    Call to Action:
    - Make sure to establish all connections before starting to work on managed airflow.
    - check MS documentation on Workflow Orchestration Manager.
    - Please help us improve by sharing your valuable Workflow Orchestration Manager Preview feedback by emailing us at ManagedAirflow@microsoft.com
    - Follow me on LinkedIn: Sally Dabbah | LinkedIn

     

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.