From 5c1f0b0dfa5d3bad2adf841ad25bd37ce0751a0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Tue, 19 Aug 2025 00:39:54 +0800 Subject: [PATCH 1/5] Update requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 999bd5fd..0a5e9a3d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,3 +47,4 @@ reportportal-client scikit-learn seaborn structlog +google.geai From de67810950fcd357620c6a9832dcfb8a9a2af520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Tue, 19 Aug 2025 00:42:40 +0800 Subject: [PATCH 2/5] Update requirements.txt --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 0a5e9a3d..721cf95f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,4 +47,4 @@ reportportal-client scikit-learn seaborn structlog -google.geai +google.genai From 938e17ea154f17860c86dcbb6b94a6ef4be9d708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Tue, 19 Aug 2025 16:12:25 +0800 Subject: [PATCH 3/5] Update model_config_template.toml --- template/model_config_template.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/template/model_config_template.toml b/template/model_config_template.toml index 92ac8881..0d756314 100644 --- a/template/model_config_template.toml +++ b/template/model_config_template.toml @@ -5,7 +5,7 @@ version = "1.3.0" [[api_providers]] # API服务提供商(可以配置多个) name = "DeepSeek" # API服务商名称(可随意命名,在models的api-provider中需使用这个命名) -base_url = "https://api.deepseek.cn/v1" # API服务商的BaseURL +base_url = "https://api.deepseek.com/v1" # API服务商的BaseURL api_key = "your-api-key-here" # API密钥(请替换为实际的API密钥) client_type = "openai" # 请求客户端(可选,默认值为"openai",使用gimini等Google系模型时请配置为"gemini") max_retry = 2 # 最大重试次数(单个模型API调用失败,最多重试的次数) From a8ff08e2a72d66947ec5af7cfc62e39163d7396a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Tue, 19 Aug 2025 16:59:51 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=8C=E9=81=BF=E5=85=8D=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=BE=AA=E7=8E=AF=E9=97=AE=E9=A2=98=E5=B9=B6=E5=A2=9E=E5=BC=BA?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/import_openie.py | 26 ++++++++- src/chat/knowledge/embedding_store.py | 58 ++++++++++++-------- src/chat/utils/utils.py | 1 + src/llm_models/model_client/base_client.py | 11 +++- src/llm_models/model_client/openai_client.py | 6 ++ src/llm_models/utils_model.py | 6 +- 6 files changed, 82 insertions(+), 26 deletions(-) diff --git a/scripts/import_openie.py b/scripts/import_openie.py index fe9f5269..c4367892 100644 --- a/scripts/import_openie.py +++ b/scripts/import_openie.py @@ -6,6 +6,7 @@ import sys import os +import asyncio from time import sleep sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) @@ -172,7 +173,7 @@ def handle_import_openie(openie_data: OpenIE, embed_manager: EmbeddingManager, k return True -def main(): # sourcery skip: dict-comprehension +async def main_async(): # sourcery skip: dict-comprehension # 新增确认提示 print("=== 重要操作确认 ===") print("OpenIE导入时会大量发送请求,可能会撞到请求速度上限,请注意选用的模型") @@ -239,6 +240,29 @@ def main(): # sourcery skip: dict-comprehension return None +def main(): + """主函数 - 设置新的事件循环并运行异步主函数""" + # 检查是否有现有的事件循环 + try: + loop = asyncio.get_running_loop() + if loop.is_closed(): + # 如果事件循环已关闭,创建新的 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + # 没有运行的事件循环,创建新的 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + # 在新的事件循环中运行异步主函数 + loop.run_until_complete(main_async()) + finally: + # 确保事件循环被正确关闭 + if not loop.is_closed(): + loop.close() + + if __name__ == "__main__": # logger.info(f"111111111111111111111111{ROOT_PATH}") main() diff --git a/src/chat/knowledge/embedding_store.py b/src/chat/knowledge/embedding_store.py index d0f6e774..dec5b595 100644 --- a/src/chat/knowledge/embedding_store.py +++ b/src/chat/knowledge/embedding_store.py @@ -117,30 +117,36 @@ class EmbeddingStore: self.idx2hash = None def _get_embedding(self, s: str) -> List[float]: - """获取字符串的嵌入向量,处理异步调用""" + """获取字符串的嵌入向量,使用完全同步的方式避免事件循环问题""" + # 创建新的事件循环并在完成后立即关闭 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: - # 尝试获取当前事件循环 - asyncio.get_running_loop() - # 如果在事件循环中,使用线程池执行 - import concurrent.futures - - def run_in_thread(): - return asyncio.run(get_embedding(s)) - - with concurrent.futures.ThreadPoolExecutor() as executor: - future = executor.submit(run_in_thread) - result = future.result() - if result is None: - logger.error(f"获取嵌入失败: {s}") - return [] - return result - except RuntimeError: - # 没有运行的事件循环,直接运行 - result = asyncio.run(get_embedding(s)) - if result is None: + # 创建新的LLMRequest实例 + from src.llm_models.utils_model import LLMRequest + from src.config.config import model_config + + llm = LLMRequest(model_set=model_config.model_task_config.embedding, request_type="embedding") + + # 使用新的事件循环运行异步方法 + embedding, _ = loop.run_until_complete(llm.get_embedding(s)) + + if embedding and len(embedding) > 0: + return embedding + else: logger.error(f"获取嵌入失败: {s}") return [] - return result + + except Exception as e: + logger.error(f"获取嵌入时发生异常: {s}, 错误: {e}") + return [] + finally: + # 确保事件循环被正确关闭 + try: + loop.close() + except Exception: + pass def _get_embeddings_batch_threaded(self, strs: List[str], chunk_size: int = 10, max_workers: int = 10, progress_callback=None) -> List[Tuple[str, List[float]]]: """使用多线程批量获取嵌入向量 @@ -181,8 +187,14 @@ class EmbeddingStore: for i, s in enumerate(chunk_strs): try: - # 直接使用异步函数 - embedding = asyncio.run(llm.get_embedding(s)) + # 在线程中创建独立的事件循环 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + embedding = loop.run_until_complete(llm.get_embedding(s)) + finally: + loop.close() + if embedding and len(embedding) > 0: chunk_results.append((start_idx + i, s, embedding[0])) # embedding[0] 是实际的向量 else: diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index 55ab3b44..9dca4089 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -111,6 +111,7 @@ def is_mentioned_bot_in_message(message: MessageRecv) -> tuple[bool, float]: async def get_embedding(text, request_type="embedding") -> Optional[List[float]]: """获取文本的embedding向量""" + # 每次都创建新的LLMRequest实例以避免事件循环冲突 llm = LLMRequest(model_set=model_config.model_task_config.embedding, request_type=request_type) try: embedding, _ = await llm.get_embedding(text) diff --git a/src/llm_models/model_client/base_client.py b/src/llm_models/model_client/base_client.py index 97c34546..807f6484 100644 --- a/src/llm_models/model_client/base_client.py +++ b/src/llm_models/model_client/base_client.py @@ -159,14 +159,23 @@ class ClientRegistry: return decorator - def get_client_class_instance(self, api_provider: APIProvider) -> BaseClient: + def get_client_class_instance(self, api_provider: APIProvider, force_new=False) -> BaseClient: """ 获取注册的API客户端实例 Args: api_provider: APIProvider实例 + force_new: 是否强制创建新实例(用于解决事件循环问题) Returns: BaseClient: 注册的API客户端实例 """ + # 如果强制创建新实例,直接创建不使用缓存 + if force_new: + if client_class := self.client_registry.get(api_provider.client_type): + return client_class(api_provider) + else: + raise KeyError(f"'{api_provider.client_type}' 类型的 Client 未注册") + + # 正常的缓存逻辑 if api_provider.name not in self.client_instance_cache: if client_class := self.client_registry.get(api_provider.client_type): self.client_instance_cache[api_provider.name] = client_class(api_provider) diff --git a/src/llm_models/model_client/openai_client.py b/src/llm_models/model_client/openai_client.py index c580899a..bba00f94 100644 --- a/src/llm_models/model_client/openai_client.py +++ b/src/llm_models/model_client/openai_client.py @@ -388,6 +388,7 @@ class OpenaiClient(BaseClient): base_url=api_provider.base_url, api_key=api_provider.api_key, max_retries=0, + timeout=api_provider.timeout, ) async def get_response( @@ -520,6 +521,11 @@ class OpenaiClient(BaseClient): extra_body=extra_params, ) except APIConnectionError as e: + # 添加详细的错误信息以便调试 + logger.error(f"OpenAI API连接错误(嵌入模型): {str(e)}") + logger.error(f"错误类型: {type(e)}") + if hasattr(e, '__cause__') and e.__cause__: + logger.error(f"底层错误: {str(e.__cause__)}") raise NetworkConnectionError() from e except APIStatusError as e: # 重封装APIError为RespNotOkException diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index e8e4db5f..f0229c2c 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -248,7 +248,11 @@ class LLMRequest: ) model_info = model_config.get_model_info(least_used_model_name) api_provider = model_config.get_provider(model_info.api_provider) - client = client_registry.get_client_class_instance(api_provider) + + # 对于嵌入任务,强制创建新的客户端实例以避免事件循环问题 + force_new_client = (self.request_type == "embedding") + client = client_registry.get_client_class_instance(api_provider, force_new=force_new_client) + logger.debug(f"选择请求模型: {model_info.name}") total_tokens, penalty, usage_penalty = self.model_usage[model_info.name] self.model_usage[model_info.name] = (total_tokens, penalty, usage_penalty + 1) # 增加使用惩罚值防止连续使用 From 64839559190d98830df558293ccef3c4692aaaac Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sun, 24 Aug 2025 16:11:03 +0000 Subject: [PATCH 5/5] fix(llm): Add retry mechanism for empty API responses --- src/llm_models/exceptions.py | 11 +++++ src/llm_models/model_client/gemini_client.py | 46 ++++++++++++-------- src/llm_models/model_client/openai_client.py | 9 +++- src/llm_models/utils_model.py | 21 ++++++--- 4 files changed, 61 insertions(+), 26 deletions(-) diff --git a/src/llm_models/exceptions.py b/src/llm_models/exceptions.py index 5b04f58c..ff847ad8 100644 --- a/src/llm_models/exceptions.py +++ b/src/llm_models/exceptions.py @@ -96,3 +96,14 @@ class PermissionDeniedException(Exception): def __str__(self): return self.message + + +class EmptyResponseException(Exception): + """响应内容为空""" + + def __init__(self, message: str = "响应内容为空,这可能是一个临时性问题"): + super().__init__(message) + self.message = message + + def __str__(self): + return self.message diff --git a/src/llm_models/model_client/gemini_client.py b/src/llm_models/model_client/gemini_client.py index d253d29c..2b2d9183 100644 --- a/src/llm_models/model_client/gemini_client.py +++ b/src/llm_models/model_client/gemini_client.py @@ -37,6 +37,7 @@ from ..exceptions import ( NetworkConnectionError, RespNotOkException, ReqAbortException, + EmptyResponseException, ) from ..payload_content.message import Message, RoleType from ..payload_content.resp_format import RespFormat, RespFormatType @@ -224,6 +225,9 @@ def _build_stream_api_resp( resp.tool_calls.append(ToolCall(call_id, function_name, arguments)) + if not resp.content and not resp.tool_calls: + raise EmptyResponseException() + return resp @@ -284,26 +288,27 @@ def _default_normal_response_parser( """ api_response = APIResponse() - if not hasattr(resp, "candidates") or not resp.candidates: - raise RespParseException(resp, "响应解析失败,缺失candidates字段") + # 解析思考内容 try: - if resp.candidates[0].content and resp.candidates[0].content.parts: - for part in resp.candidates[0].content.parts: - if not part.text: - continue - if part.thought: - api_response.reasoning_content = ( - api_response.reasoning_content + part.text if api_response.reasoning_content else part.text - ) + if (candidates := getattr(resp, "candidates", None)) and candidates: + if candidates[0].content and candidates[0].content.parts: + for part in candidates[0].content.parts: + if not part.text: + continue + if part.thought: + api_response.reasoning_content = ( + api_response.reasoning_content + part.text if api_response.reasoning_content else part.text + ) except Exception as e: logger.warning(f"解析思考内容时发生错误: {e},跳过解析") - if resp.text: - api_response.content = resp.text + # 解析响应内容 + api_response.content = getattr(resp, "text", None) - if resp.function_calls: + # 解析工具调用 + if function_calls := getattr(resp, "function_calls", None): api_response.tool_calls = [] - for call in resp.function_calls: + for call in function_calls: try: if not isinstance(call.args, dict): raise RespParseException(resp, "响应解析失败,工具调用参数无法解析为字典类型") @@ -313,17 +318,22 @@ def _default_normal_response_parser( except Exception as e: raise RespParseException(resp, "响应解析失败,无法解析工具调用参数") from e - if resp.usage_metadata: + # 解析使用情况 + if usage_metadata := getattr(resp, "usage_metadata", None): _usage_record = ( - resp.usage_metadata.prompt_token_count or 0, - (resp.usage_metadata.candidates_token_count or 0) + (resp.usage_metadata.thoughts_token_count or 0), - resp.usage_metadata.total_token_count or 0, + usage_metadata.prompt_token_count or 0, + (usage_metadata.candidates_token_count or 0) + (usage_metadata.thoughts_token_count or 0), + usage_metadata.total_token_count or 0, ) else: _usage_record = None api_response.raw_data = resp + # 最终的、唯一的空响应检查 + if not api_response.content and not api_response.tool_calls: + raise EmptyResponseException("响应中既无文本内容也无工具调用") + return api_response, _usage_record diff --git a/src/llm_models/model_client/openai_client.py b/src/llm_models/model_client/openai_client.py index bba00f94..51bb692f 100644 --- a/src/llm_models/model_client/openai_client.py +++ b/src/llm_models/model_client/openai_client.py @@ -30,6 +30,7 @@ from ..exceptions import ( NetworkConnectionError, RespNotOkException, ReqAbortException, + EmptyResponseException, ) from ..payload_content.message import Message, RoleType from ..payload_content.resp_format import RespFormat @@ -235,6 +236,9 @@ def _build_stream_api_resp( resp.tool_calls.append(ToolCall(call_id, function_name, arguments)) + if not resp.content and not resp.tool_calls: + raise EmptyResponseException() + return resp @@ -332,7 +336,7 @@ def _default_normal_response_parser( api_response = APIResponse() if not hasattr(resp, "choices") or len(resp.choices) == 0: - raise RespParseException(resp, "响应解析失败,缺失choices字段") + raise EmptyResponseException("响应解析失败,缺失choices字段或choices列表为空") message_part = resp.choices[0].message if hasattr(message_part, "reasoning_content") and message_part.reasoning_content: # type: ignore @@ -377,6 +381,9 @@ def _default_normal_response_parser( # 将原始响应存储在原始数据中 api_response.raw_data = resp + if not api_response.content and not api_response.tool_calls: + raise EmptyResponseException() + return api_response, _usage_record diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index 1125e9fd..7ab76969 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -14,7 +14,13 @@ from .payload_content.resp_format import RespFormat from .payload_content.tool_option import ToolOption, ToolCall, ToolOptionBuilder, ToolParamType from .model_client.base_client import BaseClient, APIResponse, client_registry from .utils import compress_messages, llm_usage_recorder -from .exceptions import NetworkConnectionError, ReqAbortException, RespNotOkException, RespParseException +from .exceptions import ( + NetworkConnectionError, + ReqAbortException, + RespNotOkException, + RespParseException, + EmptyResponseException, +) install(extra_lines=3) @@ -192,12 +198,6 @@ class LLMRequest: endpoint="/chat/completions", time_cost=time.time() - start_time, ) - - if not content: - if raise_when_empty: - logger.warning(f"生成的响应为空, 请求类型: {self.request_type}") - raise RuntimeError("生成的响应为空") - content = "生成的响应为空,请检查模型配置或输入内容是否正确" return content, (reasoning_content, model_info.name, tool_calls) @@ -367,6 +367,13 @@ class LLMRequest: can_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 连接异常,将于{retry_interval}秒后重试", cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 连接异常,超过最大重试次数,请检查网络连接状态或URL是否正确", ) + elif isinstance(e, EmptyResponseException): # 空响应错误 + return self._check_retry( + remain_try, + retry_interval, + can_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 收到空响应,将于{retry_interval}秒后重试。原因: {e}", + cannot_retry_msg=f"任务-'{task_name}' 模型-'{model_name}': 收到空响应,超过最大重试次数,放弃请求", + ) elif isinstance(e, ReqAbortException): logger.warning(f"任务-'{task_name}' 模型-'{model_name}': 请求被中断,详细信息-{str(e.message)}") return -1, None # 不再重试请求该模型