Tech

Chamadas de API LLM assíncronas em Python: um guia abrangente

Como desenvolvedores e cientistas de dta, frequentemente nos vemos precisando interagir com esses modelos poderosos por meio de APIs. No entanto, à medida que nossos aplicativos crescem em complexidade e escala, a necessidade de interações de API eficientes e de alto desempenho se torna essential. É aqui que a programação assíncrona brilha, permitindo-nos maximizar o rendimento e minimizar a latência ao trabalhar com APIs LLM.

Neste guia abrangente, exploraremos o mundo das chamadas de API LLM assíncronas em Python. Abordaremos tudo, desde os conceitos básicos de programação assíncrona até técnicas avançadas para lidar com fluxos de trabalho complexos. Ao ultimate deste artigo, você terá uma sólida compreensão de como aproveitar a programação assíncrona para turbinar seus aplicativos com tecnologia LLM.

Antes de nos aprofundarmos nos detalhes das chamadas de API LLM assíncronas, vamos estabelecer uma base sólida em conceitos de programação assíncrona.

A programação assíncrona permite que várias operações sejam executadas simultaneamente sem bloquear o thread principal de execução. Em Python, isso é obtido principalmente por meio do assíncrono módulo, que fornece uma estrutura para escrever código simultâneo usando corrotinas, loops de eventos e futuros.

Conceitos-chave:

  • Corrotinas: Funções definidas com definição assíncrona que pode ser pausado e retomado.
  • Loop de eventos: O mecanismo de execução central que gerencia e executa tarefas assíncronas.
  • Aguardáveis: Objetos que podem ser usados ​​com a palavra-chave await (coroutines, duties, futures).

Aqui está um exemplo simples para ilustrar esses conceitos:

import asyncio
async def greet(identify):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Howdy, {identify}!")
async def predominant():
    await asyncio.collect(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )
asyncio.run(predominant())

Neste exemplo, definimos uma função assíncrona greet que simula uma operação de E/S com asyncio.sleep(). O predominant função usa asyncio.collect() para executar várias saudações simultaneamente. Apesar do atraso de suspensão, todas as três saudações serão impressas após aproximadamente 1 segundo, demonstrando o poder da execução assíncrona.

A necessidade de assíncrono em chamadas de API LLM

Ao trabalhar com APIs LLM, frequentemente encontramos cenários em que precisamos fazer várias chamadas de API, seja em sequência ou em paralelo. O código síncrono tradicional pode levar a gargalos de desempenho significativos, especialmente ao lidar com operações de alta latência, como solicitações de rede para serviços LLM.

Considere um cenário em que precisamos gerar resumos para 100 artigos diferentes usando uma API LLM. Com uma abordagem síncrona, cada chamada de API seria bloqueada até receber uma resposta, potencialmente levando vários minutos para concluir todas as solicitações. Uma abordagem assíncrona, por outro lado, nos permite iniciar várias chamadas de API simultaneamente, reduzindo drasticamente o tempo geral de execução.

Configurando seu ambiente

Para começar com chamadas de API LLM assíncronas, você precisará configurar seu ambiente Python com as bibliotecas necessárias. Aqui está o que você precisará:

  • Python 3.7 ou superior (para suporte nativo asyncio)
  • aiohttp: Uma biblioteca cliente HTTP assíncrona
  • aberto: O cliente oficial do OpenAI Python (se você estiver usando os modelos GPT do OpenAI)
  • cadeia longa: Uma estrutura para construir aplicativos com LLMs (opcional, mas recomendado para fluxos de trabalho complexos)

Você pode instalar essas dependências usando pip:

pip set up aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Chamadas básicas de API Async LLM com asyncio e aiohttp

Vamos começar fazendo uma chamada assíncrona simples para uma API LLM usando aiohttp. Usaremos a API GPT-3.5 da OpenAI como exemplo, mas os conceitos se aplicam a outras APIs LLM também.

import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(immediate, consumer):
    response = await consumer.chat.completions.create(
        mannequin="gpt-3.5-turbo",
        messages=({"position": "person", "content material": immediate})
    )
    return response.selections(0).message.content material
async def predominant():
    prompts = (
        "Clarify quantum computing in easy phrases.",
        "Write a haiku about synthetic intelligence.",
        "Describe the method of photosynthesis."
    )
    
    async with AsyncOpenAI() as consumer:
        duties = (generate_text(immediate, consumer) for immediate in prompts)
        outcomes = await asyncio.collect(*duties)
    
    for immediate, lead to zip(prompts, outcomes):
        print(f"Immediate: {immediate}nResponse: {outcome}n")
