feat:lpmm可选接入memory agent,将memory agent改为标准工具格式,修改llm_utils以兼容

pull/1359/head
SengokuCola 2025-11-13 18:55:37 +08:00
parent e52a81e90b
commit f2819be5e9
18 changed files with 868 additions and 432 deletions

View File

@ -19,7 +19,6 @@ from src.chat.planner_actions.action_manager import ActionManager
from src.chat.heart_flow.hfc_utils import CycleDetail
from src.express.expression_learner import expression_learner_manager
from src.chat.frequency_control.frequency_control import frequency_control_manager
from src.memory_system.curious import check_and_make_question
from src.jargon import extract_and_store_jargon
from src.person_info.person_info import Person
from src.plugin_system.base.component_types import EventType, ActionInfo

View File

@ -30,6 +30,8 @@ DATA_PATH = os.path.join(ROOT_PATH, "data")
qa_manager = None
inspire_manager = None
def get_qa_manager():
return qa_manager
def lpmm_start_up(): # sourcery skip: extract-duplicate-method
# 检查LPMM知识库是否启用

View File

@ -1085,6 +1085,10 @@ class DefaultReplyer:
if not global_config.lpmm_knowledge.enable:
logger.debug("LPMM知识库未启用跳过获取知识库内容")
return ""
if global_config.lpmm_knowledge.lpmm_mode == "agent":
return ""
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
bot_name = global_config.bot.nickname
@ -1102,6 +1106,10 @@ class DefaultReplyer:
model_config=model_config.model_task_config.tool_use,
tool_options=[SearchKnowledgeFromLPMMTool.get_tool_definition()],
)
# logger.info(f"工具调用提示词: {prompt}")
# logger.info(f"工具调用: {tool_calls}")
if tool_calls:
result = await self.tool_executor.execute_tool_call(tool_calls[0], SearchKnowledgeFromLPMMTool())
end_time = time.time()
@ -1109,7 +1117,7 @@ class DefaultReplyer:
logger.debug("从LPMM知识库获取知识失败返回空知识...")
return ""
found_knowledge_from_lpmm = result.get("content", "")
logger.debug(
logger.info(
f"从LPMM知识库获取知识相关信息{found_knowledge_from_lpmm[:100]}...,信息长度: {len(found_knowledge_from_lpmm)}"
)
related_info += found_knowledge_from_lpmm

View File

@ -1030,6 +1030,10 @@ class PrivateReplyer:
if not global_config.lpmm_knowledge.enable:
logger.debug("LPMM知识库未启用跳过获取知识库内容")
return ""
if global_config.lpmm_knowledge.lpmm_mode == "agent":
return ""
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
bot_name = global_config.bot.nickname

View File

@ -77,6 +77,23 @@ def _convert_messages(messages: list[Message]) -> list[ChatCompletionMessagePara
"content": content,
}
if message.role == RoleType.Assistant and getattr(message, "tool_calls", None):
tool_calls_payload: list[dict[str, Any]] = []
for call in message.tool_calls or []:
tool_calls_payload.append(
{
"id": call.call_id,
"type": "function",
"function": {
"name": call.func_name,
"arguments": json.dumps(call.args or {}, ensure_ascii=False),
},
}
)
ret["tool_calls"] = tool_calls_payload
if ret["content"] == []:
ret["content"] = ""
# 添加工具调用ID
if message.role == RoleType.Tool:
if not message.tool_call_id:

View File

@ -1,4 +1,7 @@
from enum import Enum
from typing import List, Optional
from .tool_option import ToolCall
# 设计这系列类的目的是为未来可能的扩展做准备
@ -20,6 +23,7 @@ class Message:
role: RoleType,
content: str | list[tuple[str, str] | str],
tool_call_id: str | None = None,
tool_calls: Optional[List[ToolCall]] = None,
):
"""
初始化消息对象
@ -28,6 +32,13 @@ class Message:
self.role: RoleType = role
self.content: str | list[tuple[str, str] | str] = content
self.tool_call_id: str | None = tool_call_id
self.tool_calls: Optional[List[ToolCall]] = tool_calls
def __str__(self) -> str:
return (
f"Role: {self.role}, Content: {self.content}, "
f"Tool Call ID: {self.tool_call_id}, Tool Calls: {self.tool_calls}"
)
class MessageBuilder:
@ -35,6 +46,7 @@ class MessageBuilder:
self.__role: RoleType = RoleType.User
self.__content: list[tuple[str, str] | str] = []
self.__tool_call_id: str | None = None
self.__tool_calls: Optional[List[ToolCall]] = None
def set_role(self, role: RoleType = RoleType.User) -> "MessageBuilder":
"""
@ -86,12 +98,27 @@ class MessageBuilder:
self.__tool_call_id = tool_call_id
return self
def set_tool_calls(self, tool_calls: List[ToolCall]) -> "MessageBuilder":
"""
设置助手消息的工具调用列表
:param tool_calls: 工具调用列表
:return: MessageBuilder对象
"""
if self.__role != RoleType.Assistant:
raise ValueError("仅当角色为Assistant时才能设置工具调用列表")
if not tool_calls:
raise ValueError("工具调用列表不能为空")
self.__tool_calls = tool_calls
return self
def build(self) -> Message:
"""
构建消息对象
:return: Message对象
"""
if len(self.__content) == 0:
if len(self.__content) == 0 and not (
self.__role == RoleType.Assistant and self.__tool_calls
):
raise ValueError("内容不能为空")
if self.__role == RoleType.Tool and self.__tool_call_id is None:
raise ValueError("Tool角色的工具调用ID不能为空")
@ -104,4 +131,5 @@ class MessageBuilder:
else self.__content
),
tool_call_id=self.__tool_call_id,
tool_calls=self.__tool_calls,
)

View File

