Validate data using an Azure Function and Great Expectations

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Community Hub.

I recently got the question if it is possible to run Great Expectations validations in a Azure Function, and my first thought was "Yes, of course you can", so I dove in and got it working!

 

Great Expectations (Great Expectations Home Page • Great Expectations) is a popular Python-based OSS tool to validate data that comes into your data estate. And for me, validating incoming data is best done file by file, as the files arrive! On Azure there is no better platform for that then Azure Functions. I love the serverless nature of Functions and with the triggers available for arriving blobs, as well as HTTP, event grid events, queues and others. There are some great patterns that allow you to build event-driven data architectures. We also now have the Python v2 framework for Azure Functions available, which makes the developer experience even better. So, let's go through how to get it running.

 

Great Expectations

The basics of Great Expectations are to create a Expectation Suite. In other words, this specifies what the data should look like. This can be anything from which columns are present, to what values are in those columns, and even more complex checks like calculating the standard deviation of the column values and checking if those are in the right range. You can also create custom expectations, but at the time of writing, there are 309 expectations available out of the box, so have a look at those first (see Great Expectations Gallery of Expectations and Packages • Great Expectations)! To give you an idea, this is what a expectation suite definition looks like for a table row count between 8 and 12k and for the values of a particular column to be between 0 and 6 (shortened slightly):

 

 

{ "expectation_suite_name": "taxi", "expectations": [ { "expectation_type": "expect_table_row_count_to_be_between", "kwargs": { "max_value": 12000, "min_value": 8000 }, }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "passenger_count", "max_value": 6, "min_value": 0 } } ] }

 

 

Next, this Expectations Suite is referenced by a Checkpoint, which runs it against a batch of data to create a validation result. A checkpoint can also do other actions, and I will come back to that later. For instance, a checkpoint (in yaml) that uses the above expectations suite is this one:

 

 

name: taxi_checkpoint module_name: great_expectations.checkpoint run_name_template: '%Y%m%d-%H%M%S-$ENVIRONMENT-$FILE_NAME' validations: - batch_request: datasource_name: all_data data_connector_name: default_runtime_data_connector_name data_asset_name: all_data expectation_suite_name: taxi action_list: - name: store_validation_result action: class_name: StoreValidationResultAction - name: update_data_docs action: class_name: UpdateDataDocsAction

 

 

These are the basic entities that you need to get familiar with to use Great Expectations. Of course, there are many more advanced workflows available, but for that I refer you to their own documentation.

 

Azure Functions

Next, turning to Azure Functions, we need two pieces of information before we can do the validation. The first is the data. Great Expectations currently has two engines that it can use. One of those is Pandas (the other is Spark). And so what we will do in this case is use the in-memory pandas approach. This means loading the content of the trigger-file and reading the data, parsing it into a Pandas dataframe and passing that to the Great Expectations engine. This is what the function looks like. Be careful, as this is using the Python V2 model, so the syntax might be slightly different when using the V1 model:

 

 

@app.function_name(name="gx_validate_blob") @app.blob_trigger( arg_name="data", path="data/{name}", connection="DATA_STORAGE", ) @app.blob_output( arg_name="output", path="output/{name}.json", connection="DATA_STORAGE", ) async def gx_validate_blob(data: func.InputStream, output: func.Out[str]): """Process a file.""" assert data.name # set context context = get_context(data.name) # get checkpoint name checkpoint_name = get_checkpoint_from_filename(data.name) # get data data_frame = await get_data_frame(data.read(), file_format=FileFormat.CSV) # do the actual validation result, docs_url = run_checkpoint(context, data_frame, checkpoint_name) # setup outputs if result and docs_url: output.set(str(result.to_json_dict()))

 

 

You might notice I am not explicitly specifying the Expectations Suite and Checkpoint. While these can be created on the fly, in this case I chose to read those from a config folder that is mounted to the Functions environment. I also added a small function called `get_checkpoint_from_filename` that maps the filename to a Checkpoint name. There are many other approaches possible for doing this kind of mapping, such as using a queue trigger that has the info for the file, including the name of the checkpoint to use as well as creating the checkpoint and expectations suite on the fly in the code itself, but again I encourage you to find the pattern that works for you.

 

Data Docs

Finally, the keen-eyed viewer might have seen that in the checkpoint we do a action called "update_data_docs." This feature is very useful to give your users a portal with detailed results of the validations. Luckily, Great Expectations already has support for using a static website on Azure Storage in order to host those docs, and I could very easily make that work through my function as well.

 

This consists of two parts. The first is a additional function that I can call that does nothing more then build (or rebuild) the data docs based on the configuration:

 

 

@app.function_name(name="gx_build_docs") @app.route( trigger_arg_name="req", methods=[func.HttpMethod.GET], route="build_docs", ) async def gx_build_docs(req: func.HttpRequest) -> func.HttpResponse: """Rebuild docs.""" logging.info("(Re)building docs") context = get_context() context.build_data_docs() site = get_docs_site_urls(context) return func.HttpResponse(f"Docs (re)built: {site}")

 

 

Since it uses a environment variable for the connection string to the doc, this works out of the box in the Function environment by just adding that string to the app service configurations. This will create the data docs website and will actually upload all validations that are present in the uncommitted folder in the config file share there as well. The function is triggered by an HTTP request and responds with the message "Docs (re)build: " followed by the url of the data docs site.

 

Next, one of the additional actions in the Checkpoint I created is to update the data docs. After that is done, it allows you to load a deeplink to the page in the data docs that shows the result of the validation that was just completed. There was one complication here I needed to solve and that is that Great Expectations uses the `blob.core.windows.net\$web\index.html` address for the files it has just created instead of the `zXX.web.core.windows.net\index.html'. The former is actually not accessible if the $web container doesn't have public access enabled. So, we needed to rewrite that to the latter, for which I created a small function in the utils.py file.

 

 

URL = os.getenv("STATIC_WEB_URL_PREFIX") def rewrite_blob_url(url: str) -> str: """Rewrite blob web url.""" return url.replace("blob", URL).replace("$web/", "")

 

 

 

Outputs

In this sample, I write the output of the validation result to another blob storage location as a json, but I also created a simple function that output a dict with the filename, result and the deeplink to the data doc, like this:

 

 

def create_event_grid_event( data_name: str, result: CheckpointResult, docs_url: str ) -> dict[str, str]: """Create an event grid event from the validation result.""" return { "file": data_name, "success": result["success"], "url": docs_url, }

 

 

The output of this can then be pushed into a event grid or other messaging structure as an additional output (or instead of the json document output). You could even have two different event outputs, one for failed and one for successful validations, thereby letting the next steps in the process also start based on events, like a true event-driven architecture.

 

Conclusion

Great Expectations is a great tool (as the name implies :lol:). Running it serverless directly when new data arrives, and having tons of options to then do something with those files right as they come in, is a great pattern, and one that I think should be the standard for many cases! Using a Function to do this with also makes it very cost-effective, because it doesn't matter if you have peaks on some days with hundreds of files coming in, and on other days nothing, or if you just have a continuous, but not very high frequency, stream of files coming in, the validation happens regardless. And if the file fails the validation, you can immediately act, rather then waiting for a batch of files to be present, parse those and then take action (and potentially having to fix things in your downstream data pipeline as well as delaying delivery of your data products). The built-in data docs on Azure Storage static websites also complements this nicely because you can easily share this validation result to any user internal or external to communicate what went wrong as well.

 

The exact deployment details are available in the github repo, including a bicep deployment script that creates the storage for data files, storage for the config and the function itself!

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.