由网友(潇洒)分享简介:作为FastAPI的新手,我正在努力测试比我在教程中看到的稍微更难的代码。我使用fastapi_cache模块和Redis如下:from fastapi import Depends, FastAPI, Query, Requestfrom fastapi_cache.backends.redis import C...
作为FastAPI的新手,我正在努力测试比我在教程中看到的稍微更难的代码。我使用fastapi_cache
模块和Redis如下:
from fastapi import Depends, FastAPI, Query, Request
from fastapi_cache.backends.redis import CACHE_KEY, RedisCacheBackend
from fastapi_cache import caches, close_caches
app = FastAPI()
def redis_cache():
return caches.get(CACHE_KEY)
@app.get('/cache')
async def test(
cache: RedisCacheBackend = Depends(redis_cache),
n: int = Query(
...,
gt=-1
)
):
# code that uses redis cache
@app.on_event('startup')
async def on_startup() -> None:
rc = RedisCacheBackend('redis://redis')
caches.set(CACHE_KEY, rc)
@app.on_event('shutdown')
async def on_shutdown() -> None:
await close_caches()
test_main.py如下所示:
import pytest
from httpx import AsyncClient
from .main import app
@pytest.mark.asyncio
async def test_cache():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.get("/cache?n=150")
当我运行pytest
时,它将cache
变量设置为None
,测试失败。我想我明白为什么代码不起作用了。但我如何修复它才能正确测试我的缓存?
推荐答案
问题是httpx
没有实现生命周期协议并触发startup
事件处理程序。为此,您需要使用LifespanManager
。
安装:pip install asgi_lifespan
代码如下:
import pytest
from asgi_lifespan import LifespanManager
from httpx import AsyncClient
from .main import app
@pytest.mark.asyncio
async def test_cache():
async with LifespanManager(app):
async with AsyncClient(app=app, base_url="http://localhost") as ac:
response = await ac.get("/cache")
更多信息请点击此处:https://github.com/encode/httpx/issues/350
相关推荐
最新文章