diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index 6bbda587..17ab06e9 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -19,7 +19,6 @@ from src.chat.planner_actions.action_manager import ActionManager from src.chat.heart_flow.hfc_utils import CycleDetail from src.express.expression_learner import expression_learner_manager from src.chat.frequency_control.frequency_control import frequency_control_manager -from src.memory_system.curious import check_and_make_question from src.jargon import extract_and_store_jargon from src.person_info.person_info import Person from src.plugin_system.base.component_types import EventType, ActionInfo diff --git a/src/chat/knowledge/__init__.py b/src/chat/knowledge/__init__.py index 324320f2..b9c96708 100644 --- a/src/chat/knowledge/__init__.py +++ b/src/chat/knowledge/__init__.py @@ -30,6 +30,8 @@ DATA_PATH = os.path.join(ROOT_PATH, "data") qa_manager = None inspire_manager = None +def get_qa_manager(): + return qa_manager def lpmm_start_up(): # sourcery skip: extract-duplicate-method # 检查LPMM知识库是否启用 diff --git a/src/chat/replyer/group_generator.py b/src/chat/replyer/group_generator.py index bcda39b9..627412fb 100644 --- a/src/chat/replyer/group_generator.py +++ b/src/chat/replyer/group_generator.py @@ -1085,6 +1085,10 @@ class DefaultReplyer: if not global_config.lpmm_knowledge.enable: logger.debug("LPMM知识库未启用,跳过获取知识库内容") return "" + + if global_config.lpmm_knowledge.lpmm_mode == "agent": + return "" + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) bot_name = global_config.bot.nickname @@ -1102,6 +1106,10 @@ class DefaultReplyer: model_config=model_config.model_task_config.tool_use, tool_options=[SearchKnowledgeFromLPMMTool.get_tool_definition()], ) + + # logger.info(f"工具调用提示词: {prompt}") + # logger.info(f"工具调用: {tool_calls}") + if tool_calls: result = await self.tool_executor.execute_tool_call(tool_calls[0], SearchKnowledgeFromLPMMTool()) end_time = time.time() @@ -1109,7 +1117,7 @@ class DefaultReplyer: logger.debug("从LPMM知识库获取知识失败,返回空知识...") return "" found_knowledge_from_lpmm = result.get("content", "") - logger.debug( + logger.info( f"从LPMM知识库获取知识,相关信息:{found_knowledge_from_lpmm[:100]}...,信息长度: {len(found_knowledge_from_lpmm)}" ) related_info += found_knowledge_from_lpmm diff --git a/src/chat/replyer/private_generator.py b/src/chat/replyer/private_generator.py index 58928259..08da116e 100644 --- a/src/chat/replyer/private_generator.py +++ b/src/chat/replyer/private_generator.py @@ -1030,6 +1030,10 @@ class PrivateReplyer: if not global_config.lpmm_knowledge.enable: logger.debug("LPMM知识库未启用,跳过获取知识库内容") return "" + + if global_config.lpmm_knowledge.lpmm_mode == "agent": + return "" + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) bot_name = global_config.bot.nickname diff --git a/src/llm_models/model_client/openai_client.py b/src/llm_models/model_client/openai_client.py index 7b350169..edfb6bea 100644 --- a/src/llm_models/model_client/openai_client.py +++ b/src/llm_models/model_client/openai_client.py @@ -77,6 +77,23 @@ def _convert_messages(messages: list[Message]) -> list[ChatCompletionMessagePara "content": content, } + if message.role == RoleType.Assistant and getattr(message, "tool_calls", None): + tool_calls_payload: list[dict[str, Any]] = [] + for call in message.tool_calls or []: + tool_calls_payload.append( + { + "id": call.call_id, + "type": "function", + "function": { + "name": call.func_name, + "arguments": json.dumps(call.args or {}, ensure_ascii=False), + }, + } + ) + ret["tool_calls"] = tool_calls_payload + if ret["content"] == []: + ret["content"] = "" + # 添加工具调用ID if message.role == RoleType.Tool: if not message.tool_call_id: diff --git a/src/llm_models/payload_content/message.py b/src/llm_models/payload_content/message.py index f70c3ded..ddcdf57f 100644 --- a/src/llm_models/payload_content/message.py +++ b/src/llm_models/payload_content/message.py @@ -1,4 +1,7 @@ from enum import Enum +from typing import List, Optional + +from .tool_option import ToolCall # 设计这系列类的目的是为未来可能的扩展做准备 @@ -20,6 +23,7 @@ class Message: role: RoleType, content: str | list[tuple[str, str] | str], tool_call_id: str | None = None, + tool_calls: Optional[List[ToolCall]] = None, ): """ 初始化消息对象 @@ -28,6 +32,13 @@ class Message: self.role: RoleType = role self.content: str | list[tuple[str, str] | str] = content self.tool_call_id: str | None = tool_call_id + self.tool_calls: Optional[List[ToolCall]] = tool_calls + + def __str__(self) -> str: + return ( + f"Role: {self.role}, Content: {self.content}, " + f"Tool Call ID: {self.tool_call_id}, Tool Calls: {self.tool_calls}" + ) class MessageBuilder: @@ -35,6 +46,7 @@ class MessageBuilder: self.__role: RoleType = RoleType.User self.__content: list[tuple[str, str] | str] = [] self.__tool_call_id: str | None = None + self.__tool_calls: Optional[List[ToolCall]] = None def set_role(self, role: RoleType = RoleType.User) -> "MessageBuilder": """ @@ -86,12 +98,27 @@ class MessageBuilder: self.__tool_call_id = tool_call_id return self + def set_tool_calls(self, tool_calls: List[ToolCall]) -> "MessageBuilder": + """ + 设置助手消息的工具调用列表 + :param tool_calls: 工具调用列表 + :return: MessageBuilder对象 + """ + if self.__role != RoleType.Assistant: + raise ValueError("仅当角色为Assistant时才能设置工具调用列表") + if not tool_calls: + raise ValueError("工具调用列表不能为空") + self.__tool_calls = tool_calls + return self + def build(self) -> Message: """ 构建消息对象 :return: Message对象 """ - if len(self.__content) == 0: + if len(self.__content) == 0 and not ( + self.__role == RoleType.Assistant and self.__tool_calls + ): raise ValueError("内容不能为空") if self.__role == RoleType.Tool and self.__tool_call_id is None: raise ValueError("Tool角色的工具调用ID不能为空") @@ -104,4 +131,5 @@ class MessageBuilder: else self.__content ), tool_call_id=self.__tool_call_id, + tool_calls=self.__tool_calls, ) diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index f161db95..c3b0fac7 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -166,6 +166,57 @@ class LLMRequest: time_cost=time.time() - start_time, ) return content or "", (reasoning_content, model_info.name, tool_calls) + + async def generate_response_with_message_async( + self, + message_factory: Callable[[BaseClient], List[Message]], + temperature: Optional[float] = None, + max_tokens: Optional[int] = None, + tools: Optional[List[Dict[str, Any]]] = None, + raise_when_empty: bool = True, + ) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]: + """ + 异步生成响应 + Args: + message_factory (Callable[[BaseClient], List[Message]]): 已构建好的消息工厂 + temperature (float, optional): 温度参数 + max_tokens (int, optional): 最大token数 + tools (Optional[List[Dict[str, Any]]]): 工具列表 + raise_when_empty (bool): 当响应为空时是否抛出异常 + Returns: + (Tuple[str, str, str, Optional[List[ToolCall]]]): 响应内容、推理内容、模型名称、工具调用列表 + """ + start_time = time.time() + + tool_built = self._build_tool_options(tools) + + response, model_info = await self._execute_request( + request_type=RequestType.RESPONSE, + message_factory=message_factory, + temperature=temperature, + max_tokens=max_tokens, + tool_options=tool_built, + ) + + logger.debug(f"LLM请求总耗时: {time.time() - start_time}") + logger.debug(f"LLM生成内容: {response}") + + content = response.content + reasoning_content = response.reasoning_content or "" + tool_calls = response.tool_calls + if not reasoning_content and content: + content, extracted_reasoning = self._extract_reasoning(content) + reasoning_content = extracted_reasoning + if usage := response.usage: + llm_usage_recorder.record_usage_to_database( + model_info=model_info, + model_usage=usage, + user_id="system", + request_type=self.request_type, + endpoint="/chat/completions", + time_cost=time.time() - start_time, + ) + return content or "", (reasoning_content, model_info.name, tool_calls) async def get_embedding(self, embedding_input: str) -> Tuple[List[float], str]: """ diff --git a/src/memory_system/curious.py b/src/memory_system/curious.py deleted file mode 100644 index 044c9cdb..00000000 --- a/src/memory_system/curious.py +++ /dev/null @@ -1,215 +0,0 @@ -import time -from typing import List, Optional -from src.common.logger import get_logger -from src.chat.utils.chat_message_builder import ( - get_raw_msg_by_timestamp_with_chat_inclusive, - build_readable_messages_with_id, -) -from src.llm_models.utils_model import LLMRequest -from src.config.config import model_config, global_config -from src.memory_system.memory_utils import parse_md_json - -logger = get_logger("curious") - - -class CuriousDetector: - """ - 好奇心检测器 - 检测聊天记录中的矛盾、冲突或需要提问的内容 - """ - - def __init__(self, chat_id: str): - self.chat_id = chat_id - self.llm_request = LLMRequest( - model_set=model_config.model_task_config.utils, - request_type="curious_detector", - ) - # 触发控制 - self.last_detection_time: float = time.time() - self.min_interval_seconds: float = 60.0 - self.min_messages: int = 20 - - def should_trigger(self) -> bool: - if time.time() - self.last_detection_time < self.min_interval_seconds: - return False - recent_messages = get_raw_msg_by_timestamp_with_chat_inclusive( - chat_id=self.chat_id, - timestamp_start=self.last_detection_time, - timestamp_end=time.time(), - ) - return bool(recent_messages and len(recent_messages) >= self.min_messages) - - async def detect_questions(self, recent_messages: List) -> Optional[str]: - """ - 检测最近消息中是否有需要提问的内容 - - Args: - recent_messages: 最近的消息列表 - - Returns: - Optional[str]: 如果检测到需要提问的内容,返回问题文本;否则返回None - """ - try: - if not recent_messages or len(recent_messages) < 2: - return None - - # 构建聊天内容 - chat_content_block, _ = build_readable_messages_with_id( - messages=recent_messages, - timestamp_mode="normal_no_YMD", - read_mark=0.0, - truncate=True, - show_actions=True, - ) - - # 问题跟踪功能已移除,不再检查已有问题 - - # 构建检测提示词 - prompt = f"""你是一个严谨的聊天内容分析器。请分析以下聊天记录,检测是否存在需要提问的内容。 - -检测条件: -1. 聊天中存在逻辑矛盾或冲突的信息 -2. 有人反对或否定之前提出的信息 -3. 存在观点不一致的情况 -4. 有模糊不清或需要澄清的概念 -5. 有人提出了质疑或反驳 - -**重要限制:** -- 忽略涉及违法、暴力、色情、政治等敏感话题的内容 -- 不要对敏感话题提问 -- 只有在确实存在矛盾或冲突时才提问 -- 如果聊天内容正常,没有矛盾,请输出:NO - -**聊天记录** -{chat_content_block} - -请分析上述聊天记录,如果发现需要提问的内容,请用JSON格式输出: -```json -{{ - "question": "具体的问题描述,要完整描述涉及的概念和问题", - "reason": "为什么需要提问这个问题的理由" -}} -``` - -如果没有需要提问的内容,请只输出:NO""" - - if global_config.debug.show_prompt: - logger.info(f"好奇心检测提示词: {prompt}") - else: - logger.debug("已发送好奇心检测提示词") - - result_text, _ = await self.llm_request.generate_response_async(prompt, temperature=0.3) - - logger.info(f"好奇心检测提示词: {prompt}") - logger.info(f"好奇心检测结果: {result_text}") - - if not result_text: - return None - - result_text = result_text.strip() - - # 检查是否输出NO - if result_text.upper() == "NO": - logger.debug("未检测到需要提问的内容") - return None - - # 尝试解析JSON - try: - questions, reasoning = parse_md_json(result_text) - if questions and len(questions) > 0: - question_data = questions[0] - question = question_data.get("question", "") - reason = question_data.get("reason", "") - - if question and question.strip(): - logger.info(f"检测到需要提问的内容: {question}") - logger.info(f"提问理由: {reason}") - return question - except Exception as e: - logger.warning(f"解析问题JSON失败: {e}") - logger.debug(f"原始响应: {result_text}") - - return None - - except Exception as e: - logger.error(f"好奇心检测失败: {e}") - return None - - async def make_question_from_detection(self, question: str, context: str = "") -> bool: - """ - 将检测到的问题记录(已移除冲突追踪器功能) - - Args: - question: 检测到的问题 - context: 问题上下文 - - Returns: - bool: 是否成功记录 - """ - try: - if not question or not question.strip(): - return False - - # 冲突追踪器功能已移除 - logger.info(f"检测到问题(冲突追踪器已移除): {question}") - return True - - except Exception as e: - logger.error(f"记录问题失败: {e}") - return False - - -class CuriousManager: - def __init__(self) -> None: - self._detectors: dict[str, CuriousDetector] = {} - - def get_detector(self, chat_id: str) -> CuriousDetector: - if chat_id not in self._detectors: - self._detectors[chat_id] = CuriousDetector(chat_id) - return self._detectors[chat_id] - - -curious_manager = CuriousManager() - - -async def check_and_make_question(chat_id: str) -> bool: - """ - 检查聊天记录并生成问题(如果检测到需要提问的内容) - - Args: - chat_id: 聊天ID - recent_messages: 最近的消息列表 - - Returns: - bool: 是否检测到并记录了问题 - """ - try: - detector = curious_manager.get_detector(chat_id) - if not detector.should_trigger(): - return False - - # 拉取窗口内消息 - recent_messages = get_raw_msg_by_timestamp_with_chat_inclusive( - chat_id=chat_id, - timestamp_start=detector.last_detection_time, - timestamp_end=time.time(), - limit=80, - ) - if not recent_messages: - return False - - # 检测是否需要提问 - question = await detector.detect_questions(recent_messages) - - if question: - # 记录问题 - success = await detector.make_question_from_detection(question) - if success: - logger.info(f"成功检测并记录问题: {question}") - detector.last_detection_time = time.time() - return True - - return False - - except Exception as e: - logger.error(f"检查并生成问题失败: {e}") - return False diff --git a/src/memory_system/memory_retrieval.py b/src/memory_system/memory_retrieval.py index 7e9c8ee0..ab4e0f5b 100644 --- a/src/memory_system/memory_retrieval.py +++ b/src/memory_system/memory_retrieval.py @@ -10,11 +10,11 @@ from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.plugin_system.apis import llm_api from src.common.database.database_model import ThinkingBack from json_repair import repair_json -from src.memory_system.retrieval_tools import get_tool_registry, init_all_tools +from src.memory_system.retrieval_tools import get_tool_registry, init_all_tools, register_memory_retrieval_tool +from src.llm_models.payload_content.message import MessageBuilder, RoleType, Message logger = get_logger("memory_retrieval") - def init_memory_retrieval_prompt(): """初始化记忆检索相关的 prompt 模板和工具""" # 首先注册所有工具 @@ -35,73 +35,145 @@ def init_memory_retrieval_prompt(): 2. 是否有需要回忆的内容(比如"之前说过"、"上次"、"以前"等) 3. 是否有需要查找历史信息的问题 4. 是否有问题可以搜集信息帮助你聊天 +5. 对话中是否包含黑话、俚语、缩写等可能需要查询的概念 重要提示: +- **每次只能提出一个问题**,选择最需要查询的关键问题 - 如果"最近已查询的问题和结果"中已经包含了类似的问题,请避免重复生成相同或相似的问题 - 如果之前已经查询过某个问题但未找到答案,可以尝试用不同的方式提问或更具体的问题 - 如果之前已经查询过某个问题并找到了答案,可以直接参考已有结果,不需要重复查询 -如果你认为需要从记忆中检索信息来回答,请根据上下文提出一个或多个具体的问题。 +如果你认为需要从记忆中检索信息来回答,请: +1. 先识别对话中可能需要查询的概念(黑话/俚语/缩写/人名/专有名词等关键词) +2. 然后根据上下文提出**一个**最关键的问题来帮助你回复目标消息 + 问题格式示例: - "xxx在前几天干了什么" - "xxx是什么" - "xxxx和xxx的关系是什么" - "xxx在某个时间点发生了什么" -请输出JSON格式的问题数组。如果不需要检索记忆,则输出空数组[]。 +请输出JSON格式,包含两个字段: +- "concepts": 需要检索的概念列表(字符串数组),如果不需要检索概念则输出空数组[] +- "questions": 问题数组(字符串数组),如果不需要检索记忆则输出空数组[],如果需要检索则只输出包含一个问题的数组 -输出格式示例: +输出格式示例(需要检索时): ```json -[ - "张三在前几天干了什么", - "自然选择是什么", - "李四和王五的关系是什么" -] +{{ + "concepts": ["AAA", "BBB", "CCC"], + "questions": ["张三在前几天干了什么"] +}} ``` -请只输出JSON数组,不要输出其他内容: +输出格式示例(不需要检索时): +```json +{{ + "concepts": [], + "questions": [] +}} +``` + +请只输出JSON对象,不要输出其他内容: """, name="memory_retrieval_question_prompt", ) - # 第二步:ReAct Agent prompt(工具描述会在运行时动态生成) + # 第二步:ReAct Agent prompt(使用function calling,要求先思考再行动) Prompt( """ 你的名字是{bot_name}。现在是{time_now}。 你正在参与聊天,你需要搜集信息来回答问题,帮助你参与聊天。 你需要通过思考(Think)、行动(Action)、观察(Observation)的循环来回答问题。 +**重要限制:** +- 最大查询轮数:5轮(当前第{current_iteration}轮,剩余{remaining_iterations}轮) +- 必须尽快得出答案,避免不必要的查询 +- 思考要简短,直接切入要点 +- 必须严格使用检索到的信息回答问题,不要编造信息 + 当前问题:{question} 已收集的信息: {collected_info} -你可以使用以下工具来查询信息: -{tools_description} +**执行步骤:** -请按照以下格式输出你的思考过程: -```json -{{ - "thought": "你的思考过程,分析当前情况,决定下一步行动", - "actions": [ - {{ - "action_type": {action_types_list}, - "action_params": {{参数名: 参数值}} 或 null - }} - ] -}} -``` +**第一步:思考(Think)** +在思考中分析: +- 当前信息是否足够回答问题? +- 如果足够,在思考中直接给出答案,格式为:final_answer(answer="你的答案内容") +- 如果不够,说明最需要查询什么,并输出为纯文本说明 -重要说明: -- 你可以在一次迭代中执行多个查询,将多个action放在actions数组中 -- 如果只需要执行一个查询,actions数组中只包含一个action即可 -- 如果已经收集到足够的信息可以回答问题,请设置actions为包含一个action_type为"final_answer"的数组,并在action_params中提供答案(例如:{{"answer": "你的答案内容"}})。除非明确找到答案,否则不要设置为final_answer。 -- 如果经过多次查询后,确认无法找到相关信息或答案,请设置actions为包含一个action_type为"no_answer"的数组,并在action_params中说明原因(例如:{{"reason": "无法找到的原因"}})。 +**第二步:行动(Action)** +根据思考结果立即行动: +- 如果思考中已给出final_answer → 无需调用工具,直接结束 +- 如果信息不足 → 调用相应工具查询(可并行调用多个工具) +- 如果多次查询仍无结果 → 在思考中给出no_answer(reason="无法找到答案的原因") -请只输出JSON,不要输出其他内容: +**重要:答案必须在思考中给出,格式为 final_answer(answer="...") 或 no_answer(reason="..."),不要调用工具。** """, name="memory_retrieval_react_prompt", ) + # 第二步:ReAct Agent prompt(使用function calling,要求先思考再行动) + Prompt( + """ +你的名字是{bot_name}。现在是{time_now}。 +你正在参与聊天,你需要搜集信息来回答问题,帮助你参与聊天。 +你需要通过思考(Think)、行动(Action)、观察(Observation)的循环来回答问题。 + +**重要限制:** +- 最大查询轮数:5轮(当前第{current_iteration}轮,剩余{remaining_iterations}轮) +- 必须尽快得出答案,避免不必要的查询 +- 思考要简短,直接切入要点 +- 必须严格使用检索到的信息回答问题,不要编造信息 + +当前问题:{question} + +**执行步骤:** + +**第一步:思考(Think)** +在思考中分析: +- 当前信息是否足够回答问题? +- 如果足够,在思考中直接给出答案,格式为:final_answer(answer="你的答案内容") +- 如果不够,说明最需要查询什么,并输出为纯文本说明 + +**第二步:行动(Action)** +根据思考结果立即行动: +- 如果思考中已给出final_answer → 无需调用工具,直接结束 +- 如果信息不足 → 调用相应工具查询(可并行调用多个工具) +- 如果多次查询仍无结果 → 在思考中给出no_answer(reason="无法找到答案的原因") + +**重要:答案必须在思考中给出,格式为 final_answer(answer="...") 或 no_answer(reason="..."),不要调用工具。** +""", + name="memory_retrieval_react_prompt_head", + ) + + # 额外,如果最后一轮迭代:ReAct Agent prompt(使用function calling,要求先思考再行动) + Prompt( + """ +你的名字是{bot_name}。现在是{time_now}。 +你正在参与聊天,你需要搜集信息来回答问题,帮助你参与聊天。 + +**重要限制:** +- 你已经经过几轮查询,尝试了信息搜集,现在你需要总结信息,选择回答问题或判断问题无法回答 +- 思考要简短,直接切入要点 +- 必须严格使用检索到的信息回答问题,不要编造信息 + +当前问题:{question} +已收集的信息: +{collected_info} + +**执行步骤:** +分析: +- 当前信息是否足够回答问题? +- 如果足够,在思考中直接给出答案,格式为:final_answer(answer="你的答案内容") +- 如果不够,在思考中给出no_answer(reason="无法找到答案的原因") + +**重要:答案必须给出,格式为 final_answer(answer="...") 或 no_answer(reason="...")。** +""", + name="memory_retrieval_react_final_prompt", + ) + def _parse_react_response(response: str) -> Optional[Dict[str, Any]]: """解析ReAct Agent的响应 @@ -156,11 +228,89 @@ def _parse_react_response(response: str) -> Optional[Dict[str, Any]]: return None +async def _retrieve_concepts_with_jargon( + concepts: List[str], + chat_id: str +) -> str: + """对概念列表进行jargon检索 + + Args: + concepts: 概念列表 + chat_id: 聊天ID + + Returns: + str: 检索结果字符串 + """ + if not concepts: + return "" + + from src.jargon.jargon_miner import search_jargon + + results = [] + for concept in concepts: + concept = concept.strip() + if not concept: + continue + + # 先尝试精确匹配 + jargon_results = search_jargon( + keyword=concept, + chat_id=chat_id, + limit=10, + case_sensitive=False, + fuzzy=False + ) + + is_fuzzy_match = False + + # 如果精确匹配未找到,尝试模糊搜索 + if not jargon_results: + jargon_results = search_jargon( + keyword=concept, + chat_id=chat_id, + limit=10, + case_sensitive=False, + fuzzy=True + ) + is_fuzzy_match = True + + if jargon_results: + # 找到结果 + if is_fuzzy_match: + # 模糊匹配 + output_parts = [f"未精确匹配到'{concept}'"] + for result in jargon_results: + found_content = result.get("content", "").strip() + meaning = result.get("meaning", "").strip() + if found_content and meaning: + output_parts.append(f"找到 '{found_content}' 的含义为:{meaning}") + results.append(",".join(output_parts)) + logger.info(f"在jargon库中找到匹配(模糊搜索): {concept},找到{len(jargon_results)}条结果") + else: + # 精确匹配 + output_parts = [] + for result in jargon_results: + meaning = result.get("meaning", "").strip() + if meaning: + output_parts.append(f"'{concept}' 为黑话或者网络简写,含义为:{meaning}") + results.append(";".join(output_parts) if len(output_parts) > 1 else output_parts[0]) + logger.info(f"在jargon库中找到匹配(精确匹配): {concept},找到{len(jargon_results)}条结果") + else: + # 未找到 + results.append(f"未在jargon库中找到'{concept}'的解释") + logger.info(f"在jargon库中未找到匹配: {concept}") + + if results: + return "【概念检索结果】\n" + "\n".join(results) + "\n" + return "" + + async def _react_agent_solve_question( question: str, chat_id: str, max_iterations: int = 5, - timeout: float = 30.0 + timeout: float = 30.0, + initial_info: str = "" ) -> Tuple[bool, str, List[Dict[str, Any]], bool]: """使用ReAct架构的Agent来解决问题 @@ -169,14 +319,16 @@ async def _react_agent_solve_question( chat_id: 聊天ID max_iterations: 最大迭代次数 timeout: 超时时间(秒) + initial_info: 初始信息(如概念检索结果),将作为collected_info的初始值 Returns: Tuple[bool, str, List[Dict[str, Any]], bool]: (是否找到答案, 答案内容, 思考步骤列表, 是否超时) """ start_time = time.time() - collected_info = "" + collected_info = initial_info if initial_info else "" thinking_steps = [] is_timeout = False + conversation_messages: List[Message] = [] for iteration in range(max_iterations): # 检查超时 @@ -185,9 +337,6 @@ async def _react_agent_solve_question( is_timeout = True break - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}") - logger.info(f"ReAct Agent 已收集信息: {collected_info if collected_info else '暂无信息'}") - # 获取工具注册器 tool_registry = get_tool_registry() @@ -197,90 +346,235 @@ async def _react_agent_solve_question( # 获取当前时间 time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - # 构建prompt(动态生成工具描述) + # 计算剩余迭代次数 + current_iteration = iteration + 1 + remaining_iterations = max_iterations - current_iteration + is_final_iteration = current_iteration >= max_iterations + + # 构建prompt(不再需要工具文本描述) + + prompt_type = "memory_retrieval_react_prompt" + if is_final_iteration: + prompt_type = "memory_retrieval_react_final_prompt" + tool_definitions = [] + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: 0(最后一次迭代,不提供工具调用)") + else: + tool_definitions = tool_registry.get_tool_definitions() + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}") + prompt = await global_prompt_manager.format_prompt( - "memory_retrieval_react_prompt", + prompt_type, bot_name=bot_name, time_now=time_now, question=question, collected_info=collected_info if collected_info else "暂无信息", - tools_description=tool_registry.get_tools_description(), - action_types_list=tool_registry.get_action_types_list(), + current_iteration=current_iteration, + remaining_iterations=remaining_iterations, ) + - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 Prompt: {prompt}") + + if not is_final_iteration: + head_prompt = await global_prompt_manager.format_prompt( + "memory_retrieval_react_prompt_head", + bot_name=bot_name, + time_now=time_now, + question=question, + current_iteration=current_iteration, + remaining_iterations=remaining_iterations, + ) + + def message_factory(_client) -> List[Message]: + messages: List[Message] = [] + + system_builder = MessageBuilder() + system_builder.set_role(RoleType.System) + system_builder.add_text_content(head_prompt) + if prompt.strip(): + system_builder.add_text_content(f"\n{prompt}") + messages.append(system_builder.build()) + + messages.extend(conversation_messages) + + for msg in messages: + print(msg) + + return messages + + success, response, reasoning_content, model_name, tool_calls = await llm_api.generate_with_model_with_tools_by_message_factory( + message_factory, + model_config=model_config.model_task_config.tool_use, + tool_options=tool_definitions, + request_type="memory.react", + ) + else: + logger.info(f"ReAct Agent 第 {iteration + 1} 次Prompt: {prompt}") + success, response, reasoning_content, model_name, tool_calls = await llm_api.generate_with_model_with_tools( + prompt, + model_config=model_config.model_task_config.tool_use, + tool_options=tool_definitions, + request_type="memory.react", + ) - # 调用LLM - success, response, reasoning_content, model_name = await llm_api.generate_with_model( - prompt, - model_config=model_config.model_task_config.tool_use, - request_type="memory.react", - ) - - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 LLM响应: {response}") - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 LLM推理: {reasoning_content}") - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 LLM模型: {model_name}") + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}") if not success: logger.error(f"ReAct Agent LLM调用失败: {response}") break + + assistant_message: Optional[Message] = None + if tool_calls: + assistant_builder = MessageBuilder() + assistant_builder.set_role(RoleType.Assistant) + if response and response.strip(): + assistant_builder.add_text_content(response) + assistant_builder.set_tool_calls(tool_calls) + assistant_message = assistant_builder.build() + elif response and response.strip(): + assistant_builder = MessageBuilder() + assistant_builder.set_role(RoleType.Assistant) + assistant_builder.add_text_content(response) + assistant_message = assistant_builder.build() - # 解析响应 - action_info = _parse_react_response(response) - if not action_info: - logger.warning(f"无法解析ReAct响应,迭代{iteration + 1}") - break - - thought = action_info.get("thought", "") - actions = action_info.get("actions", []) - - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考: {thought}") - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 动作数量: {len(actions)}") - - # 记录思考步骤(包含所有actions) + # 记录思考步骤 step = { "iteration": iteration + 1, - "thought": thought, - "actions": actions, + "thought": response, + "actions": [], "observations": [] } - # 检查是否有final_answer或no_answer - for action in actions: - action_type = action.get("action_type", "") - action_params = action.get("action_params", {}) - if action_type == "final_answer": - # Agent认为已经找到答案 - # 从action_params中获取答案,如果没有则使用thought作为后备 - answer = action_params.get("answer", thought) if isinstance(action_params, dict) else thought - step["observations"] = ["找到答案"] - thinking_steps.append(step) - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 找到最终答案: {answer}") - return True, answer, thinking_steps, False - elif action_type == "no_answer": - # Agent确认无法找到答案 - # 从action_params中获取原因,如果没有则使用thought作为后备 - answer = action_params.get("reason", thought) if isinstance(action_params, dict) else thought - step["observations"] = ["确认无法找到答案"] - thinking_steps.append(step) - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 确认无法找到答案: {answer}") - return False, answer, thinking_steps, False + # 优先从思考内容中提取final_answer或no_answer + def extract_quoted_content(text, func_name, param_name): + """从文本中提取函数调用中参数的值,支持单引号和双引号 + + Args: + text: 要搜索的文本 + func_name: 函数名,如 'final_answer' + param_name: 参数名,如 'answer' + + Returns: + 提取的参数值,如果未找到则返回None + """ + if not text: + return None + + # 查找函数调用位置(不区分大小写) + func_pattern = func_name.lower() + text_lower = text.lower() + func_pos = text_lower.find(func_pattern) + if func_pos == -1: + return None + + # 查找参数名和等号 + param_pattern = f'{param_name}=' + param_pos = text_lower.find(param_pattern, func_pos) + if param_pos == -1: + return None + + # 跳过参数名、等号和空白 + start_pos = param_pos + len(param_pattern) + while start_pos < len(text) and text[start_pos] in ' \t\n': + start_pos += 1 + + if start_pos >= len(text): + return None + + # 确定引号类型 + quote_char = text[start_pos] + if quote_char not in ['"', "'"]: + return None + + # 查找匹配的结束引号(考虑转义) + end_pos = start_pos + 1 + while end_pos < len(text): + if text[end_pos] == quote_char: + # 检查是否是转义的引号 + if end_pos > start_pos + 1 and text[end_pos - 1] == '\\': + end_pos += 1 + continue + # 找到匹配的引号 + content = text[start_pos + 1:end_pos] + # 处理转义字符 + content = content.replace('\\"', '"').replace("\\'", "'").replace('\\\\', '\\') + return content + end_pos += 1 + + return None - # 并行执行所有工具 - tool_registry = get_tool_registry() + # 从LLM的直接输出内容中提取final_answer或no_answer + final_answer_content = None + no_answer_reason = None + + # 只检查response(LLM的直接输出内容),不检查reasoning_content + if response: + final_answer_content = extract_quoted_content(response, 'final_answer', 'answer') + if not final_answer_content: + no_answer_reason = extract_quoted_content(response, 'no_answer', 'reason') + + # 如果从输出内容中找到了答案,直接返回 + if final_answer_content: + step["actions"].append({"action_type": "final_answer", "action_params": {"answer": final_answer_content}}) + step["observations"] = ["从LLM输出内容中检测到final_answer"] + thinking_steps.append(step) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 从LLM输出内容中检测到final_answer: {final_answer_content[:100]}...") + return True, final_answer_content, thinking_steps, False + + if no_answer_reason: + step["actions"].append({"action_type": "no_answer", "action_params": {"reason": no_answer_reason}}) + step["observations"] = ["从LLM输出内容中检测到no_answer"] + thinking_steps.append(step) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 从LLM输出内容中检测到no_answer: {no_answer_reason[:100]}...") + return False, no_answer_reason, thinking_steps, False + + if is_final_iteration: + step["actions"].append({"action_type": "no_answer", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}}) + step["observations"] = ["已到达最后一次迭代,无法找到答案"] + thinking_steps.append(step) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 已到达最后一次迭代,无法找到答案") + return False, "已到达最后一次迭代,无法找到答案", thinking_steps, False + + if assistant_message: + conversation_messages.append(assistant_message) + + # 记录思考过程到collected_info中 + if reasoning_content or response: + thought_summary = reasoning_content or (response[:200] if response else "") + if thought_summary: + collected_info += f"\n[思考] {thought_summary}\n" + + # 处理工具调用 + if not tool_calls: + # 没有工具调用,说明LLM在思考中已经给出了答案(已在前面检查),或者需要继续查询 + # 如果思考中没有答案,说明需要继续查询或等待下一轮 + if response and response.strip(): + # 如果响应不为空,记录思考过程,继续下一轮迭代 + step["observations"] = [f"思考完成,但未调用工具。响应: {response}"] + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考完成但未调用工具: {response[:100]}...") + # 继续下一轮迭代,让LLM有机会在思考中给出final_answer或继续查询 + collected_info += f"思考: {response}" + thinking_steps.append(step) + continue + else: + logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 无工具调用且无响应") + step["observations"] = ["无响应且无工具调用"] + thinking_steps.append(step) + break + + # 处理工具调用 tool_tasks = [] - for i, action in enumerate(actions): - action_type = action.get("action_type", "") - action_params = action.get("action_params", {}) + for i, tool_call in enumerate(tool_calls): + tool_name = tool_call.func_name + tool_args = tool_call.args or {} - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 动作 {i+1}/{len(actions)}: {action_type}({action_params})") - - tool = tool_registry.get_tool(action_type) + logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 工具调用 {i+1}/{len(tool_calls)}: {tool_name}({tool_args})") + # 普通工具调用 + tool = tool_registry.get_tool(tool_name) if tool: # 准备工具参数(需要添加chat_id如果工具需要) - tool_params = action_params.copy() + tool_params = tool_args.copy() # 如果工具函数签名需要chat_id,添加它 import inspect @@ -289,35 +583,43 @@ async def _react_agent_solve_question( tool_params["chat_id"] = chat_id # 创建异步任务 - async def execute_single_tool(tool_instance, params, act_type, act_params, iter_num): + async def execute_single_tool(tool_instance, params, tool_name_str, iter_num): try: observation = await tool_instance.execute(**params) - param_str = ", ".join([f"{k}={v}" for k, v in act_params.items()]) - return f"查询{act_type}({param_str})的结果:{observation}" + param_str = ", ".join([f"{k}={v}" for k, v in params.items() if k != "chat_id"]) + return f"查询{tool_name_str}({param_str})的结果:{observation}" except Exception as e: error_msg = f"工具执行失败: {str(e)}" - logger.error(f"ReAct Agent 第 {iter_num + 1} 次迭代 动作 {act_type} {error_msg}") - return f"查询{act_type}失败: {error_msg}" + logger.error(f"ReAct Agent 第 {iter_num + 1} 次迭代 工具 {tool_name_str} {error_msg}") + return f"查询{tool_name_str}失败: {error_msg}" - tool_tasks.append(execute_single_tool(tool, tool_params, action_type, action_params, iteration)) + tool_tasks.append(execute_single_tool(tool, tool_params, tool_name, iteration)) + step["actions"].append({"action_type": tool_name, "action_params": tool_args}) else: - error_msg = f"未知的工具类型: {action_type}" - logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 动作 {i+1}/{len(actions)} {error_msg}") - tool_tasks.append(asyncio.create_task(asyncio.sleep(0, result=f"查询{action_type}失败: {error_msg}"))) + error_msg = f"未知的工具类型: {tool_name}" + logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1}/{len(tool_calls)} {error_msg}") + tool_tasks.append(asyncio.create_task(asyncio.sleep(0, result=f"查询{tool_name}失败: {error_msg}"))) # 并行执行所有工具 if tool_tasks: observations = await asyncio.gather(*tool_tasks, return_exceptions=True) # 处理执行结果 - for i, observation in enumerate(observations): + for i, (tool_call_item, observation) in enumerate(zip(tool_calls, observations)): if isinstance(observation, Exception): observation = f"工具执行异常: {str(observation)}" - logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 动作 {i+1} 执行异常: {observation}") - - step["observations"].append(observation) - collected_info += f"\n{observation}\n" - logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 动作 {i+1} 执行结果: {observation}") + logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1} 执行异常: {observation}") + + observation_text = observation if isinstance(observation, str) else str(observation) + step["observations"].append(observation_text) + collected_info += f"\n{observation_text}\n" + if observation_text.strip(): + tool_builder = MessageBuilder() + tool_builder.set_role(RoleType.Tool) + tool_builder.add_text_content(observation_text) + tool_builder.add_tool_call(tool_call_item.call_id) + conversation_messages.append(tool_builder.build()) + # logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1} 执行结果: {observation_text}") thinking_steps.append(step) @@ -529,7 +831,8 @@ def _store_thinking_back( async def _process_single_question( question: str, chat_id: str, - context: str + context: str, + initial_info: str = "" ) -> Optional[str]: """处理单个问题的查询(包含缓存检查逻辑) @@ -537,6 +840,7 @@ async def _process_single_question( question: 要查询的问题 chat_id: 聊天ID context: 上下文信息 + initial_info: 初始信息(如概念检索结果),将传递给ReAct Agent Returns: Optional[str]: 如果找到答案,返回格式化的结果字符串,否则返回None @@ -582,7 +886,8 @@ async def _process_single_question( question=question, chat_id=chat_id, max_iterations=5, - timeout=120.0 + timeout=120.0, + initial_info=initial_info ) # 存储到数据库(超时时不存储) @@ -661,19 +966,37 @@ async def build_memory_retrieval_prompt( logger.error(f"LLM生成问题失败: {response}") return "" - # 解析问题列表 - questions = _parse_questions_json(response) + # 解析概念列表和问题列表 + concepts, questions = _parse_questions_json(response) + logger.info(f"解析到 {len(concepts)} 个概念: {concepts}") + logger.info(f"解析到 {len(questions)} 个问题: {questions}") + + # 对概念进行jargon检索,作为初始信息 + initial_info = "" + if concepts: + logger.info(f"开始对 {len(concepts)} 个概念进行jargon检索") + initial_info = await _retrieve_concepts_with_jargon(concepts, chat_id) + if initial_info: + logger.info(f"概念检索完成,结果: {initial_info[:200]}...") + else: + logger.info("概念检索未找到任何结果") # 获取缓存的记忆(与question时使用相同的时间窗口和数量限制) cached_memories = _get_cached_memories(chat_id, time_window_seconds=300.0) if not questions: logger.debug("模型认为不需要检索记忆或解析失败") - # 即使没有当次查询,也返回缓存的记忆 + # 即使没有当次查询,也返回缓存的记忆和概念检索结果 + all_results = [] + if initial_info: + all_results.append(initial_info.strip()) if cached_memories: - retrieved_memory = "\n\n".join(cached_memories) + all_results.extend(cached_memories) + + if all_results: + retrieved_memory = "\n\n".join(all_results) end_time = time.time() - logger.info(f"无当次查询,返回缓存记忆,耗时: {(end_time - start_time):.3f}秒,包含 {len(cached_memories)} 条缓存记忆") + logger.info(f"无当次查询,返回缓存记忆和概念检索结果,耗时: {(end_time - start_time):.3f}秒") return f"你回忆起了以下信息:\n{retrieved_memory}\n如果与回复内容相关,可以参考这些回忆的信息。\n" else: return "" @@ -683,12 +1006,13 @@ async def build_memory_retrieval_prompt( # 第二步:并行处理所有问题(固定使用5次迭代/120秒超时) logger.info(f"问题数量: {len(questions)},固定设置最大迭代次数: 5,超时时间: 120秒") - # 并行处理所有问题 + # 并行处理所有问题,将概念检索结果作为初始信息传递 question_tasks = [ _process_single_question( question=question, chat_id=chat_id, - context=message + context=message, + initial_info=initial_info ) for question in questions ] @@ -733,14 +1057,14 @@ async def build_memory_retrieval_prompt( return "" -def _parse_questions_json(response: str) -> List[str]: - """解析问题JSON +def _parse_questions_json(response: str) -> Tuple[List[str], List[str]]: + """解析问题JSON,返回概念列表和问题列表 Args: response: LLM返回的响应 Returns: - List[str]: 问题列表 + Tuple[List[str], List[str]]: (概念列表, 问题列表) """ try: # 尝试提取JSON(可能包含在```json代码块中) @@ -757,17 +1081,28 @@ def _parse_questions_json(response: str) -> List[str]: repaired_json = repair_json(json_str) # 解析JSON - questions = json.loads(repaired_json) + parsed = json.loads(repaired_json) - if not isinstance(questions, list): - logger.warning(f"解析的JSON不是数组格式: {questions}") - return [] + # 只支持新格式:包含concepts和questions的对象 + if not isinstance(parsed, dict): + logger.warning(f"解析的JSON不是对象格式: {parsed}") + return [], [] + + concepts_raw = parsed.get("concepts", []) + questions_raw = parsed.get("questions", []) + + # 确保是列表 + if not isinstance(concepts_raw, list): + concepts_raw = [] + if not isinstance(questions_raw, list): + questions_raw = [] # 确保所有元素都是字符串 - questions = [q for q in questions if isinstance(q, str) and q.strip()] + concepts = [c for c in concepts_raw if isinstance(c, str) and c.strip()] + questions = [q for q in questions_raw if isinstance(q, str) and q.strip()] - return questions + return concepts, questions except Exception as e: logger.error(f"解析问题JSON失败: {e}, 响应内容: {response[:200]}...") - return [] + return [], [] diff --git a/src/memory_system/memory_utils.py b/src/memory_system/memory_utils.py index 59c4a143..af16456b 100644 --- a/src/memory_system/memory_utils.py +++ b/src/memory_system/memory_utils.py @@ -5,6 +5,8 @@ """ import json import re +from datetime import datetime +from typing import Tuple from difflib import SequenceMatcher from src.common.logger import get_logger @@ -106,3 +108,60 @@ def preprocess_text(text: str) -> str: logger.error(f"预处理文本时出错: {e}") return text + + +def parse_datetime_to_timestamp(value: str) -> float: + """ + 接受多种常见格式并转换为时间戳(秒) + 支持示例: + - 2025-09-29 + - 2025-09-29 00:00:00 + - 2025/09/29 00:00 + - 2025-09-29T00:00:00 + """ + value = value.strip() + fmts = [ + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y/%m/%d %H:%M:%S", + "%Y/%m/%d %H:%M", + "%Y-%m-%d", + "%Y/%m/%d", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%dT%H:%M", + ] + last_err = None + for fmt in fmts: + try: + dt = datetime.strptime(value, fmt) + return dt.timestamp() + except Exception as e: + last_err = e + raise ValueError(f"无法解析时间: {value} ({last_err})") + + +def parse_time_range(time_range: str) -> Tuple[float, float]: + """ + 解析时间范围字符串,返回开始和结束时间戳 + + Args: + time_range: 时间范围字符串,格式:"YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS" + + Returns: + Tuple[float, float]: (开始时间戳, 结束时间戳) + """ + if " - " not in time_range: + raise ValueError(f"时间范围格式错误,应为 '开始时间 - 结束时间': {time_range}") + + parts = time_range.split(" - ", 1) + if len(parts) != 2: + raise ValueError(f"时间范围格式错误: {time_range}") + + start_str = parts[0].strip() + end_str = parts[1].strip() + + start_timestamp = parse_datetime_to_timestamp(start_str) + end_timestamp = parse_datetime_to_timestamp(end_str) + + return start_timestamp, end_timestamp + diff --git a/src/memory_system/retrieval_tools/README.md b/src/memory_system/retrieval_tools/README.md index 427e4cc9..50c36dbe 100644 --- a/src/memory_system/retrieval_tools/README.md +++ b/src/memory_system/retrieval_tools/README.md @@ -11,6 +11,7 @@ retrieval_tools/ ├── tool_utils.py # 工具函数库(共用函数) ├── query_jargon.py # 查询jargon工具 ├── query_chat_history.py # 查询聊天历史工具 +├── query_lpmm_knowledge.py # 查询LPMM知识库工具 └── README.md # 本文件 ``` @@ -145,6 +146,11 @@ result = await tool.execute(time_range="2025-01-15 10:00:00 - 2025-01-15 20:00:0 - `time_point` (可选) - 时间点,格式:YYYY-MM-DD HH:MM:SS,用于查询某个时间点附近发生了什么(与time_range二选一) - `time_range` (可选) - 时间范围,格式:'YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS'(与time_point二选一) +### query_lpmm_knowledge +从LPMM知识库中检索与关键词相关的知识内容 +- 参数: + - `query` (必填) - 查询的关键词或问题描述 + ## 注意事项 - 所有工具函数必须是异步函数(`async def`) diff --git a/src/memory_system/retrieval_tools/__init__.py b/src/memory_system/retrieval_tools/__init__.py index 2bb5623c..0651db48 100644 --- a/src/memory_system/retrieval_tools/__init__.py +++ b/src/memory_system/retrieval_tools/__init__.py @@ -13,13 +13,17 @@ from .tool_registry import ( # 导入所有工具的注册函数 from .query_jargon import register_tool as register_query_jargon from .query_chat_history import register_tool as register_query_chat_history - +from .query_lpmm_knowledge import register_tool as register_lpmm_knowledge +from src.config.config import global_config def init_all_tools(): """初始化并注册所有记忆检索工具""" register_query_jargon() register_query_chat_history() + if global_config.lpmm_knowledge.lpmm_mode == "agent": + register_lpmm_knowledge() + __all__ = [ "MemoryRetrievalTool", diff --git a/src/memory_system/retrieval_tools/query_chat_history.py b/src/memory_system/retrieval_tools/query_chat_history.py index f95ee266..85776250 100644 --- a/src/memory_system/retrieval_tools/query_chat_history.py +++ b/src/memory_system/retrieval_tools/query_chat_history.py @@ -9,7 +9,7 @@ from src.common.logger import get_logger from src.common.database.database_model import ChatHistory from src.chat.utils.utils import parse_keywords_string from .tool_registry import register_memory_retrieval_tool -from .tool_utils import parse_datetime_to_timestamp, parse_time_range +from ..memory_utils import parse_datetime_to_timestamp, parse_time_range logger = get_logger("memory_retrieval_tools") @@ -17,7 +17,8 @@ logger = get_logger("memory_retrieval_tools") async def query_chat_history( chat_id: str, keyword: Optional[str] = None, - time_range: Optional[str] = None + time_range: Optional[str] = None, + fuzzy: bool = True ) -> str: """根据时间或关键词在chat_history表中查询聊天记录概述 @@ -27,6 +28,9 @@ async def query_chat_history( time_range: 时间范围或时间点,格式: - 时间范围:"YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS" - 时间点:"YYYY-MM-DD HH:MM:SS"(查询包含该时间点的记录) + fuzzy: 是否使用模糊匹配模式(默认True) + - True: 模糊匹配,只要包含任意一个关键词即匹配(OR关系) + - False: 全匹配,必须包含所有关键词才匹配(AND关系) Returns: str: 查询结果 @@ -62,9 +66,6 @@ async def query_chat_history( # 执行查询 records = list(query.order_by(ChatHistory.start_time.desc()).limit(50)) - if not records: - return "未找到相关聊天记录概述" - # 如果有关键词,进一步过滤 if keyword: # 解析多个关键词(支持空格、逗号等分隔符) @@ -96,24 +97,48 @@ async def query_chat_history( except (json.JSONDecodeError, TypeError, ValueError): pass - # 检查是否包含任意一个关键词(OR关系) + # 根据匹配模式检查关键词 matched = False - for kw in keywords_lower: - if (kw in theme or - kw in summary or - kw in original_text or - any(kw in k for k in record_keywords_list)): - matched = True - break + if fuzzy: + # 模糊匹配:只要包含任意一个关键词即匹配(OR关系) + for kw in keywords_lower: + if (kw in theme or + kw in summary or + kw in original_text or + any(kw in k for k in record_keywords_list)): + matched = True + break + else: + # 全匹配:必须包含所有关键词才匹配(AND关系) + matched = True + for kw in keywords_lower: + kw_matched = (kw in theme or + kw in summary or + kw in original_text or + any(kw in k for k in record_keywords_list)) + if not kw_matched: + matched = False + break if matched: filtered_records.append(record) if not filtered_records: keywords_str = "、".join(keywords_list) - return f"未找到包含关键词'{keywords_str}'的聊天记录概述" + match_mode = "包含任意一个关键词" if fuzzy else "包含所有关键词" + if time_range: + return f"未找到{match_mode}'{keywords_str}'且在指定时间范围内的聊天记录概述" + else: + return f"未找到{match_mode}'{keywords_str}'的聊天记录概述" records = filtered_records + + # 如果没有记录(可能是时间范围查询但没有匹配的记录) + if not records: + if time_range: + return "未找到指定时间范围内的聊天记录概述" + else: + return "未找到相关聊天记录概述" # 对即将返回的记录增加使用计数 records_to_use = records[:3] @@ -168,12 +193,12 @@ def register_tool(): """注册工具""" register_memory_retrieval_tool( name="query_chat_history", - description="根据时间或关键词在chat_history表的聊天记录概述库中查询。可以查询某个时间点发生了什么、某个时间范围内的事件,或根据关键词搜索消息概述", + description="根据时间或关键词在chat_history表的聊天记录概述库中查询。可以查询某个时间点发生了什么、某个时间范围内的事件,或根据关键词搜索消息概述。支持两种匹配模式:模糊匹配(默认,只要包含任意一个关键词即匹配)和全匹配(必须包含所有关键词才匹配)", parameters=[ { "name": "keyword", "type": "string", - "description": "关键词(可选,支持多个关键词,可用空格、逗号、斜杠等分隔,如:'麦麦 百度网盘' 或 '麦麦,百度网盘'。用于在主题、关键词、概括、原文中搜索,只要包含任意一个关键词即匹配)", + "description": "关键词(可选,支持多个关键词,可用空格、逗号、斜杠等分隔,如:'麦麦 百度网盘' 或 '麦麦,百度网盘'。用于在主题、关键词、概括、原文中搜索)", "required": False }, { @@ -181,6 +206,12 @@ def register_tool(): "type": "string", "description": "时间范围或时间点(可选)。格式:'YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS'(时间范围,查询与时间范围有交集的记录)或 'YYYY-MM-DD HH:MM:SS'(时间点,查询包含该时间点的记录)", "required": False + }, + { + "name": "fuzzy", + "type": "boolean", + "description": "是否使用模糊匹配模式(默认True)。True表示模糊匹配(只要包含任意一个关键词即匹配,OR关系),False表示全匹配(必须包含所有关键词才匹配,AND关系)", + "required": False } ], execute_func=query_chat_history diff --git a/src/memory_system/retrieval_tools/query_lpmm_knowledge.py b/src/memory_system/retrieval_tools/query_lpmm_knowledge.py new file mode 100644 index 00000000..aa9268db --- /dev/null +++ b/src/memory_system/retrieval_tools/query_lpmm_knowledge.py @@ -0,0 +1,65 @@ +""" +通过LPMM知识库查询信息 - 工具实现 +""" + +from src.common.logger import get_logger +from src.config.config import global_config +from src.chat.knowledge import get_qa_manager +from .tool_registry import register_memory_retrieval_tool + +logger = get_logger("memory_retrieval_tools") + + +async def query_lpmm_knowledge(query: str) -> str: + """在LPMM知识库中查询相关信息 + + Args: + query: 查询关键词 + + Returns: + str: 查询结果 + """ + try: + content = str(query).strip() + if not content: + return "查询关键词为空" + + if not global_config.lpmm_knowledge.enable: + logger.debug("LPMM知识库未启用") + return "LPMM知识库未启用" + + qa_manager = get_qa_manager() + if qa_manager is None: + logger.debug("LPMM知识库未初始化,跳过查询") + return "LPMM知识库未初始化" + + knowledge_info = await qa_manager.get_knowledge(content) + logger.debug(f"LPMM知识库查询结果: {knowledge_info}") + + if knowledge_info: + return f"你从LPMM知识库中找到以下信息:\n{knowledge_info}" + + return f"在LPMM知识库中未找到与“{content}”相关的信息" + + except Exception as e: + logger.error(f"LPMM知识库查询失败: {e}") + return f"LPMM知识库查询失败:{str(e)}" + + +def register_tool(): + """注册LPMM知识库查询工具""" + register_memory_retrieval_tool( + name="lpmm_search_knowledge", + description="从LPMM知识库中搜索相关信息,适用于需要知识支持的场景。", + parameters=[ + { + "name": "query", + "type": "string", + "description": "需要查询的关键词或问题", + "required": True, + } + ], + execute_func=query_lpmm_knowledge, + ) + + diff --git a/src/memory_system/retrieval_tools/tool_registry.py b/src/memory_system/retrieval_tools/tool_registry.py index 920a1bb6..0bb927f9 100644 --- a/src/memory_system/retrieval_tools/tool_registry.py +++ b/src/memory_system/retrieval_tools/tool_registry.py @@ -3,8 +3,9 @@ 提供统一的工具注册和管理接口 """ -from typing import List, Dict, Any, Optional, Callable, Awaitable +from typing import List, Dict, Any, Optional, Callable, Awaitable, Tuple from src.common.logger import get_logger +from src.llm_models.payload_content.tool_option import ToolParamType logger = get_logger("memory_retrieval_tools") @@ -50,6 +51,48 @@ class MemoryRetrievalTool: async def execute(self, **kwargs) -> str: """执行工具""" return await self.execute_func(**kwargs) + + def get_tool_definition(self) -> Dict[str, Any]: + """获取工具定义,用于LLM function calling + + Returns: + Dict[str, Any]: 工具定义字典,格式与BaseTool一致 + 格式: {"name": str, "description": str, "parameters": List[Tuple]} + """ + # 转换参数格式为元组列表,格式与BaseTool一致 + # 格式: [("param_name", ToolParamType, "description", required, enum_values)] + param_tuples = [] + + for param in self.parameters: + param_name = param.get("name", "") + param_type_str = param.get("type", "string").lower() + param_desc = param.get("description", "") + is_required = param.get("required", False) + enum_values = param.get("enum", None) + + # 转换类型字符串到ToolParamType + type_mapping = { + "string": ToolParamType.STRING, + "integer": ToolParamType.INTEGER, + "int": ToolParamType.INTEGER, + "float": ToolParamType.FLOAT, + "boolean": ToolParamType.BOOLEAN, + "bool": ToolParamType.BOOLEAN, + } + param_type = type_mapping.get(param_type_str, ToolParamType.STRING) + + # 构建参数元组 + param_tuple = (param_name, param_type, param_desc, is_required, enum_values) + param_tuples.append(param_tuple) + + # 构建工具定义,格式与BaseTool.get_tool_definition()一致 + tool_def = { + "name": self.name, + "description": self.description, + "parameters": param_tuples + } + + return tool_def class MemoryRetrievalToolRegistry: @@ -60,6 +103,9 @@ class MemoryRetrievalToolRegistry: def register_tool(self, tool: MemoryRetrievalTool) -> None: """注册工具""" + if tool.name in self.tools: + logger.debug(f"记忆检索工具 {tool.name} 已存在,跳过重复注册") + return self.tools[tool.name] = tool logger.info(f"注册记忆检索工具: {tool.name}") @@ -79,11 +125,19 @@ class MemoryRetrievalToolRegistry: return "\n".join(descriptions) def get_action_types_list(self) -> str: - """获取所有动作类型的列表,用于prompt""" + """获取所有动作类型的列表,用于prompt(已废弃,保留用于兼容)""" action_types = [tool.name for tool in self.tools.values()] action_types.append("final_answer") action_types.append("no_answer") return " 或 ".join([f'"{at}"' for at in action_types]) + + def get_tool_definitions(self) -> List[Dict[str, Any]]: + """获取所有工具的定义列表,用于LLM function calling + + Returns: + List[Dict[str, Any]]: 工具定义列表,每个元素是一个工具定义字典 + """ + return [tool.get_tool_definition() for tool in self.tools.values()] # 全局工具注册器实例 diff --git a/src/memory_system/retrieval_tools/tool_utils.py b/src/memory_system/retrieval_tools/tool_utils.py deleted file mode 100644 index d0ca334f..00000000 --- a/src/memory_system/retrieval_tools/tool_utils.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -工具函数库 -包含所有工具共用的工具函数 -""" - -from datetime import datetime -from typing import Tuple - - -def parse_datetime_to_timestamp(value: str) -> float: - """ - 接受多种常见格式并转换为时间戳(秒) - 支持示例: - - 2025-09-29 - - 2025-09-29 00:00:00 - - 2025/09/29 00:00 - - 2025-09-29T00:00:00 - """ - value = value.strip() - fmts = [ - "%Y-%m-%d %H:%M:%S", - "%Y-%m-%d %H:%M", - "%Y/%m/%d %H:%M:%S", - "%Y/%m/%d %H:%M", - "%Y-%m-%d", - "%Y/%m/%d", - "%Y-%m-%dT%H:%M:%S", - "%Y-%m-%dT%H:%M", - ] - last_err = None - for fmt in fmts: - try: - dt = datetime.strptime(value, fmt) - return dt.timestamp() - except Exception as e: - last_err = e - raise ValueError(f"无法解析时间: {value} ({last_err})") - - -def parse_time_range(time_range: str) -> Tuple[float, float]: - """ - 解析时间范围字符串,返回开始和结束时间戳 - - Args: - time_range: 时间范围字符串,格式:"YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS" - - Returns: - Tuple[float, float]: (开始时间戳, 结束时间戳) - """ - if " - " not in time_range: - raise ValueError(f"时间范围格式错误,应为 '开始时间 - 结束时间': {time_range}") - - parts = time_range.split(" - ", 1) - if len(parts) != 2: - raise ValueError(f"时间范围格式错误: {time_range}") - - start_str = parts[0].strip() - end_str = parts[1].strip() - - start_timestamp = parse_datetime_to_timestamp(start_str) - end_timestamp = parse_datetime_to_timestamp(end_str) - - return start_timestamp, end_timestamp - diff --git a/src/plugin_system/apis/llm_api.py b/src/plugin_system/apis/llm_api.py index e454c486..a4e2dd30 100644 --- a/src/plugin_system/apis/llm_api.py +++ b/src/plugin_system/apis/llm_api.py @@ -7,9 +7,11 @@ success, response, reasoning, model_name = await llm_api.generate_with_model(prompt, model_config) """ -from typing import Tuple, Dict, List, Any, Optional +from typing import Tuple, Dict, List, Any, Optional, Callable from src.common.logger import get_logger from src.llm_models.payload_content.tool_option import ToolCall +from src.llm_models.payload_content.message import Message +from src.llm_models.model_client.base_client import BaseClient from src.llm_models.utils_model import LLMRequest from src.config.config import model_config from src.config.api_ada_configs import TaskConfig @@ -120,3 +122,44 @@ async def generate_with_model_with_tools( error_msg = f"生成内容时出错: {str(e)}" logger.error(f"[LLMAPI] {error_msg}") return False, error_msg, "", "", None + + +async def generate_with_model_with_tools_by_message_factory( + message_factory: Callable[[BaseClient], List[Message]], + model_config: TaskConfig, + tool_options: List[Dict[str, Any]] | None = None, + request_type: str = "plugin.generate", + temperature: Optional[float] = None, + max_tokens: Optional[int] = None, +) -> Tuple[bool, str, str, str, List[ToolCall] | None]: + """使用指定模型和工具生成内容(通过消息工厂构建消息列表) + + Args: + message_factory: 消息工厂函数 + model_config: 模型配置 + tool_options: 工具选项列表 + request_type: 请求类型标识 + temperature: 温度参数 + max_tokens: 最大token数 + + Returns: + Tuple[bool, str, str, str, List[ToolCall] | None]: (是否成功, 生成的内容, 推理过程, 模型名称, 工具调用列表) + """ + try: + model_name_list = model_config.model_list + logger.info(f"[LLMAPI] 使用模型集合 {model_name_list} 生成内容(消息工厂)") + + llm_request = LLMRequest(model_set=model_config, request_type=request_type) + + response, (reasoning_content, model_name, tool_call) = await llm_request.generate_response_with_message_async( + message_factory=message_factory, + tools=tool_options, + temperature=temperature, + max_tokens=max_tokens, + ) + return True, response, reasoning_content, model_name, tool_call + + except Exception as e: + error_msg = f"生成内容时出错: {str(e)}" + logger.error(f"[LLMAPI] {error_msg}") + return False, error_msg, "", "", None diff --git a/src/plugin_system/core/tool_use.py b/src/plugin_system/core/tool_use.py index aad7cad6..a6197137 100644 --- a/src/plugin_system/core/tool_use.py +++ b/src/plugin_system/core/tool_use.py @@ -93,6 +93,14 @@ class ToolExecutor: # 获取可用工具 tools = self._get_tool_definitions() + # 如果没有可用工具,直接返回空内容 + if not tools: + logger.info(f"{self.log_prefix}没有可用工具,直接返回空内容") + if return_details: + return [], [], "" + else: + return [], [], "" + # print(f"tools: {tools}") # 获取当前时间 @@ -116,6 +124,7 @@ class ToolExecutor: response, (reasoning_content, model_name, tool_calls) = await self.llm_model.generate_response_async( prompt=prompt, tools=tools, raise_when_empty=False ) + # 执行工具调用 tool_results, used_tools = await self.execute_tool_calls(tool_calls)