From 87f1637081d79500ac576fe213fc9a9f7a447a71 Mon Sep 17 00:00:00 2001 From: Bakadax Date: Sun, 4 May 2025 02:42:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=B3=E9=97=AD=E5=AE=89=E5=85=A8=E8=BF=87?= =?UTF-8?q?=E6=BB=A4=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/models/utils_model.py | 491 +++++++++--------------------- 1 file changed, 143 insertions(+), 348 deletions(-) diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py index 89ea9463..ff6222f5 100644 --- a/src/plugins/models/utils_model.py +++ b/src/plugins/models/utils_model.py @@ -146,14 +146,12 @@ class LLMRequest: try: raw_api_key_config = os.environ[self.model_key_name] self.base_url = os.environ[model["base_url"]] - # --- Gemini 检测 --- self.is_gemini = "googleapis.com" in self.base_url.lower() if self.is_gemini: logger.debug(f"模型 {self.model_name}: 检测到为 Gemini API (Base URL: {self.base_url})") if self.stream: logger.warning(f"模型 {self.model_name}: Gemini 流式输出处理与 OpenAI 不同,暂时强制禁用流式。") self.stream = False - # --- 结束 Gemini 检测 --- # 解析和过滤 API Keys (代码不变) parsed_keys = [] @@ -357,10 +355,9 @@ class LLMRequest: actual_endpoint = endpoint if self.is_gemini: - # Gemini API URL structure: https://generativelanguage.googleapis.com/v1beta/models/{model}:{action} - action = endpoint.lstrip(':') # Remove leading ':' if present - api_url = f"{self.base_url.rstrip('/')}/models/{self.model_name}:{action}" - stream_mode = False # Gemini stream handled differently if needed later + action = endpoint.lstrip('/') + api_url = f"{self.base_url.rstrip('/')}/{self.model_name}{action}" + stream_mode = False else: api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" stream_mode = self.stream @@ -369,9 +366,7 @@ class LLMRequest: merged_params = {**self.params, **call_params} if payload is None: - # --- 调用 _build_payload 来构建请求体 --- payload = await self._build_payload(prompt, image_base64, image_format, merged_params) - # --- 结束调用 _build_payload --- else: logger.debug("使用外部提供的 payload,忽略单次调用参数合并。") @@ -384,7 +379,7 @@ class LLMRequest: "policy": policy, "payload": payload, "api_url": api_url, - "stream_mode": payload.get("stream", False), # Check payload for stream setting + "stream_mode": payload.get("stream", False), "image_base64": image_base64, "image_format": image_format, "prompt": prompt, @@ -484,9 +479,6 @@ class LLMRequest: if use_proxy: post_kwargs["proxy"] = current_proxy_url - logger.trace(f"模型 {self.model_name}: 发送请求到 {api_url} (代理: {use_proxy})") - logger.trace(f"模型 {self.model_name}: Payload: {json.dumps(await _safely_record(request_content, actual_payload), indent=2)}") # 安全记录Payload - async with session.post(api_url, **post_kwargs) as response: if response.status == 429 and is_key_list: @@ -525,34 +517,29 @@ class LLMRequest: await self._handle_error_response(response, attempt, policy, current_key) if response.status in policy["retry_codes"] and attempt < policy["max_retries"] - 1: - if response.status not in [429, 403]: # 429/403 切换逻辑优先 + if response.status not in [429, 403]: wait_time = policy["base_wait"] * (2**attempt) logger.warning(f"模型 {self.model_name}: 遇到可重试错误 {response.status}, 等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) last_exception = RuntimeError(f"重试错误 {response.status}") - continue # 继续下一次循环尝试 + continue if response.status in policy["abort_codes"] or (response.status in policy["retry_codes"] and attempt >= policy["max_retries"] - 1): if attempt >= policy["max_retries"] - 1 and response.status in policy["retry_codes"]: logger.error(f"模型 {self.model_name}: 达到最大重试次数,最后一次尝试仍为可重试错误 {response.status}。") - # _handle_error_response 内部会 raise 相应的异常 await self._handle_error_response(response, attempt, policy, current_key) - # 如果 _handle_error_response 没有 raise (理论上不应该),则手动 raise await response.read() final_error_msg = f"请求中止或达到最大重试次数,最终状态码: {response.status}" logger.error(final_error_msg) raise RequestAbortException(final_error_msg, response) - # --- 成功响应处理 --- - response.raise_for_status() # 检查 2xx 状态码,理论上此时应该都是成功的 + response.raise_for_status() result = {} if not self.is_gemini and stream_mode: result = await self._handle_stream_output(response) else: result = await response.json() - logger.trace(f"模型 {self.model_name}: 收到响应: {json.dumps(result, indent=2)}") - # --- 调用响应处理器 --- return ( response_handler(result) if response_handler @@ -569,7 +556,7 @@ class LLMRequest: logger.error(f"模型 {self.model_name}: 因权限拒绝 (403) 中止请求: {e}") if is_key_list and not available_keys_pool and e.key_identifier: logger.critical(f" 中止原因是 Key ...{e.key_identifier[-4:]} 触发 403 后已无其他 Key 可用。") - raise e # 直接抛出,终止执行 + raise e except aiohttp.ClientProxyConnectionError as e: # (代码不变) logger.error(f"代理连接错误: {e} (代理地址: {current_proxy_url})") @@ -593,7 +580,7 @@ class LLMRequest: except (PayLoadTooLargeError, RequestAbortException) as e: # (代码不变) logger.error(f"模型 {self.model_name}: 请求处理中遇到关键错误,将中止: {e}") - raise e # 直接抛出,终止执行 + raise e except Exception as e: # (代码不变) last_exception = e @@ -601,42 +588,35 @@ class LLMRequest: if attempt >= policy["max_retries"] - 1: logger.error(f"模型 {self.model_name}: 达到最大重试次数 ({policy['max_retries']}),因非 HTTP 错误失败。") - # 在这里也记录请求详情可能有助于调试 - safe_payload_record = await _safely_record(request_content, actual_payload) - logger.error(f"失败时的请求详情 (Payload): {json.dumps(safe_payload_record, indent=2)}") else: try: - # 重新准备请求内容字典,以便传递给异常处理器 temp_request_content = { "policy": policy, - "payload": actual_payload, # 传递当前的 payload + "payload": actual_payload, "api_url": api_url, "stream_mode": stream_mode, - "image_base64": image_base64, # 传递原始图片信息 + "image_base64": image_base64, "image_format": image_format, - "prompt": prompt, # 传递原始 prompt - "current_key": current_key # 传递当前使用的 key + "prompt": prompt, } - # 传递 api_kwargs 给 _handle_exception 以便重建 payload handled_payload, count_delta = await self._handle_exception( e, attempt, temp_request_content, merged_params=api_kwargs ) if handled_payload: - actual_payload = handled_payload # 更新 payload 以供下次重试 + actual_payload = handled_payload logger.info(f"模型 {self.model_name}: 异常处理更新了 payload,将使用当前 Key 重试。") - # else: payload 未更新,继续使用原 payload 重试 wait_time = policy["base_wait"] * (2**attempt) logger.warning(f"模型 {self.model_name}: 等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) - continue # 继续下一次循环尝试 + continue except (RequestAbortException, PermissionDeniedException) as abort_exception: logger.error(f"模型 {self.model_name}: 异常处理判断需要中止请求: {abort_exception}") - raise abort_exception # 终止执行 + raise abort_exception except RuntimeError as rt_error: logger.error(f"模型 {self.model_name}: 异常处理遇到运行时错误: {rt_error}") - raise rt_error # 终止执行 + raise rt_error # --- 循环结束 --- # (代码不变) @@ -646,20 +626,13 @@ class LLMRequest: logger.error(f"最后遇到的错误是权限拒绝: {str(last_exception)}") raise last_exception logger.error(f"最后遇到的错误: {str(last_exception.__class__.__name__)} - {str(last_exception)}") - # 记录最后失败时的请求信息 - safe_payload_record = await _safely_record(request_content, actual_payload) - logger.error(f"最终失败时的请求详情 (Payload): {json.dumps(safe_payload_record, indent=2)}") raise RuntimeError(f"模型 {self.model_name} 达到最大重试次数,API 请求失败。最后错误: {str(last_exception)}") from last_exception else: - # 理论上不应该到达这里,因为总会有 last_exception 或成功返回 if not available_keys_pool and keys_abandoned_runtime: final_error_msg = f"模型 {self.model_name}: 所有可用 API Keys 均因 403 错误被禁用。" logger.critical(final_error_msg) raise PermissionDeniedException(final_error_msg) else: - # 记录最后失败时的请求信息 - safe_payload_record = await _safely_record(request_content, actual_payload) - logger.error(f"最终失败时的请求详情 (Payload): {json.dumps(safe_payload_record, indent=2)}") raise RuntimeError(f"模型 {self.model_name} 达到最大重试次数,API 请求失败,原因未知。") @@ -775,55 +748,40 @@ class LLMRequest: except Exception as e: error_text = f"(无法读取响应体: {e})" - key_identifier_log = f"...{current_key[-4:]}" if current_key else "N/A" - if status == 403: logger.error( - f"模型 {self.model_name}: 遇到 403 (权限拒绝) 错误。Key: {key_identifier_log}. " + f"模型 {self.model_name}: 遇到 403 (权限拒绝) 错误。Key: ...{current_key[-4:] if current_key else 'N/A'}. " f"响应: {error_text[:200]}" ) - # 抛出 PermissionDeniedException,由 _execute_request 捕获处理切换或中止逻辑 raise PermissionDeniedException(f"模型禁止访问 ({status})", key_identifier=current_key) - elif status in policy["retry_codes"] and status != 429: # 429 由 _execute_request 处理 + elif status in policy["retry_codes"] and status != 429: if status == 413: - logger.warning(f"模型 {self.model_name}: 错误码 413 (Payload Too Large)。Key: {key_identifier_log}. 尝试压缩...") - # 抛出 PayLoadTooLargeError,由 _execute_request 捕获处理压缩或重试逻辑 + logger.warning(f"模型 {self.model_name}: 错误码 413 (Payload Too Large)。Key: ...{current_key[-4:] if current_key else 'N/A'}. 尝试压缩...") raise PayLoadTooLargeError("请求体过大") elif status in [500, 503]: logger.error( - f"模型 {self.model_name}: 服务器内部错误或过载 ({status})。Key: {key_identifier_log}. " + f"模型 {self.model_name}: 服务器内部错误或过载 ({status})。Key: ...{current_key[-4:] if current_key else 'N/A'}. " f"响应: {error_text[:200]}" ) - # 对于 500/503,这里不抛出异常,让 _execute_request 根据重试次数决定是否继续或中止 return else: - # 其他可重试错误码 - logger.warning(f"模型 {self.model_name}: 遇到可重试错误码: {status}. Key: {key_identifier_log}") - # 不抛出异常,让 _execute_request 根据重试次数决定是否继续或中止 + logger.warning(f"模型 {self.model_name}: 遇到可重试错误码: {status}. Key: ...{current_key[-4:] if current_key else 'N/A'}") return elif status in policy["abort_codes"]: logger.error( f"模型 {self.model_name}: 遇到需要中止的错误码: {status} - {error_code_mapping.get(status, '未知错误')}. " - f"Key: {key_identifier_log}. 响应: {error_text[:200]}" + f"Key: ...{current_key[-4:] if current_key else 'N/A'}. 响应: {error_text[:200]}" ) - # 抛出 RequestAbortException,由 _execute_request 捕获并中止 raise RequestAbortException(f"请求出现错误 {status},中止处理", response) else: - # 未在策略中定义的错误码 - logger.error(f"模型 {self.model_name}: 遇到未明确处理的错误码: {status}. Key: {key_identifier_log}. 响应: {error_text[:200]}") + logger.error(f"模型 {self.model_name}: 遇到未明确处理的错误码: {status}. Key: ...{current_key[-4:] if current_key else 'N/A'}. 响应: {error_text[:200]}") try: - # 尝试按 aiohttp 的方式抛出,如果失败则用 RequestAbortException response.raise_for_status() - # 如果 raise_for_status 没抛异常(理论上不应该),手动抛出 raise RequestAbortException(f"未处理的错误状态码 {status}", response) except aiohttp.ClientResponseError as e: - # 优先抛出 aiohttp 的异常,包含更多信息 raise RequestAbortException(f"未处理的错误状态码 {status}: {e.message}", response) from e - except Exception as e: # 兜底 - logger.error(f"在处理未知错误码 {status} 时发生意外: {e}") - raise RequestAbortException(f"处理未处理的错误状态码 {status} 时出错", response) from e async def _handle_exception( @@ -838,7 +796,6 @@ class LLMRequest: if retry_count < policy["max_retries"] - 1: keep_request = True - # 使用传入的 merged_params (即 api_kwargs) 或原始 payload 来重建 params_for_rebuild = merged_params if merged_params is not None else payload if isinstance(exception, PayLoadTooLargeError): @@ -848,70 +805,59 @@ class LLMRequest: if image_base64: compressed_image_base64 = compress_base64_image_by_scale(image_base64) if compressed_image_base64 != image_base64: - # 使用原始 prompt, 新图片, 和 params_for_rebuild 重建 payload new_payload = await self._build_payload( request_content["prompt"], compressed_image_base64, request_content["image_format"], params_for_rebuild ) logger.info("图片压缩成功,将使用压缩后的图片重试。") - return new_payload, 0 # 返回新 payload + return new_payload, 0 else: logger.warning("图片压缩未改变大小或失败。") else: logger.warning("请求体过大但请求中不包含图片,无法压缩。") - # 无法压缩或无图片,返回 None 表示不更新 payload return None, 0 else: logger.error("达到最大重试次数,请求体仍然过大。") raise RuntimeError("请求体过大,压缩或重试后仍然失败。") from exception elif isinstance(exception, (aiohttp.ClientError, asyncio.TimeoutError)): - # 网络或超时错误 if keep_request: logger.error(f"模型 {self.model_name} 网络错误: {str(exception)}") - # 不更新 payload,直接重试 return None, 0 else: logger.critical(f"模型 {self.model_name} 网络错误达到最大重试次数: {str(exception)}") raise RuntimeError(f"网络请求失败: {str(exception)}") from exception elif isinstance(exception, aiohttp.ClientResponseError): - # HTTP 响应错误 (未被 _handle_error_response 捕获的,理论上少见) if keep_request: logger.error( f"模型 {self.model_name} HTTP响应错误 (未被策略覆盖): 状态码: {exception.status}, 错误: {exception.message}" ) try: - # 尝试记录错误响应体 error_text = await exception.response.text() if hasattr(exception, 'response') else str(exception) logger.error(f"服务器错误响应详情: {error_text[:500]}") except Exception as parse_err: logger.warning(f"无法解析服务器错误响应内容: {str(parse_err)}") - # 不更新 payload,直接重试 return None, 0 else: logger.critical( f"模型 {self.model_name} HTTP响应错误达到最大重试次数: 状态码: {exception.status}, 错误: {exception.message}" ) - # 记录请求详情 - current_key_placeholder = request_content.get("current_key", "******")[-4:] + current_key_placeholder = request_content.get("current_key", "******") handled_payload = await _safely_record(request_content, payload) - logger.critical(f"请求头 (部分): {{'Authorization': 'Bearer ...{current_key_placeholder}' or 'x-goog-api-key': '...{current_key_placeholder}', ...}} 请求体: {json.dumps(handled_payload, indent=2)}") + logger.critical(f"请求头: {await self._build_headers(api_key=current_key_placeholder, no_key=True)} 请求体: {handled_payload}") raise RuntimeError( f"模型 {self.model_name} API请求失败: 状态码 {exception.status}, {exception.message}" ) from exception else: - # 其他未知错误 if keep_request: logger.error(f"模型 {self.model_name} 遇到未知错误: {str(exception.__class__.__name__)} - {str(exception)}") - # 不更新 payload,直接重试 return None, 0 else: logger.critical(f"模型 {self.model_name} 请求因未知错误失败: {str(exception.__class__.__name__)} - {str(exception)}") - # 记录请求详情 - current_key_placeholder = request_content.get("current_key", "******")[-4:] + current_key_placeholder = request_content.get("current_key", "******") handled_payload = await _safely_record(request_content, payload) - logger.critical(f"请求头 (部分): {{'Authorization': 'Bearer ...{current_key_placeholder}' or 'x-goog-api-key': '...{current_key_placeholder}', ...}} 请求体: {json.dumps(handled_payload, indent=2)}") + logger.critical(f"请求头: {await self._build_headers(api_key=current_key_placeholder, no_key=True)} 请求体: {handled_payload}") raise RuntimeError(f"模型 {self.model_name} API请求失败: {str(exception)}") from exception @@ -919,18 +865,14 @@ class LLMRequest: """根据模型名称转换合并后的参数,并移除内部参数""" # (代码不变) new_params = dict(merged_params) - new_params.pop("request_type", None) # 移除内部使用的 request_type + new_params.pop("request_type", None) if not self.is_gemini and self.model_name.lower() in self.MODELS_NEEDING_TRANSFORMATION: - # --- O1/O3 模型参数转换 --- - new_params.pop("temperature", None) # 移除 temperature + new_params.pop("temperature", None) if "max_tokens" in new_params: - new_params["max_completion_tokens"] = new_params.pop("max_tokens") # 重命名 max_tokens - logger.trace(f"模型 {self.model_name}: 应用 O1/O3 参数转换。") + new_params["max_completion_tokens"] = new_params.pop("max_tokens") elif self.is_gemini: - # --- Gemini 模型参数转换 --- gen_config = new_params.get("generationConfig", {}) - # 将 OpenAI 风格的参数移入 generationConfig if "temperature" in new_params: gen_config["temperature"] = new_params.pop("temperature") if "max_tokens" in new_params: @@ -939,40 +881,25 @@ class LLMRequest: gen_config["topP"] = new_params.pop("top_p") if "top_k" in new_params: gen_config["topK"] = new_params.pop("top_k") - # 可以在这里添加 stopSequences 等其他 Gemini 参数的转换 - # if "stop" in new_params: - # gen_config["stopSequences"] = new_params.pop("stop") - if gen_config: # 只有当 gen_config 非空时才更新 + if gen_config: new_params["generationConfig"] = gen_config - # 移除 Gemini 不支持的 OpenAI 参数 new_params.pop("frequency_penalty", None) new_params.pop("presence_penalty", None) - new_params.pop("max_completion_tokens", None) # 移除 O1/O3 的参数(如果存在) - logger.trace(f"模型 {self.model_name}: 应用 Gemini 参数转换。") - else: - # --- 标准 OpenAI 兼容模型 --- - # 确保 max_tokens 或 max_completion_tokens 存在 - if "max_tokens" not in new_params and "max_completion_tokens" not in new_params: - new_params["max_tokens"] = global_config.model_max_output_length - logger.trace(f"模型 {self.model_name}: 未提供 max_tokens,使用全局默认值 {global_config.model_max_output_length}") - elif "max_completion_tokens" in new_params: # 优先使用 O1/O3 风格的(如果存在) - new_params["max_tokens"] = new_params.pop("max_completion_tokens") - logger.trace(f"模型 {self.model_name}: 使用标准 OpenAI 参数。") + new_params.pop("max_completion_tokens", None) return new_params async def _build_payload(self, prompt: str, image_base64: str = None, image_format: str = None, merged_params: dict = None) -> dict: - """构建请求体 (区分 Gemini 和 OpenAI),使用合并和转换后的参数,并为 Gemini 添加安全设置""" + """构建请求体 (区分 Gemini 和 OpenAI),使用合并和转换后的参数""" + # (代码不变) if merged_params is None: - merged_params = self.params # 如果没有传入合并参数,使用实例的默认参数 + merged_params = self.params - # --- 参数转换 --- params_copy = await self._transform_parameters(merged_params) if self.is_gemini: - # --- 构建 Gemini Payload --- parts = [] if prompt: parts.append({"text": prompt}) @@ -984,28 +911,24 @@ class LLMRequest: "data": image_base64 } }) - payload = { "contents": [{"parts": parts}], - # 其他转换后的参数 (如 generationConfig) 会在这里展开 **params_copy } - payload.pop("model", None) # Gemini 不在顶层传 model - + payload.pop("model", None) # --- 添加 Gemini 安全设置 --- safety_settings = [ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, - # 注意:HARM_CATEGORY_CIVIC_INTEGRITY 不是标准的 REST API 类别,故不包含 + {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, ] payload["safetySettings"] = safety_settings - logger.debug(f"模型 {self.model_name}: 已为 Gemini 请求添加 safetySettings (BLOCK_NONE)。") + logger.debug(f"模型 {self.model_name}: 已为 Gemini 函数调用请求添加 safetySettings (BLOCK_NONE)。") # --- 结束添加安全设置 --- else: - # --- 构建 OpenAI Payload --- if image_base64: messages = [ { @@ -1025,24 +948,20 @@ class LLMRequest: payload = { "model": self.model_name, "messages": messages, - # 其他转换后的参数 (如 temperature, max_tokens) 会在这里展开 **params_copy, } - # 确保 max_tokens 存在 (已在 _transform_parameters 中处理) - # if "max_tokens" not in payload and "max_completion_tokens" not in payload: - # if "max_tokens" not in params_copy and "max_completion_tokens" not in params_copy: - # payload["max_tokens"] = global_config.model_max_output_length - # if "max_completion_tokens" in payload: # 已在 _transform_parameters 中处理 - # payload["max_tokens"] = payload.pop("max_completion_tokens") + if "max_tokens" not in payload and "max_completion_tokens" not in payload: + if "max_tokens" not in params_copy and "max_completion_tokens" not in params_copy: + payload["max_tokens"] = global_config.model_max_output_length + if "max_completion_tokens" in payload: + payload["max_tokens"] = payload.pop("max_completion_tokens") - logger.trace(f"构建的 Payload (模型: {self.model_name}, Gemini: {self.is_gemini}): {json.dumps(await _safely_record({'image_base64': image_base64, 'image_format': image_format}, payload), indent=2)}") return payload def _default_response_handler( self, result: dict, user_id: str = "system", request_type: str = None, endpoint: str = "/chat/completions" ) -> Tuple: """默认响应解析 (区分 Gemini 和 OpenAI),并处理函数/工具调用""" - # (代码不变) content = "没有返回结果" reasoning_content = "" tool_calls = None # OpenAI 格式 @@ -1075,40 +994,29 @@ class LLMRequest: # else: function_call 已获取,content 留空或设为特定值 else: - # 检查是否因为安全设置被阻止 - finish_reason = candidate.get("finishReason") - safety_ratings = candidate.get("safetyRatings") - if finish_reason == "SAFETY": - logger.warning(f"模型 {self.model_name}: Gemini 响应因安全设置被阻止。 Safety Ratings: {safety_ratings}") - content = "响应内容因安全原因被过滤。" - elif finish_reason == "RECITATION": - logger.warning(f"模型 {self.model_name}: Gemini 响应因引用限制被阻止。") - content = "响应内容因引用限制被过滤。" - elif finish_reason == "OTHER": - logger.warning(f"模型 {self.model_name}: Gemini 响应因未知原因停止。") - else: - # 既没有 content/parts,也不是已知的 finishReason - content = "Gemini响应中缺少 content 或 parts" - logger.warning(f"模型 {self.model_name}: Gemini 响应格式不完整 (缺少 content/parts 且非已知 finishReason): {result}") + content = "Gemini响应中缺少 content 或 parts" + logger.warning(f"模型 {self.model_name}: Gemini 响应格式不完整 (缺少 content/parts): {result}") + finish_reason = candidate.get("finishReason") + if finish_reason == "SAFETY": + logger.warning(f"模型 {self.model_name}: Gemini 响应因安全设置被阻止。") + content = "响应内容因安全原因被过滤。" + elif finish_reason == "RECITATION": + logger.warning(f"模型 {self.model_name}: Gemini 响应因引用限制被阻止。") + content = "响应内容因引用限制被过滤。" + elif finish_reason == "OTHER": + logger.warning(f"模型 {self.model_name}: Gemini 响应因未知原因停止。") + # finishReason == "TOOL_CODE" or "FUNCTION_CALL" 是正常情况 usage = result.get("usageMetadata", {}) if usage: prompt_tokens = usage.get("promptTokenCount", 0) - completion_tokens = usage.get("candidatesTokenCount", 0) # 注意 Gemini key 不同 + completion_tokens = usage.get("candidatesTokenCount", 0) total_tokens = usage.get("totalTokenCount", 0) - # Gemini 有时不返回 candidatesTokenCount,尝试计算 - if completion_tokens == 0 and total_tokens > 0 and prompt_tokens > 0: + if completion_tokens == 0 and total_tokens > 0: completion_tokens = total_tokens - prompt_tokens else: - # 检查 promptFeedback 是否包含 blockReason - prompt_feedback = result.get("promptFeedback") - if prompt_feedback and prompt_feedback.get("blockReason"): - block_reason = prompt_feedback.get("blockReason") - logger.warning(f"模型 {self.model_name} (Gemini): 提示因 '{block_reason}' 被阻止。") - content = f"提示内容因 '{block_reason}' 被阻止。" - else: - logger.warning(f"模型 {self.model_name} (Gemini) 的响应中缺少 'usageMetadata' 信息。") + logger.warning(f"模型 {self.model_name} (Gemini) 的响应中缺少 'usageMetadata' 信息。") except Exception as e: logger.error(f"解析 Gemini 响应出错: {e} - 响应: {result}") @@ -1148,12 +1056,10 @@ class LLMRequest: total_tokens=total_tokens, user_id=user_id, request_type=request_type, - endpoint=endpoint, # 传递 endpoint 用于区分 generateContent 和 embedContent + endpoint=endpoint, ) else: - # 只有在内容不是因安全过滤等原因导致无 token 时才警告 - if "因安全原因被过滤" not in content and "因引用限制被过滤" not in content and "被阻止" not in content: - logger.warning(f"模型 {self.model_name}: 未能从响应中提取有效的 token 使用信息。响应: {result}") + logger.warning(f"模型 {self.model_name}: 未能从响应中提取有效的 token 使用信息。") # --- 返回结果 (统一格式) --- @@ -1165,21 +1071,15 @@ class LLMRequest: logger.debug(f"检测到 Gemini 函数调用: {function_call}") # 将 Gemini functionCall 转换为 OpenAI tool_calls 格式 # 注意: Gemini 的 functionCall 没有显式的 id 和 type,需要模拟 - try: - # Gemini 的参数在 'args' 中,OpenAI 在 'arguments' (通常是 JSON 字符串) - # 需要将 Gemini 的 dict 参数转换为 JSON 字符串 - arguments_json = json.dumps(function_call.get("args", {})) - except TypeError as e: - logger.error(f"无法将 Gemini 函数调用的参数转换为 JSON: {function_call.get('args', {})} - Error: {e}") - arguments_json = "{}" # 使用空 JSON 对象作为回退 - final_tool_calls = [ { "id": f"call_{random.randint(1000, 9999)}", # 生成一个随机 ID "type": "function", "function": { "name": function_call.get("name"), - "arguments": arguments_json + # Gemini 的参数在 'args' 中,OpenAI 在 'arguments' (通常是 JSON 字符串) + # 需要将 Gemini 的 dict 参数转换为 JSON 字符串 + "arguments": json.dumps(function_call.get("args", {})) } } ] @@ -1187,7 +1087,6 @@ class LLMRequest: if final_tool_calls: # 如果有工具/函数调用,通常 content 为空或包含思考过程,这里返回转换后的调用信息 - # 同时返回 content 和 reasoning_content (可能为空) return content, reasoning_content, final_tool_calls else: # 没有工具/函数调用,返回普通文本响应 @@ -1212,7 +1111,6 @@ class LLMRequest: """构建请求头 (区分 Gemini 和 OpenAI)""" # (代码不变) if no_key: - # 用于日志记录,隐藏实际 key if self.is_gemini: return {"x-goog-api-key": "**********", "Content-Type": "application/json"} else: @@ -1223,10 +1121,8 @@ class LLMRequest: raise ValueError(f"无效的 API key 提供给 _build_headers。") if self.is_gemini: - # Gemini 使用 x-goog-api-key return {"x-goog-api-key": api_key, "Content-Type": "application/json"} else: - # OpenAI 使用 Authorization Bearer token return {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} @@ -1239,20 +1135,14 @@ class LLMRequest: prompt=prompt, user_id=user_id, request_type="chat", - **kwargs # 传递覆盖参数 + **kwargs ) - # _default_response_handler 返回 (content, reasoning) 或 (content, reasoning, tool_calls) if len(response) == 3: content, reasoning_content, tool_calls = response - # 对于普通 generate_response,即使返回 tool_calls,也只返回前三个元素 - return content, reasoning_content, self.model_name #, tool_calls - elif len(response) == 2: + return content, reasoning_content, self.model_name, tool_calls + else: content, reasoning_content = response return content, reasoning_content, self.model_name - else: - logger.error(f"来自 _default_response_handler 的意外响应格式: {response}") - return "处理响应出错", "", self.model_name - async def generate_response_for_image(self, prompt: str, image_base64: str, image_format: str, user_id: str = "system", **kwargs) -> Tuple: """根据输入的提示和图片生成模型的异步响应,支持覆盖参数""" @@ -1265,13 +1155,11 @@ class LLMRequest: image_format=image_format, user_id=user_id, request_type="vision", - **kwargs # 传递覆盖参数 + **kwargs ) # _default_response_handler 现在总是返回至少2个值 if len(response) == 3: - content, reasoning, tool_calls = response - # Vision 请求通常不期望 tool_calls,但如果返回了也包含在内 - return content, reasoning #, tool_calls # 保持返回两个值 + return response # content, reasoning, tool_calls (tool_calls 可能为 None) elif len(response) == 2: content, reasoning = response return content, reasoning # 对于 vision 请求,通常没有 tool_calls @@ -1287,14 +1175,13 @@ class LLMRequest: response = await self._execute_request( endpoint=endpoint, prompt=prompt, - payload=None, # 让 _execute_request 内部构建 - retry_policy=None, # 使用默认重试策略 - response_handler=None, # 使用默认响应处理器 + payload=None, + retry_policy=None, + response_handler=None, user_id=user_id, - request_type=request_type, # 传递请求类型 - **kwargs # 传递覆盖参数 + request_type=request_type, + **kwargs ) - # 返回 _default_response_handler 的原始结果 (通常是 tuple) return response # 修改:实现 Gemini Function Calling 的 Payload 构建 @@ -1302,31 +1189,28 @@ class LLMRequest: """异步方式根据输入的提示和工具生成模型的响应,支持覆盖参数和 Gemini 函数调用""" endpoint = ":generateContent" if self.is_gemini else "/chat/completions" - merged_params = {**self.params, **kwargs} # 合并实例参数和调用时参数 + merged_params = {**self.params, **kwargs} + transformed_params = await self._transform_parameters(merged_params) # 清理 request_type 等 - # --- 构建特定于工具调用的 Payload --- payload = None + if self.is_gemini: # --- 构建 Gemini Function Calling Payload --- logger.debug(f"为 Gemini ({self.model_name}) 构建函数调用请求。") # 1. 转换工具定义 (OpenAI -> Gemini) + # OpenAI tool format: [{"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}}] + # Gemini tool format: [{"functionDeclarations": [{"name": ..., "description": ..., "parameters": ...}]}] function_declarations = [] if tools: for tool in tools: if tool.get("type") == "function" and "function" in tool: func_def = tool["function"] # Gemini parameters 使用 OpenAPI Schema,与 OpenAI 基本兼容 - # 确保 description 存在,即使为空字符串 - gemini_func_def = { + function_declarations.append({ "name": func_def.get("name"), - "description": func_def.get("description", ""), - "parameters": func_def.get("parameters", {"type": "object", "properties": {}}) # 确保 parameters 存在 - } - # 移除 description 如果为空 (Gemini 可能不允许空 description) - # if not gemini_func_def["description"]: - # del gemini_func_def["description"] # 实际测试看是否需要 - - function_declarations.append(gemini_func_def) + "description": func_def.get("description", ""), # Description is required for Gemini + "parameters": func_def.get("parameters", {"type": "object", "properties": {}}) # Ensure parameters exist + }) else: logger.warning(f"跳过不支持的工具类型或格式: {tool}") @@ -1334,14 +1218,14 @@ class LLMRequest: logger.error("没有有效的函数声明可用于 Gemini 请求。") return "没有提供有效的函数定义", "", None - gemini_tools_config = [{"functionDeclarations": function_declarations}] + gemini_tools = [{"functionDeclarations": function_declarations}] - # 2. 构建 Gemini Payload (需要先转换参数) - transformed_params = await self._transform_parameters(merged_params) # 清理 request_type 等, 转换参数格式 + # 2. 构建 Gemini Payload + # parts = [{"text": prompt}] # 初始 parts payload = { "contents": [{"parts": [{"text": prompt}]}], # 包含用户提示 - "tools": gemini_tools_config, - # toolConfig 可以从 kwargs 获取或使用默认值 + "tools": gemini_tools, + # toolConfig 默认是 AUTO,可以根据需要从 kwargs 获取或硬编码 # "toolConfig": {"functionCallingConfig": {"mode": "ANY"}}, # 例如强制调用 **transformed_params # 合并其他转换后的参数 (如 generationConfig) } @@ -1349,38 +1233,24 @@ class LLMRequest: payload.pop("messages", None) # 移除 OpenAI 特有的 messages payload.pop("tool_choice", None) # 移除 OpenAI 特有的 tool_choice - # --- 添加 Gemini 安全设置 --- - safety_settings = [ - {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, - {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, - {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, - {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, - {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, - ] - payload["safetySettings"] = safety_settings - logger.debug(f"模型 {self.model_name}: 已为 Gemini 函数调用请求添加 safetySettings (BLOCK_NONE)。") - # --- 结束添加安全设置 --- - logger.trace(f"构建的 Gemini 函数调用 Payload: {json.dumps(payload, indent=2)}") + else: # --- 构建 OpenAI Tool Calling Payload --- + # (逻辑不变) logger.debug(f"为 OpenAI 兼容模型 ({self.model_name}) 构建工具调用请求。") - # 需要先转换参数 - transformed_params = await self._transform_parameters(merged_params) payload = { "model": self.model_name, "messages": [{"role": "user", "content": prompt}], - **transformed_params, # 使用转换后的参数 + **transformed_params, "tools": tools, - "tool_choice": transformed_params.get("tool_choice", "auto"), # tool_choice 可能在转换时被移除,从转换后的参数获取 + "tool_choice": transformed_params.get("tool_choice", "auto"), } - # 确保 max_tokens (已在 _transform_parameters 中处理) - # if "max_completion_tokens" in payload: - # payload["max_tokens"] = payload.pop("max_completion_tokens") - # if "max_tokens" not in payload: - # payload["max_tokens"] = global_config.model_max_output_length - logger.trace(f"构建的 OpenAI 工具调用 Payload: {json.dumps(payload, indent=2)}") + if "max_completion_tokens" in payload: + payload["max_tokens"] = payload.pop("max_completion_tokens") + if "max_tokens" not in payload: + payload["max_tokens"] = global_config.model_max_output_length # --- 执行请求 --- @@ -1390,17 +1260,16 @@ class LLMRequest: response = await self._execute_request( endpoint=endpoint, - payload=payload, # 直接传递构建好的 payload - prompt=prompt, # prompt 仍然需要,用于可能的重试和日志记录 + payload=payload, + prompt=prompt, # prompt 仍然需要,用于可能的重试 user_id=user_id, - request_type="tool_call", # 设置请求类型 - **kwargs # 传递原始 kwargs 以便在重试时重新合并参数 + request_type="tool_call", + **kwargs # 传递原始 kwargs 以便在重试时重新合并 ) # _default_response_handler 现在会处理 Gemini functionCall 并统一格式 logger.debug(f"模型 {self.model_name} 工具/函数调用返回结果: {response}") - # 解析响应 (与 generate_response 类似) if isinstance(response, tuple) and len(response) == 3: content, reasoning_content, final_tool_calls = response # final_tool_calls 已经是统一的 OpenAI 格式 @@ -1408,7 +1277,7 @@ class LLMRequest: elif isinstance(response, tuple) and len(response) == 2: content, reasoning_content = response logger.debug("收到普通响应,无工具/函数调用") - return content, reasoning_content, None # 返回 None 表示没有工具调用 + return content, reasoning_content, None else: logger.error(f"收到来自 _execute_request/_default_response_handler 的意外响应格式: {response}") return "处理响应时出错", "", None @@ -1424,96 +1293,69 @@ class LLMRequest: api_kwargs = {k: v for k, v in kwargs.items() if k != 'request_type'} if self.is_gemini: - # --- 构建 Gemini Embedding Payload --- endpoint = ":embedContent" - # Gemini embedding 模型名称通常是 'embedding-001' 或类似,不带 'models/' 前缀 - # 但 API URL 需要 'models/' 前缀,这里假设 self.model_name 是正确的 embedding 模型名 payload = { - "model": f"models/{self.model_name}", # URL 需要 'models/' + "model": f"models/{self.model_name}", "content": { "parts": [{"text": text}] }, - # Gemini embedding 可能有特定参数,如 task_type - # "taskType": "RETRIEVAL_DOCUMENT", # Example - **api_kwargs # 合并其他调用时参数 + **api_kwargs } - # 移除 OpenAI 特有的参数 payload.pop("encoding_format", None) payload.pop("input", None) - logger.trace(f"构建的 Gemini Embedding Payload: {json.dumps(payload, indent=2)}") else: - # --- 构建 OpenAI Embedding Payload --- endpoint = "/embeddings" payload = { "model": self.model_name, "input": text, - "encoding_format": "float", # OpenAI 推荐 float - **api_kwargs # 合并其他调用时参数 + "encoding_format": "float", + **api_kwargs } - # 移除 Gemini 特有的参数 payload.pop("content", None) payload.pop("taskType", None) - logger.trace(f"构建的 OpenAI Embedding Payload: {json.dumps(payload, indent=2)}") def embedding_handler(result): # (代码不变) embedding_value = None prompt_tokens = 0 - completion_tokens = 0 # Embedding 通常没有 completion tokens + completion_tokens = 0 total_tokens = 0 if self.is_gemini: - # --- 解析 Gemini Embedding 响应 --- if "embedding" in result and "value" in result["embedding"]: embedding_value = result["embedding"]["value"] - # Gemini embedding API 响应目前不包含 token usage logger.warning(f"模型 {self.model_name} (Gemini Embedding): 响应中未找到明确的 token 使用信息。") - # 尝试从可能的其他字段获取,如果未来 API 更新的话 - usage = result.get("usageMetadata", {}) - prompt_tokens = usage.get("promptTokenCount", 0) - total_tokens = usage.get("totalTokenCount", 0) - else: - # --- 解析 OpenAI Embedding 响应 --- if "data" in result and len(result["data"]) > 0: embedding_value = result["data"][0].get("embedding", None) usage = result.get("usage", {}) if usage: prompt_tokens = usage.get("prompt_tokens", 0) - total_tokens = usage.get("total_tokens", 0) # OpenAI embedding 只有 total_tokens + total_tokens = usage.get("total_tokens", 0) else: logger.warning(f"模型 {self.model_name} (OpenAI Embedding) 的响应中缺少 'usage' 信息。") - # --- 记录 Usage --- - # 只有在获取到 token 信息时才记录 - if prompt_tokens > 0 or total_tokens > 0: - self._record_usage( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, # 通常为 0 - total_tokens=total_tokens, - user_id=user_id, - request_type="embedding", - endpoint=endpoint, # 传递 endpoint - ) - elif embedding_value is not None: # 即使没有 token 信息,只要成功获取 embedding 也记录一下(token 为 0) - self._record_usage(0, 0, 0, user_id, "embedding", endpoint) - else: - logger.warning(f"模型 {self.model_name} (Embedding): 未能从响应中提取有效的 embedding 或 token 使用信息。") - - + self._record_usage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + user_id=user_id, + request_type="embedding", + endpoint=endpoint, + ) return embedding_value embedding = await self._execute_request( endpoint=endpoint, - payload=payload, # 传递构建好的 payload - prompt=text, # prompt 用于日志和可能的重试构建 - retry_policy={"max_retries": 2, "base_wait": 6}, # Embedding 通常重试次数少一些 - response_handler=embedding_handler, # 使用特定的 handler + payload=payload, + prompt=text, + retry_policy={"max_retries": 2, "base_wait": 6}, + response_handler=embedding_handler, user_id=user_id, - request_type="embedding", # 设置请求类型 - **api_kwargs # 传递覆盖参数 + request_type="embedding", + **api_kwargs ) return embedding @@ -1523,108 +1365,60 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10 # (代码不变) try: image_data = base64.b64decode(base64_data) - if len(image_data) <= target_size * 1.05: # 允许 5% 的误差 + if len(image_data) <= target_size * 1.05: logger.info(f"图片大小 {len(image_data) / 1024:.1f}KB 已足够小,无需压缩。") return base64_data img = Image.open(io.BytesIO(image_data)) img_format = img.format original_width, original_height = img.size - # 调整缩放比例计算,避免除零或无效比例 - if len(image_data) <= 0: - logger.warning("原始图片数据大小为 0,无法计算缩放比例。") - return base64_data - scale = max(0.1, min(1.0, (target_size / len(image_data)) ** 0.5)) # 最小缩放比例设为 0.1 + scale = max(0.2, min(1.0, (target_size / len(image_data)) ** 0.5)) new_width = max(1, int(original_width * scale)) new_height = max(1, int(original_height * scale)) - - logger.info(f"尝试压缩图片: {original_width}x{original_height} ({img_format}) -> 目标尺寸约 {new_width}x{new_height}") - output_buffer = io.BytesIO() save_format = img_format # Default to original format - save_params = {} - # 处理动图 (GIF) if getattr(img, "is_animated", False) and img.n_frames > 1: frames = [] durations = [] loop = img.info.get('loop', 0) - disposal = img.info.get('disposal', 2) # Use default disposal if not specified + disposal = img.info.get('disposal', 2) logger.info(f"检测到 GIF 动图 ({img.n_frames} 帧),尝试按比例压缩...") - try: - while True: - current_duration = img.info.get('duration', 100) # Default duration 100ms - durations.append(current_duration) - - # Convert frame to RGBA for consistent processing, handle palette transparency - current_frame = img.convert("RGBA") - resized_frame = current_frame.resize((new_width, new_height), Image.Resampling.LANCZOS) - frames.append(resized_frame) - img.seek(img.tell() + 1) # Move to next frame - except EOFError: - pass # End of sequence - except Exception as gif_err: - logger.error(f"处理 GIF 帧时出错: {gif_err}") - return base64_data # Return original on error - + for frame_idx in range(img.n_frames): + img.seek(frame_idx) + current_duration = img.info.get('duration', 100) + durations.append(current_duration) + new_frame = img.convert("RGBA").copy() + resized_frame = new_frame.resize((new_width, new_height), Image.Resampling.LANCZOS) + frames.append(resized_frame) if frames: - # Save the animated GIF - # Ensure transparency info is preserved if possible - save_kwargs = { - "save_all": True, - "append_images": frames[1:], - "optimize": False, # Optimization can sometimes break animations - "duration": durations, - "loop": loop, - "disposal": disposal, - } - # Handle transparency for the first frame - transparency = img.info.get('transparency', None) - if transparency is not None: - save_kwargs['transparency'] = transparency - # Handle background color if present - background = img.info.get('background', None) - if background is not None: - save_kwargs['background'] = background - - frames[0].save(output_buffer, format="GIF", **save_kwargs) + frames[0].save( + output_buffer, format="GIF", save_all=True, append_images=frames[1:], + optimize=False, duration=durations, loop=loop, disposal=disposal, + transparency=img.info.get('transparency', None), background=img.info.get('background', None) + ) save_format = "GIF" else: logger.warning("未能处理 GIF 帧。") - return base64_data # Return original if no frames processed + return base64_data else: - # 处理静态图片 - # 保留透明度 (PNG, WEBP) if img.mode in ("RGBA", "LA") or 'transparency' in img.info: - # Convert to RGBA before resizing to preserve alpha channel correctly resized_img = img.convert("RGBA").resize((new_width, new_height), Image.Resampling.LANCZOS) - # Prefer PNG for transparency, but consider WEBP if original was WEBP? (Pillow might not save animated WEBP well) save_format = "PNG" save_params = {"optimize": True} - # 处理无透明度图片 (JPEG, etc.) else: - # Convert to RGB for standard formats resized_img = img.convert("RGB").resize((new_width, new_height), Image.Resampling.LANCZOS) - # Prefer JPEG if original was JPEG for potentially smaller size if img_format and img_format.upper() == "JPEG": save_format = "JPEG" - save_params = {"quality": 85, "optimize": True} # Good balance of quality/size + save_params = {"quality": 85, "optimize": True} else: - # Fallback to PNG for other non-transparent formats save_format = "PNG" save_params = {"optimize": True} - resized_img.save(output_buffer, format=save_format, **save_params) compressed_data = output_buffer.getvalue() - final_size_kb = len(compressed_data) / 1024 - original_size_kb = len(image_data) / 1024 - target_size_kb = target_size / 1024 - - logger.success(f"压缩图片: {original_width}x{original_height} ({img.format} -> {save_format})") - logger.info(f"压缩前大小: {original_size_kb:.1f}KB, 压缩后大小: {final_size_kb:.1f}KB (目标: {target_size_kb:.1f}KB)") - - # 只有在压缩后明显变小才返回压缩后的数据 (例如,小于原始大小的 95%) - if final_size_kb < original_size_kb * 0.95: + logger.success(f"压缩图片: {original_width}x{original_height} -> {new_width}x{new_height} ({img.format} -> {save_format})") + logger.info(f"压缩前大小: {len(image_data) / 1024:.1f}KB, 压缩后大小: {len(compressed_data) / 1024:.1f}KB (目标: {target_size / 1024:.1f}KB)") + if len(compressed_data) < len(image_data) * 0.95: return base64.b64encode(compressed_data).decode("utf-8") else: logger.info("压缩效果不明显或反而增大,返回原始图片。") @@ -1633,4 +1427,5 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10 logger.error(f"压缩图片失败: {str(e)}") import traceback logger.error(traceback.format_exc()) - return base64_data # 发生任何错误都返回原始数据 + return base64_data +