我正在使用Python3.10和FastAPI0.92.0
编写服务器发送事件(SSE)流api。这是Python代码的样子:
from fastapi import APIRouter, FastAPI, Header
from src.chat.completions import chat_stream
from fastapi.responses import StreamingResponse
router = APIRouter()
@router.get("/v1/completions",response_class=StreamingResponse)
def stream_chat(q: str, authorization: str = Header(None)):
auth_mode, auth_token = authorization.split(' ')
if auth_token is None:
return "Authorization token is missing"
answer = chat_stream(q, auth_token)
return StreamingResponse(answer, media_type="text/event-stream")
这是chat_stream
函数:
import openai
def chat_stream(question: str, key: str):
openai.api_key = key
# create a completion
completion = openai.Completion.create(model="text-davinci-003",
prompt=question,
stream=True)
return completion
当我使用此命令调用api时:
curl -N -H "Authorization: Bearer sk-the openai key" https://chat.poemhub.top/v1/completions?q=hello
服务器端显示以下错误:
INFO: 123.146.17.54:0 - "GET /v1/completions?q=hello HTTP/1.0" 200 OK
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/uvicorn/protocols/http/h11_impl.py", line 429, in run_asgi
result = await app( # type: ignore[func-returns-value]
File "/usr/local/lib/python3.10/dist-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/fastapi/applications.py", line 276, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 69, in app
await response(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 270, in __call__
async with anyio.create_task_group() as task_group:
File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 273, in wrap
await func()
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 264, in stream_response
chunk = chunk.encode(self.charset)
File "/usr/local/lib/python3.10/dist-packages/openai/openai_object.py", line 61, in __getattr__
raise AttributeError(*err.args)
AttributeError: encode
为什么会出现这个错误?我该怎么做才能修复它?
如FastAPI的留档中所述,StreamingResponse
采用异步
生成器或普通生成器/迭代器并流式传输响应主体。如本答案所述,在任何一种情况下,FastAPI仍将异步工作-如果传递给StreamingResponse
的生成器不是异步的,FastAPI/Starlette将在单独的线程中运行生成器(请参阅此处的相关实现),使用iterate_in_threadpool()
,然后将被await
ed(参见iterate_in_threadpool()
实现)。有关FastAPI中def
与async def
的更多详细信息,以及在async def
endpoint内运行阻塞操作的解决方案,请查看此详细答案。
StreamingResponse
是Response
的子类,它以bytes
格式流式传输响应主体。因此,如果通过生成器传递的内容不是bytes
格式,FastAPI/Starlette将尝试将其编码/转换为bytes
(使用默认的utf-8
编码方案)。以下是相关实现的代码片段:
async for chunk in self.body_iterator:
if not isinstance(chunk, bytes):
chunk = chunk.encode(self.charset)
await send({"type": "http.response.body", "body": chunk, "more_body": True})
但是,如果迭代器/生成器中的块
不是可以编码的str
格式,则会引发AtbanteError
(例如,AtbanteError:…没有属性'encode'
),类似于此答案中描述的内容(参见选项2,注释3)。此外,如果块包含utf-8
编码范围之外的字符,您可能会遇到UnicodeEncodeError:…编解码器无法对字符
进行编码。因此,给定您提供的错误回溯中的AtbanteError: encode
,您很可能传递的是str
以外的对象。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
import time
app = FastAPI()
@app.get('/')
def main():
def gen():
while True:
yield (json.dumps({'msg': 'Hello World!'}) + '\n\n').encode('utf-8')
time.sleep(0.5)
return StreamingResponse(gen(), media_type='text/event-stream')