How To: APIM Asynch to Synch Pattern

This post has been republished via RSS; it originally appeared at: Microsoft Developer Community Blog articles.

In this How To post, I will use APIM to turn an asynchronous messaging into a synchronous messaging by publishing a message to Azure Service Bus and retrieving the response using Azure Blob Storage.

 

The following illustrates the pattern:

JeffreyChilberto_0-1702000805564.png

Just because you can do something, does not mean you should...

Most developers I know would solve this by introducing a Function or Logic App to handle the async to sync messaging. I did find some good blog posts on using a Function App or Logic App, but I wanted to discover if this was possible and how well it worked.

 

Even ChatGpt thought I was doomed for failure:

JeffreyChilberto_1-1702001044729.png

...

JeffreyChilberto_2-1702001075830.png

Again, this is working, but I would hesitate to push to production until you have confidence in the implemention.

Setup

For those of you who want to follow along, you will need to provision Azure API Management, Azure Storage, Azure Service Bus, and a local or Azure-based ASP.Net Core application.

Azure API Management

Creating a new instance of Azure APIM takes time to provision but is straight forward: https://learn.microsoft.com/en-us/azure/api-management/get-started-create-service-instance. I created a developer instance.

 

Once APIM is available, create a new HTTP API. In my example, I am only setting the base url for the Service Bus so my Frontend URL is structured to post to a queue called "request":

JeffreyChilberto_0-1702001653460.png

I will cover the policy after we cover the other services.

Azure Storage

I kept things simple and created a publicly accessible blob container named "reply". I do recommend using service identity whenever possible so would recommend this. Have a look at How To: Send Requests to Azure Storage From API Management for an example of how to achieve this.

Azure Service Bus

For my purposes, I created a standard Azure Service Bus and kept all the default settings. I then created a queue called "request".

ASP.Net Core

I ran a local ASP.Net Core 7 application for my testing. In the end, it became a more sophistiated echo where it uploaded to Azure Blob Storage the content it received from the published message. I did learn some things though around how to handle dependency injection with the Azure.Messaging.ServiceBus package.

ServiceListener

I first created a class that would listen to messages published to the Service Bus named ServiceListener:

 

public class ServiceListener : BackgroundService, IDisposable { private readonly ServiceBusProcessor _processor; private readonly BlobServiceClient _blobClient; public ServiceListener(ServiceBusClient client, BlobServiceClient blobClient) { _blobClient = blobClient; var options = new ServiceBusProcessorOptions { AutoCompleteMessages = false, MaxConcurrentCalls = 2 }; _processor = client.CreateProcessor("request", options); _processor.ProcessMessageAsync += MessageHandler; _processor.ProcessErrorAsync += ErrorHandler; } }

 

 

The ServiceListener will receive ServiceBusProcessor and a BlobServiceClient in the constructor. The ServiceBusProcessor will be used to listen to messages posted to the request queue. This is down by registering to the ProcessMessageAsync event. The BlobServiceClient will be used to upload to the Azure Blog Storage.

The ServiceListener implements the BackgroundService as I want it to run for the life of application. This requires the ExecuteAsync() method to be implemented:

 

 

protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await _processor.StartProcessingAsync(); }

 

 

The ServiceListener also implements the IDisposable interface. This is so we politely stop the service when the application stops.

 

public async Task DisposeAsync() { await _processor.StopProcessingAsync(); await _processor.DisposeAsync(); }

 

 

MessageHandler() is used to upload the message content to blob storage. I simply used a naming convention based on the MessageId:

 

internal async Task MessageHandler(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); var filename = $"{args.Message.MessageId}.json"; BlobClient blobClient = _blobClient.GetBlobContainerClient("reply").GetBlobClient(filename); await blobClient.UploadAsync(BinaryData.FromString(body), overwrite: true); await args.CompleteMessageAsync(args.Message); }

 

 

It is also required to define a ProcessErrorAsync callback. 

 

Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine(args.ErrorSource); Console.WriteLine(args.FullyQualifiedNamespace); Console.WriteLine(args.EntityPath); Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; }

 

 

To set the ServiceListener so that it starts up correctly, I added the following to the Startup method:

 

