Azure Functions: Support for HTTP Streams in Python is now in Preview!

This post has been republished via RSS; it originally appeared at: Microsoft Tech Community - Latest Blogs - .

HTTP streams lets you accept and return data from your HTTP endpoints using FastAPI request and response APIs enabled in your functions. These APIs lets the host process large data in HTTP messages as chunks instead of reading an entire message into memory.

 

This feature makes it possible to handle large data stream, OpenAI integrations, deliver dynamic content, and support other core HTTP scenarios requiring real-time interactions over HTTP. You can also use FastAPI response types with HTTP streams. Without HTTP streams, the size of your HTTP requests and responses are limited by memory restrictions that can be encountered when processing entire message payloads all in memory.

 

To get started, the following prerequisites are required:

 

 

Then, enable HTTP streaming in your Azure Function app. HTTP streams are disabled by default. You need to enable this feature in your application settings and also update your code to use the FastAPI package.

 

  1. Add the azurefunctions-extensions-http-fastapi extension package to the requirements.txt file in the project.

  2. Add the following code to the function_app.py file in the project, which imports the FastAPI extension:

    from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
  3. When deploying, add these application settings"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1" "PYTHON_ENABLE_INIT_INDEXING": "1"

    When running locally, you also need to add these same settings to the local.settings.json project file.

 

Following are a few example code snippets on using HTTP streams with Azure Functions in Python.

 

This example is an HTTP triggered function that streams HTTP response data. You might use these capabilities to support scenarios like sending event data through a pipeline for real time visualization or detecting anomalies in large sets of data and providing instant notifications.

 

import time
import azure.functions as func
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)


def generate_count():
    """Generate a stream of chronological numbers."""
    count = 0
    while True:
        yield f"counting, {count}\n\n"
        count += 1

@app.route(route="stream", methods=[func.HttpMethod.GET])
async def stream_count(req: Request) -> StreamingResponse:
    """Endpoint to stream of chronological numbers."""
    return StreamingResponse(generate_count(), media_type="text/event-stream")

 

This example is an HTTP triggered function that receives and processes streaming data from a client in real time. It demonstrates streaming upload capabilities that can be helpful for scenarios like processing continuous data streams and handling event data from IoT devices.

 

import azure.functions as func
from azurefunctions.extensions.http.fastapi import JSONResponse, Request

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)


@app.route(route="streaming_upload", methods=[func.HttpMethod.POST])
async def streaming_upload(req: Request) -> JSONResponse:
    """Handle streaming upload requests."""
    # Process each chunk of data as it arrives
    async for chunk in req.stream():
        process_data_chunk(chunk)

    # Once all data is received, return a JSON response indicating successful processing
    return JSONResponse({"status": "Data uploaded and processed successfully"})


def process_data_chunk(chunk: bytes):
    """Process each data chunk."""
    # Add custom processing logic here
    pass

 

Note, you must use an HTTP client library to make streaming calls to a function's FastAPI endpoints. The client tool or browser you're using might not natively support streaming or could only return the first chunk of data. You can use a client script like this to send streaming data to an HTTP endpoint.

 
This example is using Azure Open AI to stream a response to a prompt of listing the most populous U.S. cities. Using HTTP streaming will give progressive responses, processing the data as it is received.
import azure.functions as func
import openai
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
import os

# Azure Function App
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

endpoint = os.environ["AZURE_OPEN_AI_ENDPOINT"]
api_key = os.environ["AZURE_OPEN_AI_API_KEY"]

# Azure Open AI
deployment = os.environ["AZURE_OPEN_AI_DEPLOYMENT_MODEL"]
temperature = 0.7

client = openai.AsyncAzureOpenAI(
    azure_endpoint=endpoint,
    api_key=api_key,
    api_version="2023-09-01-preview"
)

# Get data from Azure Open AI
async def stream_processor(response):
    async for chunk in response:
        if len(chunk.choices) > 0:
            delta = chunk.choices[0].delta
            if delta.content: # Get remaining generated response if applicable
                await asyncio.sleep(0.1)
                yield delta.content


# HTTP streaming Azure Function
@app.route(route="stream-cities", methods=[func.HttpMethod.GET])
async def stream_openai_text(req: Request) -> StreamingResponse:
    prompt = "List the 100 most populous cities in the United States."
    azure_open_ai_response = await client.chat.completions.create(
        model=deployment,
        temperature=temperature,
        max_tokens=1000,
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    return StreamingResponse(stream_processor(azure_open_ai_response), media_type="text/event-stream")
 
Thanks for reading along! To learn more about HTTP streaming support in Azure Functions using Python, checkout the developer reference guide. For questions and comments, create an issue in our Azure Functions Python GitHub repository.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.