AI

Asynchronous LLM API Calls in Python: A Comprehensive Guide

As developers and dta scientists, we often need to interact with these powerful models through APIs. However, as our applications become more complex and larger, the need for efficient and performant API interactions becomes crucial. This is where asynchronous programming shines, allowing us to maximize throughput and minimize latency when working with LLM APIs.

In this comprehensive guide, we explore the world of asynchronous LLM API calls in Python. We cover everything from the basics of asynchronous programming to advanced techniques for handling complex workflows. By the end of this article, you will have a good understanding of how to use asynchronous programming to supercharge your LLM-powered applications.

Before we dive into the specifics of asynchronous LLM API calls, we’ll lay a solid foundation in asynchronous programming concepts.

Asynchronous programming allows multiple operations to be executed simultaneously without blocking the main thread of execution. In Python this is mainly achieved via the asynchronous module, which provides a framework for writing concurrent code using coroutines, event loops, and futures.

Key Concepts:

  • Coroutines: Functions defined with asynchronous final which can be paused and resumed.
  • Event loop: the central execution mechanism that manages and executes asynchronous tasks.
  • To wait: Objects that can be used with the await keyword (coroutines, tasks, future).

Here is a simple example to illustrate these concepts:

import asyncio
async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Hello, {name}!")
async def main():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )
asyncio.run(main())

In this example we define an asynchronous function greet that simulates an I/O operation asyncio.sleep(). The main function used asyncio.gather() to perform multiple greetings at the same time. Despite the sleep delay, all three greetings are printed after about 1 second, demonstrating the power of asynchronous execution.

The need for async in LLM API calls

When working with LLM APIs, we often encounter scenarios where we need to make multiple API calls, either sequentially or in parallel. Traditional synchronous code can lead to significant performance bottlenecks, especially when dealing with high-latency operations such as network requests to LLM services.

Consider a scenario where we need to generate summaries for 100 different articles using an LLM API. A synchronous approach would block each API call until a response is received, potentially taking several minutes to complete all requests. An asynchronous approach, on the other hand, allows us to initiate multiple API calls simultaneously, dramatically reducing the overall execution time.

Set up your environment

To get started with asynchronous LLM API calls, you’ll need to set up your Python environment with the necessary libraries. This is what you need:

  • Python 3.7 or higher (for native asyncio support)
  • aiohttp: An asynchronous HTTP client library
  • openai: The official OpenAI Python client (if you use OpenAI’s GPT models)
  • long chain: A framework for building applications with LLMs (optional, but recommended for complex workflows)
See also  Your Guide to eCommerce Growth with Affiliate Marketing

You can install these dependencies with pip:

pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Basic Async LLM API calls with asyncio and aiohttp

Let’s start with a simple asynchronous call to an LLM API using aiohttp. We’ll use OpenAI’s GPT-3.5 API as an example, but the concepts apply to other LLM APIs as well.

import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(prompt, client):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content
async def main():
    prompts = [
        "Explain quantum computing in simple terms.",
        "Write a haiku about artificial intelligence.",
        "Describe the process of photosynthesis."
    ]
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())

In this example we define an asynchronous function generate_text which makes a call to the OpenAI API using the AsyncOpenAI client. The main function creates multiple tasks for different prompts and uses asyncio.gather() to run them simultaneously.

This approach allows us to send multiple requests to the LLM API simultaneously, significantly reducing the total time required to process all prompts.

Advanced techniques: batch processing and concurrency control

While the previous example demonstrates the basics of asynchronous LLM API calls, real-world applications often require a more advanced approach. Let’s explore two important techniques: request batching and concurrency checking.

Process requests in batches: When dealing with a large number of prompts, it is often more efficient to group them into batches rather than sending separate requests for each prompt. This reduces the overhead of multiple API calls and can lead to better performance.

import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, client):
    responses = await asyncio.gather(*[
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        ) for prompt in batch
    ])
    return [response.choices[0].message.content for response in responses]
async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    batch_size = 10
    
    async with AsyncOpenAI() as client:
        results = []
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i+batch_size]
            batch_results = await process_batch(batch, client)
            results.extend(batch_results)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())

Concurrency Control: Although asynchronous programming allows for concurrent execution, it is important to control the level of concurrency to avoid overloading the API server or exceeding rate limits. For this we can use asyncio.Semaphore.

