Wankupi's BlogHomepageWankupi's Blog关于

FastAPI 折腾记录

最近重启了某个web的开发工作。

web的结构是 flask 做 api 后端,vue 做前端。

随着 api 越来越多,感觉之前的写法已经太不优雅了, 涉及到很多参数和返回值的手动检验。 因此我决定尝试一下 FastAPI。

g, request, db ?

很快我发现了 FastAPI 第一个很舒服的写法: 可以将需要用到了数据直接写在函数参数中, 而不需要手动从 Query、Body(Form/JSON)、Cookie 中解析。

但同时也发现了另一个问题: 为什么像是 request 这种东西都要从参数获得啊...

好吧。老实说如果我从没用过flask,我可能会觉得这很正常。 但是我确实认为让我把一个参数通过很多层函数,一层层传递到下面,属实有些不可接受。 或者说我希望至少有一个类似 vue 中 provide 和 inject 的事物。

具体的场景是,我希望以下几点同时成立

  1. 每次请求使用同一个数据库连接
  2. 能够处理异常,成功则commit, 有异常则rollback
  3. 不把数据库连接通过参数传递
  4. 线程/协程安全。不能互相影响
  5. 不在每个endpoint都写一遍获得数据库对象的代码

其实如果忽略 5 的话,有一个简单的办法是

python
def use_db():
    db = SqlSession()
    try:
        yield db
        db.commit()
    except Exception as e:
        db.rollback()
        raise e
    finally:
        db.close()

@app.get("/xxx")
def get_xxx(some_args, db: SqlSession = Depends(use_db)):
    ...

使用generator的这个特性确实很不错。但是我不太想每次都写一个参数。

如果想把一个变量注入全局变量,那么首先要找一个地方存储。 因为每次请求都是coroutine,所以需要一个协程安全的存储位置。

python中有一个工具是contextvars。 所以一个思路就是在depend中添加一个函数,函数将数据库对象注入ContextVars。

于是马上我就发现了问题,end_point函数似乎和dependency函数不是同一个context... 虽然我可以在 denpendency 函数中设置 contextvar, 但是end_point函数看不到这个值。

阅读文档后我发现

当你使用 def 而不是 async def 来声明一个路径操作函数时,它运行在外部的线程池中并等待其结果,而不是直接调用(因为它会阻塞服务器)。

所以我做了一些测试如下(fastapi:0.115.11)

python
count = 0
class A:
    def __init__(self):
        global count
        count += 1
        self.val = count
        print(self, " init")
    def __del__(self):
        print(self, "del")
    def __repr__(self):
        return f"A({self.val})"
    def __str__(self):
        return f"A({self.val})"

cv = ContextVar[A]("cv", default=None)
# or def context_on_async()
async def context_on_async():
    a = A()
    cv.set(a)
    yield a
    cv.set(None)

app = FastAPI()
@app.get("/", dependencies=[Depends(context_on_async)])
# or def read_root():
async def read_root():
    print("read_root")
    x = cv.get()
    return {"Hello": "World", "cv": str(x)}
实验编号end_pointdependency能否获得对象是否正确销毁
1defdefNoYes
2defasyncYesNo
3asyncdefNoYes
4asyncasyncYesYes

也就是说,当 dependency 为 async 时,dependency和end_point函数在同一个 context 中。 但是如果终端处理函数不是async, 就会有奇怪的问题出现。 程序日志如下

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
A(1)  init
read_root
INFO:     127.0.0.1:37198 - "GET / HTTP/1.1" 200 OK
A(2)  init
read_root
INFO:     127.0.0.1:37204 - "GET / HTTP/1.1" 200 OK
A(3)  init
A(2) del
read_root
INFO:     127.0.0.1:37208 - "GET / HTTP/1.1" 200 OK
A(4)  init
A(3) del
read_root
INFO:     127.0.0.1:37220 - "GET / HTTP/1.1" 200 OK
^CINFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [152187]
A(1) del
A(4) del

老实说有一种误打误撞找到bug的感觉。

事实也的确如此。

最后经过几个小时的深挖,问题出在一个名为 anyio 的工具库。 原因是两个

  1. 循环中的变量会一直存活到下次循环
  2. 第一次请求创建workder时,设置了回调函数,拷贝了第一次的context。

详情见PR

最后也明白了原理。在fastapi的框架中, 如果函数是async的,就会直接运行在原本的context中; 而如果不是,就会拷贝当前的context,传到新线程去运行。

因此如果只要设置依赖函数为async, 此时对context的修改就可以被后面的函数看到。

根据这个原理,我写了一个注入器

python
_T = TypeVar("T")
class GlobalContextVar(Generic[_T]):
    def __init__(self, name: str, dependency: AsyncGenerator):
        self.__dict__.setdefault("cvar", None)
        self.__dict__.setdefault("app_dependency", None)
        self.cvar = ContextVar[_T | None](name)

        @wraps(dependency)
        async def wrapped(*args, **kwargs):
            func = asynccontextmanager(dependency)
            async with func(*args, **kwargs) as gen:
                self.cvar.set(gen)
                yield gen
                self.cvar.set(None)

        self.app_dependency = wrapped

    def __get_inner(self):
        return self.cvar.get()

    def __getattr__(self, item):
        if item in self.__dict__:
            return super().__getattr__(item)
        return getattr(self.__get_inner(), item)

    def __setattr__(self, key, value):
        if key in self.__dict__:
            return super().__setattr__(key, value)
        return setattr(self.__get_inner(), key, value)

    def __delattr__(self, item):
        delattr(self.__get_inner(), item)

    def __repr__(self):
        return repr(self.__get_inner())

    def __str__(self):
        return str(self.__get_inner())

基本思想就是 Proxy 加上 contextvars 注入。

用法如下

python
async def use_db(): ... # 同上

db: Session = GlobalContextVar[Session]("db", use_db)
app = FastAPI(dependencies=[Depends(db.app_dependency)])