Py学习  »  Python

Httpx, 一个神奇的 Python 库-异步客户端

程序员老朱 • 5 月前 • 399 次点击  

HTTPX 是一个功能强大的 Python HTTP 客户端库,提供了同步和异步两种操作模式。本教程将深入介绍异步客户端的使用方法、优势以及在实际项目中的应用场景。

一、什么是异步客户端?

HTTPX 默认提供标准的同步 API,但同时支持异步客户端操作。异步客户端使用 Python 的 async/await 语法,基于事件循环实现非阻塞 I/O 操作。

二、异步编程的优势

异步编程相比传统的多线程模型具有以下显著优势:

  • 「更高的并发性能」:异步模型避免了线程切换的开销,单线程即可处理大量并发连接
  • 「更低的内存消耗」:无需为每个连接创建单独的线程栈
  • 「更简单的并发控制」:避免多线程编程中的竞态条件和锁管理问题
  • 「原生支持长连接」:特别适合 WebSocketSSE(服务器发送事件)等长寿命连接场景

三、适用场景

异步客户端特别适合以下场景:

  1. 「异步 Web 框架集成」:如 FastAPIStarletteSanicQuart 等
  2. 「高并发爬虫和 API 调用」:需要同时处理成百上千个请求
  3. 「实时应用」:如聊天应用、实时通知系统
  4. 「微服务架构」:服务间的异步通信
  5. 「流式数据处理」:处理大文件上传/下载或实时数据流

四、基础使用

4.1. 发起异步请求

要发起异步请求,您需要一个 AsyncClient




    
>>> async with httpx.AsyncClient() as client:
...     r = await client.get('http://httpbin.org/get')
...
>>> r
200 OK]>

为了充分利用连接池的优势,请确保不要实例化多个客户端实例

# ❌ 错误做法:每次循环都创建新的 Client
for i in range(10):
    with httpx.AsyncClient() as client:  # 每次循环都创建新连接

使用一个在需要处传递的作用域内客户端,或者使用一个全局客户端实例。

client = httpx.AsyncClient()
...
await client.aclose()

下面是显示使用AsyncClient示例:

FastAPI中,很多时候会调用外部API资源,为了不阻塞业务的正常请求,我们使用了AsyncClient请求资源。我们可以在

application的 state容器内创建一个httpx,在FastAPI应用程序关闭时,释放并关闭httpx.AsyncClient

@app.on_event("startup")
asyncdef startup():
    app.state.httpx = httpx.AsyncClient()


@app.on_event("shutdown")
asyncdef shutdown():
    await app.state.httpx.aclose()


@app.get("/proxy")
asyncdef proxy(url: str, request: Request):
    asyncwith request.app.state.httpx as client:
        r = await client.get(url)
        return r.json()

4.2. 异步请求方法

所有请求方法都是异步的,因此对于以下所有方法,您都应该使用 response = await client.get(...) 这种形式:

方法
描述
示例
get()
GET 请求
await client.get(url)
post()
POST 请求
await client.post(url, json=data)
put()
PUT 请求
await client.put(url, data=data)
patch()
PATCH 请求
await client.patch(url, data=data)
delete()
DELETE 请求
await client.delete(url)
head()
HEAD 请求
await client.head(url)
options()
OPTIONS 请求
await client.options(url)
request()
通用请求
await client.request(method, url, ...)
send()
发送预构建请求
await client.send(request)

4.2. 共享配置

同时针对请求的超时、限时、请求头、基础url、认证、代理、事件钩子等配置项目可以配置AsyncClient中。在代码块内部

的各种请求方法共享使用。




    
 # 超时设置
 timeout = httpx.Timeout(connect=5.0, read=30.0, write=10.0, pool=1.0)
 # 连接限制
 limits = httpx.Limits(
 max_connections=100,
 max_keepalive_connections=20,
 keepalive_expiry=5.0
 )
 # 默认头部
 headers = {
 "User-Agent": "MyApp/1.0",
 "Accept": "application/json",
 }

#
 基础URL
base_url = "http://httpbin.org/get"
# 认证
auth = ("username", "password")
# 代理
proxies = "http://proxy.example.com:8080"

#
 钩子函数示例
async def log_request(request):
    print(f"请求: {request.method} {request.url}")


async def log_response(response):
    print(f"响应: {response.status_code}")


async def check_status(response):
    if response.status_code >= 400:
        print(f"请求失败: {response.status_code}")
        
# 事件钩子
event_hooks = {
"request": [log_request],
"response": [log_response, check_status],
}

async with httpx.AsyncClient(
        timeout=timeout,
        limits=limits,
        headers=headers,
        base_url=base_url,
        auth=auth,
        proxy=proxies,
        event_hooks=event_hooks) as client:
   ...

五、流式响应

「流式响应」(Streaming Response)是一种数据逐步传输的技术模式,服务器在生成完整数据之前就开始向客户端逐步发送部分数据,让用户能够边接收边处理,而不是等待所有数据就绪后才一次性返回。

「核心特点:」

  1. 「逐步传输」数据被分成多个“块”(chunks),像水流一样持续传输,直到全部发送完成。

  2. 「实时性」用户无需等待全部数据生成即可看到部分结果,适合实时性要求高的场景。

  3. 「低延迟感知」即使整体处理耗时较长,用户也能快速看到初步反馈,提升体验。

  4. 「高效处理大数据」避免大文件或长内容一次性加载导致内存压力,支持边传输边处理。

