diff --git a/src/llm_models/model_client/openai_client.py b/src/llm_models/model_client/openai_client.py index dd92b9e8..ff36eee2 100644 --- a/src/llm_models/model_client/openai_client.py +++ b/src/llm_models/model_client/openai_client.py @@ -7,13 +7,7 @@ from collections.abc import Iterable from typing import Callable, Any, Coroutine, Optional from json_repair import repair_json -from openai import ( - AsyncOpenAI, - APIConnectionError, - APIStatusError, - NOT_GIVEN, - AsyncStream, -) +from openai import AsyncOpenAI, APIConnectionError, APIError, APIStatusError, NOT_GIVEN, AsyncStream from openai.types.chat import ( ChatCompletion, ChatCompletionChunk, @@ -39,6 +33,23 @@ from ..payload_content.tool_option import ToolOption, ToolParam, ToolCall logger = get_logger("OpenAI客户端") +def _extract_status_code_from_api_error(error: APIError) -> int: + """ + 尝试从APIError对象中提取HTTP状态码,无法确定时回退为500。 + """ + status_code = getattr(error, "status_code", None) + if status_code is None: + status_code = getattr(error, "status", None) + if status_code is None: + response = getattr(error, "response", None) + if response is not None: + status_code = getattr(response, "status_code", None) or getattr(response, "status", None) + try: + return int(status_code) + except (TypeError, ValueError): + return 500 + + def _convert_messages(messages: list[Message]) -> list[ChatCompletionMessageParam]: """ 转换消息格式 - 将消息转换为OpenAI API所需的格式 @@ -281,45 +292,52 @@ async def _default_stream_response_handler( if buffer and not buffer.closed: buffer.close() - async for event in resp_stream: - if interrupt_flag and interrupt_flag.is_set(): - # 如果中断量被设置,则抛出ReqAbortException - _insure_buffer_closed() - raise ReqAbortException("请求被外部信号中断") - # 空 choices / usage-only 帧的防御 - if not hasattr(event, "choices") or not event.choices: - if hasattr(event, "usage") and event.usage: + try: + async for event in resp_stream: + if interrupt_flag and interrupt_flag.is_set(): + # 如果中断量被设置,则抛出ReqAbortException + _insure_buffer_closed() + raise ReqAbortException("请求被外部信号中断") + # 空 choices / usage-only 帧的防御 + if not hasattr(event, "choices") or not event.choices: + if hasattr(event, "usage") and event.usage: + _usage_record = ( + event.usage.prompt_tokens or 0, + event.usage.completion_tokens or 0, + event.usage.total_tokens or 0, + ) + continue # 跳过本帧,避免访问 choices[0] + delta = event.choices[0].delta # 获取当前块的delta内容 + + if hasattr(event.choices[0], "finish_reason") and event.choices[0].finish_reason: + finish_reason = event.choices[0].finish_reason + + if hasattr(delta, "reasoning_content") and delta.reasoning_content: # type: ignore + # 标记:有独立的推理内容块 + _has_rc_attr_flag = True + + _in_rc_flag = _process_delta( + delta, + _has_rc_attr_flag, + _in_rc_flag, + _rc_delta_buffer, + _fc_delta_buffer, + _tool_calls_buffer, + ) + + if event.usage: + # 如果有使用情况,则将其存储在APIResponse对象中 _usage_record = ( event.usage.prompt_tokens or 0, event.usage.completion_tokens or 0, event.usage.total_tokens or 0, ) - continue # 跳过本帧,避免访问 choices[0] - delta = event.choices[0].delta # 获取当前块的delta内容 - - if hasattr(event.choices[0], "finish_reason") and event.choices[0].finish_reason: - finish_reason = event.choices[0].finish_reason - - if hasattr(delta, "reasoning_content") and delta.reasoning_content: # type: ignore - # 标记:有独立的推理内容块 - _has_rc_attr_flag = True - - _in_rc_flag = _process_delta( - delta, - _has_rc_attr_flag, - _in_rc_flag, - _rc_delta_buffer, - _fc_delta_buffer, - _tool_calls_buffer, - ) - - if event.usage: - # 如果有使用情况,则将其存储在APIResponse对象中 - _usage_record = ( - event.usage.prompt_tokens or 0, - event.usage.completion_tokens or 0, - event.usage.total_tokens or 0, - ) + except APIError as e: + _insure_buffer_closed() + status_code = _extract_status_code_from_api_error(e) + message = getattr(e, "message", None) or str(e) + logger.warning(f"OpenAI流式响应异常: {message}") + raise RespNotOkException(status_code, message) from e try: return _build_stream_api_resp( @@ -533,6 +551,10 @@ class OpenaiClient(BaseClient): except APIStatusError as e: # 重封装APIError为RespNotOkException raise RespNotOkException(e.status_code, e.message) from e + except APIError as e: + status_code = _extract_status_code_from_api_error(e) + message = getattr(e, "message", None) or str(e) + raise RespNotOkException(status_code, message) from e if usage_record: resp.usage = UsageRecord(