FastAPI 折腾记录
最近重启了某个web的开发工作。
web的结构是 flask 做 api 后端,vue 做前端。
随着 api 越来越多,感觉之前的写法已经太不优雅了, 涉及到很多参数和返回值的手动检验。 因此我决定尝试一下 FastAPI。
g, request, db ?
很快我发现了 FastAPI 第一个很舒服的写法: 可以将需要用到了数据直接写在函数参数中, 而不需要手动从 Query、Body(Form/JSON)、Cookie 中解析。
但同时也发现了另一个问题: 为什么像是 request 这种东西都要从参数获得啊...
好吧。老实说如果我从没用过flask,我可能会觉得这很正常。 但是我确实认为让我把一个参数通过很多层函数,一层层传递到下面,属实有些不可接受。 或者说我希望至少有一个类似 vue 中 provide 和 inject 的事物。
具体的场景是,我希望以下几点同时成立
- 每次请求使用同一个数据库连接
- 能够处理异常,成功则commit, 有异常则rollback
- 不把数据库连接通过参数传递
- 线程/协程安全。不能互相影响
- 不在每个endpoint都写一遍获得数据库对象的代码
其实如果忽略 5 的话,有一个简单的办法是
def use_db():
db = SqlSession()
try:
yield db
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
db.close()
@app.get("/xxx")
def get_xxx(some_args, db: SqlSession = Depends(use_db)):
...
使用generator的这个特性确实很不错。但是我不太想每次都写一个参数。
如果想把一个变量注入全局变量,那么首先要找一个地方存储。 因为每次请求都是coroutine,所以需要一个协程安全的存储位置。
python中有一个工具是contextvars。 所以一个思路就是在depend中添加一个函数,函数将数据库对象注入ContextVars。
于是马上我就发现了问题,end_point函数似乎和dependency函数不是同一个context... 虽然我可以在 denpendency 函数中设置 contextvar, 但是end_point函数看不到这个值。
阅读文档后我发现
当你使用 def 而不是 async def 来声明一个路径操作函数时,它运行在外部的线程池中并等待其结果,而不是直接调用(因为它会阻塞服务器)。
所以我做了一些测试如下(fastapi:0.115.11)
count = 0
class A:
def __init__(self):
global count
count += 1
self.val = count
print(self, " init")
def __del__(self):
print(self, "del")
def __repr__(self):
return f"A({self.val})"
def __str__(self):
return f"A({self.val})"
cv = ContextVar[A]("cv", default=None)
# or def context_on_async()
async def context_on_async():
a = A()
cv.set(a)
yield a
cv.set(None)
app = FastAPI()
@app.get("/", dependencies=[Depends(context_on_async)])
# or def read_root():
async def read_root():
print("read_root")
x = cv.get()
return {"Hello": "World", "cv": str(x)}
| 实验编号 | end_point | dependency | 能否获得 | 对象是否正确销毁 |
|---|---|---|---|---|
| 1 | def | def | No | Yes |
| 2 | def | async | Yes | No |
| 3 | async | def | No | Yes |
| 4 | async | async | Yes | Yes |
也就是说,当 dependency 为 async 时,dependency和end_point函数在同一个 context 中。 但是如果终端处理函数不是async, 就会有奇怪的问题出现。 程序日志如下
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
A(1) init
read_root
INFO: 127.0.0.1:37198 - "GET / HTTP/1.1" 200 OK
A(2) init
read_root
INFO: 127.0.0.1:37204 - "GET / HTTP/1.1" 200 OK
A(3) init
A(2) del
read_root
INFO: 127.0.0.1:37208 - "GET / HTTP/1.1" 200 OK
A(4) init
A(3) del
read_root
INFO: 127.0.0.1:37220 - "GET / HTTP/1.1" 200 OK
^CINFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [152187]
A(1) del
A(4) del
老实说有一种误打误撞找到bug的感觉。
事实也的确如此。
最后经过几个小时的深挖,问题出在一个名为 anyio 的工具库。 原因是两个
- 循环中的变量会一直存活到下次循环
- 第一次请求创建workder时,设置了回调函数,拷贝了第一次的context。
最后也明白了原理。在fastapi的框架中, 如果函数是async的,就会直接运行在原本的context中; 而如果不是,就会拷贝当前的context,传到新线程去运行。
因此如果只要设置依赖函数为async, 此时对context的修改就可以被后面的函数看到。
根据这个原理,我写了一个注入器
_T = TypeVar("T")
class GlobalContextVar(Generic[_T]):
def __init__(self, name: str, dependency: AsyncGenerator):
self.__dict__.setdefault("cvar", None)
self.__dict__.setdefault("app_dependency", None)
self.cvar = ContextVar[_T | None](name)
@wraps(dependency)
async def wrapped(*args, **kwargs):
func = asynccontextmanager(dependency)
async with func(*args, **kwargs) as gen:
self.cvar.set(gen)
yield gen
self.cvar.set(None)
self.app_dependency = wrapped
def __get_inner(self):
return self.cvar.get()
def __getattr__(self, item):
if item in self.__dict__:
return super().__getattr__(item)
return getattr(self.__get_inner(), item)
def __setattr__(self, key, value):
if key in self.__dict__:
return super().__setattr__(key, value)
return setattr(self.__get_inner(), key, value)
def __delattr__(self, item):
delattr(self.__get_inner(), item)
def __repr__(self):
return repr(self.__get_inner())
def __str__(self):
return str(self.__get_inner())
基本思想就是 Proxy 加上 contextvars 注入。
用法如下
async def use_db(): ... # 同上
db: Session = GlobalContextVar[Session]("db", use_db)
app = FastAPI(dependencies=[Depends(db.app_dependency)])