5.1 基础示例

通过client.stream获取远程url,并通过response.aiter_bytes读取部分字节资源。

async def stream_response_basic():
    async with httpx.AsyncClient()  as client:
        async with client.stream('GET''https://api.example.com/large-data'as response:
            # 检查响应状态
            print(f"状态码: {response.status_code}")
            print(f"响应头: {response.headers}")
            
            # 按字节流式处理响应
            async for chunk in response.aiter_bytes(chunk_size=1024):
                process_chunk(chunk)  # 自定义处理函数
                

5.2 流式响应方法详解

方法
描述
适用场景
aiter_bytes()
按字节迭代
二进制文件下载
aiter_text()
按文本迭代
文本文件、JSON 流
aiter_lines()
按行迭代
日志文件、CSV 数据
aiter_raw()
原始字节流
自定义协议处理
aread()
读取指定大小
分块读取
aclose()
关闭响应
手动管理时使用

当上下文块使用不切实际时,可以通过使用 client.send(..., stream=True) 发送一个 [Request 实例] 来进入"手动模式"。

import httpx
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse

client = httpx.AsyncClient()

async def home(request):
    req = client.build_request("GET""https://www.example.com/")
    r = await client.send(req, stream=True )
    return StreamingResponse(r.aiter_text(), background=BackgroundTask(r.aclose))

当使用这种"手动流式模式"时,开发者有责任确保最终调用 Response.aclose()。如果未能这样做会导致连接保持打开状态,很可能最终导致资源泄漏。

六、流式请求

「流式请求」(Streaming Request)是一种允许客户端或服务器「边生成边发送数据」的通信方式,与流式响应相对应,常用于大文件上传、实时数据推送等场景。

当使用 AsyncClient 实例发送流式请求体时,应该使用异步字节生成器而非普通字节生成器:

async def upload_bytes():
    ...  # 生成字节内容

await client.post(url, content=upload_bytes())

其实Httpx的操作函数和流式响应相同,基本基于非GET请求方法。比如下面代码上传大文件,边获取边上传。

file_path = ""
asyncdef file_generator():
    """异步文件生成器"""
    with open(file_path, 'rb'as f:
        while chunk := f.read(8192):
            yield chunk
            await asyncio.sleep(0)  # 让出控制权,避免阻塞
    
asyncwith httpx.AsyncClient() as client:
    response = await client.post(
        "https://api.example.com/upload",
        content=file_generator(),  # 使用生成器
        headers={"Content-Type""application/octet-stream"}
    )
    return response.json()

七、显示传输

HTTPX 允许显式创建传输层实例,这提供了对底层 HTTP 连接行为的细粒度控制。当直接实例化传输对象时,需要使用 httpx.AsyncHTTPTransport

7.1 什么是传输层?

传输层(Transport)是 HTTPX 底层的网络通信组件,负责:

  • 管理 HTTP 连接池
  • 处理连接复用
  • 实现超时和重试机制
  • 管理 SSL/TLS 连接
  • 处理代理和代理认证

7.2 为什么需要显式传输实例?

显式创建传输实例的优势:

  1. 「细粒度控制」:可以精确配置连接行为
  2. 「连接池复用」:多个客户端共享同一个连接池
  3. 「性能调优」:根据应用需求优化网络参数
  4. 「特殊协议支持」:自定义协议处理
  5. 「监控和调试」:更容易监控网络层行为

7.3 示例代码

分离了传输层和客户端请求,关闭连接仅关闭传输层即可。

transport = httpx.AsyncHTTPTransport(
        retries=3,  # 重试次数
        limits=httpx.Limits(
            max_connections=100,      # 最大连接数
            max_keepalive_connections=50,  # 最大保持活跃的连接数
            keepalive_expiry=5.0      # 连接保持时间(秒)
        )
    )

try:
        # 创建多个使用相同传输的客户端
        client1 = httpx.AsyncClient(transport=transport)
        client2 = httpx.AsyncClient(transport=transport)
        
        # 使用客户端
        response1 = await client1.get("https://httpbin.org/uuid")
        response2 = await client2.get("https://httpbin.org/ip")
        
        print(f"Client 1 响应: {response1.json()}")
        print(f"Client 2 响应: {response2.json()}")
        
        # 分别关闭客户端
        await client1.aclose()
        await client2.aclose()
        
    finally:
        # 手动关闭传输
        await transport.aclose()

而且可以通过transport.get_connection_stats()查看传输层统计信息。

  


    
# 获取传输统计信息
    stats = transport.get_connection_stats()
    print("传输统计信息:")
    print(f"  活跃连接数: {stats['active_connections']}")
    print(f"  空闲连接数: {stats['idle_connections']}")
    print(f"  总连接数: {stats['total_connections']}")
    print(f"  请求数: {stats['requests']}")
    print(f"  活跃连接数: {stats['active_connections']}")
    print(f"  空闲连接数: {stats['idle_connections']}")

HTTPX 异步客户端基于 Python 的 async/await 语法,利用事件循环实现高效的非阻塞 I/O 操作。相比传统多线程,它具有更高的并发性能、更低的内存消耗、更简单的并发控制,并原生支持 WebSocket 等长连接场景,非常适合集成异步 Web 框架、构建高并发爬虫、开发实时应用和处理流式数据。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/190984