import asyncio
from openai import AsyncOpenAI
async def generate_text(prompt, client, semaphore):
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(100)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [generate_text(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())

In this example, we’ll use a semaphore to limit the number of concurrent requests to five, ensuring we don’t overload the API server.

See also  Real Estate License Reciprocity & Portability: A Complete Guide

Error handling and retries in asynchronous LLM calls

When working with external APIs, it is critical to implement robust error handling and retry mechanisms. Let’s improve our code to handle common errors and implement exponential deferral on retries.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class APIError(Exception):
    pass
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
    try:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"Error occurred: {e}")
        raise APIError("Failed to generate text")
async def process_prompt(prompt, client, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(prompt, client)
            return prompt, result
        except APIError:
            return prompt, "Failed to generate response after multiple attempts."
async def main():
    prompts = [f"Tell me a fact about number {i}" for i in range(20)]
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = [process_prompt(prompt, client, semaphore) for prompt in prompts]
        results = await asyncio.gather(*tasks)
    
    for prompt, result in results:
        print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())

This improved version includes:

  • A habit APIError exception for API related errors.
  • A generate_text_with_retry feature decorated with @retry from the persistence library, implementing exponential backoff.
  • Error handling in the process_prompt function to detect and report errors.

Optimize performance: stream comments

For long-form content generation, streaming responses can significantly improve the perceived performance of your application. Instead of waiting for the full response, you can process and display snippets of text as they become available.

import asyncio
from openai import AsyncOpenAI
async def stream_text(prompt, client):
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end='', flush=True)
    
    print("\n")
    return full_response
async def main():
    prompt = "Write a short story about a time-traveling scientist."
    
    async with AsyncOpenAI() as client:
        result = await stream_text(prompt, client)
    
    print(f"Full response:\n{result}")
asyncio.run(main())

This example shows how to stream the response from the API, printing each part as it arrives. This approach is especially useful for chat applications or any scenario where you want to provide real-time feedback to the user.

See also  How AI Researchers Won Nobel Prizes in Physics and Chemistry: Two Key Lessons for Future Scientific Discoveries

Build asynchronous workflows with LangChain

For more complex LLM-powered applications, the LangChain framework provides a high-level abstraction that simplifies the process of chaining multiple LLM calls together and integrating other tools. Let’s look at an example of using LangChain with asynchronous capabilities:

This example shows how LangChain can be used to create more complex workflows with streaming and asynchronous execution. The AsyncCallbackManager And StreamingStdOutCallbackHandler enable real-time streaming of the generated content.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager([StreamingStdOutCallbackHandler()]))
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="Write a short story about {topic}."
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    return await chain.arun(topic=topic)
async def main():
    topics = ["a magical forest", "a futuristic city", "an underwater civilization"]
    tasks = [generate_story(topic) for topic in topics]
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")
asyncio.run(main())

Deliver asynchronous LLM applications with FastAPI

To make your asynchronous LLM application available as a web service, FastAPI is an excellent choice due to its native support for asynchronous operations. Here’s an example of how to create a simple API endpoint for text generation:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
class GenerationRequest(BaseModel):
    prompt: str
class GenerationResponse(BaseModel):
    generated_text: str
@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": request.prompt}]
    )
    generated_text = response.choices[0].message.content
    
    # Simulate some post-processing in the background
    background_tasks.add_task(log_generation, request.prompt, generated_text)
    
    return GenerationResponse(generated_text=generated_text)
async def log_generation(prompt: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

This FastAPI application creates an endpoint /generate that accepts a prompt and returns generated text. It also shows how to use background tasks for additional processing without blocking the response.

Best practices and common pitfalls

Keep the following best practices in mind when working with asynchronous LLM APIs:

  1. Use connection pooling: When making multiple requests, reuse connections to reduce overhead.
  2. Implement good error handling: Always consider network issues, API errors, and unexpected responses.
  3. Respect the rate limits: Use semaphores or other concurrency control mechanisms to avoid overwhelming the API.
  4. Monitoring and logging: Implement comprehensive logging to track performance and identify issues.
  5. Use streaming for long-form content: It improves the user experience and allows early processing of partial results.

Source link

Related Articles

Back to top button