@ -166,6 +166,57 @@ class LLMRequest:
time_cost=time.time() - start_time,
)
return content or "", (reasoning_content, model_info.name, tool_calls)
async def generate_response_with_message_async(
self,
message_factory: Callable[[BaseClient], List[Message]],
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
tools: Optional[List[Dict[str, Any]]] = None,
raise_when_empty: bool = True,
) -> Tuple[str, Tuple[str, str, Optional[List[ToolCall]]]]:
"""
异步生成响应
Args:
message_factory (Callable[[BaseClient], List[Message]]): 已构建好的消息工厂
temperature (float, optional): 温度参数
max_tokens (int, optional): 最大token数
tools (Optional[List[Dict[str, Any]]]): 工具列表
raise_when_empty (bool): 当响应为空时是否抛出异常
Returns:
(Tuple[str, str, str, Optional[List[ToolCall]]]): 响应内容推理内容模型名称工具调用列表
"""
start_time = time.time()
tool_built = self._build_tool_options(tools)
response, model_info = await self._execute_request(
request_type=RequestType.RESPONSE,
message_factory=message_factory,
temperature=temperature,
max_tokens=max_tokens,
tool_options=tool_built,
)
logger.debug(f"LLM请求总耗时: {time.time() - start_time}")
logger.debug(f"LLM生成内容: {response}")
content = response.content
reasoning_content = response.reasoning_content or ""
tool_calls = response.tool_calls
if not reasoning_content and content:
content, extracted_reasoning = self._extract_reasoning(content)
reasoning_content = extracted_reasoning
if usage := response.usage:
llm_usage_recorder.record_usage_to_database(
model_info=model_info,
model_usage=usage,
user_id="system",
request_type=self.request_type,
endpoint="/chat/completions",
time_cost=time.time() - start_time,
)
return content or "", (reasoning_content, model_info.name, tool_calls)
async def get_embedding(self, embedding_input: str) -> Tuple[List[float], str]:
"""

View File

@ -1,215 +0,0 @@
import time
from typing import List, Optional
from src.common.logger import get_logger
from src.chat.utils.chat_message_builder import (
get_raw_msg_by_timestamp_with_chat_inclusive,
build_readable_messages_with_id,
)
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config, global_config
from src.memory_system.memory_utils import parse_md_json
logger = get_logger("curious")
class CuriousDetector:
"""
好奇心检测器 - 检测聊天记录中的矛盾冲突或需要提问的内容
"""
def __init__(self, chat_id: str):
self.chat_id = chat_id
self.llm_request = LLMRequest(
model_set=model_config.model_task_config.utils,
request_type="curious_detector",
)
# 触发控制
self.last_detection_time: float = time.time()
self.min_interval_seconds: float = 60.0
self.min_messages: int = 20
def should_trigger(self) -> bool:
if time.time() - self.last_detection_time < self.min_interval_seconds:
return False
recent_messages = get_raw_msg_by_timestamp_with_chat_inclusive(
chat_id=self.chat_id,
timestamp_start=self.last_detection_time,
timestamp_end=time.time(),
)
return bool(recent_messages and len(recent_messages) >= self.min_messages)
async def detect_questions(self, recent_messages: List) -> Optional[str]:
"""
检测最近消息中是否有需要提问的内容
Args:
recent_messages: 最近的消息列表
Returns:
Optional[str]: 如果检测到需要提问的内容返回问题文本否则返回None
"""
try:
if not recent_messages or len(recent_messages) < 2:
return None
# 构建聊天内容
chat_content_block, _ = build_readable_messages_with_id(
messages=recent_messages,
timestamp_mode="normal_no_YMD",
read_mark=0.0,
truncate=True,
show_actions=True,
)
# 问题跟踪功能已移除,不再检查已有问题
# 构建检测提示词
prompt = f"""你是一个严谨的聊天内容分析器。请分析以下聊天记录,检测是否存在需要提问的内容。
检测条件
1. 聊天中存在逻辑矛盾或冲突的信息
2. 有人反对或否定之前提出的信息
3. 存在观点不一致的情况
4. 有模糊不清或需要澄清的概念
5. 有人提出了质疑或反驳
**重要限制**
- 忽略涉及违法暴力色情政治等敏感话题的内容
- 不要对敏感话题提问
- 只有在确实存在矛盾或冲突时才提问
- 如果聊天内容正常没有矛盾请输出NO
**聊天记录**
{chat_content_block}
请分析上述聊天记录如果发现需要提问的内容请用JSON格式输出
```json
{{
"question": "具体的问题描述,要完整描述涉及的概念和问题",
"reason": "为什么需要提问这个问题的理由"
}}
```
如果没有需要提问的内容请只输出NO"""
if global_config.debug.show_prompt:
logger.info(f"好奇心检测提示词: {prompt}")
else:
logger.debug("已发送好奇心检测提示词")
result_text, _ = await self.llm_request.generate_response_async(prompt, temperature=0.3)
logger.info(f"好奇心检测提示词: {prompt}")
logger.info(f"好奇心检测结果: {result_text}")
if not result_text:
return None
result_text = result_text.strip()
# 检查是否输出NO
if result_text.upper() == "NO":
logger.debug("未检测到需要提问的内容")
return None
# 尝试解析JSON
try:
questions, reasoning = parse_md_json(result_text)
if questions and len(questions) > 0:
question_data = questions[0]
question = question_data.get("question", "")
reason = question_data.get("reason", "")
if question and question.strip():
logger.info(f"检测到需要提问的内容: {question}")
logger.info(f"提问理由: {reason}")
return question
except Exception as e:
logger.warning(f"解析问题JSON失败: {e}")
logger.debug(f"原始响应: {result_text}")
return None
except Exception as e:
logger.error(f"好奇心检测失败: {e}")
return None
async def make_question_from_detection(self, question: str, context: str = "") -> bool:
"""
将检测到的问题记录已移除冲突追踪器功能
Args:
question: 检测到的问题
context: 问题上下文
Returns:
bool: 是否成功记录
"""
try:
if not question or not question.strip():
return False
# 冲突追踪器功能已移除
logger.info(f"检测到问题(冲突追踪器已移除): {question}")
return True
except Exception as e:
logger.error(f"记录问题失败: {e}")
return False
class CuriousManager:
def __init__(self) -> None:
self._detectors: dict[str, CuriousDetector] = {}
def get_detector(self, chat_id: str) -> CuriousDetector:
if chat_id not in self._detectors:
self._detectors[chat_id] = CuriousDetector(chat_id)
return self._detectors[chat_id]
curious_manager = CuriousManager()
async def check_and_make_question(chat_id: str) -> bool:
"""
检查聊天记录并生成问题如果检测到需要提问的内容
Args:
chat_id: 聊天ID
recent_messages: 最近的消息列表
Returns:
bool: 是否检测到并记录了问题
"""
try:
detector = curious_manager.get_detector(chat_id)
if not detector.should_trigger():
return False
# 拉取窗口内消息
recent_messages = get_raw_msg_by_timestamp_with_chat_inclusive(
chat_id=chat_id,
timestamp_start=detector.last_detection_time,
timestamp_end=time.time(),
limit=80,
)
if not recent_messages:
return False
# 检测是否需要提问
question = await detector.detect_questions(recent_messages)
if question:
# 记录问题
success = await detector.make_question_from_detection(question)
if success:
logger.info(f"成功检测并记录问题: {question}")
detector.last_detection_time = time.time()
return True
return False
except Exception as e:
logger.error(f"检查并生成问题失败: {e}")
return False

View File

@ -10,11 +10,11 @@ from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.plugin_system.apis import llm_api
from src.common.database.database_model import ThinkingBack
from json_repair import repair_json
from src.memory_system.retrieval_tools import get_tool_registry, init_all_tools
from src.memory_system.retrieval_tools import get_tool_registry, init_all_tools, register_memory_retrieval_tool
from src.llm_models.payload_content.message import MessageBuilder, RoleType, Message
logger = get_logger("memory_retrieval")
def init_memory_retrieval_prompt():
"""初始化记忆检索相关的 prompt 模板和工具"""
# 首先注册所有工具
@ -35,73 +35,145 @@ def init_memory_retrieval_prompt():
2. 是否有需要回忆的内容比如"之前说过""上次""以前"
3. 是否有需要查找历史信息的问题
4. 是否有问题可以搜集信息帮助你聊天
5. 对话中是否包含黑话俚语缩写等可能需要查询的概念
重要提示
- **每次只能提出一个问题**选择最需要查询的关键问题
- 如果"最近已查询的问题和结果"中已经包含了类似的问题请避免重复生成相同或相似的问题
- 如果之前已经查询过某个问题但未找到答案可以尝试用不同的方式提问或更具体的问题
- 如果之前已经查询过某个问题并找到了答案可以直接参考已有结果不需要重复查询
如果你认为需要从记忆中检索信息来回答请根据上下文提出一个或多个具体的问题
如果你认为需要从记忆中检索信息来回答
1. 先识别对话中可能需要查询的概念黑话/俚语/缩写/人名/专有名词等关键词
2. 然后根据上下文提出**一个**最关键的问题来帮助你回复目标消息
问题格式示例
- "xxx在前几天干了什么"
- "xxx是什么"
- "xxxx和xxx的关系是什么"
- "xxx在某个时间点发生了什么"
请输出JSON格式的问题数组如果不需要检索记忆则输出空数组[]
请输出JSON格式包含两个字段
- "concepts": 需要检索的概念列表字符串数组如果不需要检索概念则输出空数组[]
- "questions": 问题数组字符串数组如果不需要检索记忆则输出空数组[]如果需要检索则只输出包含一个问题的数组
输出格式示例
输出格式示例需要检索时
```json
[
"张三在前几天干了什么",
"自然选择是什么",
"李四和王五的关系是什么"
]
{{
"concepts": ["AAA", "BBB", "CCC"],
"questions": ["张三在前几天干了什么"]
}}
```
请只输出JSON数组不要输出其他内容
输出格式示例不需要检索时
```json
{{
"concepts": [],
"questions": []
}}
```
请只输出JSON对象不要输出其他内容
""",
name="memory_retrieval_question_prompt",
)
# 第二步ReAct Agent prompt工具描述会在运行时动态生成
# 第二步ReAct Agent prompt使用function calling要求先思考再行动
Prompt(
"""
你的名字是{bot_name}现在是{time_now}
你正在参与聊天你需要搜集信息来回答问题帮助你参与聊天
你需要通过思考(Think)行动(Action)观察(Observation)的循环来回答问题
**重要限制**
- 最大查询轮数5当前第{current_iteration}剩余{remaining_iterations}
- 必须尽快得出答案避免不必要的查询
- 思考要简短直接切入要点
- 必须严格使用检索到的信息回答问题不要编造信息
当前问题{question}
已收集的信息
{collected_info}
你可以使用以下工具来查询信息
{tools_description}
**执行步骤**
请按照以下格式输出你的思考过程
```json
{{
"thought": "你的思考过程,分析当前情况,决定下一步行动",
"actions": [
{{
"action_type": {action_types_list},
"action_params": {{参数名: 参数值}} null
}}
]
}}
```
**第一步思考Think**
在思考中分析
- 当前信息是否足够回答问题
- 如果足够在思考中直接给出答案格式为final_answer(answer="你的答案内容")
- 如果不够说明最需要查询什么并输出为纯文本说明
重要说明
- 你可以在一次迭代中执行多个查询将多个action放在actions数组中
- 如果只需要执行一个查询actions数组中只包含一个action即可
- 如果已经收集到足够的信息可以回答问题请设置actions为包含一个action_type为"final_answer"的数组并在action_params中提供答案例如{{"answer": "你的答案内容"}}除非明确找到答案否则不要设置为final_answer
- 如果经过多次查询后确认无法找到相关信息或答案请设置actions为包含一个action_type为"no_answer"的数组并在action_params中说明原因例如{{"reason": "无法找到的原因"}}
**第二步行动Action**
根据思考结果立即行动
- 如果思考中已给出final_answer 无需调用工具直接结束
- 如果信息不足 调用相应工具查询可并行调用多个工具
- 如果多次查询仍无结果 在思考中给出no_answer(reason="无法找到答案的原因")
请只输出JSON不要输出其他内容
**重要答案必须在思考中给出格式为 final_answer(answer="...") no_answer(reason="...")不要调用工具**
""",
name="memory_retrieval_react_prompt",
)
# 第二步ReAct Agent prompt使用function calling要求先思考再行动
Prompt(
"""
你的名字是{bot_name}现在是{time_now}
你正在参与聊天你需要搜集信息来回答问题帮助你参与聊天
你需要通过思考(Think)行动(Action)观察(Observation)的循环来回答问题
**重要限制**
- 最大查询轮数5当前第{current_iteration}剩余{remaining_iterations}
- 必须尽快得出答案避免不必要的查询
- 思考要简短直接切入要点
- 必须严格使用检索到的信息回答问题不要编造信息
当前问题{question}
**执行步骤**
**第一步思考Think**
在思考中分析
- 当前信息是否足够回答问题
- 如果足够在思考中直接给出答案格式为final_answer(answer="你的答案内容")
- 如果不够说明最需要查询什么并输出为纯文本说明
**第二步行动Action**
根据思考结果立即行动
- 如果思考中已给出final_answer 无需调用工具直接结束
- 如果信息不足 调用相应工具查询可并行调用多个工具
- 如果多次查询仍无结果 在思考中给出no_answer(reason="无法找到答案的原因")
**重要答案必须在思考中给出格式为 final_answer(answer="...") no_answer(reason="...")不要调用工具**
""",
name="memory_retrieval_react_prompt_head",
)
# 额外如果最后一轮迭代ReAct Agent prompt使用function calling要求先思考再行动
Prompt(
"""
你的名字是{bot_name}现在是{time_now}
你正在参与聊天你需要搜集信息来回答问题帮助你参与聊天
**重要限制**
- 你已经经过几轮查询尝试了信息搜集现在你需要总结信息选择回答问题或判断问题无法回答
- 思考要简短直接切入要点
- 必须严格使用检索到的信息回答问题不要编造信息
当前问题{question}
已收集的信息
{collected_info}
**执行步骤**
分析
- 当前信息是否足够回答问题
- 如果足够在思考中直接给出答案格式为final_answer(answer="你的答案内容")
- 如果不够在思考中给出no_answer(reason="无法找到答案的原因")
**重要答案必须给出格式为 final_answer(answer="...") no_answer(reason="...")**
""",
name="memory_retrieval_react_final_prompt",
)
def _parse_react_response(response: str) -> Optional[Dict[str, Any]]:
"""解析ReAct Agent的响应
@ -156,11 +228,89 @@ def _parse_react_response(response: str) -> Optional[Dict[str, Any]]:
return None
async def _retrieve_concepts_with_jargon(
concepts: List[str],
chat_id: str
) -> str:
"""对概念列表进行jargon检索
Args:
concepts: 概念列表
chat_id: 聊天ID
Returns:
str: 检索结果字符串
"""
if not concepts:
return ""
from src.jargon.jargon_miner import search_jargon
results = []
for concept in concepts:
concept = concept.strip()
if not concept:
continue
# 先尝试精确匹配
jargon_results = search_jargon(
keyword=concept,
chat_id=chat_id,
limit=10,
case_sensitive=False,
fuzzy=False
)
is_fuzzy_match = False
# 如果精确匹配未找到,尝试模糊搜索
if not jargon_results:
jargon_results = search_jargon(
keyword=concept,
chat_id=chat_id,
limit=10,
case_sensitive=False,
fuzzy=True
)
is_fuzzy_match = True
if jargon_results:
# 找到结果
if is_fuzzy_match:
# 模糊匹配
output_parts = [f"未精确匹配到'{concept}'"]
for result in jargon_results:
found_content = result.get("content", "").strip()
meaning = result.get("meaning", "").strip()
if found_content and meaning:
output_parts.append(f"找到 '{found_content}' 的含义为:{meaning}")
results.append("".join(output_parts))
logger.info(f"在jargon库中找到匹配模糊搜索: {concept},找到{len(jargon_results)}条结果")
else:
# 精确匹配
output_parts = []
for result in jargon_results:
meaning = result.get("meaning", "").strip()
if meaning:
output_parts.append(f"'{concept}' 为黑话或者网络简写,含义为:{meaning}")
results.append("".join(output_parts) if len(output_parts) > 1 else output_parts[0])
logger.info(f"在jargon库中找到匹配精确匹配: {concept},找到{len(jargon_results)}条结果")
else:
# 未找到
results.append(f"未在jargon库中找到'{concept}'的解释")
logger.info(f"在jargon库中未找到匹配: {concept}")
if results:
return "【概念检索结果】\n" + "\n".join(results) + "\n"
return ""
async def _react_agent_solve_question(
question: str,
chat_id: str,
max_iterations: int = 5,
timeout: float = 30.0
timeout: float = 30.0,
initial_info: str = ""
) -> Tuple[bool, str, List[Dict[str, Any]], bool]:
"""使用ReAct架构的Agent来解决问题
@ -169,14 +319,16 @@ async def _react_agent_solve_question(
chat_id: 聊天ID
max_iterations: 最大迭代次数
timeout: 超时时间
initial_info: 初始信息如概念检索结果将作为collected_info的初始值
Returns:
Tuple[bool, str, List[Dict[str, Any]], bool]: (是否找到答案, 答案内容, 思考步骤列表, 是否超时)
"""
start_time = time.time()
collected_info = ""
collected_info = initial_info if initial_info else ""
thinking_steps = []
is_timeout = False
conversation_messages: List[Message] = []
for iteration in range(max_iterations):
# 检查超时
@ -185,9 +337,6 @@ async def _react_agent_solve_question(
is_timeout = True
break
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}")
logger.info(f"ReAct Agent 已收集信息: {collected_info if collected_info else '暂无信息'}")
# 获取工具注册器
tool_registry = get_tool_registry()
@ -197,90 +346,235 @@ async def _react_agent_solve_question(
# 获取当前时间
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构建prompt动态生成工具描述
# 计算剩余迭代次数
current_iteration = iteration + 1
remaining_iterations = max_iterations - current_iteration
is_final_iteration = current_iteration >= max_iterations
# 构建prompt不再需要工具文本描述
prompt_type = "memory_retrieval_react_prompt"
if is_final_iteration:
prompt_type = "memory_retrieval_react_final_prompt"
tool_definitions = []
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: 0最后一次迭代不提供工具调用")
else:
tool_definitions = tool_registry.get_tool_definitions()
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}")
prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_prompt",
prompt_type,
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "暂无信息",
tools_description=tool_registry.get_tools_description(),
action_types_list=tool_registry.get_action_types_list(),
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 Prompt: {prompt}")
if not is_final_iteration:
head_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_prompt_head",
bot_name=bot_name,
time_now=time_now,
question=question,
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
)
def message_factory(_client) -> List[Message]:
messages: List[Message] = []
system_builder = MessageBuilder()
system_builder.set_role(RoleType.System)
system_builder.add_text_content(head_prompt)
if prompt.strip():
system_builder.add_text_content(f"\n{prompt}")
messages.append(system_builder.build())
messages.extend(conversation_messages)
for msg in messages:
print(msg)
return messages
success, response, reasoning_content, model_name, tool_calls = await llm_api.generate_with_model_with_tools_by_message_factory(
message_factory,
model_config=model_config.model_task_config.tool_use,
tool_options=tool_definitions,
request_type="memory.react",
)
else:
logger.info(f"ReAct Agent 第 {iteration + 1} 次Prompt: {prompt}")
success, response, reasoning_content, model_name, tool_calls = await llm_api.generate_with_model_with_tools(
prompt,
model_config=model_config.model_task_config.tool_use,
tool_options=tool_definitions,
request_type="memory.react",
)
# 调用LLM
success, response, reasoning_content, model_name = await llm_api.generate_with_model(
prompt,
model_config=model_config.model_task_config.tool_use,
request_type="memory.react",
)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 LLM响应: {response}")
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 LLM推理: {reasoning_content}")
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 LLM模型: {model_name}")
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}")
if not success:
logger.error(f"ReAct Agent LLM调用失败: {response}")
break
assistant_message: Optional[Message] = None
if tool_calls:
assistant_builder = MessageBuilder()
assistant_builder.set_role(RoleType.Assistant)
if response and response.strip():
assistant_builder.add_text_content(response)
assistant_builder.set_tool_calls(tool_calls)
assistant_message = assistant_builder.build()
elif response and response.strip():
assistant_builder = MessageBuilder()
assistant_builder.set_role(RoleType.Assistant)
assistant_builder.add_text_content(response)
assistant_message = assistant_builder.build()
# 解析响应
action_info = _parse_react_response(response)
if not action_info:
logger.warning(f"无法解析ReAct响应迭代{iteration + 1}")
break
thought = action_info.get("thought", "")
actions = action_info.get("actions", [])
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考: {thought}")
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 动作数量: {len(actions)}")
# 记录思考步骤包含所有actions
# 记录思考步骤
step = {
"iteration": iteration + 1,
"thought": thought,
"actions": actions,
"thought": response,
"actions": [],
"observations": []
}
# 检查是否有final_answer或no_answer
for action in actions:
action_type = action.get("action_type", "")
action_params = action.get("action_params", {})
if action_type == "final_answer":
# Agent认为已经找到答案
# 从action_params中获取答案如果没有则使用thought作为后备
answer = action_params.get("answer", thought) if isinstance(action_params, dict) else thought
step["observations"] = ["找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 找到最终答案: {answer}")
return True, answer, thinking_steps, False
elif action_type == "no_answer":
# Agent确认无法找到答案
# 从action_params中获取原因如果没有则使用thought作为后备
answer = action_params.get("reason", thought) if isinstance(action_params, dict) else thought
step["observations"] = ["确认无法找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 确认无法找到答案: {answer}")
return False, answer, thinking_steps, False
# 优先从思考内容中提取final_answer或no_answer
def extract_quoted_content(text, func_name, param_name):
"""从文本中提取函数调用中参数的值,支持单引号和双引号
Args:
text: 要搜索的文本
func_name: 函数名 'final_answer'
param_name: 参数名 'answer'
Returns:
提取的参数值如果未找到则返回None
"""
if not text:
return None
# 查找函数调用位置(不区分大小写)
func_pattern = func_name.lower()
text_lower = text.lower()
func_pos = text_lower.find(func_pattern)
if func_pos == -1:
return None
# 查找参数名和等号
param_pattern = f'{param_name}='
param_pos = text_lower.find(param_pattern, func_pos)
if param_pos == -1:
return None
# 跳过参数名、等号和空白
start_pos = param_pos + len(param_pattern)
while start_pos < len(text) and text[start_pos] in ' \t\n':
start_pos += 1
if start_pos >= len(text):
return None
# 确定引号类型
quote_char = text[start_pos]
if quote_char not in ['"', "'"]:
return None
# 查找匹配的结束引号(考虑转义)
end_pos = start_pos + 1
while end_pos < len(text):
if text[end_pos] == quote_char:
# 检查是否是转义的引号
if end_pos > start_pos + 1 and text[end_pos - 1] == '\\':
end_pos += 1
continue
# 找到匹配的引号
content = text[start_pos + 1:end_pos]
# 处理转义字符
content = content.replace('\\"', '"').replace("\\'", "'").replace('\\\\', '\\')
return content
end_pos += 1
return None
# 并行执行所有工具
tool_registry = get_tool_registry()
# 从LLM的直接输出内容中提取final_answer或no_answer
final_answer_content = None
no_answer_reason = None
# 只检查responseLLM的直接输出内容不检查reasoning_content
if response:
final_answer_content = extract_quoted_content(response, 'final_answer', 'answer')
if not final_answer_content:
no_answer_reason = extract_quoted_content(response, 'no_answer', 'reason')
# 如果从输出内容中找到了答案,直接返回
if final_answer_content:
step["actions"].append({"action_type": "final_answer", "action_params": {"answer": final_answer_content}})
step["observations"] = ["从LLM输出内容中检测到final_answer"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 从LLM输出内容中检测到final_answer: {final_answer_content[:100]}...")
return True, final_answer_content, thinking_steps, False
if no_answer_reason:
step["actions"].append({"action_type": "no_answer", "action_params": {"reason": no_answer_reason}})
step["observations"] = ["从LLM输出内容中检测到no_answer"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 从LLM输出内容中检测到no_answer: {no_answer_reason[:100]}...")
return False, no_answer_reason, thinking_steps, False
if is_final_iteration:
step["actions"].append({"action_type": "no_answer", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}})
step["observations"] = ["已到达最后一次迭代,无法找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 已到达最后一次迭代,无法找到答案")
return False, "已到达最后一次迭代,无法找到答案", thinking_steps, False
if assistant_message:
conversation_messages.append(assistant_message)
# 记录思考过程到collected_info中
if reasoning_content or response:
thought_summary = reasoning_content or (response[:200] if response else "")
if thought_summary:
collected_info += f"\n[思考] {thought_summary}\n"
# 处理工具调用
if not tool_calls:
# 没有工具调用说明LLM在思考中已经给出了答案已在前面检查或者需要继续查询
# 如果思考中没有答案,说明需要继续查询或等待下一轮
if response and response.strip():
# 如果响应不为空,记录思考过程,继续下一轮迭代
step["observations"] = [f"思考完成,但未调用工具。响应: {response}"]
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考完成但未调用工具: {response[:100]}...")
# 继续下一轮迭代让LLM有机会在思考中给出final_answer或继续查询
collected_info += f"思考: {response}"
thinking_steps.append(step)
continue
else:
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 无工具调用且无响应")
step["observations"] = ["无响应且无工具调用"]
thinking_steps.append(step)
break
# 处理工具调用
tool_tasks = []
for i, action in enumerate(actions):
action_type = action.get("action_type", "")
action_params = action.get("action_params", {})
for i, tool_call in enumerate(tool_calls):
tool_name = tool_call.func_name
tool_args = tool_call.args or {}
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 动作 {i+1}/{len(actions)}: {action_type}({action_params})")
tool = tool_registry.get_tool(action_type)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 工具调用 {i+1}/{len(tool_calls)}: {tool_name}({tool_args})")
# 普通工具调用
tool = tool_registry.get_tool(tool_name)
if tool:
# 准备工具参数需要添加chat_id如果工具需要
tool_params = action_params.copy()
tool_params = tool_args.copy()
# 如果工具函数签名需要chat_id添加它
import inspect
@ -289,35 +583,43 @@ async def _react_agent_solve_question(
tool_params["chat_id"] = chat_id
# 创建异步任务
async def execute_single_tool(tool_instance, params, act_type, act_params, iter_num):
async def execute_single_tool(tool_instance, params, tool_name_str, iter_num):
try:
observation = await tool_instance.execute(**params)
param_str = ", ".join([f"{k}={v}" for k, v in act_params.items()])
return f"查询{act_type}({param_str})的结果:{observation}"
param_str = ", ".join([f"{k}={v}" for k, v in params.items() if k != "chat_id"])
return f"查询{tool_name_str}({param_str})的结果:{observation}"
except Exception as e:
error_msg = f"工具执行失败: {str(e)}"
logger.error(f"ReAct Agent 第 {iter_num + 1} 次迭代 动作 {act_type} {error_msg}")
return f"查询{act_type}失败: {error_msg}"
logger.error(f"ReAct Agent 第 {iter_num + 1} 次迭代 工具 {tool_name_str} {error_msg}")
return f"查询{tool_name_str}失败: {error_msg}"
tool_tasks.append(execute_single_tool(tool, tool_params, action_type, action_params, iteration))
tool_tasks.append(execute_single_tool(tool, tool_params, tool_name, iteration))
step["actions"].append({"action_type": tool_name, "action_params": tool_args})
else:
error_msg = f"未知的工具类型: {action_type}"
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 动作 {i+1}/{len(actions)} {error_msg}")
tool_tasks.append(asyncio.create_task(asyncio.sleep(0, result=f"查询{action_type}失败: {error_msg}")))
error_msg = f"未知的工具类型: {tool_name}"
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1}/{len(tool_calls)} {error_msg}")
tool_tasks.append(asyncio.create_task(asyncio.sleep(0, result=f"查询{tool_name}失败: {error_msg}")))
# 并行执行所有工具
if tool_tasks:
observations = await asyncio.gather(*tool_tasks, return_exceptions=True)
# 处理执行结果
for i, observation in enumerate(observations):
for i, (tool_call_item, observation) in enumerate(zip(tool_calls, observations)):
if isinstance(observation, Exception):
observation = f"工具执行异常: {str(observation)}"
logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 动作 {i+1} 执行异常: {observation}")
step["observations"].append(observation)
collected_info += f"\n{observation}\n"
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 动作 {i+1} 执行结果: {observation}")
logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1} 执行异常: {observation}")
observation_text = observation if isinstance(observation, str) else str(observation)
step["observations"].append(observation_text)
collected_info += f"\n{observation_text}\n"
if observation_text.strip():
tool_builder = MessageBuilder()
tool_builder.set_role(RoleType.Tool)
tool_builder.add_text_content(observation_text)
tool_builder.add_tool_call(tool_call_item.call_id)
conversation_messages.append(tool_builder.build())
# logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 工具 {i+1} 执行结果: {observation_text}")
thinking_steps.append(step)
@ -529,7 +831,8 @@ def _store_thinking_back(
async def _process_single_question(
question: str,
chat_id: str,
context: str
context: str,
initial_info: str = ""
) -> Optional[str]:
"""处理单个问题的查询(包含缓存检查逻辑)
@ -537,6 +840,7 @@ async def _process_single_question(
question: 要查询的问题
chat_id: 聊天ID
context: 上下文信息
initial_info: 初始信息如概念检索结果将传递给ReAct Agent
Returns:
Optional[str]: 如果找到答案返回格式化的结果字符串否则返回None
@ -582,7 +886,8 @@ async def _process_single_question(
question=question,
chat_id=chat_id,
max_iterations=5,
timeout=120.0
timeout=120.0,
initial_info=initial_info
)
# 存储到数据库(超时时不存储)
@ -661,19 +966,37 @@ async def build_memory_retrieval_prompt(
logger.error(f"LLM生成问题失败: {response}")
return ""
# 解析问题列表
questions = _parse_questions_json(response)
# 解析概念列表和问题列表
concepts, questions = _parse_questions_json(response)
logger.info(f"解析到 {len(concepts)} 个概念: {concepts}")
logger.info(f"解析到 {len(questions)} 个问题: {questions}")
# 对概念进行jargon检索作为初始信息
initial_info = ""
if concepts:
logger.info(f"开始对 {len(concepts)} 个概念进行jargon检索")
initial_info = await _retrieve_concepts_with_jargon(concepts, chat_id)
if initial_info:
logger.info(f"概念检索完成,结果: {initial_info[:200]}...")
else:
logger.info("概念检索未找到任何结果")
# 获取缓存的记忆与question时使用相同的时间窗口和数量限制
cached_memories = _get_cached_memories(chat_id, time_window_seconds=300.0)
if not questions:
logger.debug("模型认为不需要检索记忆或解析失败")
# 即使没有当次查询,也返回缓存的记忆
# 即使没有当次查询,也返回缓存的记忆和概念检索结果
all_results = []
if initial_info:
all_results.append(initial_info.strip())
if cached_memories:
retrieved_memory = "\n\n".join(cached_memories)
all_results.extend(cached_memories)
if all_results:
retrieved_memory = "\n\n".join(all_results)
end_time = time.time()
logger.info(f"无当次查询,返回缓存记忆,耗时: {(end_time - start_time):.3f}秒,包含 {len(cached_memories)} 条缓存记忆")
logger.info(f"无当次查询,返回缓存记忆和概念检索结果,耗时: {(end_time - start_time):.3f}")
return f"你回忆起了以下信息:\n{retrieved_memory}\n如果与回复内容相关,可以参考这些回忆的信息。\n"
else:
return ""
@ -683,12 +1006,13 @@ async def build_memory_retrieval_prompt(
# 第二步并行处理所有问题固定使用5次迭代/120秒超时
logger.info(f"问题数量: {len(questions)},固定设置最大迭代次数: 5超时时间: 120秒")
# 并行处理所有问题
# 并行处理所有问题,将概念检索结果作为初始信息传递
question_tasks = [
_process_single_question(
question=question,
chat_id=chat_id,
context=message
context=message,
initial_info=initial_info
)
for question in questions
]
@ -733,14 +1057,14 @@ async def build_memory_retrieval_prompt(
return ""
def _parse_questions_json(response: str) -> List[str]:
"""解析问题JSON
def _parse_questions_json(response: str) -> Tuple[List[str], List[str]]:
"""解析问题JSON,返回概念列表和问题列表
Args:
response: LLM返回的响应
Returns:
List[str]: 问题列表
Tuple[List[str], List[str]]: (概念列表, 问题列表)
"""
try:
# 尝试提取JSON可能包含在```json代码块中
@ -757,17 +1081,28 @@ def _parse_questions_json(response: str) -> List[str]:
repaired_json = repair_json(json_str)
# 解析JSON
questions = json.loads(repaired_json)
parsed = json.loads(repaired_json)
if not isinstance(questions, list):
logger.warning(f"解析的JSON不是数组格式: {questions}")
return []
# 只支持新格式包含concepts和questions的对象
if not isinstance(parsed, dict):
logger.warning(f"解析的JSON不是对象格式: {parsed}")
return [], []
concepts_raw = parsed.get("concepts", [])
questions_raw = parsed.get("questions", [])
# 确保是列表
if not isinstance(concepts_raw, list):
concepts_raw = []
if not isinstance(questions_raw, list):
questions_raw = []
# 确保所有元素都是字符串
questions = [q for q in questions if isinstance(q, str) and q.strip()]
concepts = [c for c in concepts_raw if isinstance(c, str) and c.strip()]
questions = [q for q in questions_raw if isinstance(q, str) and q.strip()]
return questions
return concepts, questions
except Exception as e:
logger.error(f"解析问题JSON失败: {e}, 响应内容: {response[:200]}...")
return []
return [], []

View File

@ -5,6 +5,8 @@
"""
import json
import re
from datetime import datetime
from typing import Tuple
from difflib import SequenceMatcher
from src.common.logger import get_logger
@ -106,3 +108,60 @@ def preprocess_text(text: str) -> str:
logger.error(f"预处理文本时出错: {e}")
return text
def parse_datetime_to_timestamp(value: str) -> float:
"""
接受多种常见格式并转换为时间戳
支持示例
- 2025-09-29
- 2025-09-29 00:00:00
- 2025/09/29 00:00
- 2025-09-29T00:00:00
"""
value = value.strip()
fmts = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y/%m/%d %H:%M:%S",
"%Y/%m/%d %H:%M",
"%Y-%m-%d",
"%Y/%m/%d",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M",
]
last_err = None
for fmt in fmts:
try:
dt = datetime.strptime(value, fmt)
return dt.timestamp()
except Exception as e:
last_err = e
raise ValueError(f"无法解析时间: {value} ({last_err})")
def parse_time_range(time_range: str) -> Tuple[float, float]:
"""
解析时间范围字符串返回开始和结束时间戳
Args:
time_range: 时间范围字符串格式"YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS"
Returns:
Tuple[float, float]: (开始时间戳, 结束时间戳)
"""
if " - " not in time_range:
raise ValueError(f"时间范围格式错误,应为 '开始时间 - 结束时间': {time_range}")
parts = time_range.split(" - ", 1)
if len(parts) != 2:
raise ValueError(f"时间范围格式错误: {time_range}")
start_str = parts[0].strip()
end_str = parts[1].strip()
start_timestamp = parse_datetime_to_timestamp(start_str)
end_timestamp = parse_datetime_to_timestamp(end_str)
return start_timestamp, end_timestamp

View File

@ -11,6 +11,7 @@ retrieval_tools/
├── tool_utils.py # 工具函数库(共用函数)
├── query_jargon.py # 查询jargon工具
├── query_chat_history.py # 查询聊天历史工具
├── query_lpmm_knowledge.py # 查询LPMM知识库工具
└── README.md # 本文件
```
@ -145,6 +146,11 @@ result = await tool.execute(time_range="2025-01-15 10:00:00 - 2025-01-15 20:00:0
- `time_point` (可选) - 时间点格式YYYY-MM-DD HH:MM:SS用于查询某个时间点附近发生了什么与time_range二选一
- `time_range` (可选) - 时间范围,格式:'YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS'与time_point二选一
### query_lpmm_knowledge
从LPMM知识库中检索与关键词相关的知识内容
- 参数:
- `query` (必填) - 查询的关键词或问题描述
## 注意事项
- 所有工具函数必须是异步函数(`async def`

View File

@ -13,13 +13,17 @@ from .tool_registry import (
# 导入所有工具的注册函数
from .query_jargon import register_tool as register_query_jargon
from .query_chat_history import register_tool as register_query_chat_history
from .query_lpmm_knowledge import register_tool as register_lpmm_knowledge
from src.config.config import global_config
def init_all_tools():
"""初始化并注册所有记忆检索工具"""
register_query_jargon()
register_query_chat_history()
if global_config.lpmm_knowledge.lpmm_mode == "agent":
register_lpmm_knowledge()
__all__ = [
"MemoryRetrievalTool",

View File

@ -9,7 +9,7 @@ from src.common.logger import get_logger
from src.common.database.database_model import ChatHistory
from src.chat.utils.utils import parse_keywords_string
from .tool_registry import register_memory_retrieval_tool
from .tool_utils import parse_datetime_to_timestamp, parse_time_range
from ..memory_utils import parse_datetime_to_timestamp, parse_time_range
logger = get_logger("memory_retrieval_tools")
@ -17,7 +17,8 @@ logger = get_logger("memory_retrieval_tools")
async def query_chat_history(
chat_id: str,
keyword: Optional[str] = None,
time_range: Optional[str] = None
time_range: Optional[str] = None,
fuzzy: bool = True
) -> str:
"""根据时间或关键词在chat_history表中查询聊天记录概述
@ -27,6 +28,9 @@ async def query_chat_history(
time_range: 时间范围或时间点格式
- 时间范围"YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS"
- 时间点"YYYY-MM-DD HH:MM:SS"查询包含该时间点的记录
fuzzy: 是否使用模糊匹配模式默认True
- True: 模糊匹配只要包含任意一个关键词即匹配OR关系
- False: 全匹配必须包含所有关键词才匹配AND关系
Returns:
str: 查询结果
@ -62,9 +66,6 @@ async def query_chat_history(
# 执行查询
records = list(query.order_by(ChatHistory.start_time.desc()).limit(50))
if not records:
return "未找到相关聊天记录概述"
# 如果有关键词,进一步过滤
if keyword:
# 解析多个关键词(支持空格、逗号等分隔符)
@ -96,24 +97,48 @@ async def query_chat_history(
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 检查是否包含任意一个关键词OR关系
# 根据匹配模式检查关键词
matched = False
for kw in keywords_lower:
if (kw in theme or
kw in summary or
kw in original_text or
any(kw in k for k in record_keywords_list)):
matched = True
break
if fuzzy:
# 模糊匹配只要包含任意一个关键词即匹配OR关系
for kw in keywords_lower:
if (kw in theme or
kw in summary or
kw in original_text or
any(kw in k for k in record_keywords_list)):
matched = True
break
else:
# 全匹配必须包含所有关键词才匹配AND关系
matched = True
for kw in keywords_lower:
kw_matched = (kw in theme or
kw in summary or
kw in original_text or
any(kw in k for k in record_keywords_list))
if not kw_matched:
matched = False
break
if matched:
filtered_records.append(record)
if not filtered_records:
keywords_str = "".join(keywords_list)
return f"未找到包含关键词'{keywords_str}'的聊天记录概述"
match_mode = "包含任意一个关键词" if fuzzy else "包含所有关键词"
if time_range:
return f"未找到{match_mode}'{keywords_str}'且在指定时间范围内的聊天记录概述"
else:
return f"未找到{match_mode}'{keywords_str}'的聊天记录概述"
records = filtered_records
# 如果没有记录(可能是时间范围查询但没有匹配的记录)
if not records:
if time_range:
return "未找到指定时间范围内的聊天记录概述"
else:
return "未找到相关聊天记录概述"
# 对即将返回的记录增加使用计数
records_to_use = records[:3]
@ -168,12 +193,12 @@ def register_tool():
"""注册工具"""
register_memory_retrieval_tool(
name="query_chat_history",
description="根据时间或关键词在chat_history表的聊天记录概述库中查询。可以查询某个时间点发生了什么、某个时间范围内的事件或根据关键词搜索消息概述",
description="根据时间或关键词在chat_history表的聊天记录概述库中查询。可以查询某个时间点发生了什么、某个时间范围内的事件或根据关键词搜索消息概述。支持两种匹配模式:模糊匹配(默认,只要包含任意一个关键词即匹配)和全匹配(必须包含所有关键词才匹配)",
parameters=[
{
"name": "keyword",
"type": "string",
"description": "关键词(可选,支持多个关键词,可用空格、逗号、斜杠等分隔,如:'麦麦 百度网盘''麦麦,百度网盘'。用于在主题、关键词、概括、原文中搜索,只要包含任意一个关键词即匹配",
"description": "关键词(可选,支持多个关键词,可用空格、逗号、斜杠等分隔,如:'麦麦 百度网盘''麦麦,百度网盘'。用于在主题、关键词、概括、原文中搜索",
"required": False
},
{
@ -181,6 +206,12 @@ def register_tool():
"type": "string",
"description": "时间范围或时间点(可选)。格式:'YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS'(时间范围,查询与时间范围有交集的记录)或 'YYYY-MM-DD HH:MM:SS'(时间点,查询包含该时间点的记录)",
"required": False
},
{
"name": "fuzzy",
"type": "boolean",
"description": "是否使用模糊匹配模式默认True。True表示模糊匹配只要包含任意一个关键词即匹配OR关系False表示全匹配必须包含所有关键词才匹配AND关系",
"required": False
}
],
execute_func=query_chat_history

View File

@ -0,0 +1,65 @@
"""
通过LPMM知识库查询信息 - 工具实现
"""
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.knowledge import get_qa_manager
from .tool_registry import register_memory_retrieval_tool
logger = get_logger("memory_retrieval_tools")
async def query_lpmm_knowledge(query: str) -> str:
"""在LPMM知识库中查询相关信息
Args:
query: 查询关键词
Returns:
str: 查询结果
"""
try:
content = str(query).strip()
if not content:
return "查询关键词为空"
if not global_config.lpmm_knowledge.enable:
logger.debug("LPMM知识库未启用")
return "LPMM知识库未启用"
qa_manager = get_qa_manager()
if qa_manager is None:
logger.debug("LPMM知识库未初始化跳过查询")
return "LPMM知识库未初始化"
knowledge_info = await qa_manager.get_knowledge(content)
logger.debug(f"LPMM知识库查询结果: {knowledge_info}")
if knowledge_info:
return f"你从LPMM知识库中找到以下信息\n{knowledge_info}"
return f"在LPMM知识库中未找到与“{content}”相关的信息"
except Exception as e:
logger.error(f"LPMM知识库查询失败: {e}")
return f"LPMM知识库查询失败{str(e)}"
def register_tool():
"""注册LPMM知识库查询工具"""
register_memory_retrieval_tool(
name="lpmm_search_knowledge",
description="从LPMM知识库中搜索相关信息适用于需要知识支持的场景。",
parameters=[
{
"name": "query",
"type": "string",
"description": "需要查询的关键词或问题",
"required": True,
}
],
execute_func=query_lpmm_knowledge,
)

View File

@ -3,8 +3,9 @@
提供统一的工具注册和管理接口
"""
from typing import List, Dict, Any, Optional, Callable, Awaitable
from typing import List, Dict, Any, Optional, Callable, Awaitable, Tuple
from src.common.logger import get_logger
from src.llm_models.payload_content.tool_option import ToolParamType
logger = get_logger("memory_retrieval_tools")
@ -50,6 +51,48 @@ class MemoryRetrievalTool:
async def execute(self, **kwargs) -> str:
"""执行工具"""
return await self.execute_func(**kwargs)
def get_tool_definition(self) -> Dict[str, Any]:
"""获取工具定义用于LLM function calling
Returns:
Dict[str, Any]: 工具定义字典格式与BaseTool一致
格式: {"name": str, "description": str, "parameters": List[Tuple]}
"""
# 转换参数格式为元组列表格式与BaseTool一致
# 格式: [("param_name", ToolParamType, "description", required, enum_values)]
param_tuples = []
for param in self.parameters:
param_name = param.get("name", "")
param_type_str = param.get("type", "string").lower()
param_desc = param.get("description", "")
is_required = param.get("required", False)
enum_values = param.get("enum", None)
# 转换类型字符串到ToolParamType
type_mapping = {
"string": ToolParamType.STRING,
"integer": ToolParamType.INTEGER,
"int": ToolParamType.INTEGER,
"float": ToolParamType.FLOAT,
"boolean": ToolParamType.BOOLEAN,
"bool": ToolParamType.BOOLEAN,
}
param_type = type_mapping.get(param_type_str, ToolParamType.STRING)
# 构建参数元组
param_tuple = (param_name, param_type, param_desc, is_required, enum_values)
param_tuples.append(param_tuple)
# 构建工具定义格式与BaseTool.get_tool_definition()一致
tool_def = {
"name": self.name,
"description": self.description,
"parameters": param_tuples
}
return tool_def
class MemoryRetrievalToolRegistry:
@ -60,6 +103,9 @@ class MemoryRetrievalToolRegistry:
def register_tool(self, tool: MemoryRetrievalTool) -> None:
"""注册工具"""
if tool.name in self.tools:
logger.debug(f"记忆检索工具 {tool.name} 已存在,跳过重复注册")
return
self.tools[tool.name] = tool
logger.info(f"注册记忆检索工具: {tool.name}")
@ -79,11 +125,19 @@ class MemoryRetrievalToolRegistry:
return "\n".join(descriptions)
def get_action_types_list(self) -> str:
"""获取所有动作类型的列表用于prompt"""
"""获取所有动作类型的列表用于prompt(已废弃,保留用于兼容)"""
action_types = [tool.name for tool in self.tools.values()]
action_types.append("final_answer")
action_types.append("no_answer")
return "".join([f'"{at}"' for at in action_types])
def get_tool_definitions(self) -> List[Dict[str, Any]]:
"""获取所有工具的定义列表用于LLM function calling
Returns:
List[Dict[str, Any]]: 工具定义列表每个元素是一个工具定义字典
"""
return [tool.get_tool_definition() for tool in self.tools.values()]
# 全局工具注册器实例

View File

@ -1,64 +0,0 @@
"""
工具函数库
包含所有工具共用的工具函数
"""
from datetime import datetime
from typing import Tuple
def parse_datetime_to_timestamp(value: str) -> float:
"""
接受多种常见格式并转换为时间戳
支持示例
- 2025-09-29
- 2025-09-29 00:00:00
- 2025/09/29 00:00
- 2025-09-29T00:00:00
"""
value = value.strip()
fmts = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y/%m/%d %H:%M:%S",
"%Y/%m/%d %H:%M",
"%Y-%m-%d",
"%Y/%m/%d",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M",
]
last_err = None
for fmt in fmts:
try:
dt = datetime.strptime(value, fmt)
return dt.timestamp()
except Exception as e:
last_err = e
raise ValueError(f"无法解析时间: {value} ({last_err})")
def parse_time_range(time_range: str) -> Tuple[float, float]:
"""
解析时间范围字符串返回开始和结束时间戳
Args:
time_range: 时间范围字符串格式"YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS"
Returns:
Tuple[float, float]: (开始时间戳, 结束时间戳)
"""
if " - " not in time_range:
raise ValueError(f"时间范围格式错误,应为 '开始时间 - 结束时间': {time_range}")
parts = time_range.split(" - ", 1)
if len(parts) != 2:
raise ValueError(f"时间范围格式错误: {time_range}")
start_str = parts[0].strip()
end_str = parts[1].strip()
start_timestamp = parse_datetime_to_timestamp(start_str)
end_timestamp = parse_datetime_to_timestamp(end_str)
return start_timestamp, end_timestamp

View File

@ -7,9 +7,11 @@
success, response, reasoning, model_name = await llm_api.generate_with_model(prompt, model_config)
"""
from typing import Tuple, Dict, List, Any, Optional
from typing import Tuple, Dict, List, Any, Optional, Callable
from src.common.logger import get_logger
from src.llm_models.payload_content.tool_option import ToolCall
from src.llm_models.payload_content.message import Message
from src.llm_models.model_client.base_client import BaseClient
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
from src.config.api_ada_configs import TaskConfig
@ -120,3 +122,44 @@ async def generate_with_model_with_tools(
error_msg = f"生成内容时出错: {str(e)}"
logger.error(f"[LLMAPI] {error_msg}")
return False, error_msg, "", "", None
async def generate_with_model_with_tools_by_message_factory(
message_factory: Callable[[BaseClient], List[Message]],
model_config: TaskConfig,
tool_options: List[Dict[str, Any]] | None = None,
request_type: str = "plugin.generate",
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
) -> Tuple[bool, str, str, str, List[ToolCall] | None]:
"""使用指定模型和工具生成内容(通过消息工厂构建消息列表)
Args:
message_factory: 消息工厂函数
model_config: 模型配置
tool_options: 工具选项列表
request_type: 请求类型标识
temperature: 温度参数
max_tokens: 最大token数
Returns:
Tuple[bool, str, str, str, List[ToolCall] | None]: (是否成功, 生成的内容, 推理过程, 模型名称, 工具调用列表)
"""
try:
model_name_list = model_config.model_list
logger.info(f"[LLMAPI] 使用模型集合 {model_name_list} 生成内容(消息工厂)")
llm_request = LLMRequest(model_set=model_config, request_type=request_type)
response, (reasoning_content, model_name, tool_call) = await llm_request.generate_response_with_message_async(
message_factory=message_factory,
tools=tool_options,
temperature=temperature,
max_tokens=max_tokens,
)
return True, response, reasoning_content, model_name, tool_call
except Exception as e:
error_msg = f"生成内容时出错: {str(e)}"
logger.error(f"[LLMAPI] {error_msg}")
return False, error_msg, "", "", None

View File

@ -93,6 +93,14 @@ class ToolExecutor:
# 获取可用工具
tools = self._get_tool_definitions()
# 如果没有可用工具,直接返回空内容
if not tools:
logger.info(f"{self.log_prefix}没有可用工具,直接返回空内容")
if return_details:
return [], [], ""
else:
return [], [], ""
# print(f"tools: {tools}")
# 获取当前时间
@ -116,6 +124,7 @@ class ToolExecutor:
response, (reasoning_content, model_name, tool_calls) = await self.llm_model.generate_response_async(
prompt=prompt, tools=tools, raise_when_empty=False
)
# 执行工具调用
tool_results, used_tools = await self.execute_tool_calls(tool_calls)