fix: 捕获OpenAI流式错误并触发重试

pull/1327/head
magisk317 2025-10-27 20:22:58 +08:00
parent 4d5456ed4b
commit cb40ff6a0e
1 changed files with 63 additions and 41 deletions

View File

@ -7,13 +7,7 @@ from collections.abc import Iterable
from typing import Callable, Any, Coroutine, Optional
from json_repair import repair_json
from openai import (
AsyncOpenAI,
APIConnectionError,
APIStatusError,
NOT_GIVEN,
AsyncStream,
)
from openai import AsyncOpenAI, APIConnectionError, APIError, APIStatusError, NOT_GIVEN, AsyncStream
from openai.types.chat import (
ChatCompletion,
ChatCompletionChunk,
@ -39,6 +33,23 @@ from ..payload_content.tool_option import ToolOption, ToolParam, ToolCall
logger = get_logger("OpenAI客户端")
def _extract_status_code_from_api_error(error: APIError) -> int:
"""
尝试从APIError对象中提取HTTP状态码无法确定时回退为500
"""
status_code = getattr(error, "status_code", None)
if status_code is None:
status_code = getattr(error, "status", None)
if status_code is None:
response = getattr(error, "response", None)
if response is not None:
status_code = getattr(response, "status_code", None) or getattr(response, "status", None)
try:
return int(status_code)
except (TypeError, ValueError):
return 500
def _convert_messages(messages: list[Message]) -> list[ChatCompletionMessageParam]:
"""
转换消息格式 - 将消息转换为OpenAI API所需的格式
@ -281,45 +292,52 @@ async def _default_stream_response_handler(
if buffer and not buffer.closed:
buffer.close()
async for event in resp_stream:
if interrupt_flag and interrupt_flag.is_set():
# 如果中断量被设置则抛出ReqAbortException
_insure_buffer_closed()
raise ReqAbortException("请求被外部信号中断")
# 空 choices / usage-only 帧的防御
if not hasattr(event, "choices") or not event.choices:
if hasattr(event, "usage") and event.usage:
try:
async for event in resp_stream:
if interrupt_flag and interrupt_flag.is_set():
# 如果中断量被设置则抛出ReqAbortException
_insure_buffer_closed()
raise ReqAbortException("请求被外部信号中断")
# 空 choices / usage-only 帧的防御
if not hasattr(event, "choices") or not event.choices:
if hasattr(event, "usage") and event.usage:
_usage_record = (
event.usage.prompt_tokens or 0,
event.usage.completion_tokens or 0,
event.usage.total_tokens or 0,
)
continue # 跳过本帧,避免访问 choices[0]
delta = event.choices[0].delta # 获取当前块的delta内容
if hasattr(event.choices[0], "finish_reason") and event.choices[0].finish_reason:
finish_reason = event.choices[0].finish_reason
if hasattr(delta, "reasoning_content") and delta.reasoning_content: # type: ignore
# 标记:有独立的推理内容块
_has_rc_attr_flag = True
_in_rc_flag = _process_delta(
delta,
_has_rc_attr_flag,
_in_rc_flag,
_rc_delta_buffer,
_fc_delta_buffer,
_tool_calls_buffer,
)
if event.usage:
# 如果有使用情况则将其存储在APIResponse对象中
_usage_record = (
event.usage.prompt_tokens or 0,
event.usage.completion_tokens or 0,
event.usage.total_tokens or 0,
)
continue # 跳过本帧,避免访问 choices[0]
delta = event.choices[0].delta # 获取当前块的delta内容
if hasattr(event.choices[0], "finish_reason") and event.choices[0].finish_reason:
finish_reason = event.choices[0].finish_reason
if hasattr(delta, "reasoning_content") and delta.reasoning_content: # type: ignore
# 标记:有独立的推理内容块
_has_rc_attr_flag = True
_in_rc_flag = _process_delta(
delta,
_has_rc_attr_flag,
_in_rc_flag,
_rc_delta_buffer,
_fc_delta_buffer,
_tool_calls_buffer,
)
if event.usage:
# 如果有使用情况则将其存储在APIResponse对象中
_usage_record = (
event.usage.prompt_tokens or 0,
event.usage.completion_tokens or 0,
event.usage.total_tokens or 0,
)
except APIError as e:
_insure_buffer_closed()
status_code = _extract_status_code_from_api_error(e)
message = getattr(e, "message", None) or str(e)
logger.warning(f"OpenAI流式响应异常: {message}")
raise RespNotOkException(status_code, message) from e
try:
return _build_stream_api_resp(
@ -533,6 +551,10 @@ class OpenaiClient(BaseClient):
except APIStatusError as e:
# 重封装APIError为RespNotOkException
raise RespNotOkException(e.status_code, e.message) from e
except APIError as e:
status_code = _extract_status_code_from_api_error(e)
message = getattr(e, "message", None) or str(e)
raise RespNotOkException(status_code, message) from e
if usage_record:
resp.usage = UsageRecord(