Como desenvolvedores e cientistas de dta, frequentemente nos vemos precisando interagir com esses modelos poderosos por meio de APIs. No entanto, à medida que nossos aplicativos crescem em complexidade e escala, a necessidade de interações de API eficientes e de alto desempenho se torna essential. É aqui que a programação assíncrona brilha, permitindo-nos maximizar o rendimento e minimizar a latência ao trabalhar com APIs LLM.
Neste guia abrangente, exploraremos o mundo das chamadas de API LLM assíncronas em Python. Abordaremos tudo, desde os conceitos básicos de programação assíncrona até técnicas avançadas para lidar com fluxos de trabalho complexos. Ao ultimate deste artigo, você terá uma sólida compreensão de como aproveitar a programação assíncrona para turbinar seus aplicativos com tecnologia LLM.
Antes de nos aprofundarmos nos detalhes das chamadas de API LLM assíncronas, vamos estabelecer uma base sólida em conceitos de programação assíncrona.
A programação assíncrona permite que várias operações sejam executadas simultaneamente sem bloquear o thread principal de execução. Em Python, isso é obtido principalmente por meio do assíncrono módulo, que fornece uma estrutura para escrever código simultâneo usando corrotinas, loops de eventos e futuros.
Conceitos-chave:
- Corrotinas: Funções definidas com definição assíncrona que pode ser pausado e retomado.
- Loop de eventos: O mecanismo de execução central que gerencia e executa tarefas assíncronas.
- Aguardáveis: Objetos que podem ser usados com a palavra-chave await (coroutines, duties, futures).
Aqui está um exemplo simples para ilustrar esses conceitos:
import asyncio async def greet(identify): await asyncio.sleep(1) # Simulate an I/O operation print(f"Howdy, {identify}!") async def predominant(): await asyncio.collect( greet("Alice"), greet("Bob"), greet("Charlie") ) asyncio.run(predominant())
Neste exemplo, definimos uma função assíncrona greet
que simula uma operação de E/S com asyncio.sleep()
. O predominant
função usa asyncio.collect()
para executar várias saudações simultaneamente. Apesar do atraso de suspensão, todas as três saudações serão impressas após aproximadamente 1 segundo, demonstrando o poder da execução assíncrona.
A necessidade de assíncrono em chamadas de API LLM
Ao trabalhar com APIs LLM, frequentemente encontramos cenários em que precisamos fazer várias chamadas de API, seja em sequência ou em paralelo. O código síncrono tradicional pode levar a gargalos de desempenho significativos, especialmente ao lidar com operações de alta latência, como solicitações de rede para serviços LLM.
Considere um cenário em que precisamos gerar resumos para 100 artigos diferentes usando uma API LLM. Com uma abordagem síncrona, cada chamada de API seria bloqueada até receber uma resposta, potencialmente levando vários minutos para concluir todas as solicitações. Uma abordagem assíncrona, por outro lado, nos permite iniciar várias chamadas de API simultaneamente, reduzindo drasticamente o tempo geral de execução.
Configurando seu ambiente
Para começar com chamadas de API LLM assíncronas, você precisará configurar seu ambiente Python com as bibliotecas necessárias. Aqui está o que você precisará:
- Python 3.7 ou superior (para suporte nativo asyncio)
- aiohttp: Uma biblioteca cliente HTTP assíncrona
- aberto: O cliente oficial do OpenAI Python (se você estiver usando os modelos GPT do OpenAI)
- cadeia longa: Uma estrutura para construir aplicativos com LLMs (opcional, mas recomendado para fluxos de trabalho complexos)
Você pode instalar essas dependências usando pip:
pip set up aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
Chamadas básicas de API Async LLM com asyncio e aiohttp
Vamos começar fazendo uma chamada assíncrona simples para uma API LLM usando aiohttp. Usaremos a API GPT-3.5 da OpenAI como exemplo, mas os conceitos se aplicam a outras APIs LLM também.
import asyncio import aiohttp from openai import AsyncOpenAI async def generate_text(immediate, consumer): response = await consumer.chat.completions.create( mannequin="gpt-3.5-turbo", messages=({"position": "person", "content material": immediate}) ) return response.selections(0).message.content material async def predominant(): prompts = ( "Clarify quantum computing in easy phrases.", "Write a haiku about synthetic intelligence.", "Describe the method of photosynthesis." ) async with AsyncOpenAI() as consumer: duties = (generate_text(immediate, consumer) for immediate in prompts) outcomes = await asyncio.collect(*duties) for immediate, lead to zip(prompts, outcomes): print(f"Immediate: {immediate}nResponse: {outcome}n") asyncio.run(predominant())
Neste exemplo, definimos uma função assíncrona generate_text
que faz uma chamada para a API OpenAI usando o cliente AsyncOpenAI. O predominant
a função cria várias tarefas para diferentes prompts e usos asyncio.collect()
para executá-los simultaneamente.
Essa abordagem nos permite enviar várias solicitações para a API LLM simultaneamente, reduzindo significativamente o tempo whole necessário para processar todos os prompts.
Técnicas avançadas: Controle de lotes e simultaneidade
Embora o exemplo anterior demonstre os fundamentos das chamadas de API LLM assíncronas, aplicativos do mundo actual geralmente exigem abordagens mais sofisticadas. Vamos explorar duas técnicas importantes: solicitações em lote e controle de simultaneidade.
Solicitações em lote: ao lidar com um grande número de prompts, geralmente é mais eficiente agrupá-los em grupos em vez de enviar solicitações individuais para cada immediate. Isso reduz a sobrecarga de várias chamadas de API e pode levar a um melhor desempenho.
import asyncio from openai import AsyncOpenAI async def process_batch(batch, consumer): responses = await asyncio.collect(*( consumer.chat.completions.create( mannequin="gpt-3.5-turbo", messages=({"position": "person", "content material": immediate}) ) for immediate in batch )) return (response.selections(0).message.content material for response in responses) async def predominant(): prompts = (f"Inform me a reality about quantity {i}" for i in vary(100)) batch_size = 10 async with AsyncOpenAI() as consumer: outcomes = () for i in vary(0, len(prompts), batch_size): batch = prompts(i:i+batch_size) batch_results = await process_batch(batch, consumer) outcomes.lengthen(batch_results) for immediate, lead to zip(prompts, outcomes): print(f"Immediate: {immediate}nResponse: {outcome}n") asyncio.run(predominant())
Controle de simultaneidade: embora a programação assíncrona permita a execução simultânea, é importante controlar o nível de simultaneidade para evitar sobrecarregar o servidor da API ou exceder os limites de taxa. Podemos usar asyncio.Semaphore para esse propósito.
import asyncio from openai import AsyncOpenAI async def generate_text(immediate, consumer, semaphore): async with semaphore: response = await consumer.chat.completions.create( mannequin="gpt-3.5-turbo", messages=({"position": "person", "content material": immediate}) ) return response.selections(0).message.content material async def predominant(): prompts = (f"Inform me a reality about quantity {i}" for i in vary(100)) max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as consumer: duties = (generate_text(immediate, consumer, semaphore) for immediate in prompts) outcomes = await asyncio.collect(*duties) for immediate, lead to zip(prompts, outcomes): print(f"Immediate: {immediate}nResponse: {outcome}n") asyncio.run(predominant())
Neste exemplo, usamos um semáforo para limitar o número de solicitações simultâneas a 5, garantindo que não sobrecarregaremos o servidor da API.
Tratamento de erros e novas tentativas em chamadas LLM assíncronas
Ao trabalhar com APIs externas, é essential implementar mecanismos robustos de tratamento de erros e de repetição. Vamos aprimorar nosso código para lidar com erros comuns e implementar backoff exponencial para repetições.
import asyncio import random from openai import AsyncOpenAI from tenacity import retry, stop_after_attempt, wait_exponential class APIError(Exception): go @retry(cease=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def generate_text_with_retry(immediate, consumer): strive: response = await consumer.chat.completions.create( mannequin="gpt-3.5-turbo", messages=({"position": "person", "content material": immediate}) ) return response.selections(0).message.content material besides Exception as e: print(f"Error occurred: {e}") elevate APIError("Didn't generate textual content") async def process_prompt(immediate, consumer, semaphore): async with semaphore: strive: outcome = await generate_text_with_retry(immediate, consumer) return immediate, outcome besides APIError: return immediate, "Didn't generate response after a number of makes an attempt." async def predominant(): prompts = (f"Inform me a reality about quantity {i}" for i in vary(20)) max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as consumer: duties = (process_prompt(immediate, consumer, semaphore) for immediate in prompts) outcomes = await asyncio.collect(*duties) for immediate, lead to outcomes: print(f"Immediate: {immediate}nResponse: {outcome}n") asyncio.run(predominant())
Esta versão aprimorada inclui:
- Um costume
APIError
exceção para erros relacionados à API. - UM
generate_text_with_retry
função decorada com@retry
da biblioteca tenacity, implementando o backoff exponencial. - Tratamento de erros no
process_prompt
função para capturar e relatar falhas.
Otimizando o desempenho: respostas de streaming
Para geração de conteúdo de formato longo, as respostas de streaming podem melhorar significativamente o desempenho percebido do seu aplicativo. Em vez de esperar pela resposta inteira, você pode processar e exibir pedaços de texto conforme eles se tornam disponíveis.
import asyncio from openai import AsyncOpenAI async def stream_text(immediate, consumer): stream = await consumer.chat.completions.create( mannequin="gpt-3.5-turbo", messages=({"position": "person", "content material": immediate}), stream=True ) full_response = "" async for chunk in stream: if chunk.selections(0).delta.content material is just not None: content material = chunk.selections(0).delta.content material full_response += content material print(content material, finish='', flush=True) print("n") return full_response async def predominant(): immediate = "Write a brief story a couple of time-traveling scientist." async with AsyncOpenAI() as consumer: outcome = await stream_text(immediate, consumer) print(f"Full response:n{outcome}") asyncio.run(predominant())
Este exemplo demonstra como transmitir a resposta da API, imprimindo cada pedaço conforme ele chega. Essa abordagem é particularmente útil para aplicativos de bate-papo ou qualquer cenário em que você queira fornecer suggestions em tempo actual ao usuário.
Construindo fluxos de trabalho assíncronos com LangChain
Para aplicativos mais complexos alimentados por LLM, o framework LangChain fornece uma abstração de alto nível que simplifica o processo de encadeamento de múltiplas chamadas LLM e integração de outras ferramentas. Vamos dar uma olhada em um exemplo de uso do LangChain com capacidades assíncronas:
Este exemplo mostra como o LangChain pode ser usado para criar fluxos de trabalho mais complexos com streaming e execução assíncrona. O AsyncCallbackManager
e StreamingStdOutCallbackHandler
permitir streaming em tempo actual do conteúdo gerado.
import asyncio from langchain.llms import OpenAI from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.callbacks.supervisor import AsyncCallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler async def generate_story(matter): llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager((StreamingStdOutCallbackHandler()))) immediate = PromptTemplate( input_variables=("matter"), template="Write a brief story about {matter}." ) chain = LLMChain(llm=llm, immediate=immediate) return await chain.arun(matter=matter) async def predominant(): matters = ("a magical forest", "a futuristic metropolis", "an underwater civilization") duties = (generate_story(matter) for matter in matters) tales = await asyncio.collect(*duties) for matter, story in zip(matters, tales): print(f"nTopic: {matter}nStory: {story}n{'='*50}n") asyncio.run(predominant())
Atendendo aplicativos LLM assíncronos com FastAPI
Para tornar seu aplicativo LLM assíncrono disponível como um serviço internet, o FastAPI é uma ótima escolha devido ao seu suporte nativo para operações assíncronas. Aqui está um exemplo de como criar um endpoint de API simples para geração de texto:
from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from openai import AsyncOpenAI app = FastAPI() consumer = AsyncOpenAI() class GenerationRequest(BaseModel): immediate: str class GenerationResponse(BaseModel): generated_text: str @app.submit("/generate", response_model=GenerationResponse) async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks): response = await consumer.chat.completions.create( mannequin="gpt-3.5-turbo", messages=({"position": "person", "content material": request.immediate}) ) generated_text = response.selections(0).message.content material # Simulate some post-processing within the background background_tasks.add_task(log_generation, request.immediate, generated_text) return GenerationResponse(generated_text=generated_text) async def log_generation(immediate: str, generated_text: str): # Simulate logging or extra processing await asyncio.sleep(2) print(f"Logged: Immediate '{immediate}' generated textual content of size {len(generated_text)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
Este aplicativo FastAPI cria um ponto de extremidade /generate
que aceita um immediate e retorna texto gerado. Ele também demonstra como usar tarefas em segundo plano para processamento adicional sem bloquear a resposta.
Melhores práticas e armadilhas comuns
Ao trabalhar com APIs LLM assíncronas, tenha estas práticas recomendadas em mente:
- Usar pool de conexão: Ao fazer várias solicitações, reutilize conexões para reduzir a sobrecarga.
- Implementar tratamento de erros adequado: Sempre leve em consideração problemas de rede, erros de API e respostas inesperadas.
- Respeite os limites de taxa: Use semáforos ou outros mecanismos de controle de simultaneidade para evitar sobrecarregar a API.
- Monitorar e registrar: Implemente registro abrangente para monitorar o desempenho e identificar problemas.
- Use streaming para conteúdo longo: Melhora a experiência do usuário e permite o processamento antecipado de resultados parciais.