HTTPX 是一个功能强大的 Python HTTP 客户端库,提供了同步和异步两种操作模式。本教程将深入介绍异步客户端的使用方法、优势以及在实际项目中的应用场景。
一、什么是异步客户端?
HTTPX 默认提供标准的同步 API,但同时支持异步客户端操作。异步客户端使用 Python 的 async/await 语法,基于事件循环实现非阻塞 I/O 操作。
二、异步编程的优势
异步编程相比传统的多线程模型具有以下显著优势:
- 「更高的并发性能」:异步模型避免了线程切换的开销,单线程即可处理大量并发连接
- 「更低的内存消耗」:无需为每个连接创建单独的线程栈
- 「更简单的并发控制」:避免多线程编程中的竞态条件和锁管理问题
- 「原生支持长连接」:特别适合
WebSocket、SSE(服务器发送事件)等长寿命连接场景
三、适用场景
异步客户端特别适合以下场景:
- 「异步 Web 框架集成」:如
FastAPI、Starlette、Sanic、Quart 等 - 「高并发爬虫和 API 调用」:需要同时处理成百上千个请求
- 「流式数据处理」:处理大文件上传/下载或实时数据流
四、基础使用
4.1. 发起异步请求
要发起异步请求,您需要一个 AsyncClient。
>>> async with httpx.AsyncClient() as client:
... r = await client.get('http://httpbin.org/get')
...
>>> r
200 OK]>
❝为了充分利用连接池的优势,请确保不要实例化多个客户端实例
❞
# ❌ 错误做法:每次循环都创建新的 Client
for i in range(10):
with httpx.AsyncClient() as client: # 每次循环都创建新连接
使用一个在需要处传递的作用域内客户端,或者使用一个全局客户端实例。
client = httpx.AsyncClient()
...
await client.aclose()
下面是显示使用AsyncClient示例:
在FastAPI中,很多时候会调用外部API资源,为了不阻塞业务的正常请求,我们使用了AsyncClient请求资源。我们可以在
application的 state容器内创建一个httpx,在FastAPI应用程序关闭时,释放并关闭httpx.AsyncClient
@app.on_event("startup")
asyncdef startup():
app.state.httpx = httpx.AsyncClient()
@app.on_event("shutdown")
asyncdef shutdown():
await app.state.httpx.aclose()
@app.get("/proxy")
asyncdef proxy(url: str, request: Request):
asyncwith request.app.state.httpx as client:
r = await client.get(url)
return r.json()
4.2. 异步请求方法
所有请求方法都是异步的,因此对于以下所有方法,您都应该使用 response = await client.get(...) 这种形式:
| | |
|---|
get() | | await client.get(url) |
post() | | await client.post(url, json=data) |
put() | | await client.put(url, data=data) |
patch() | | await client.patch(url, data=data) |
delete() | | await client.delete(url) |
head() | | await client.head(url) |
options() | | await client.options(url) |
request() | | await client.request(method, url, ...) |
send() | | await client.send(request) |
4.2. 共享配置
同时针对请求的超时、限时、请求头、基础url、认证、代理、事件钩子等配置项目可以配置AsyncClient中。在代码块内部
的各种请求方法共享使用。
# 超时设置
timeout = httpx.Timeout(connect=5.0, read=30.0, write=10.0, pool=1.0)
# 连接限制
limits = httpx.Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=5.0
)
# 默认头部
headers = {
"User-Agent": "MyApp/1.0",
"Accept": "application/json",
}
# 基础URL
base_url = "http://httpbin.org/get"
# 认证
auth = ("username", "password")
# 代理
proxies = "http://proxy.example.com:8080"
# 钩子函数示例
async def log_request(request):
print(f"请求: {request.method} {request.url}")
async def log_response(response):
print(f"响应: {response.status_code}")
async def check_status(response):
if response.status_code >= 400:
print(f"请求失败: {response.status_code}")
# 事件钩子
event_hooks = {
"request": [log_request],
"response": [log_response, check_status],
}
async with httpx.AsyncClient(
timeout=timeout,
limits=limits,
headers=headers,
base_url=base_url,
auth=auth,
proxy=proxies,
event_hooks=event_hooks) as client:
...
五、流式响应
「流式响应」(Streaming Response)是一种数据逐步传输的技术模式,服务器在生成完整数据之前就开始向客户端逐步发送部分数据,让用户能够边接收边处理,而不是等待所有数据就绪后才一次性返回。
「核心特点:」
「逐步传输」数据被分成多个“块”(chunks),像水流一样持续传输,直到全部发送完成。
「实时性」用户无需等待全部数据生成即可看到部分结果,适合实时性要求高的场景。
「低延迟感知」即使整体处理耗时较长,用户也能快速看到初步反馈,提升体验。
「高效处理大数据」避免大文件或长内容一次性加载导致内存压力,支持边传输边处理。
5.1 基础示例
通过client.stream获取远程url,并通过response.aiter_bytes读取部分字节资源。
async def stream_response_basic():
async with httpx.AsyncClient()
as client:
async with client.stream('GET', 'https://api.example.com/large-data') as response:
# 检查响应状态
print(f"状态码: {response.status_code}")
print(f"响应头: {response.headers}")
# 按字节流式处理响应
async for chunk in response.aiter_bytes(chunk_size=1024):
process_chunk(chunk) # 自定义处理函数
5.2 流式响应方法详解
| | |
|---|
aiter_bytes() | | |
aiter_text() | | |
aiter_lines() | | |
aiter_raw() | | |
aread() | | |
aclose() | | |
当上下文块使用不切实际时,可以通过使用 client.send(..., stream=True) 发送一个 [Request 实例] 来进入"手动模式"。
import httpx
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
client = httpx.AsyncClient()
async def home(request):
req = client.build_request("GET", "https://www.example.com/")
r = await client.send(req, stream=True
)
return StreamingResponse(r.aiter_text(), background=BackgroundTask(r.aclose))
当使用这种"手动流式模式"时,开发者有责任确保最终调用 Response.aclose()。如果未能这样做会导致连接保持打开状态,很可能最终导致资源泄漏。
六、流式请求
「流式请求」(Streaming Request)是一种允许客户端或服务器「边生成边发送数据」的通信方式,与流式响应相对应,常用于大文件上传、实时数据推送等场景。
当使用 AsyncClient 实例发送流式请求体时,应该使用异步字节生成器而非普通字节生成器:
async def upload_bytes():
... # 生成字节内容
await client.post(url, content=upload_bytes())
其实Httpx的操作函数和流式响应相同,基本基于非GET请求方法。比如下面代码上传大文件,边获取边上传。
file_path = ""
asyncdef file_generator():
"""异步文件生成器"""
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
yield chunk
await asyncio.sleep(0) # 让出控制权,避免阻塞
asyncwith httpx.AsyncClient() as client:
response = await client.post(
"https://api.example.com/upload",
content=file_generator(), # 使用生成器
headers={"Content-Type": "application/octet-stream"}
)
return response.json()
七、显示传输
HTTPX 允许显式创建传输层实例,这提供了对底层 HTTP 连接行为的细粒度控制。当直接实例化传输对象时,需要使用 httpx.AsyncHTTPTransport。
7.1 什么是传输层?
传输层(Transport)是 HTTPX 底层的网络通信组件,负责:
7.2 为什么需要显式传输实例?
显式创建传输实例的优势:
7.3 示例代码
分离了传输层和客户端请求,关闭连接仅关闭传输层即可。
transport = httpx.AsyncHTTPTransport(
retries=3, # 重试次数
limits=httpx.Limits(
max_connections=100, # 最大连接数
max_keepalive_connections=50, # 最大保持活跃的连接数
keepalive_expiry=5.0 # 连接保持时间(秒)
)
)
try:
# 创建多个使用相同传输的客户端
client1 = httpx.AsyncClient(transport=transport)
client2 = httpx.AsyncClient(transport=transport)
# 使用客户端
response1 = await client1.get("https://httpbin.org/uuid")
response2 = await client2.get("https://httpbin.org/ip")
print(f"Client 1 响应: {response1.json()}")
print(f"Client 2 响应: {response2.json()}")
# 分别关闭客户端
await client1.aclose()
await client2.aclose()
finally:
# 手动关闭传输
await transport.aclose()
而且可以通过transport.get_connection_stats()查看传输层统计信息。
# 获取传输统计信息
stats = transport.get_connection_stats()
print("传输统计信息:")
print(f" 活跃连接数: {stats['active_connections']}")
print(f" 空闲连接数: {stats['idle_connections']}")
print(f" 总连接数: {stats['total_connections']}")
print(f" 请求数: {stats['requests']}")
print(f" 活跃连接数: {stats['active_connections']}")
print(f" 空闲连接数: {stats['idle_connections']}")
HTTPX 异步客户端基于 Python 的 async/await 语法,利用事件循环实现高效的非阻塞 I/O 操作。相比传统多线程,它具有更高的并发性能、更低的内存消耗、更简单的并发控制,并原生支持 WebSocket 等长连接场景,非常适合集成异步 Web 框架、构建高并发爬虫、开发实时应用和处理流式数据。