services.AddAzureClients(clientsBuilder => { string connectionString = "Endpoint=sb://..."; clientsBuilder.AddServiceBusClient(connectionString) .ConfigureOptions(options => { options.RetryOptions.Delay = TimeSpan.FromMilliseconds(50); options.RetryOptions.MaxDelay = TimeSpan.FromSeconds(5); options.RetryOptions.MaxRetries = 3; }); clientsBuilder.AddBlobServiceClient("DefaultEndpointsProtocol=https;AccountName=...") .ConfigureOptions(options => { options.Retry.MaxRetries = 3; options.Retry.Delay = TimeSpan.FromSeconds(5); options.Retry.MaxDelay = TimeSpan.FromSeconds(10); options.Retry.NetworkTimeout = TimeSpan.FromSeconds(30); }); }); services.AddHostedService<ServiceListener>();

 

 

To use the extensions, you do need to bring in the Microsoft.Extensions.Azure package.

Azure APIM Policy

With the setup done, let's discuss the policy. I broke this up into two steps: inbound and outbound.

inbound

The inbound step will be used to submit the message to the service bus. I did use ServerlessNotes as a basis so give them a visit for some tips.

 

 

<inbound> <base /> <cache-lookup-value key="errorsas" variable-name="cachedSasToken" /> <set-variable name="RequestId" value="@(context.RequestId.ToString())" /> <choose> <when condition="@(context.Variables.GetValueOrDefault&lt;string>("cachedSasToken") == null)"> <cache-store-value key="errorsas" value="@{ string resourceUri = "https://*.servicebus.windows.net/request"; string keyName = "*"; string key = "*"; TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1); var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + 120); string stringToSign = System.Uri.EscapeDataString(resourceUri) + "\n" + expiry; HMACSHA256 hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key)); var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign))); var sasToken = String.Format("SharedAccessSignature sr={0}&amp;sig={1}&amp;se={2}&amp;skn={3}", System.Uri.EscapeDataString(resourceUri), System.Uri.EscapeDataString(signature), expiry, keyName); return sasToken; }" duration="10" /> <cache-lookup-value key="errorsas" variable-name="cachedSasToken" /> </when> </choose> <set-header name="Authorization" exists-action="override"> <value>@(context.Variables.GetValueOrDefault<string>("cachedSasToken"))</value> </set-header> <set-header name="Content-type" exists-action="override"> <value>application/json</value> </set-header> <set-header name="Ocp-Apim-Subscription-Key" exists-action="delete" /> <set-header name="BrokerProperties" exists-action="override"> <value>@{ string request = context.Variables.GetValueOrDefault<string>("RequestId"); return string.Format("{{\"MessageId\":\"{0}\"}}", request); }</value> </set-header> <set-backend-service base-url="https://*.servicebus.windows.net/" /> </inbound>

 

 

There is a lot to unpack here. For the token side, please see the ServerlessNotes site. I did find the behavior less than perfect where I had to submit multiple times for the cache to start working correctly.

What I do want to highlight is the RequestId of the APIM request becomes the MessageId of the Service Bus message. This value will be unique and will allow us to match up the reply with the request.

outbound

In the outbound step, we will retrieve the response from blob storage.

 

 

<outbound> <base /> <set-variable name="BlobFileName" value="@{ var requestId = context.RequestId; return $"https://*.blob.core.windows.net/reply/{requestId}.json"; }" /> <send-request mode="new" timeout="120" response-variable-name="blobdata" ignore-error="true"> <set-url>@((string)context.Variables["BlobFileName"])</set-url> <set-method>GET</set-method> <set-header name="x-ms-version" exists-action="override"> <value>2019-07-07</value> </set-header> <set-header name="x-ms-blob-type" exists-action="override"> <value>BlockBlob</value> </set-header> </send-request> <set-body>@(((IResponse)context.Variables["blobdata"]).Body.As<JObject>().ToString())</set-body> </outbound>

 

 

The outbound is fairly straightforward. We are using the RequestId to retrieve the reply from storage. This example uses a public location to keep things simple. I did set the timeout to 120 seconds but in my cases the handling of the message was subsecond so this was not required.

Conclusion

I performed my testing in the Azure Portal and Postman. In my case, I was just replying with the posted message so my response time was consistently under 200ms. Most responses were close to under 50ms. Note this was between Australia and New Zealand so I anticipate the performance to be better within the same region.

 

In short, this was interesting as I had not seen this done. I am sure others have, so if you have, please put a comment with your thoughts on the approach.

 

Cheers!

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.