asyncio.run(predominant())

Neste exemplo, definimos uma função assíncrona generate_text que faz uma chamada para a API OpenAI usando o cliente AsyncOpenAI. O predominant a função cria várias tarefas para diferentes prompts e usos asyncio.collect() para executá-los simultaneamente.

Essa abordagem nos permite enviar várias solicitações para a API LLM simultaneamente, reduzindo significativamente o tempo whole necessário para processar todos os prompts.

Técnicas avançadas: Controle de lotes e simultaneidade

Embora o exemplo anterior demonstre os fundamentos das chamadas de API LLM assíncronas, aplicativos do mundo actual geralmente exigem abordagens mais sofisticadas. Vamos explorar duas técnicas importantes: solicitações em lote e controle de simultaneidade.

Solicitações em lote: ao lidar com um grande número de prompts, geralmente é mais eficiente agrupá-los em grupos em vez de enviar solicitações individuais para cada immediate. Isso reduz a sobrecarga de várias chamadas de API e pode levar a um melhor desempenho.

import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, consumer):
    responses = await asyncio.collect(*(
        consumer.chat.completions.create(
            mannequin="gpt-3.5-turbo",
            messages=({"position": "person", "content material": immediate})
        ) for immediate in batch
    ))
    return (response.selections(0).message.content material for response in responses)
async def predominant():
    prompts = (f"Inform me a reality about quantity {i}" for i in vary(100))
    batch_size = 10
    
    async with AsyncOpenAI() as consumer:
        outcomes = ()
        for i in vary(0, len(prompts), batch_size):
            batch = prompts(i:i+batch_size)
            batch_results = await process_batch(batch, consumer)
            outcomes.lengthen(batch_results)
    
    for immediate, lead to zip(prompts, outcomes):
        print(f"Immediate: {immediate}nResponse: {outcome}n")
asyncio.run(predominant())

Controle de simultaneidade: embora a programação assíncrona permita a execução simultânea, é importante controlar o nível de simultaneidade para evitar sobrecarregar o servidor da API ou exceder os limites de taxa. Podemos usar asyncio.Semaphore para esse propósito.

import asyncio
from openai import AsyncOpenAI
async def generate_text(immediate, consumer, semaphore):
    async with semaphore:
        response = await consumer.chat.completions.create(
            mannequin="gpt-3.5-turbo",
            messages=({"position": "person", "content material": immediate})
        )
        return response.selections(0).message.content material
async def predominant():
    prompts = (f"Inform me a reality about quantity {i}" for i in vary(100))
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as consumer:
        duties = (generate_text(immediate, consumer, semaphore) for immediate in prompts)
        outcomes = await asyncio.collect(*duties)
    
    for immediate, lead to zip(prompts, outcomes):
        print(f"Immediate: {immediate}nResponse: {outcome}n")
asyncio.run(predominant())

Neste exemplo, usamos um semáforo para limitar o número de solicitações simultâneas a 5, garantindo que não sobrecarregaremos o servidor da API.

Tratamento de erros e novas tentativas em chamadas LLM assíncronas

Ao trabalhar com APIs externas, é essential implementar mecanismos robustos de tratamento de erros e de repetição. Vamos aprimorar nosso código para lidar com erros comuns e implementar backoff exponencial para repetições.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class APIError(Exception):
    go
