一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 3.6+ 并基于标准的 Python 类型提示
入门 安装 FastAPI 1 $ pip install "fastapi[all]"
可以分开来安装 假如你想将应用程序部署到生产环境,你可能要执行以下操作:
并且安装 uvicorn
来作为服务器:
1 $ pip install "uvicorn[standard]"
运行代码 1 $ uvicorn main:app --reload
Python: 3.9.5
FastAPI: 0.103.1
最小程序 下面代码会直接启动http服务,也可以使用 uvicorn main:app --reload
1 2 3 4 from fastapi import FastAPIimport uvicornapp = FastAPI()
添加一个 API 的示例
1 2 3 4 5 6 7 @app.get("/" ) async def root (): return {"message" : "Hello World" } if __name__ == '__main__' : uvicorn.run(app='main:app' , reload=True )
路径参数 最基本的路径参数 1 2 3 4 @app.get("/items/{item_id}" ) async def read_item (item_id ): return {"item_id" : item_id}
多个路径参数 1 2 3 4 @app.get("/items/{item_id}/{user_id}" ) async def read_item (item_id, user_id ): return {"item_id" : item_id, "user_id" : user_id}
有类型的路径参数 1 2 3 4 @app.get("/items/{item_id}" ) async def read_item (item_id: int ): return {"item_id" : item_id}
文件路径参数 1 2 3 4 @app.get("/file/{file_path:path}" ) async def read_item (file_path ): return {"file_path" : file_path}
查询参数 带默认值的查询参数 1 2 3 4 5 6 fake_items_db = [{"item_name" : "Foo" }, {"item_name" : "Bar" }] @app.get("/items/" ) async def read_item (skip: int = 0 , limit: int = 10 ): return fake_items_db[skip: skip + limit]
可选查询参数 1 2 3 4 5 6 7 8 from typing import Union @app.get("/items/{item_id}" ) async def read_item (item_id: str , q: Union [str , None ] = None ): if q: return {"item_id" : item_id, "q" : q} return {"item_id" : item_id}
多路径多查询参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @app.get("/users/{user_id}/items/{item_id}" ) async def read_user_item ( user_id: int , item_id: str , q: Union [str , None ] = None , short: bool = False ): item = {"item_id" : item_id, "owner_id" : user_id} if q: item.update({"q" : q}) if not short: item.update( {"description" : "这是一个令人惊叹的项目,有很长的描述" } ) return item
必需查询参数 1 2 3 4 5 @app.get("/items/{item_id}" ) async def read_user_item (item_id: str , needy: str ): item = {"item_id" : item_id, "needy" : needy} return item
请求体 1 2 3 4 5 6 7 8 9 10 11 12 13 from pydantic import BaseModelfrom typing import Union class Item (BaseModel ): name: str = '小明' description: Union [str , None ] = None price: float tax: Union [float , None ] = None @app.post("/items/" ) async def create_item (item: Item ): print (item.name) return item
调用 1 2 3 4 5 6 7 8 9 10 curl -X 'POST' \ 'http://127.0.0.1:8000/items/' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "name": "小明", "description": "string", "price": 0, "tax": 0 }'
查询参数和字符串校验 1 2 3 4 5 6 7 8 9 10 from fastapi import Query@app.get("/items/" ) async def read_items ( q: Union [str , None ] = Query(default=None , max_length=50 ) ): results = {"items" : [{"item_id" : "Foo" }, {"item_id" : "Bar" }]} if q: results.update({"q" : q}) return results
参数列表
参数
含义
类型
default
默认值
任意类型或…
max_length
最大长度
int
min_length
最小长度
int
pattern
正则匹配
string
alias
别名参数
string
deprecated
准备弃用参数
bool
多个相同的查询参数 1 2 3 4 5 6 7 @app.get("/items/" ) async def read_items ( q: Union [List [str ], None ] = Query(default=None ) ): query_items = {"q" : q} return query_items
路径参数和数值校验 Path 用法基本和 Query 相同,参考:FastAPI官方文档
导入 Path 1 2 from fastapi import FastAPI, Path, Queryfrom typing_extensions import Annotated
声明元数据 1 2 3 4 5 6 7 8 9 @app.get("/items/{item_id}" ) async def read_items ( item_id: Annotated[int , Path(title="要获取的项目的 ID" )], q: Annotated[str | None , Query(alias="item-query" )] = None , ): results = {"item_id" : item_id} if q: results.update({"q" : q}) return results
参数列表
参数
含义
类型
...
和 Query 具有相同参数
…
ge
大于等于
int float
gt
大于
int float
le
小于等于
int float
le
小于等于
int float
title
api文档的标题
string
其他参数 都具有 Query
的参数,max_length
、min_length
等
Cookie参数 1 2 3 4 5 6 7 from fastapi import Cookie@app.get("/items/" ) async def read_items ( ads_id: Annotated[Union [str , None ], Cookie( )] = None ): return {"ads_id" : ads_id}
1 2 3 4 5 6 7 8 from fastapi import Header@app.get("/items/" ) async def read_items ( user_agent: Annotated[Union [str , None ], Header( )] = None , items_id: Annotated[Union [int , None ], Header(ge=1 )] = None ): return {"User-Agent" : user_agent, "items_id" : items_id}
表单数据 接收的不是 JSON,而是表单字段时,要使用 Form。
安装 1 $ pip install python-multipart
HTML 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > </head > <body > <form method ="post" action ="http://127.0.0.1:8000/login" > <span > 账号:</span > <input type ="text" name ="username" > <br > <span > 密码:</span > <input type ="password" name ="password" > <br > <input type ="submit" value ="登录" > </form > </body > </html >
FastAPI 1 2 3 4 5 6 7 8 from fastapi import FastAPI, Formimport uvicornapp = FastAPI() @app.post("/login/" ) async def login (username: str = Form( ), password: str = Form( ) ): return {"username" : username} if __name__ == '__main__' : uvicorn.run(app='main:app' , reload=True )
文件上传 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from fastapi import FastAPI, UploadFilefrom fastapi.responses import HTMLResponse@app.post("/uploadfile/" ) async def create_upload_file (file: UploadFile ): print (file.file.read().decode()) return {"filenames" : file.filename, "type" : str (type (file.file))} @app.get("/" ) async def main (): content = """<body> <form action="/uploadfile/" enctype="multipart/form-data" method="post"> <input name="file" type="file" multiple> <input type="submit"> </form> </body>""" return HTMLResponse(content=content)
UploadFile 属性
属性名
含义
返回
filename
文件名
上传的文件名
content_type
内容类型
MIME
类型
file
文件
SpooledTemporaryFile 具有 read
,write
方法
UploadFile async 方法
方法名
含义
write(data)
把 data
写入文件
read(size)
按指定数量的字节读取文件内容
seek(offset)
移动至文件 offset
(int
)字节处的位置
close()
关闭文件
依赖项 依赖项使用场景
共享业务逻辑(复用相同的代码逻辑)
共享数据库连接
实现安全、验证、角色权限
等……
创建依赖项 1 2 3 4 5 from typing import Union from fastapi import Depends, FastAPIapp = FastAPI()
read_items
和 read_users
方法依赖 common_parameters
白话就是 read_items
和 read_users
都需要 q
,skip
,limit
查询参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def common_parameters ( q: Union [str , None ] = None , skip: int = 0 , limit: int = 100 ): return {"q" : q, "skip" : skip, "limit" : limit} @app.get("/items/" ) async def read_items ( commons: dict = Depends(common_parameters ) ): return commons @app.get("/users/" ) async def read_users ( commons: dict = Depends(common_parameters ) ): return commons
类作为依赖项 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from typing import Union from fastapi import Depends, FastAPIapp = FastAPI() fake_items_db = [{"item_name" : "Foo" }, {"item_name" : "Bar" }] class CommonQueryParams : def __init__ ( self, q: Union [str , None ] = None , skip: int = 0 , limit: int = 100 ): self.q = q self.skip = skip self.limit = limit
read_itemsx
接收一个 commons
参数,类型是 CommonQueryParams
CommonQueryParams
接收三个参数,这三个参数是调用 api 的时候传
1 2 3 4 5 6 7 8 9 10 @app.get("/items/" ) async def read_items ( commons: CommonQueryParams = Depends(CommonQueryParams ) ): response = {} if commons.q: response.update({"q" : commons.q}) items = fake_items_db[commons.skip : commons.skip + commons.limit] response.update({"items" : items}) return response
还可以简写 1 2 3 4 5 6 7 8 9 10 11 @app.get("/items/" ) async def read_items ( commons: CommonQueryParams = Depends( ) ): response = {} if commons.q: response.update({"q" : commons.q}) items = fake_items_db[commons.skip : commons.skip + commons.limit] response.update({"items" : items}) return response
子依赖项 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from typing import Union from fastapi import Cookie, Depends, FastAPIapp = FastAPI() def query_extractor (q: Union [str , None ] = None ): return q def query_or_cookie_extractor ( q: str = Depends(query_extractor ), last_query: Union [str , None ] = Cookie(default=None ), ): if not q: return last_query return q @app.get("/items/" ) async def read_query ( query_or_default: str = Depends(query_or_cookie_extractor ) ): return {"q_or_cookie" : query_or_default}
不使用缓存 使用 use_cache = False
参数不使用缓存数据,不使用 use_cache = False
,value
和 value1
是一样的
1 2 3 4 5 6 7 8 9 10 11 12 13 def result_value (): value = randint(1 , 99 ) return value def get_value ( value: int = Depends(result_value, use_cache=False ), value1: int = Depends(result_value, use_cache=False ) ): return value, value1 @app.get('/value/' ) async def needy_dependency (value: tuple = Depends(get_value ) ): return {"value" : value}
全局依赖项 1 2 3 4 5 6 7 8 9 10 from fastapi import Depends, FastAPI, Header, HTTPExceptionasync def verify_token (x_token: str = Header( ) ): if x_token != "fake-super-secret-token" : raise HTTPException(status_code=400 , detail="X-Token 标头无效" ) async def verify_key (x_key: str = Header( ) ): if x_key != "fake-super-secret-key" : raise HTTPException(status_code=400 , detail="X-Key 标头无效" ) return x_key
全局依赖项很有用,后面的安全性就可以使用全局依赖项
1 2 3 4 5 6 7 8 9 10 11 app = FastAPI( dependencies=[Depends(verify_token), Depends(verify_key)] ) @app.get("/items/" ) async def read_items (): return [{"item" : "Portal Gun" }, {"item" : "Plumbus" }] @app.get("/users/" ) async def read_users (): return [{"username" : "Rick" }, {"username" : "Morty" }]
安全性 基于 Token 的认证 1 2 3 4 5 from fastapi import FastAPI, Depends, HTTPExceptionfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom pydantic import BaseModelapp = FastAPI()
使用 OAuth2PasswordBearer 创建一个 token 依赖
1 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token" )
假设这是你的用户数据库
1 2 3 4 5 6 7 8 9 fake_users_db = { "johndoe" : { "username" : "johndoe" , "full_name" : "John Doe" , "email" : "johndoe@example.com" , "hashed_password" : "fakehashedsecret" , "disabled" : False , } }
创建一个用户模型
1 2 3 4 5 class User (BaseModel ): username: str email: str full_name: str disabled: bool
创建一个简单的认证函数
1 2 3 4 5 6 7 8 9 10 11 12 def fake_hash_password (password: str ): return "fakehashed" + password def get_user (db, username: str ): if username in db: user_dict = db[username] return User(**user_dict) def fake_decode_token (token: str ): return get_user(fake_users_db, token)
创建一个依赖,用于从请求中获取 token 并验证用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 async def get_current_user (token: str = Depends(oauth2_scheme ) ): user = fake_decode_token(token) if not user: raise HTTPException( status_code=401 , detail="Invalid authentication credentials" , headers={"WWW-Authenticate" : "Bearer" }, ) return user @app.post("/token" ) async def login (form_data: OAuth2PasswordRequestForm = Depends( ) ): user = get_user(fake_users_db, form_data.username) if not user or user.hashed_password != fake_hash_password(form_data.password): raise HTTPException(status_code=400 , detail="Incorrect username or password" ) return {"access_token" : user.username, "token_type" : "bearer" } @app.get("/users/me" ) async def read_users_me (current_user: User = Depends(get_current_user ) ): return current_user
使用 OAuth2PasswordBearer 来创建一个简单的 token 认证流程。
HTTPS 和证书 1 2 3 from fastapi import FastAPIapp = FastAPI()
在生产环境中,你应该使用一个真正的证书和私钥,你可以从像 Let’s Encrypt 这样的证书颁发机构获得免费的证书,或者使用 OpenSSL 生成自签名证书
1 2 3 @app.get("/https" ) async def read_https (): return {"message" : "Hello, HTTPS!" }
启动服务器时,使用以下命令来指定证书和私钥:
1 uvicorn main:app --host 0.0.0.0 --port 443 --ssl-keyfile /path/to/your/key.pem --ssl-certfile /path/to/your/cert.pem
FastAPI 默认支持 HTTPS,你只需要提供证书和私钥即可。
待更新
参考