From 01b06ed302b18ad540be00a8b741815ad5a1af2d Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 25 Aug 2025 19:22:00 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=B8=85=E7=90=86=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=EF=BC=8C=E7=8E=B0=E7=94=A8=20RespNotOkExcept?= =?UTF-8?q?ion=20=E5=8A=A0=E4=B8=8A=E7=8A=B6=E6=80=81=E7=A0=81=EF=BC=8C?= =?UTF-8?q?=E4=B8=94=E5=B0=86=20429=20=E5=92=8C=205xx=20=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E7=9A=84=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=E4=BB=8E=E2=80=9C?= =?UTF-8?q?=E7=A1=AC=E5=A4=B1=E8=B4=A5=E2=80=9D=E7=A7=BB=E5=9B=9E=E2=80=9C?= =?UTF-8?q?=E5=8F=AF=E9=87=8D=E8=AF=95=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/llm_models/exceptions.py | 45 +--- src/llm_models/utils_model.py | 417 +++++++++++++--------------------- 2 files changed, 167 insertions(+), 295 deletions(-) diff --git a/src/llm_models/exceptions.py b/src/llm_models/exceptions.py index ff847ad8..bf1c88de 100644 --- a/src/llm_models/exceptions.py +++ b/src/llm_models/exceptions.py @@ -65,39 +65,6 @@ class RespParseException(Exception): return self.message or "解析响应内容时发生未知错误,请检查是否配置了正确的解析方法" -class PayLoadTooLargeError(Exception): - """自定义异常类,用于处理请求体过大错误""" - - def __init__(self, message: str): - super().__init__(message) - self.message = message - - def __str__(self): - return "请求体过大,请尝试压缩图片或减少输入内容。" - - -class RequestAbortException(Exception): - """自定义异常类,用于处理请求中断异常""" - - def __init__(self, message: str): - super().__init__(message) - self.message = message - - def __str__(self): - return self.message - - -class PermissionDeniedException(Exception): - """自定义异常类,用于处理访问拒绝的异常""" - - def __init__(self, message: str): - super().__init__(message) - self.message = message - - def __str__(self): - return self.message - - class EmptyResponseException(Exception): """响应内容为空""" @@ -107,3 +74,15 @@ class EmptyResponseException(Exception): def __str__(self): return self.message + + +class ModelAttemptFailed(Exception): + """当在单个模型上的所有重试都失败后,由“执行者”函数抛出,以通知“调度器”切换模型。""" + + def __init__(self, message: str, original_exception: Exception | None = None): + super().__init__(message) + self.message = message + self.original_exception = original_exception + + def __str__(self): + return self.message \ No newline at end of file diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index 96dbd290..a5e1a615 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -17,10 +17,9 @@ from .model_client.base_client import BaseClient, APIResponse, client_registry from .utils import compress_messages, llm_usage_recorder from .exceptions import ( NetworkConnectionError, - ReqAbortException, RespNotOkException, - RespParseException, EmptyResponseException, + ModelAttemptFailed, ) install(extra_lines=3) @@ -77,32 +76,27 @@ class LLMRequest: Returns: (Tuple[str, str, str, Optional[List[ToolCall]]]): 响应内容、推理内容、模型名称、工具调用列表 """ - # 模型选择 start_time = time.time() - model_info, api_provider, client = self._select_model() - # 请求体构建 - message_builder = MessageBuilder() - message_builder.add_text_content(prompt) - message_builder.add_image_content( - image_base64=image_base64, image_format=image_format, support_formats=client.get_support_image_formats() - ) - messages = [message_builder.build()] + def message_factory(client: BaseClient) -> List[Message]: + message_builder = MessageBuilder() + message_builder.add_text_content(prompt) + message_builder.add_image_content( + image_base64=image_base64, + image_format=image_format, + support_formats=client.get_support_image_formats() + ) + return [message_builder.build()] - # 请求并处理返回值 - response = await self._execute_request( - api_provider=api_provider, - client=client, + response, model_info = await self._execute_request( request_type=RequestType.RESPONSE, - model_info=model_info, - message_list=messages, + message_factory=message_factory, temperature=temperature, max_tokens=max_tokens, ) content = response.content or "" reasoning_content = response.reasoning_content or "" tool_calls = response.tool_calls - # 从内容中提取标签的推理内容(向后兼容) if not reasoning_content and content: content, extracted_reasoning = self._extract_reasoning(content) reasoning_content = extracted_reasoning @@ -125,15 +119,8 @@ class LLMRequest: Returns: (Optional[str]): 生成的文本描述或None """ - # 模型选择 - model_info, api_provider, client = self._select_model() - - # 请求并处理返回值 - response = await self._execute_request( - api_provider=api_provider, - client=client, + response, _ = await self._execute_request( request_type=RequestType.AUDIO, - model_info=model_info, audio_base64=voice_base64, ) return response.content or None @@ -152,43 +139,35 @@ class LLMRequest: prompt (str): 提示词 temperature (float, optional): 温度参数 max_tokens (int, optional): 最大token数 + tools (Optional[List[Dict[str, Any]]]): 工具列表 + raise_when_empty (bool): 当响应为空时是否抛出异常 Returns: (Tuple[str, str, str, Optional[List[ToolCall]]]): 响应内容、推理内容、模型名称、工具调用列表 """ - # 请求体构建 start_time = time.time() - - message_builder = MessageBuilder() - message_builder.add_text_content(prompt) - messages = [message_builder.build()] + + def message_factory(client: BaseClient) -> List[Message]: + message_builder = MessageBuilder() + message_builder.add_text_content(prompt) + return [message_builder.build()] tool_built = self._build_tool_options(tools) - # 模型选择 - model_info, api_provider, client = self._select_model() - - # 请求并处理返回值 - logger.debug(f"LLM选择耗时: {model_info.name} {time.time() - start_time}") - - response = await self._execute_request( - api_provider=api_provider, - client=client, + response, model_info = await self._execute_request( request_type=RequestType.RESPONSE, - model_info=model_info, - message_list=messages, + message_factory=message_factory, temperature=temperature, max_tokens=max_tokens, tool_options=tool_built, ) + logger.debug(f"LLM请求总耗时: {time.time() - start_time}") content = response.content reasoning_content = response.reasoning_content or "" tool_calls = response.tool_calls - # 从内容中提取标签的推理内容(向后兼容) if not reasoning_content and content: content, extracted_reasoning = self._extract_reasoning(content) reasoning_content = extracted_reasoning - if usage := response.usage: llm_usage_recorder.record_usage_to_database( model_info=model_info, @@ -198,31 +177,22 @@ class LLMRequest: endpoint="/chat/completions", time_cost=time.time() - start_time, ) - - return content or "", (reasoning_content, model_info.name, tool_calls) + return content, (reasoning_content, model_info.name, tool_calls) async def get_embedding(self, embedding_input: str) -> Tuple[List[float], str]: - """获取嵌入向量 + """ + 获取嵌入向量 Args: embedding_input (str): 获取嵌入的目标 Returns: (Tuple[List[float], str]): (嵌入向量,使用的模型名称) """ - # 无需构建消息体,直接使用输入文本 start_time = time.time() - model_info, api_provider, client = self._select_model() - - # 请求并处理返回值 - response = await self._execute_request( - api_provider=api_provider, - client=client, + response, model_info = await self._execute_request( request_type=RequestType.EMBEDDING, - model_info=model_info, embedding_input=embedding_input, ) - embedding = response.embedding - if usage := response.usage: llm_usage_recorder.record_usage_to_database( model_info=model_info, @@ -232,59 +202,61 @@ class LLMRequest: endpoint="/embeddings", time_cost=time.time() - start_time, ) - if not embedding: raise RuntimeError("获取embedding失败") - return embedding, model_info.name - def _select_model(self) -> Tuple[ModelInfo, APIProvider, BaseClient]: + def _select_model(self, exclude_models: set = None) -> Tuple[ModelInfo, APIProvider, BaseClient]: """ 根据总tokens和惩罚值选择的模型 """ + available_models = { + model: scores + for model, scores in self.model_usage.items() + if not exclude_models or model not in exclude_models + } + if not available_models: + raise RuntimeError("没有可用的模型可供选择。所有模型均已尝试失败。") + least_used_model_name = min( - self.model_usage, - key=lambda k: self.model_usage[k][0] + self.model_usage[k][1] * 300 + self.model_usage[k][2] * 1000, + available_models, + key=lambda k: available_models[k][0] + available_models[k][1] * 300 + available_models[k][2] * 1000, ) model_info = model_config.get_model_info(least_used_model_name) api_provider = model_config.get_provider(model_info.api_provider) - - # 对于嵌入任务,强制创建新的客户端实例以避免事件循环问题 - force_new_client = self.request_type == "embedding" + force_new_client = (self.request_type == "embedding") client = client_registry.get_client_class_instance(api_provider, force_new=force_new_client) - logger.debug(f"选择请求模型: {model_info.name}") total_tokens, penalty, usage_penalty = self.model_usage[model_info.name] - self.model_usage[model_info.name] = (total_tokens, penalty, usage_penalty + 1) # 增加使用惩罚值防止连续使用 + self.model_usage[model_info.name] = (total_tokens, penalty, usage_penalty + 1) return model_info, api_provider, client - async def _execute_request( + async def _attempt_request_on_model( self, + model_info: ModelInfo, api_provider: APIProvider, client: BaseClient, request_type: RequestType, - model_info: ModelInfo, - message_list: List[Message] | None = None, - tool_options: list[ToolOption] | None = None, - response_format: RespFormat | None = None, - stream_response_handler: Optional[Callable] = None, - async_response_parser: Optional[Callable] = None, - temperature: Optional[float] = None, - max_tokens: Optional[int] = None, - embedding_input: str = "", - audio_base64: str = "", + message_list: List[Message], + tool_options: list[ToolOption] | None, + response_format: RespFormat | None, + stream_response_handler: Optional[Callable], + async_response_parser: Optional[Callable], + temperature: Optional[float], + max_tokens: Optional[int], + embedding_input: str | None, + audio_base64: str | None, + compressed_messages: Optional[List[Message]] = None, ) -> APIResponse: """ - 实际执行请求的方法 - - 包含了重试和异常处理逻辑 + 在单个模型上执行请求,包含针对临时错误的重试逻辑。 + 如果成功,返回APIResponse。如果失败(重试耗尽或硬错误),则抛出ModelAttemptFailed异常。 """ retry_remain = api_provider.max_retry - compressed_messages: Optional[List[Message]] = None + while retry_remain > 0: try: if request_type == RequestType.RESPONSE: - assert message_list is not None, "message_list cannot be None for response requests" return await client.get_response( model_info=model_info, message_list=(compressed_messages or message_list), @@ -297,202 +269,123 @@ class LLMRequest: extra_params=model_info.extra_params, ) elif request_type == RequestType.EMBEDDING: - assert embedding_input, "embedding_input cannot be empty for embedding requests" + assert embedding_input is not None return await client.get_embedding( model_info=model_info, embedding_input=embedding_input, extra_params=model_info.extra_params, ) elif request_type == RequestType.AUDIO: - assert audio_base64 is not None, "audio_base64 cannot be None for audio requests" + assert audio_base64 is not None return await client.get_audio_transcriptions( model_info=model_info, audio_base64=audio_base64, extra_params=model_info.extra_params, ) + except (EmptyResponseException, NetworkConnectionError) as e: + retry_remain -= 1 + if retry_remain <= 0: + logger.error(f"模型 '{model_info.name}' 在用尽对临时错误的重试次数后仍然失败。") + raise ModelAttemptFailed(f"模型 '{model_info.name}' 重试耗尽", original_exception=e) from e + + logger.warning(f"模型 '{model_info.name}' 遇到可重试错误: {str(e)}。剩余重试次数: {retry_remain}") + await asyncio.sleep(api_provider.retry_interval) + + except RespNotOkException as e: + # 可重试的HTTP错误 + if e.status_code == 429 or e.status_code >= 500: + retry_remain -= 1 + if retry_remain <= 0: + logger.error(f"模型 '{model_info.name}' 在遇到 {e.status_code} 错误并用尽重试次数后仍然失败。") + raise ModelAttemptFailed(f"模型 '{model_info.name}' 重试耗尽", original_exception=e) from e + + logger.warning(f"模型 '{model_info.name}' 遇到可重试的HTTP错误: {str(e)}。剩余重试次数: {retry_remain}") + await asyncio.sleep(api_provider.retry_interval) + continue + + # 特殊处理413,尝试压缩 + if e.status_code == 413 and message_list and not compressed_messages: + logger.warning(f"模型 '{model_info.name}' 返回413请求体过大,尝试压缩后重试...") + # 压缩消息本身不消耗重试次数 + compressed_messages = compress_messages(message_list) + continue + + # 不可重试的HTTP错误 + logger.warning(f"模型 '{model_info.name}' 遇到不可重试的HTTP错误: {str(e)}") + raise ModelAttemptFailed(f"模型 '{model_info.name}' 遇到硬错误", original_exception=e) from e + except Exception as e: - logger.debug(f"请求失败: {str(e)}") - # 处理异常 + logger.error(traceback.format_exc()) + + logger.warning(f"模型 '{model_info.name}' 遇到未知的不可重试错误: {str(e)}") + raise ModelAttemptFailed(f"模型 '{model_info.name}' 遇到硬错误", original_exception=e) from e + + raise ModelAttemptFailed(f"模型 '{model_info.name}' 未被尝试,因为重试次数已配置为0或更少。") + + async def _execute_request( + self, + request_type: RequestType, + message_factory: Optional[Callable[[BaseClient], List[Message]]] = None, + tool_options: list[ToolOption] | None = None, + response_format: RespFormat | None = None, + stream_response_handler: Optional[Callable] = None, + async_response_parser: Optional[Callable] = None, + temperature: Optional[float] = None, + max_tokens: Optional[int] = None, + embedding_input: str | None = None, + audio_base64: str | None = None, + ) -> Tuple[APIResponse, ModelInfo]: + """ + 调度器函数,负责模型选择、故障切换。 + """ + failed_models_this_request = set() + max_attempts = len(self.model_for_task.model_list) + last_exception: Optional[Exception] = None + compressed_messages: Optional[List[Message]] = None + + for _attempt in range(max_attempts): + model_info, api_provider, client = self._select_model(exclude_models=failed_models_this_request) + + message_list = [] + if message_factory: + message_list = message_factory(client) + + try: + response = await self._attempt_request_on_model( + model_info, api_provider, client, request_type, + message_list=message_list, + tool_options=tool_options, + response_format=response_format, + stream_response_handler=stream_response_handler, + async_response_parser=async_response_parser, + temperature=temperature, + max_tokens=max_tokens, + embedding_input=embedding_input, + audio_base64=audio_base64, + compressed_messages=compressed_messages, + ) + return response, model_info + + except ModelAttemptFailed as e: + last_exception = e.original_exception or e + logger.warning(f"模型 '{model_info.name}' 尝试失败,切换到下一个模型。原因: {e}") total_tokens, penalty, usage_penalty = self.model_usage[model_info.name] self.model_usage[model_info.name] = (total_tokens, penalty + 1, usage_penalty) + failed_models_this_request.add(model_info.name) - wait_interval, compressed_messages = self._default_exception_handler( - e, - self.task_name, - model_name=model_info.name, - remain_try=retry_remain, - retry_interval=api_provider.retry_interval, - messages=(message_list, compressed_messages is not None) if message_list else None, - ) + if isinstance(last_exception, RespNotOkException) and last_exception.status_code == 400: + logger.error("收到不可恢复的客户端错误 (400),中止所有尝试。") + raise last_exception from e - if wait_interval == -1: - retry_remain = 0 # 不再重试 - elif wait_interval > 0: - logger.info(f"等待 {wait_interval} 秒后重试...") - await asyncio.sleep(wait_interval) finally: - # 放在finally防止死循环 - retry_remain -= 1 - total_tokens, penalty, usage_penalty = self.model_usage[model_info.name] - self.model_usage[model_info.name] = (total_tokens, penalty, usage_penalty - 1) # 使用结束,减少使用惩罚值 - logger.error(f"模型 '{model_info.name}' 请求失败,达到最大重试次数 {api_provider.max_retry} 次") - raise RuntimeError("请求失败,已达到最大重试次数") + total_tokens, penalty, usage_penalty = self.model_usage[model_info.name] + if usage_penalty > 0: + self.model_usage[model_info.name] = (total_tokens, penalty, usage_penalty - 1) - def _default_exception_handler( - self, - e: Exception, - task_name: str, - model_name: str, - remain_try: int, - retry_interval: int = 10, - messages: Tuple[List[Message], bool] | None = None, - ) -> Tuple[int, List[Message] | None]: - """ - 默认异常处理函数 - Args: - e (Exception): 异常对象 - task_name (str): 任务名称 - model_name (str): 模型名称 - remain_try (int): 剩余尝试次数 - retry_interval (int): 重试间隔 - messages (tuple[list[Message], bool] | None): (消息列表, 是否已压缩过) - Returns: - (等待间隔(如果为0则不等待,为-1则不再请求该模型), 新的消息列表(适用于压缩消息)) - """ - - if isinstance(e, NetworkConnectionError): # 网络连接错误 - return self._check_retry( - remain_try, - retry_interval, - can_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 连接异常,将于{retry_interval}秒后重试", - cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 连接异常,超过最大重试次数,请检查网络连接状态或URL是否正确", - ) - elif isinstance(e, EmptyResponseException): # 空响应错误 - return self._check_retry( - remain_try, - retry_interval, - can_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 收到空响应,将于{retry_interval}秒后重试。原因: {e}", - cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 收到空响应,超过最大重试次数,放弃请求", - ) - elif isinstance(e, ReqAbortException): - logger.warning(f"任务-'{task_name}' 模型-'{model_name}': 请求被中断,详细信息-{str(e.message)}") - return -1, None # 不再重试请求该模型 - elif isinstance(e, RespNotOkException): - return self._handle_resp_not_ok( - e, - task_name, - model_name, - remain_try, - retry_interval, - messages, - ) - elif isinstance(e, RespParseException): - # 响应解析错误 - logger.error(f"任务-'{task_name}' 模型-'{model_name}': 响应解析错误,错误信息-{e.message}") - logger.debug(f"附加内容: {str(e.ext_info)}") - return -1, None # 不再重试请求该模型 - else: - print(traceback.format_exc()) - logger.error(f"任务-'{task_name}' 模型-'{model_name}': 未知异常,错误信息-{str(e)}") - return -1, None # 不再重试请求该模型 - - def _check_retry( - self, - remain_try: int, - retry_interval: int, - can_retry_msg: str, - cannot_retry_msg: str, - can_retry_callable: Callable | None = None, - **kwargs, - ) -> Tuple[int, List[Message] | None]: - """辅助函数:检查是否可以重试 - Args: - remain_try (int): 剩余尝试次数 - retry_interval (int): 重试间隔 - can_retry_msg (str): 可以重试时的提示信息 - cannot_retry_msg (str): 不可以重试时的提示信息 - can_retry_callable (Callable | None): 可以重试时调用的函数(如果有) - **kwargs: 其他参数 - - Returns: - (Tuple[int, List[Message] | None]): (等待间隔(如果为0则不等待,为-1则不再请求该模型), 新的消息列表(适用于压缩消息)) - """ - if remain_try > 0: - # 还有重试机会 - logger.warning(f"{can_retry_msg}") - if can_retry_callable is not None: - return retry_interval, can_retry_callable(**kwargs) - else: - return retry_interval, None - else: - # 达到最大重试次数 - logger.warning(f"{cannot_retry_msg}") - return -1, None # 不再重试请求该模型 - - def _handle_resp_not_ok( - self, - e: RespNotOkException, - task_name: str, - model_name: str, - remain_try: int, - retry_interval: int = 10, - messages: tuple[list[Message], bool] | None = None, - ): - """ - 处理响应错误异常 - Args: - e (RespNotOkException): 响应错误异常对象 - task_name (str): 任务名称 - model_name (str): 模型名称 - remain_try (int): 剩余尝试次数 - retry_interval (int): 重试间隔 - messages (tuple[list[Message], bool] | None): (消息列表, 是否已压缩过) - Returns: - (等待间隔(如果为0则不等待,为-1则不再请求该模型), 新的消息列表(适用于压缩消息)) - """ - # 响应错误 - if e.status_code in [400, 401, 402, 403, 404]: - # 客户端错误 - logger.warning( - f"任务-'{task_name}' 模型-'{model_name}': 请求失败,错误代码-{e.status_code},错误信息-{e.message}" - ) - return -1, None # 不再重试请求该模型 - elif e.status_code == 413: - if messages and not messages[1]: - # 消息列表不为空且未压缩,尝试压缩消息 - return self._check_retry( - remain_try, - 0, - can_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 请求体过大,尝试压缩消息后重试", - cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 请求体过大,压缩消息后仍然过大,放弃请求", - can_retry_callable=compress_messages, - messages=messages[0], - ) - # 没有消息可压缩 - logger.warning(f"任务-'{task_name}' 模型-'{model_name}': 请求体过大,无法压缩消息,放弃请求。") - return -1, None - elif e.status_code == 429: - # 请求过于频繁 - return self._check_retry( - remain_try, - retry_interval, - can_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 请求过于频繁,将于{retry_interval}秒后重试", - cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 请求过于频繁,超过最大重试次数,放弃请求", - ) - elif e.status_code >= 500: - # 服务器错误 - return self._check_retry( - remain_try, - retry_interval, - can_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 服务器错误,将于{retry_interval}秒后重试", - cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 服务器错误,超过最大重试次数,请稍后再试", - ) - else: - # 未知错误 - logger.warning( - f"任务-'{task_name}' 模型-'{model_name}': 未知错误,错误代码-{e.status_code},错误信息-{e.message}" - ) - return -1, None + logger.error(f"所有 {max_attempts} 个模型均尝试失败。") + if last_exception: + raise last_exception + raise RuntimeError("请求失败,所有可用模型均已尝试失败。") def _build_tool_options(self, tools: Optional[List[Dict[str, Any]]]) -> Optional[List[ToolOption]]: # sourcery skip: extract-method @@ -537,4 +430,4 @@ class LLMRequest: match = re.search(r"(?:)?(.*?)", content, re.DOTALL) content = re.sub(r"(?:)?.*?", "", content, flags=re.DOTALL, count=1).strip() reasoning = match[1].strip() if match else "" - return content, reasoning + return content, reasoning \ No newline at end of file