@retry(cease=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(immediate, consumer):
    strive:
        response = await consumer.chat.completions.create(
            mannequin="gpt-3.5-turbo",
            messages=({"position": "person", "content material": immediate})
        )
        return response.selections(0).message.content material
    besides Exception as e:
        print(f"Error occurred: {e}")
        elevate APIError("Didn't generate textual content")
async def process_prompt(immediate, consumer, semaphore):
    async with semaphore:
        strive:
            outcome = await generate_text_with_retry(immediate, consumer)
            return immediate, outcome
        besides APIError:
            return immediate, "Didn't generate response after a number of makes an attempt."
async def predominant():
    prompts = (f"Inform me a reality about quantity {i}" for i in vary(20))
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as consumer:
        duties = (process_prompt(immediate, consumer, semaphore) for immediate in prompts)
        outcomes = await asyncio.collect(*duties)
    
    for immediate, lead to outcomes:
        print(f"Immediate: {immediate}nResponse: {outcome}n")
asyncio.run(predominant())

Esta versão aprimorada inclui:

  • Um costume APIError exceção para erros relacionados à API.
  • UM generate_text_with_retry função decorada com @retry da biblioteca tenacity, implementando o backoff exponencial.
  • Tratamento de erros no process_prompt função para capturar e relatar falhas.

Otimizando o desempenho: respostas de streaming

Para geração de conteúdo de formato longo, as respostas de streaming podem melhorar significativamente o desempenho percebido do seu aplicativo. Em vez de esperar pela resposta inteira, você pode processar e exibir pedaços de texto conforme eles se tornam disponíveis.

import asyncio
from openai import AsyncOpenAI
async def stream_text(immediate, consumer):
    stream = await consumer.chat.completions.create(
        mannequin="gpt-3.5-turbo",
        messages=({"position": "person", "content material": immediate}),
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.selections(0).delta.content material is just not None:
            content material = chunk.selections(0).delta.content material
            full_response += content material
            print(content material, finish='', flush=True)
    
    print("n")
    return full_response
async def predominant():
    immediate = "Write a brief story a couple of time-traveling scientist."
    
    async with AsyncOpenAI() as consumer:
        outcome = await stream_text(immediate, consumer)
    
    print(f"Full response:n{outcome}")
asyncio.run(predominant())

Este exemplo demonstra como transmitir a resposta da API, imprimindo cada pedaço conforme ele chega. Essa abordagem é particularmente útil para aplicativos de bate-papo ou qualquer cenário em que você queira fornecer suggestions em tempo actual ao usuário.

Construindo fluxos de trabalho assíncronos com LangChain

Para aplicativos mais complexos alimentados por LLM, o framework LangChain fornece uma abstração de alto nível que simplifica o processo de encadeamento de múltiplas chamadas LLM e integração de outras ferramentas. Vamos dar uma olhada em um exemplo de uso do LangChain com capacidades assíncronas:

Este exemplo mostra como o LangChain pode ser usado para criar fluxos de trabalho mais complexos com streaming e execução assíncrona. O AsyncCallbackManager e StreamingStdOutCallbackHandler permitir streaming em tempo actual do conteúdo gerado.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.supervisor import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def generate_story(matter):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager((StreamingStdOutCallbackHandler())))
    immediate = PromptTemplate(
        input_variables=("matter"),
        template="Write a brief story about {matter}."
    )
    chain = LLMChain(llm=llm, immediate=immediate)
    return await chain.arun(matter=matter)
async def predominant():
    matters = ("a magical forest", "a futuristic metropolis", "an underwater civilization")
    duties = (generate_story(matter) for matter in matters)
    tales = await asyncio.collect(*duties)
    
    for matter, story in zip(matters, tales):
        print(f"nTopic: {matter}nStory: {story}n{'='*50}n")
asyncio.run(predominant())

Atendendo aplicativos LLM assíncronos com FastAPI

Para tornar seu aplicativo LLM assíncrono disponível como um serviço internet, o FastAPI é uma ótima escolha devido ao seu suporte nativo para operações assíncronas. Aqui está um exemplo de como criar um endpoint de API simples para geração de texto:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
consumer = AsyncOpenAI()
class GenerationRequest(BaseModel):
    immediate: str
class GenerationResponse(BaseModel):
    generated_text: str
@app.submit("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await consumer.chat.completions.create(
        mannequin="gpt-3.5-turbo",
        messages=({"position": "person", "content material": request.immediate})
    )
    generated_text = response.selections(0).message.content material
    
    # Simulate some post-processing within the background
    background_tasks.add_task(log_generation, request.immediate, generated_text)
    
    return GenerationResponse(generated_text=generated_text)
async def log_generation(immediate: str, generated_text: str):
    # Simulate logging or extra processing
    await asyncio.sleep(2)
    print(f"Logged: Immediate '{immediate}' generated textual content of size {len(generated_text)}")
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Este aplicativo FastAPI cria um ponto de extremidade /generate que aceita um immediate e retorna texto gerado. Ele também demonstra como usar tarefas em segundo plano para processamento adicional sem bloquear a resposta.

Melhores práticas e armadilhas comuns

Ao trabalhar com APIs LLM assíncronas, tenha estas práticas recomendadas em mente:

  1. Usar pool de conexão: Ao fazer várias solicitações, reutilize conexões para reduzir a sobrecarga.
  2. Implementar tratamento de erros adequado: Sempre leve em consideração problemas de rede, erros de API e respostas inesperadas.
  3. Respeite os limites de taxa: Use semáforos ou outros mecanismos de controle de simultaneidade para evitar sobrecarregar a API.
  4. Monitorar e registrar: Implemente registro abrangente para monitorar o desempenho e identificar problemas.
  5. Use streaming para conteúdo longo: Melhora a experiência do usuário e permite o processamento antecipado de resultados parciais.

Unite AI Mobile Newsletter 1

Artigos relacionados

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button