From 737a2fd2f80fa5526c27ac5b7538fbfe74e407ac Mon Sep 17 00:00:00 2001 From: Bakadax Date: Sun, 4 May 2025 02:29:00 +0800 Subject: [PATCH] modified: src/plugins/models/utils_model.py --- src/plugins/models/utils_model.py | 498 +++++++++++++++++++++--------- 1 file changed, 357 insertions(+), 141 deletions(-) diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py index 8b35cf2e..89ea9463 100644 --- a/src/plugins/models/utils_model.py +++ b/src/plugins/models/utils_model.py @@ -146,12 +146,14 @@ class LLMRequest: try: raw_api_key_config = os.environ[self.model_key_name] self.base_url = os.environ[model["base_url"]] + # --- Gemini 检测 --- self.is_gemini = "googleapis.com" in self.base_url.lower() if self.is_gemini: logger.debug(f"模型 {self.model_name}: 检测到为 Gemini API (Base URL: {self.base_url})") if self.stream: logger.warning(f"模型 {self.model_name}: Gemini 流式输出处理与 OpenAI 不同,暂时强制禁用流式。") self.stream = False + # --- 结束 Gemini 检测 --- # 解析和过滤 API Keys (代码不变) parsed_keys = [] @@ -355,9 +357,10 @@ class LLMRequest: actual_endpoint = endpoint if self.is_gemini: - action = endpoint.lstrip('/') - api_url = f"{self.base_url.rstrip('/')}/{self.model_name}{action}" - stream_mode = False + # Gemini API URL structure: https://generativelanguage.googleapis.com/v1beta/models/{model}:{action} + action = endpoint.lstrip(':') # Remove leading ':' if present + api_url = f"{self.base_url.rstrip('/')}/models/{self.model_name}:{action}" + stream_mode = False # Gemini stream handled differently if needed later else: api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" stream_mode = self.stream @@ -366,7 +369,9 @@ class LLMRequest: merged_params = {**self.params, **call_params} if payload is None: + # --- 调用 _build_payload 来构建请求体 --- payload = await self._build_payload(prompt, image_base64, image_format, merged_params) + # --- 结束调用 _build_payload --- else: logger.debug("使用外部提供的 payload,忽略单次调用参数合并。") @@ -379,7 +384,7 @@ class LLMRequest: "policy": policy, "payload": payload, "api_url": api_url, - "stream_mode": payload.get("stream", False), + "stream_mode": payload.get("stream", False), # Check payload for stream setting "image_base64": image_base64, "image_format": image_format, "prompt": prompt, @@ -479,6 +484,9 @@ class LLMRequest: if use_proxy: post_kwargs["proxy"] = current_proxy_url + logger.trace(f"模型 {self.model_name}: 发送请求到 {api_url} (代理: {use_proxy})") + logger.trace(f"模型 {self.model_name}: Payload: {json.dumps(await _safely_record(request_content, actual_payload), indent=2)}") # 安全记录Payload + async with session.post(api_url, **post_kwargs) as response: if response.status == 429 and is_key_list: @@ -517,29 +525,34 @@ class LLMRequest: await self._handle_error_response(response, attempt, policy, current_key) if response.status in policy["retry_codes"] and attempt < policy["max_retries"] - 1: - if response.status not in [429, 403]: + if response.status not in [429, 403]: # 429/403 切换逻辑优先 wait_time = policy["base_wait"] * (2**attempt) logger.warning(f"模型 {self.model_name}: 遇到可重试错误 {response.status}, 等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) last_exception = RuntimeError(f"重试错误 {response.status}") - continue + continue # 继续下一次循环尝试 if response.status in policy["abort_codes"] or (response.status in policy["retry_codes"] and attempt >= policy["max_retries"] - 1): if attempt >= policy["max_retries"] - 1 and response.status in policy["retry_codes"]: logger.error(f"模型 {self.model_name}: 达到最大重试次数,最后一次尝试仍为可重试错误 {response.status}。") + # _handle_error_response 内部会 raise 相应的异常 await self._handle_error_response(response, attempt, policy, current_key) + # 如果 _handle_error_response 没有 raise (理论上不应该),则手动 raise await response.read() final_error_msg = f"请求中止或达到最大重试次数,最终状态码: {response.status}" logger.error(final_error_msg) raise RequestAbortException(final_error_msg, response) - response.raise_for_status() + # --- 成功响应处理 --- + response.raise_for_status() # 检查 2xx 状态码,理论上此时应该都是成功的 result = {} if not self.is_gemini and stream_mode: result = await self._handle_stream_output(response) else: result = await response.json() + logger.trace(f"模型 {self.model_name}: 收到响应: {json.dumps(result, indent=2)}") + # --- 调用响应处理器 --- return ( response_handler(result) if response_handler @@ -556,7 +569,7 @@ class LLMRequest: logger.error(f"模型 {self.model_name}: 因权限拒绝 (403) 中止请求: {e}") if is_key_list and not available_keys_pool and e.key_identifier: logger.critical(f" 中止原因是 Key ...{e.key_identifier[-4:]} 触发 403 后已无其他 Key 可用。") - raise e + raise e # 直接抛出,终止执行 except aiohttp.ClientProxyConnectionError as e: # (代码不变) logger.error(f"代理连接错误: {e} (代理地址: {current_proxy_url})") @@ -580,7 +593,7 @@ class LLMRequest: except (PayLoadTooLargeError, RequestAbortException) as e: # (代码不变) logger.error(f"模型 {self.model_name}: 请求处理中遇到关键错误,将中止: {e}") - raise e + raise e # 直接抛出,终止执行 except Exception as e: # (代码不变) last_exception = e @@ -588,35 +601,42 @@ class LLMRequest: if attempt >= policy["max_retries"] - 1: logger.error(f"模型 {self.model_name}: 达到最大重试次数 ({policy['max_retries']}),因非 HTTP 错误失败。") + # 在这里也记录请求详情可能有助于调试 + safe_payload_record = await _safely_record(request_content, actual_payload) + logger.error(f"失败时的请求详情 (Payload): {json.dumps(safe_payload_record, indent=2)}") else: try: + # 重新准备请求内容字典,以便传递给异常处理器 temp_request_content = { "policy": policy, - "payload": actual_payload, + "payload": actual_payload, # 传递当前的 payload "api_url": api_url, "stream_mode": stream_mode, - "image_base64": image_base64, + "image_base64": image_base64, # 传递原始图片信息 "image_format": image_format, - "prompt": prompt, + "prompt": prompt, # 传递原始 prompt + "current_key": current_key # 传递当前使用的 key } + # 传递 api_kwargs 给 _handle_exception 以便重建 payload handled_payload, count_delta = await self._handle_exception( e, attempt, temp_request_content, merged_params=api_kwargs ) if handled_payload: - actual_payload = handled_payload + actual_payload = handled_payload # 更新 payload 以供下次重试 logger.info(f"模型 {self.model_name}: 异常处理更新了 payload,将使用当前 Key 重试。") + # else: payload 未更新,继续使用原 payload 重试 wait_time = policy["base_wait"] * (2**attempt) logger.warning(f"模型 {self.model_name}: 等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) - continue + continue # 继续下一次循环尝试 except (RequestAbortException, PermissionDeniedException) as abort_exception: logger.error(f"模型 {self.model_name}: 异常处理判断需要中止请求: {abort_exception}") - raise abort_exception + raise abort_exception # 终止执行 except RuntimeError as rt_error: logger.error(f"模型 {self.model_name}: 异常处理遇到运行时错误: {rt_error}") - raise rt_error + raise rt_error # 终止执行 # --- 循环结束 --- # (代码不变) @@ -626,13 +646,20 @@ class LLMRequest: logger.error(f"最后遇到的错误是权限拒绝: {str(last_exception)}") raise last_exception logger.error(f"最后遇到的错误: {str(last_exception.__class__.__name__)} - {str(last_exception)}") + # 记录最后失败时的请求信息 + safe_payload_record = await _safely_record(request_content, actual_payload) + logger.error(f"最终失败时的请求详情 (Payload): {json.dumps(safe_payload_record, indent=2)}") raise RuntimeError(f"模型 {self.model_name} 达到最大重试次数,API 请求失败。最后错误: {str(last_exception)}") from last_exception else: + # 理论上不应该到达这里,因为总会有 last_exception 或成功返回 if not available_keys_pool and keys_abandoned_runtime: final_error_msg = f"模型 {self.model_name}: 所有可用 API Keys 均因 403 错误被禁用。" logger.critical(final_error_msg) raise PermissionDeniedException(final_error_msg) else: + # 记录最后失败时的请求信息 + safe_payload_record = await _safely_record(request_content, actual_payload) + logger.error(f"最终失败时的请求详情 (Payload): {json.dumps(safe_payload_record, indent=2)}") raise RuntimeError(f"模型 {self.model_name} 达到最大重试次数,API 请求失败,原因未知。") @@ -748,40 +775,55 @@ class LLMRequest: except Exception as e: error_text = f"(无法读取响应体: {e})" + key_identifier_log = f"...{current_key[-4:]}" if current_key else "N/A" + if status == 403: logger.error( - f"模型 {self.model_name}: 遇到 403 (权限拒绝) 错误。Key: ...{current_key[-4:] if current_key else 'N/A'}. " + f"模型 {self.model_name}: 遇到 403 (权限拒绝) 错误。Key: {key_identifier_log}. " f"响应: {error_text[:200]}" ) + # 抛出 PermissionDeniedException,由 _execute_request 捕获处理切换或中止逻辑 raise PermissionDeniedException(f"模型禁止访问 ({status})", key_identifier=current_key) - elif status in policy["retry_codes"] and status != 429: + elif status in policy["retry_codes"] and status != 429: # 429 由 _execute_request 处理 if status == 413: - logger.warning(f"模型 {self.model_name}: 错误码 413 (Payload Too Large)。Key: ...{current_key[-4:] if current_key else 'N/A'}. 尝试压缩...") + logger.warning(f"模型 {self.model_name}: 错误码 413 (Payload Too Large)。Key: {key_identifier_log}. 尝试压缩...") + # 抛出 PayLoadTooLargeError,由 _execute_request 捕获处理压缩或重试逻辑 raise PayLoadTooLargeError("请求体过大") elif status in [500, 503]: logger.error( - f"模型 {self.model_name}: 服务器内部错误或过载 ({status})。Key: ...{current_key[-4:] if current_key else 'N/A'}. " + f"模型 {self.model_name}: 服务器内部错误或过载 ({status})。Key: {key_identifier_log}. " f"响应: {error_text[:200]}" ) + # 对于 500/503,这里不抛出异常,让 _execute_request 根据重试次数决定是否继续或中止 return else: - logger.warning(f"模型 {self.model_name}: 遇到可重试错误码: {status}. Key: ...{current_key[-4:] if current_key else 'N/A'}") + # 其他可重试错误码 + logger.warning(f"模型 {self.model_name}: 遇到可重试错误码: {status}. Key: {key_identifier_log}") + # 不抛出异常,让 _execute_request 根据重试次数决定是否继续或中止 return elif status in policy["abort_codes"]: logger.error( f"模型 {self.model_name}: 遇到需要中止的错误码: {status} - {error_code_mapping.get(status, '未知错误')}. " - f"Key: ...{current_key[-4:] if current_key else 'N/A'}. 响应: {error_text[:200]}" + f"Key: {key_identifier_log}. 响应: {error_text[:200]}" ) + # 抛出 RequestAbortException,由 _execute_request 捕获并中止 raise RequestAbortException(f"请求出现错误 {status},中止处理", response) else: - logger.error(f"模型 {self.model_name}: 遇到未明确处理的错误码: {status}. Key: ...{current_key[-4:] if current_key else 'N/A'}. 响应: {error_text[:200]}") + # 未在策略中定义的错误码 + logger.error(f"模型 {self.model_name}: 遇到未明确处理的错误码: {status}. Key: {key_identifier_log}. 响应: {error_text[:200]}") try: + # 尝试按 aiohttp 的方式抛出,如果失败则用 RequestAbortException response.raise_for_status() + # 如果 raise_for_status 没抛异常(理论上不应该),手动抛出 raise RequestAbortException(f"未处理的错误状态码 {status}", response) except aiohttp.ClientResponseError as e: + # 优先抛出 aiohttp 的异常,包含更多信息 raise RequestAbortException(f"未处理的错误状态码 {status}: {e.message}", response) from e + except Exception as e: # 兜底 + logger.error(f"在处理未知错误码 {status} 时发生意外: {e}") + raise RequestAbortException(f"处理未处理的错误状态码 {status} 时出错", response) from e async def _handle_exception( @@ -796,6 +838,7 @@ class LLMRequest: if retry_count < policy["max_retries"] - 1: keep_request = True + # 使用传入的 merged_params (即 api_kwargs) 或原始 payload 来重建 params_for_rebuild = merged_params if merged_params is not None else payload if isinstance(exception, PayLoadTooLargeError): @@ -805,59 +848,70 @@ class LLMRequest: if image_base64: compressed_image_base64 = compress_base64_image_by_scale(image_base64) if compressed_image_base64 != image_base64: + # 使用原始 prompt, 新图片, 和 params_for_rebuild 重建 payload new_payload = await self._build_payload( request_content["prompt"], compressed_image_base64, request_content["image_format"], params_for_rebuild ) logger.info("图片压缩成功,将使用压缩后的图片重试。") - return new_payload, 0 + return new_payload, 0 # 返回新 payload else: logger.warning("图片压缩未改变大小或失败。") else: logger.warning("请求体过大但请求中不包含图片,无法压缩。") + # 无法压缩或无图片,返回 None 表示不更新 payload return None, 0 else: logger.error("达到最大重试次数,请求体仍然过大。") raise RuntimeError("请求体过大,压缩或重试后仍然失败。") from exception elif isinstance(exception, (aiohttp.ClientError, asyncio.TimeoutError)): + # 网络或超时错误 if keep_request: logger.error(f"模型 {self.model_name} 网络错误: {str(exception)}") + # 不更新 payload,直接重试 return None, 0 else: logger.critical(f"模型 {self.model_name} 网络错误达到最大重试次数: {str(exception)}") raise RuntimeError(f"网络请求失败: {str(exception)}") from exception elif isinstance(exception, aiohttp.ClientResponseError): + # HTTP 响应错误 (未被 _handle_error_response 捕获的,理论上少见) if keep_request: logger.error( f"模型 {self.model_name} HTTP响应错误 (未被策略覆盖): 状态码: {exception.status}, 错误: {exception.message}" ) try: + # 尝试记录错误响应体 error_text = await exception.response.text() if hasattr(exception, 'response') else str(exception) logger.error(f"服务器错误响应详情: {error_text[:500]}") except Exception as parse_err: logger.warning(f"无法解析服务器错误响应内容: {str(parse_err)}") + # 不更新 payload,直接重试 return None, 0 else: logger.critical( f"模型 {self.model_name} HTTP响应错误达到最大重试次数: 状态码: {exception.status}, 错误: {exception.message}" ) - current_key_placeholder = request_content.get("current_key", "******") + # 记录请求详情 + current_key_placeholder = request_content.get("current_key", "******")[-4:] handled_payload = await _safely_record(request_content, payload) - logger.critical(f"请求头: {await self._build_headers(api_key=current_key_placeholder, no_key=True)} 请求体: {handled_payload}") + logger.critical(f"请求头 (部分): {{'Authorization': 'Bearer ...{current_key_placeholder}' or 'x-goog-api-key': '...{current_key_placeholder}', ...}} 请求体: {json.dumps(handled_payload, indent=2)}") raise RuntimeError( f"模型 {self.model_name} API请求失败: 状态码 {exception.status}, {exception.message}" ) from exception else: + # 其他未知错误 if keep_request: logger.error(f"模型 {self.model_name} 遇到未知错误: {str(exception.__class__.__name__)} - {str(exception)}") + # 不更新 payload,直接重试 return None, 0 else: logger.critical(f"模型 {self.model_name} 请求因未知错误失败: {str(exception.__class__.__name__)} - {str(exception)}") - current_key_placeholder = request_content.get("current_key", "******") + # 记录请求详情 + current_key_placeholder = request_content.get("current_key", "******")[-4:] handled_payload = await _safely_record(request_content, payload) - logger.critical(f"请求头: {await self._build_headers(api_key=current_key_placeholder, no_key=True)} 请求体: {handled_payload}") + logger.critical(f"请求头 (部分): {{'Authorization': 'Bearer ...{current_key_placeholder}' or 'x-goog-api-key': '...{current_key_placeholder}', ...}} 请求体: {json.dumps(handled_payload, indent=2)}") raise RuntimeError(f"模型 {self.model_name} API请求失败: {str(exception)}") from exception @@ -865,14 +919,18 @@ class LLMRequest: """根据模型名称转换合并后的参数,并移除内部参数""" # (代码不变) new_params = dict(merged_params) - new_params.pop("request_type", None) + new_params.pop("request_type", None) # 移除内部使用的 request_type if not self.is_gemini and self.model_name.lower() in self.MODELS_NEEDING_TRANSFORMATION: - new_params.pop("temperature", None) + # --- O1/O3 模型参数转换 --- + new_params.pop("temperature", None) # 移除 temperature if "max_tokens" in new_params: - new_params["max_completion_tokens"] = new_params.pop("max_tokens") + new_params["max_completion_tokens"] = new_params.pop("max_tokens") # 重命名 max_tokens + logger.trace(f"模型 {self.model_name}: 应用 O1/O3 参数转换。") elif self.is_gemini: + # --- Gemini 模型参数转换 --- gen_config = new_params.get("generationConfig", {}) + # 将 OpenAI 风格的参数移入 generationConfig if "temperature" in new_params: gen_config["temperature"] = new_params.pop("temperature") if "max_tokens" in new_params: @@ -881,25 +939,40 @@ class LLMRequest: gen_config["topP"] = new_params.pop("top_p") if "top_k" in new_params: gen_config["topK"] = new_params.pop("top_k") + # 可以在这里添加 stopSequences 等其他 Gemini 参数的转换 + # if "stop" in new_params: + # gen_config["stopSequences"] = new_params.pop("stop") - if gen_config: + if gen_config: # 只有当 gen_config 非空时才更新 new_params["generationConfig"] = gen_config + # 移除 Gemini 不支持的 OpenAI 参数 new_params.pop("frequency_penalty", None) new_params.pop("presence_penalty", None) - new_params.pop("max_completion_tokens", None) + new_params.pop("max_completion_tokens", None) # 移除 O1/O3 的参数(如果存在) + logger.trace(f"模型 {self.model_name}: 应用 Gemini 参数转换。") + else: + # --- 标准 OpenAI 兼容模型 --- + # 确保 max_tokens 或 max_completion_tokens 存在 + if "max_tokens" not in new_params and "max_completion_tokens" not in new_params: + new_params["max_tokens"] = global_config.model_max_output_length + logger.trace(f"模型 {self.model_name}: 未提供 max_tokens,使用全局默认值 {global_config.model_max_output_length}") + elif "max_completion_tokens" in new_params: # 优先使用 O1/O3 风格的(如果存在) + new_params["max_tokens"] = new_params.pop("max_completion_tokens") + logger.trace(f"模型 {self.model_name}: 使用标准 OpenAI 参数。") return new_params async def _build_payload(self, prompt: str, image_base64: str = None, image_format: str = None, merged_params: dict = None) -> dict: - """构建请求体 (区分 Gemini 和 OpenAI),使用合并和转换后的参数""" - # (代码不变) + """构建请求体 (区分 Gemini 和 OpenAI),使用合并和转换后的参数,并为 Gemini 添加安全设置""" if merged_params is None: - merged_params = self.params + merged_params = self.params # 如果没有传入合并参数,使用实例的默认参数 + # --- 参数转换 --- params_copy = await self._transform_parameters(merged_params) if self.is_gemini: + # --- 构建 Gemini Payload --- parts = [] if prompt: parts.append({"text": prompt}) @@ -911,13 +984,28 @@ class LLMRequest: "data": image_base64 } }) + payload = { "contents": [{"parts": parts}], + # 其他转换后的参数 (如 generationConfig) 会在这里展开 **params_copy } - payload.pop("model", None) + payload.pop("model", None) # Gemini 不在顶层传 model + + # --- 添加 Gemini 安全设置 --- + safety_settings = [ + {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, + # 注意:HARM_CATEGORY_CIVIC_INTEGRITY 不是标准的 REST API 类别,故不包含 + ] + payload["safetySettings"] = safety_settings + logger.debug(f"模型 {self.model_name}: 已为 Gemini 请求添加 safetySettings (BLOCK_NONE)。") + # --- 结束添加安全设置 --- else: + # --- 构建 OpenAI Payload --- if image_base64: messages = [ { @@ -937,20 +1025,24 @@ class LLMRequest: payload = { "model": self.model_name, "messages": messages, + # 其他转换后的参数 (如 temperature, max_tokens) 会在这里展开 **params_copy, } - if "max_tokens" not in payload and "max_completion_tokens" not in payload: - if "max_tokens" not in params_copy and "max_completion_tokens" not in params_copy: - payload["max_tokens"] = global_config.model_max_output_length - if "max_completion_tokens" in payload: - payload["max_tokens"] = payload.pop("max_completion_tokens") + # 确保 max_tokens 存在 (已在 _transform_parameters 中处理) + # if "max_tokens" not in payload and "max_completion_tokens" not in payload: + # if "max_tokens" not in params_copy and "max_completion_tokens" not in params_copy: + # payload["max_tokens"] = global_config.model_max_output_length + # if "max_completion_tokens" in payload: # 已在 _transform_parameters 中处理 + # payload["max_tokens"] = payload.pop("max_completion_tokens") + logger.trace(f"构建的 Payload (模型: {self.model_name}, Gemini: {self.is_gemini}): {json.dumps(await _safely_record({'image_base64': image_base64, 'image_format': image_format}, payload), indent=2)}") return payload def _default_response_handler( self, result: dict, user_id: str = "system", request_type: str = None, endpoint: str = "/chat/completions" ) -> Tuple: """默认响应解析 (区分 Gemini 和 OpenAI),并处理函数/工具调用""" + # (代码不变) content = "没有返回结果" reasoning_content = "" tool_calls = None # OpenAI 格式 @@ -983,29 +1075,40 @@ class LLMRequest: # else: function_call 已获取,content 留空或设为特定值 else: - content = "Gemini响应中缺少 content 或 parts" - logger.warning(f"模型 {self.model_name}: Gemini 响应格式不完整 (缺少 content/parts): {result}") + # 检查是否因为安全设置被阻止 + finish_reason = candidate.get("finishReason") + safety_ratings = candidate.get("safetyRatings") + if finish_reason == "SAFETY": + logger.warning(f"模型 {self.model_name}: Gemini 响应因安全设置被阻止。 Safety Ratings: {safety_ratings}") + content = "响应内容因安全原因被过滤。" + elif finish_reason == "RECITATION": + logger.warning(f"模型 {self.model_name}: Gemini 响应因引用限制被阻止。") + content = "响应内容因引用限制被过滤。" + elif finish_reason == "OTHER": + logger.warning(f"模型 {self.model_name}: Gemini 响应因未知原因停止。") + else: + # 既没有 content/parts,也不是已知的 finishReason + content = "Gemini响应中缺少 content 或 parts" + logger.warning(f"模型 {self.model_name}: Gemini 响应格式不完整 (缺少 content/parts 且非已知 finishReason): {result}") - finish_reason = candidate.get("finishReason") - if finish_reason == "SAFETY": - logger.warning(f"模型 {self.model_name}: Gemini 响应因安全设置被阻止。") - content = "响应内容因安全原因被过滤。" - elif finish_reason == "RECITATION": - logger.warning(f"模型 {self.model_name}: Gemini 响应因引用限制被阻止。") - content = "响应内容因引用限制被过滤。" - elif finish_reason == "OTHER": - logger.warning(f"模型 {self.model_name}: Gemini 响应因未知原因停止。") - # finishReason == "TOOL_CODE" or "FUNCTION_CALL" 是正常情况 usage = result.get("usageMetadata", {}) if usage: prompt_tokens = usage.get("promptTokenCount", 0) - completion_tokens = usage.get("candidatesTokenCount", 0) + completion_tokens = usage.get("candidatesTokenCount", 0) # 注意 Gemini key 不同 total_tokens = usage.get("totalTokenCount", 0) - if completion_tokens == 0 and total_tokens > 0: + # Gemini 有时不返回 candidatesTokenCount,尝试计算 + if completion_tokens == 0 and total_tokens > 0 and prompt_tokens > 0: completion_tokens = total_tokens - prompt_tokens else: - logger.warning(f"模型 {self.model_name} (Gemini) 的响应中缺少 'usageMetadata' 信息。") + # 检查 promptFeedback 是否包含 blockReason + prompt_feedback = result.get("promptFeedback") + if prompt_feedback and prompt_feedback.get("blockReason"): + block_reason = prompt_feedback.get("blockReason") + logger.warning(f"模型 {self.model_name} (Gemini): 提示因 '{block_reason}' 被阻止。") + content = f"提示内容因 '{block_reason}' 被阻止。" + else: + logger.warning(f"模型 {self.model_name} (Gemini) 的响应中缺少 'usageMetadata' 信息。") except Exception as e: logger.error(f"解析 Gemini 响应出错: {e} - 响应: {result}") @@ -1045,10 +1148,12 @@ class LLMRequest: total_tokens=total_tokens, user_id=user_id, request_type=request_type, - endpoint=endpoint, + endpoint=endpoint, # 传递 endpoint 用于区分 generateContent 和 embedContent ) else: - logger.warning(f"模型 {self.model_name}: 未能从响应中提取有效的 token 使用信息。") + # 只有在内容不是因安全过滤等原因导致无 token 时才警告 + if "因安全原因被过滤" not in content and "因引用限制被过滤" not in content and "被阻止" not in content: + logger.warning(f"模型 {self.model_name}: 未能从响应中提取有效的 token 使用信息。响应: {result}") # --- 返回结果 (统一格式) --- @@ -1060,15 +1165,21 @@ class LLMRequest: logger.debug(f"检测到 Gemini 函数调用: {function_call}") # 将 Gemini functionCall 转换为 OpenAI tool_calls 格式 # 注意: Gemini 的 functionCall 没有显式的 id 和 type,需要模拟 + try: + # Gemini 的参数在 'args' 中,OpenAI 在 'arguments' (通常是 JSON 字符串) + # 需要将 Gemini 的 dict 参数转换为 JSON 字符串 + arguments_json = json.dumps(function_call.get("args", {})) + except TypeError as e: + logger.error(f"无法将 Gemini 函数调用的参数转换为 JSON: {function_call.get('args', {})} - Error: {e}") + arguments_json = "{}" # 使用空 JSON 对象作为回退 + final_tool_calls = [ { "id": f"call_{random.randint(1000, 9999)}", # 生成一个随机 ID "type": "function", "function": { "name": function_call.get("name"), - # Gemini 的参数在 'args' 中,OpenAI 在 'arguments' (通常是 JSON 字符串) - # 需要将 Gemini 的 dict 参数转换为 JSON 字符串 - "arguments": json.dumps(function_call.get("args", {})) + "arguments": arguments_json } } ] @@ -1076,6 +1187,7 @@ class LLMRequest: if final_tool_calls: # 如果有工具/函数调用,通常 content 为空或包含思考过程,这里返回转换后的调用信息 + # 同时返回 content 和 reasoning_content (可能为空) return content, reasoning_content, final_tool_calls else: # 没有工具/函数调用,返回普通文本响应 @@ -1100,6 +1212,7 @@ class LLMRequest: """构建请求头 (区分 Gemini 和 OpenAI)""" # (代码不变) if no_key: + # 用于日志记录,隐藏实际 key if self.is_gemini: return {"x-goog-api-key": "**********", "Content-Type": "application/json"} else: @@ -1110,8 +1223,10 @@ class LLMRequest: raise ValueError(f"无效的 API key 提供给 _build_headers。") if self.is_gemini: + # Gemini 使用 x-goog-api-key return {"x-goog-api-key": api_key, "Content-Type": "application/json"} else: + # OpenAI 使用 Authorization Bearer token return {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} @@ -1124,14 +1239,20 @@ class LLMRequest: prompt=prompt, user_id=user_id, request_type="chat", - **kwargs + **kwargs # 传递覆盖参数 ) + # _default_response_handler 返回 (content, reasoning) 或 (content, reasoning, tool_calls) if len(response) == 3: content, reasoning_content, tool_calls = response - return content, reasoning_content, self.model_name, tool_calls - else: + # 对于普通 generate_response,即使返回 tool_calls,也只返回前三个元素 + return content, reasoning_content, self.model_name #, tool_calls + elif len(response) == 2: content, reasoning_content = response return content, reasoning_content, self.model_name + else: + logger.error(f"来自 _default_response_handler 的意外响应格式: {response}") + return "处理响应出错", "", self.model_name + async def generate_response_for_image(self, prompt: str, image_base64: str, image_format: str, user_id: str = "system", **kwargs) -> Tuple: """根据输入的提示和图片生成模型的异步响应,支持覆盖参数""" @@ -1144,11 +1265,13 @@ class LLMRequest: image_format=image_format, user_id=user_id, request_type="vision", - **kwargs + **kwargs # 传递覆盖参数 ) # _default_response_handler 现在总是返回至少2个值 if len(response) == 3: - return response # content, reasoning, tool_calls (tool_calls 可能为 None) + content, reasoning, tool_calls = response + # Vision 请求通常不期望 tool_calls,但如果返回了也包含在内 + return content, reasoning #, tool_calls # 保持返回两个值 elif len(response) == 2: content, reasoning = response return content, reasoning # 对于 vision 请求,通常没有 tool_calls @@ -1164,13 +1287,14 @@ class LLMRequest: response = await self._execute_request( endpoint=endpoint, prompt=prompt, - payload=None, - retry_policy=None, - response_handler=None, + payload=None, # 让 _execute_request 内部构建 + retry_policy=None, # 使用默认重试策略 + response_handler=None, # 使用默认响应处理器 user_id=user_id, - request_type=request_type, - **kwargs + request_type=request_type, # 传递请求类型 + **kwargs # 传递覆盖参数 ) + # 返回 _default_response_handler 的原始结果 (通常是 tuple) return response # 修改:实现 Gemini Function Calling 的 Payload 构建 @@ -1178,28 +1302,31 @@ class LLMRequest: """异步方式根据输入的提示和工具生成模型的响应,支持覆盖参数和 Gemini 函数调用""" endpoint = ":generateContent" if self.is_gemini else "/chat/completions" - merged_params = {**self.params, **kwargs} - transformed_params = await self._transform_parameters(merged_params) # 清理 request_type 等 + merged_params = {**self.params, **kwargs} # 合并实例参数和调用时参数 + # --- 构建特定于工具调用的 Payload --- payload = None - if self.is_gemini: # --- 构建 Gemini Function Calling Payload --- logger.debug(f"为 Gemini ({self.model_name}) 构建函数调用请求。") # 1. 转换工具定义 (OpenAI -> Gemini) - # OpenAI tool format: [{"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}}] - # Gemini tool format: [{"functionDeclarations": [{"name": ..., "description": ..., "parameters": ...}]}] function_declarations = [] if tools: for tool in tools: if tool.get("type") == "function" and "function" in tool: func_def = tool["function"] # Gemini parameters 使用 OpenAPI Schema,与 OpenAI 基本兼容 - function_declarations.append({ + # 确保 description 存在,即使为空字符串 + gemini_func_def = { "name": func_def.get("name"), - "description": func_def.get("description", ""), # Description is required for Gemini - "parameters": func_def.get("parameters", {"type": "object", "properties": {}}) # Ensure parameters exist - }) + "description": func_def.get("description", ""), + "parameters": func_def.get("parameters", {"type": "object", "properties": {}}) # 确保 parameters 存在 + } + # 移除 description 如果为空 (Gemini 可能不允许空 description) + # if not gemini_func_def["description"]: + # del gemini_func_def["description"] # 实际测试看是否需要 + + function_declarations.append(gemini_func_def) else: logger.warning(f"跳过不支持的工具类型或格式: {tool}") @@ -1207,14 +1334,14 @@ class LLMRequest: logger.error("没有有效的函数声明可用于 Gemini 请求。") return "没有提供有效的函数定义", "", None - gemini_tools = [{"functionDeclarations": function_declarations}] + gemini_tools_config = [{"functionDeclarations": function_declarations}] - # 2. 构建 Gemini Payload - # parts = [{"text": prompt}] # 初始 parts + # 2. 构建 Gemini Payload (需要先转换参数) + transformed_params = await self._transform_parameters(merged_params) # 清理 request_type 等, 转换参数格式 payload = { "contents": [{"parts": [{"text": prompt}]}], # 包含用户提示 - "tools": gemini_tools, - # toolConfig 默认是 AUTO,可以根据需要从 kwargs 获取或硬编码 + "tools": gemini_tools_config, + # toolConfig 可以从 kwargs 获取或使用默认值 # "toolConfig": {"functionCallingConfig": {"mode": "ANY"}}, # 例如强制调用 **transformed_params # 合并其他转换后的参数 (如 generationConfig) } @@ -1222,24 +1349,38 @@ class LLMRequest: payload.pop("messages", None) # 移除 OpenAI 特有的 messages payload.pop("tool_choice", None) # 移除 OpenAI 特有的 tool_choice - logger.trace(f"构建的 Gemini 函数调用 Payload: {json.dumps(payload, indent=2)}") + # --- 添加 Gemini 安全设置 --- + safety_settings = [ + {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, + ] + payload["safetySettings"] = safety_settings + logger.debug(f"模型 {self.model_name}: 已为 Gemini 函数调用请求添加 safetySettings (BLOCK_NONE)。") + # --- 结束添加安全设置 --- + logger.trace(f"构建的 Gemini 函数调用 Payload: {json.dumps(payload, indent=2)}") else: # --- 构建 OpenAI Tool Calling Payload --- - # (逻辑不变) logger.debug(f"为 OpenAI 兼容模型 ({self.model_name}) 构建工具调用请求。") + # 需要先转换参数 + transformed_params = await self._transform_parameters(merged_params) payload = { "model": self.model_name, "messages": [{"role": "user", "content": prompt}], - **transformed_params, + **transformed_params, # 使用转换后的参数 "tools": tools, - "tool_choice": transformed_params.get("tool_choice", "auto"), + "tool_choice": transformed_params.get("tool_choice", "auto"), # tool_choice 可能在转换时被移除,从转换后的参数获取 } - if "max_completion_tokens" in payload: - payload["max_tokens"] = payload.pop("max_completion_tokens") - if "max_tokens" not in payload: - payload["max_tokens"] = global_config.model_max_output_length + # 确保 max_tokens (已在 _transform_parameters 中处理) + # if "max_completion_tokens" in payload: + # payload["max_tokens"] = payload.pop("max_completion_tokens") + # if "max_tokens" not in payload: + # payload["max_tokens"] = global_config.model_max_output_length + logger.trace(f"构建的 OpenAI 工具调用 Payload: {json.dumps(payload, indent=2)}") # --- 执行请求 --- @@ -1249,16 +1390,17 @@ class LLMRequest: response = await self._execute_request( endpoint=endpoint, - payload=payload, - prompt=prompt, # prompt 仍然需要,用于可能的重试 + payload=payload, # 直接传递构建好的 payload + prompt=prompt, # prompt 仍然需要,用于可能的重试和日志记录 user_id=user_id, - request_type="tool_call", - **kwargs # 传递原始 kwargs 以便在重试时重新合并 + request_type="tool_call", # 设置请求类型 + **kwargs # 传递原始 kwargs 以便在重试时重新合并参数 ) # _default_response_handler 现在会处理 Gemini functionCall 并统一格式 logger.debug(f"模型 {self.model_name} 工具/函数调用返回结果: {response}") + # 解析响应 (与 generate_response 类似) if isinstance(response, tuple) and len(response) == 3: content, reasoning_content, final_tool_calls = response # final_tool_calls 已经是统一的 OpenAI 格式 @@ -1266,7 +1408,7 @@ class LLMRequest: elif isinstance(response, tuple) and len(response) == 2: content, reasoning_content = response logger.debug("收到普通响应,无工具/函数调用") - return content, reasoning_content, None + return content, reasoning_content, None # 返回 None 表示没有工具调用 else: logger.error(f"收到来自 _execute_request/_default_response_handler 的意外响应格式: {response}") return "处理响应时出错", "", None @@ -1282,69 +1424,96 @@ class LLMRequest: api_kwargs = {k: v for k, v in kwargs.items() if k != 'request_type'} if self.is_gemini: + # --- 构建 Gemini Embedding Payload --- endpoint = ":embedContent" + # Gemini embedding 模型名称通常是 'embedding-001' 或类似,不带 'models/' 前缀 + # 但 API URL 需要 'models/' 前缀,这里假设 self.model_name 是正确的 embedding 模型名 payload = { - "model": f"models/{self.model_name}", + "model": f"models/{self.model_name}", # URL 需要 'models/' "content": { "parts": [{"text": text}] }, - **api_kwargs + # Gemini embedding 可能有特定参数,如 task_type + # "taskType": "RETRIEVAL_DOCUMENT", # Example + **api_kwargs # 合并其他调用时参数 } + # 移除 OpenAI 特有的参数 payload.pop("encoding_format", None) payload.pop("input", None) + logger.trace(f"构建的 Gemini Embedding Payload: {json.dumps(payload, indent=2)}") else: + # --- 构建 OpenAI Embedding Payload --- endpoint = "/embeddings" payload = { "model": self.model_name, "input": text, - "encoding_format": "float", - **api_kwargs + "encoding_format": "float", # OpenAI 推荐 float + **api_kwargs # 合并其他调用时参数 } + # 移除 Gemini 特有的参数 payload.pop("content", None) payload.pop("taskType", None) + logger.trace(f"构建的 OpenAI Embedding Payload: {json.dumps(payload, indent=2)}") def embedding_handler(result): # (代码不变) embedding_value = None prompt_tokens = 0 - completion_tokens = 0 + completion_tokens = 0 # Embedding 通常没有 completion tokens total_tokens = 0 if self.is_gemini: + # --- 解析 Gemini Embedding 响应 --- if "embedding" in result and "value" in result["embedding"]: embedding_value = result["embedding"]["value"] + # Gemini embedding API 响应目前不包含 token usage logger.warning(f"模型 {self.model_name} (Gemini Embedding): 响应中未找到明确的 token 使用信息。") + # 尝试从可能的其他字段获取,如果未来 API 更新的话 + usage = result.get("usageMetadata", {}) + prompt_tokens = usage.get("promptTokenCount", 0) + total_tokens = usage.get("totalTokenCount", 0) + else: + # --- 解析 OpenAI Embedding 响应 --- if "data" in result and len(result["data"]) > 0: embedding_value = result["data"][0].get("embedding", None) usage = result.get("usage", {}) if usage: prompt_tokens = usage.get("prompt_tokens", 0) - total_tokens = usage.get("total_tokens", 0) + total_tokens = usage.get("total_tokens", 0) # OpenAI embedding 只有 total_tokens else: logger.warning(f"模型 {self.model_name} (OpenAI Embedding) 的响应中缺少 'usage' 信息。") - self._record_usage( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=total_tokens, - user_id=user_id, - request_type="embedding", - endpoint=endpoint, - ) + # --- 记录 Usage --- + # 只有在获取到 token 信息时才记录 + if prompt_tokens > 0 or total_tokens > 0: + self._record_usage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, # 通常为 0 + total_tokens=total_tokens, + user_id=user_id, + request_type="embedding", + endpoint=endpoint, # 传递 endpoint + ) + elif embedding_value is not None: # 即使没有 token 信息,只要成功获取 embedding 也记录一下(token 为 0) + self._record_usage(0, 0, 0, user_id, "embedding", endpoint) + else: + logger.warning(f"模型 {self.model_name} (Embedding): 未能从响应中提取有效的 embedding 或 token 使用信息。") + + return embedding_value embedding = await self._execute_request( endpoint=endpoint, - payload=payload, - prompt=text, - retry_policy={"max_retries": 2, "base_wait": 6}, - response_handler=embedding_handler, + payload=payload, # 传递构建好的 payload + prompt=text, # prompt 用于日志和可能的重试构建 + retry_policy={"max_retries": 2, "base_wait": 6}, # Embedding 通常重试次数少一些 + response_handler=embedding_handler, # 使用特定的 handler user_id=user_id, - request_type="embedding", - **api_kwargs + request_type="embedding", # 设置请求类型 + **api_kwargs # 传递覆盖参数 ) return embedding @@ -1354,60 +1523,108 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10 # (代码不变) try: image_data = base64.b64decode(base64_data) - if len(image_data) <= target_size * 1.05: + if len(image_data) <= target_size * 1.05: # 允许 5% 的误差 logger.info(f"图片大小 {len(image_data) / 1024:.1f}KB 已足够小,无需压缩。") return base64_data img = Image.open(io.BytesIO(image_data)) img_format = img.format original_width, original_height = img.size - scale = max(0.2, min(1.0, (target_size / len(image_data)) ** 0.5)) + # 调整缩放比例计算,避免除零或无效比例 + if len(image_data) <= 0: + logger.warning("原始图片数据大小为 0,无法计算缩放比例。") + return base64_data + scale = max(0.1, min(1.0, (target_size / len(image_data)) ** 0.5)) # 最小缩放比例设为 0.1 new_width = max(1, int(original_width * scale)) new_height = max(1, int(original_height * scale)) + + logger.info(f"尝试压缩图片: {original_width}x{original_height} ({img_format}) -> 目标尺寸约 {new_width}x{new_height}") + output_buffer = io.BytesIO() save_format = img_format # Default to original format + save_params = {} + # 处理动图 (GIF) if getattr(img, "is_animated", False) and img.n_frames > 1: frames = [] durations = [] loop = img.info.get('loop', 0) - disposal = img.info.get('disposal', 2) + disposal = img.info.get('disposal', 2) # Use default disposal if not specified logger.info(f"检测到 GIF 动图 ({img.n_frames} 帧),尝试按比例压缩...") - for frame_idx in range(img.n_frames): - img.seek(frame_idx) - current_duration = img.info.get('duration', 100) - durations.append(current_duration) - new_frame = img.convert("RGBA").copy() - resized_frame = new_frame.resize((new_width, new_height), Image.Resampling.LANCZOS) - frames.append(resized_frame) + try: + while True: + current_duration = img.info.get('duration', 100) # Default duration 100ms + durations.append(current_duration) + + # Convert frame to RGBA for consistent processing, handle palette transparency + current_frame = img.convert("RGBA") + resized_frame = current_frame.resize((new_width, new_height), Image.Resampling.LANCZOS) + frames.append(resized_frame) + img.seek(img.tell() + 1) # Move to next frame + except EOFError: + pass # End of sequence + except Exception as gif_err: + logger.error(f"处理 GIF 帧时出错: {gif_err}") + return base64_data # Return original on error + if frames: - frames[0].save( - output_buffer, format="GIF", save_all=True, append_images=frames[1:], - optimize=False, duration=durations, loop=loop, disposal=disposal, - transparency=img.info.get('transparency', None), background=img.info.get('background', None) - ) + # Save the animated GIF + # Ensure transparency info is preserved if possible + save_kwargs = { + "save_all": True, + "append_images": frames[1:], + "optimize": False, # Optimization can sometimes break animations + "duration": durations, + "loop": loop, + "disposal": disposal, + } + # Handle transparency for the first frame + transparency = img.info.get('transparency', None) + if transparency is not None: + save_kwargs['transparency'] = transparency + # Handle background color if present + background = img.info.get('background', None) + if background is not None: + save_kwargs['background'] = background + + frames[0].save(output_buffer, format="GIF", **save_kwargs) save_format = "GIF" else: logger.warning("未能处理 GIF 帧。") - return base64_data + return base64_data # Return original if no frames processed else: + # 处理静态图片 + # 保留透明度 (PNG, WEBP) if img.mode in ("RGBA", "LA") or 'transparency' in img.info: + # Convert to RGBA before resizing to preserve alpha channel correctly resized_img = img.convert("RGBA").resize((new_width, new_height), Image.Resampling.LANCZOS) + # Prefer PNG for transparency, but consider WEBP if original was WEBP? (Pillow might not save animated WEBP well) save_format = "PNG" save_params = {"optimize": True} + # 处理无透明度图片 (JPEG, etc.) else: + # Convert to RGB for standard formats resized_img = img.convert("RGB").resize((new_width, new_height), Image.Resampling.LANCZOS) + # Prefer JPEG if original was JPEG for potentially smaller size if img_format and img_format.upper() == "JPEG": save_format = "JPEG" - save_params = {"quality": 85, "optimize": True} + save_params = {"quality": 85, "optimize": True} # Good balance of quality/size else: + # Fallback to PNG for other non-transparent formats save_format = "PNG" save_params = {"optimize": True} + resized_img.save(output_buffer, format=save_format, **save_params) compressed_data = output_buffer.getvalue() - logger.success(f"压缩图片: {original_width}x{original_height} -> {new_width}x{new_height} ({img.format} -> {save_format})") - logger.info(f"压缩前大小: {len(image_data) / 1024:.1f}KB, 压缩后大小: {len(compressed_data) / 1024:.1f}KB (目标: {target_size / 1024:.1f}KB)") - if len(compressed_data) < len(image_data) * 0.95: + final_size_kb = len(compressed_data) / 1024 + original_size_kb = len(image_data) / 1024 + target_size_kb = target_size / 1024 + + logger.success(f"压缩图片: {original_width}x{original_height} ({img.format} -> {save_format})") + logger.info(f"压缩前大小: {original_size_kb:.1f}KB, 压缩后大小: {final_size_kb:.1f}KB (目标: {target_size_kb:.1f}KB)") + + # 只有在压缩后明显变小才返回压缩后的数据 (例如,小于原始大小的 95%) + if final_size_kb < original_size_kb * 0.95: return base64.b64encode(compressed_data).decode("utf-8") else: logger.info("压缩效果不明显或反而增大,返回原始图片。") @@ -1416,5 +1633,4 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10 logger.error(f"压缩图片失败: {str(e)}") import traceback logger.error(traceback.format_exc()) - return base64_data - + return base64_data # 发生任何错误都返回原始数据