feat:优化记忆查询的超时设置

pull/1414/head
SengokuCola 2025-12-06 18:59:45 +08:00
parent 83ec86f28b
commit 441fc0b742
8 changed files with 474 additions and 371 deletions

View File

@ -303,90 +303,6 @@ class HeartFChatting:
return loop_info, reply_text, cycle_timers
async def _run_planner_without_reply(
self,
available_actions: Dict[str, ActionInfo],
cycle_timers: Dict[str, float],
) -> List[ActionPlannerInfo]:
"""执行planner但不包含reply动作用于并行执行场景提及时使用简化版提示词"""
try:
with Timer("规划器", cycle_timers):
action_to_use_info = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
is_mentioned=True, # 标记为提及时,使用简化版提示词
)
# 过滤掉reply动作虽然提及时不应该有reply但为了安全还是过滤一下
return [action for action in action_to_use_info if action.action_type != "reply"]
except Exception as e:
logger.error(f"{self.log_prefix} Planner执行失败: {e}")
traceback.print_exc()
return []
async def _generate_mentioned_reply(
self,
force_reply_message: "DatabaseMessages",
thinking_id: str,
cycle_timers: Dict[str, float],
available_actions: Dict[str, ActionInfo],
) -> Dict[str, Any]:
"""当被提及时,独立生成回复的任务"""
try:
self.questioned = False
# 重置连续 no_reply 计数
self.consecutive_no_reply_count = 0
reason = ""
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
action_done=True,
thinking_id=thinking_id,
action_data={},
action_name="reply",
action_reasoning=reason,
)
with Timer("提及回复生成", cycle_timers):
success, llm_response = await generator_api.generate_reply(
chat_stream=self.chat_stream,
reply_message=force_reply_message,
available_actions=available_actions,
chosen_actions=[], # 独立回复不依赖planner的动作
reply_reason=reason,
enable_tool=global_config.tool.enable_tool,
request_type="replyer",
from_plugin=False,
reply_time_point=self.last_read_time,
)
if not success or not llm_response or not llm_response.reply_set:
logger.warning(f"{self.log_prefix} 提及回复生成失败")
return {"action_type": "reply", "success": False, "result": "提及回复生成失败", "loop_info": None}
response_set = llm_response.reply_set
selected_expressions = llm_response.selected_expressions
loop_info, reply_text, _ = await self._send_and_store_reply(
response_set=response_set,
action_message=force_reply_message,
cycle_timers=cycle_timers,
thinking_id=thinking_id,
actions=[], # 独立回复不依赖planner的动作
selected_expressions=selected_expressions,
)
self.last_active_time = time.time()
return {
"action_type": "reply",
"success": True,
"result": f"你回复内容{reply_text}",
"loop_info": loop_info,
}
except Exception as e:
logger.error(f"{self.log_prefix} 提及回复生成异常: {e}")
traceback.print_exc()
return {"action_type": "reply", "success": False, "result": f"提及回复生成异常: {e}", "loop_info": None}
async def _observe(
self, # interest_value: float = 0.0,
recent_messages_list: Optional[List["DatabaseMessages"]] = None,
@ -438,95 +354,50 @@ class HeartFChatting:
except Exception as e:
logger.error(f"{self.log_prefix} 动作修改失败: {e}")
# 如果被提及让回复生成和planner并行执行
if force_reply_message:
logger.info(f"{self.log_prefix} 检测到提及回复生成与planner并行执行")
# 执行planner
is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info()
# 并行执行planner和回复生成
planner_task = asyncio.create_task(
self._run_planner_without_reply(
available_actions=available_actions,
cycle_timers=cycle_timers,
)
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
filter_intercept_message_level=1,
)
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal_no_YMD",
read_mark=self.action_planner.last_obs_time_mark,
truncate=True,
show_actions=True,
)
prompt_info = await self.action_planner.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
current_available_actions=available_actions,
chat_content_block=chat_content_block,
message_id_list=message_id_list,
interest=global_config.personality.interest,
)
continue_flag, modified_message = await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
)
if not continue_flag:
return False
if modified_message and modified_message._modify_flags.modify_llm_prompt:
prompt_info = (modified_message.llm_prompt, prompt_info[1])
with Timer("规划器", cycle_timers):
action_to_use_info = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
)
reply_task = asyncio.create_task(
self._generate_mentioned_reply(
force_reply_message=force_reply_message,
thinking_id=thinking_id,
cycle_timers=cycle_timers,
available_actions=available_actions,
)
)
# 等待两个任务完成
planner_result, reply_result = await asyncio.gather(planner_task, reply_task, return_exceptions=True)
# 处理planner结果
if isinstance(planner_result, BaseException):
logger.error(f"{self.log_prefix} Planner执行异常: {planner_result}")
action_to_use_info = []
else:
action_to_use_info = planner_result
# 处理回复结果
if isinstance(reply_result, BaseException):
logger.error(f"{self.log_prefix} 回复生成异常: {reply_result}")
reply_result = {
"action_type": "reply",
"success": False,
"result": "回复生成异常",
"loop_info": None,
}
else:
# 正常流程只执行planner
is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info()
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
filter_intercept_message_level=1,
)
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal_no_YMD",
read_mark=self.action_planner.last_obs_time_mark,
truncate=True,
show_actions=True,
)
prompt_info = await self.action_planner.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
current_available_actions=available_actions,
chat_content_block=chat_content_block,
message_id_list=message_id_list,
interest=global_config.personality.interest,
)
continue_flag, modified_message = await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
)
if not continue_flag:
return False
if modified_message and modified_message._modify_flags.modify_llm_prompt:
prompt_info = (modified_message.llm_prompt, prompt_info[1])
with Timer("规划器", cycle_timers):
action_to_use_info = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
)
reply_result = None
# 只在提及情况下过滤掉planner返回的reply动作提及时已有独立回复生成
if force_reply_message:
action_to_use_info = [action for action in action_to_use_info if action.action_type != "reply"]
logger.info(
f"{self.log_prefix} 决定执行{len(action_to_use_info)}个动作: {' '.join([a.action_type for a in action_to_use_info])}"
)
# 3. 并行执行所有动作不包括replyreply已经独立执行
# 3. 并行执行所有动作
action_tasks = [
asyncio.create_task(
self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers)
@ -537,10 +408,6 @@ class HeartFChatting:
# 并行执行所有任务
results = await asyncio.gather(*action_tasks, return_exceptions=True)
# 如果有独立的回复结果,添加到结果列表中
if reply_result:
results = list(results) + [reply_result]
# 处理执行结果
reply_loop_info = None
reply_text_from_reply = ""

View File

@ -260,6 +260,9 @@ class MemoryConfig(ConfigBase):
max_agent_iterations: int = 5
"""Agent最多迭代轮数最低为1"""
agent_timeout_seconds: float = 120.0
"""Agent超时时间"""
enable_jargon_detection: bool = True
"""记忆检索过程中是否启用黑话识别"""
@ -270,6 +273,8 @@ class MemoryConfig(ConfigBase):
"""验证配置值"""
if self.max_agent_iterations < 1:
raise ValueError(f"max_agent_iterations 必须至少为1当前值: {self.max_agent_iterations}")
if self.agent_timeout_seconds <= 0:
raise ValueError(f"agent_timeout_seconds 必须大于0当前值: {self.agent_timeout_seconds}")
@dataclass

View File

@ -319,9 +319,74 @@ class ExpressionLearner:
parsed = json.loads(repaired)
else:
parsed = repaired
except Exception:
logger.error(f"解析表达风格 JSON 失败,原始响应:{response}")
return []
except Exception as parse_error:
# 如果解析失败,尝试修复中文引号问题
# 使用状态机方法,在 JSON 字符串值内部将中文引号替换为转义的英文引号
try:
def fix_chinese_quotes_in_json(text):
"""使用状态机修复 JSON 字符串值中的中文引号"""
result = []
i = 0
in_string = False
escape_next = False
while i < len(text):
char = text[i]
if escape_next:
# 当前字符是转义字符后的字符,直接添加
result.append(char)
escape_next = False
i += 1
continue
if char == '\\':
# 转义字符
result.append(char)
escape_next = True
i += 1
continue
if char == '"' and not escape_next:
# 遇到英文引号,切换字符串状态
in_string = not in_string
result.append(char)
i += 1
continue
if in_string:
# 在字符串值内部,将中文引号替换为转义的英文引号
if char == '"': # 中文左引号
result.append('\\"')
elif char == '"': # 中文右引号
result.append('\\"')
else:
result.append(char)
else:
# 不在字符串内,直接添加
result.append(char)
i += 1
return ''.join(result)
fixed_raw = fix_chinese_quotes_in_json(raw)
# 再次尝试解析
if fixed_raw.startswith("[") and fixed_raw.endswith("]"):
parsed = json.loads(fixed_raw)
else:
repaired = repair_json(fixed_raw)
if isinstance(repaired, str):
parsed = json.loads(repaired)
else:
parsed = repaired
except Exception as fix_error:
logger.error(f"解析表达风格 JSON 失败,初始错误: {type(parse_error).__name__}: {str(parse_error)}")
logger.error(f"修复中文引号后仍失败,错误: {type(fix_error).__name__}: {str(fix_error)}")
logger.error(f"解析表达风格 JSON 失败,原始响应:{response}")
logger.error(f"处理后的 JSON 字符串前500字符{raw[:500]}")
return []
if isinstance(parsed, dict):
parsed_list = [parsed]

View File

@ -48,7 +48,7 @@ def _init_prompt() -> None:
- 中文词语的缩写用几个汉字概括一个词汇或含义例如社死内卷
JSON 数组输出元素为对象严格按以下结构
请你提取出可能的黑话最多10
请你提取出可能的黑话最多30个黑话请尽量提取所有
[
{{"content": "词条", "msg_id": "m12"}}, // msg_id 必须与上方聊天中展示的ID完全一致
{{"content": "词条2", "msg_id": "m15"}}
@ -168,19 +168,24 @@ class JargonMiner:
self.chat_id = chat_id
self.last_learning_time: float = time.time()
# 频率控制,可按需调整
self.min_messages_for_learning: int = 10
self.min_learning_interval: float = 20
self.min_messages_for_learning: int = 30
self.min_learning_interval: float = 60
self.llm = LLMRequest(
model_set=model_config.model_task_config.utils,
request_type="jargon.extract",
)
self.llm_inference = LLMRequest(
model_set=model_config.model_task_config.utils,
request_type="jargon.inference",
)
# 初始化stream_name作为类属性避免重复提取
chat_manager = get_chat_manager()
stream_name = chat_manager.get_stream_name(self.chat_id)
self.stream_name = stream_name if stream_name else self.chat_id
self.cache_limit = 100
self.cache_limit = 50
self.cache: OrderedDict[str, None] = OrderedDict()
# 黑话提取锁,防止并发执行
@ -276,7 +281,7 @@ class JargonMiner:
raw_content_list=raw_content_text,
)
response1, _ = await self.llm.generate_response_async(prompt1, temperature=0.3)
response1, _ = await self.llm_inference.generate_response_async(prompt1, temperature=0.3)
if not response1:
logger.warning(f"jargon {content} 推断1失败无响应")
return
@ -313,7 +318,7 @@ class JargonMiner:
content=content,
)
response2, _ = await self.llm.generate_response_async(prompt2, temperature=0.3)
response2, _ = await self.llm_inference.generate_response_async(prompt2, temperature=0.3)
if not response2:
logger.warning(f"jargon {content} 推断2失败无响应")
return
@ -360,7 +365,7 @@ class JargonMiner:
if global_config.debug.show_jargon_prompt:
logger.info(f"jargon {content} 比较提示词: {prompt3}")
response3, _ = await self.llm.generate_response_async(prompt3, temperature=0.3)
response3, _ = await self.llm_inference.generate_response_async(prompt3, temperature=0.3)
if not response3:
logger.warning(f"jargon {content} 比较失败:无响应")
return

View File

@ -1,6 +1,7 @@
import time
import json
import asyncio
import re
from typing import List, Dict, Any, Optional, Tuple, Set
from src.common.logger import get_logger
from src.config.config import global_config, model_config
@ -77,20 +78,12 @@ def init_memory_retrieval_prompt():
问题要说明前因后果和上下文使其全面且精准
输出格式示例需要检索时
输出格式示例
```json
{{
"questions": ["张三在前几天干了什么"] #问题数组(字符串数组),如果不需要检索记忆则输出空数组[],如果需要检索则只输出包含一个问题的数组
}}
```
输出格式示例不需要检索时
```json
{{
"questions": []
}}
```
请只输出JSON对象不要输出其他内容
""",
name="memory_retrieval_question_prompt",
@ -104,17 +97,16 @@ def init_memory_retrieval_prompt():
已收集的信息
{collected_info}
**执行步骤**
**工具说明**
- 如果涉及过往事件或者查询某个过去可能提到过的概念或者某段时间发生的事件可以使用聊天记录查询工具查询过往事件
- 如果涉及人物可以使用人物信息查询工具查询人物信息
- 如果没有可靠信息且查询时间充足或者不确定查询类别也可以使用lpmm知识库查询作为辅助信息
- **如果信息不足需要使用tool说明需要查询什么并输出为纯文本说明然后调用相应工具查询可并行调用多个工具**
- **如果当前已收集的信息足够回答问题且能找到明确答案调用found_answer工具标记已找到答案**
**思考**
- 你可以对查询思路给出简短的思考思考要简短直接切入要点
- 如果信息不足你必须给出使用什么工具进行查询
- 如果信息足够你必须调用found_answer工具
- 先思考当前信息是否足够回答问题
- 如果信息不足则需要使用tool查询信息你必须给出使用什么工具进行查询
- 如果当前已收集的信息足够或信息不足确定无法找到答案你必须调用finish_search工具结束查询
""",
name="memory_retrieval_react_prompt_head",
)
@ -128,14 +120,12 @@ def init_memory_retrieval_prompt():
已收集的信息
{collected_info}
**执行步骤**
分析
- 当前信息是否足够回答问题
- **如果信息足够且能找到明确答案**在思考中直接给出答案格式为found_answer(answer="你的答案内容")
- **如果信息不足或无法找到答案**在思考中给出not_enough_info(reason="信息不足或无法找到答案的原因")
**重要规则**
- 你已经经过几轮查询尝试了信息搜集现在你需要总结信息选择回答问题或判断问题无法回答
- 必须严格使用检索到的信息回答问题不要编造信息
- 答案必须精简不要过多解释
- **只有在检索到明确具体的答案时才使用found_answer**
@ -167,7 +157,7 @@ def _log_conversation_messages(
# 如果有head_prompt先添加为第一条消息
if head_prompt:
msg_info = "========================================\n[消息 1] 角色: System 内容类型: 文本\n-----------------------------"
msg_info = "========================================\n[消息 1] 角色: System\n-----------------------------"
msg_info += f"\n{head_prompt}"
log_lines.append(msg_info)
start_idx = 2
@ -180,19 +170,6 @@ def _log_conversation_messages(
for idx, msg in enumerate(conversation_messages, start_idx):
role_name = msg.role.value if hasattr(msg.role, "value") else str(msg.role)
# # 处理内容 - 显示完整内容,不截断
# if isinstance(msg.content, str):
# full_content = msg.content
# content_type = "文本"
# elif isinstance(msg.content, list):
# text_parts = [item for item in msg.content if isinstance(item, str)]
# image_count = len([item for item in msg.content if isinstance(item, tuple)])
# full_content = "".join(text_parts) if text_parts else ""
# content_type = f"混合({len(text_parts)}段文本, {image_count}张图片)"
# else:
# full_content = str(msg.content)
# content_type = "未知"
# 构建单条消息的日志信息
# msg_info = f"\n========================================\n[消息 {idx}] 角色: {role_name} 内容类型: {content_type}\n-----------------------------"
msg_info = f"\n========================================\n[消息 {idx}] 角色: {role_name}\n-----------------------------"
@ -205,14 +182,12 @@ def _log_conversation_messages(
if msg.tool_calls:
msg_info += f"\n 工具调用: {len(msg.tool_calls)}"
for tool_call in msg.tool_calls:
msg_info += f"\n - {tool_call}"
msg_info += f"\n - {tool_call.func_name}: {json.dumps(tool_call.args, ensure_ascii=False)}"
if msg.tool_call_id:
msg_info += f"\n 工具调用ID: {msg.tool_call_id}"
# if msg.tool_call_id:
# msg_info += f"\n 工具调用ID: {msg.tool_call_id}"
log_lines.append(msg_info)
total_count = len(conversation_messages) + (1 if head_prompt else 0)
@ -257,6 +232,7 @@ async def _react_agent_solve_question(
conversation_messages: List[Message] = []
first_head_prompt: Optional[str] = None # 保存第一次使用的head_prompt用于日志显示
# 正常迭代max_iterations 次(最终评估单独处理,不算在迭代中)
for iteration in range(max_iterations):
# 检查超时
if time.time() - start_time > timeout:
@ -276,7 +252,6 @@ async def _react_agent_solve_question(
# 计算剩余迭代次数
current_iteration = iteration + 1
remaining_iterations = max_iterations - current_iteration
is_final_iteration = current_iteration >= max_iterations
# 提取函数调用中参数的值,支持单引号和双引号
def extract_quoted_content(text, func_name, param_name):
@ -336,114 +311,10 @@ async def _react_agent_solve_question(
return None
# 如果是最后一次迭代使用final_prompt进行总结
if is_final_iteration:
evaluation_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_final_prompt",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "暂无信息",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
if global_config.debug.show_memory_prompt:
logger.info(f"ReAct Agent 最终评估Prompt: {evaluation_prompt}")
eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools(
evaluation_prompt,
model_config=model_config.model_task_config.tool_use,
tool_options=[], # 最终评估阶段不提供工具
request_type="memory.react.final",
)
if not eval_success:
logger.error(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段 LLM调用失败: {eval_response}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status="未找到答案最终评估阶段LLM调用失败",
)
return False, "最终评估阶段LLM调用失败", thinking_steps, False
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估响应: {eval_response}"
)
# 从最终评估响应中提取found_answer或not_enough_info
found_answer_content = None
not_enough_info_reason = None
if eval_response:
found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer")
if not found_answer_content:
not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason")
# 如果找到答案,返回
if found_answer_content:
eval_step = {
"iteration": iteration + 1,
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}],
"observations": ["最终评估阶段检测到found_answer"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段找到关于问题{question}的答案: {found_answer_content}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"找到答案:{found_answer_content}",
)
return True, found_answer_content, thinking_steps, False
# 如果评估为not_enough_info返回空字符串不返回任何信息
if not_enough_info_reason:
eval_step = {
"iteration": iteration + 1,
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}],
"observations": ["最终评估阶段检测到not_enough_info"]
}
thinking_steps.append(eval_step)
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代 最终评估阶段判断信息不足: {not_enough_info_reason}"
)
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"未找到答案:{not_enough_info_reason}",
)
return False, "", thinking_steps, False
# 如果没有明确判断视为not_enough_info返回空字符串不返回任何信息
eval_step = {
"iteration": iteration + 1,
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "not_enough_info", "action_params": {"reason": "已到达最后一次迭代,无法找到答案"}}],
"observations": ["已到达最后一次迭代,无法找到答案"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 已到达最后一次迭代,无法找到答案")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status="未找到答案:已到达最后一次迭代,无法找到答案",
)
return False, "", thinking_steps, False
# 前n-1次迭代使用head_prompt决定调用哪些工具包含found_answer工具
# 正常迭代使用head_prompt决定调用哪些工具包含finish_search工具
tool_definitions = tool_registry.get_tool_definitions()
logger.info(
f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具数量: {len(tool_definitions)}"
)
# tool_names = [tool_def["name"] for tool_def in tool_definitions]
# logger.debug(f"ReAct Agent 第 {iteration + 1} 次迭代,问题: {question}|可用工具: {', '.join(tool_names)} (共{len(tool_definitions)}个)")
# head_prompt应该只构建一次使用初始的collected_info后续迭代都复用同一个
if first_head_prompt is None:
@ -493,15 +364,15 @@ async def _react_agent_solve_question(
request_type="memory.react",
)
logger.debug(
f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}"
)
# logger.info(
# f"ReAct Agent 第 {iteration + 1} 次迭代 模型: {model_name} ,调用工具数量: {len(tool_calls) if tool_calls else 0} ,调用工具响应: {response}"
# )
if not success:
logger.error(f"ReAct Agent LLM调用失败: {response}")
break
# 注意这里会检查found_answer工具调用如果检测到found_answer工具会直接返回答案
# 注意这里会检查finish_search工具调用如果检测到finish_search工具会根据found_answer参数决定返回答案或退出查询
assistant_message: Optional[Message] = None
if tool_calls:
@ -531,8 +402,102 @@ async def _react_agent_solve_question(
# 处理工具调用
if not tool_calls:
# 如果没有工具调用,记录思考过程,继续下一轮迭代(下一轮会再次评估)
# 如果没有工具调用,检查响应文本中是否包含finish_search函数调用格式
if response and response.strip():
# 尝试从文本中解析finish_search函数调用
def parse_finish_search_from_text(text: str):
"""从文本中解析finish_search函数调用返回(found_answer, answer)元组,如果未找到则返回(None, None)"""
if not text:
return None, None
# 查找finish_search函数调用位置不区分大小写
func_pattern = "finish_search"
text_lower = text.lower()
func_pos = text_lower.find(func_pattern)
if func_pos == -1:
return None, None
# 查找函数调用的开始和结束位置
# 从func_pos开始向后查找左括号
start_pos = text.find("(", func_pos)
if start_pos == -1:
return None, None
# 查找匹配的右括号(考虑嵌套)
paren_count = 0
end_pos = start_pos
for i in range(start_pos, len(text)):
if text[i] == "(":
paren_count += 1
elif text[i] == ")":
paren_count -= 1
if paren_count == 0:
end_pos = i
break
else:
# 没有找到匹配的右括号
return None, None
# 提取函数参数部分
params_text = text[start_pos + 1 : end_pos]
# 解析found_answer参数布尔值可能是true/false/True/False
found_answer = None
found_answer_patterns = [
r"found_answer\s*=\s*true",
r"found_answer\s*=\s*True",
r"found_answer\s*=\s*false",
r"found_answer\s*=\s*False",
]
for pattern in found_answer_patterns:
match = re.search(pattern, params_text, re.IGNORECASE)
if match:
found_answer = "true" in match.group(0).lower()
break
# 解析answer参数字符串使用extract_quoted_content
answer = extract_quoted_content(text, "finish_search", "answer")
return found_answer, answer
parsed_found_answer, parsed_answer = parse_finish_search_from_text(response)
if parsed_found_answer is not None:
# 检测到finish_search函数调用格式
if parsed_found_answer:
# 找到了答案
if parsed_answer:
step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": True, "answer": parsed_answer}})
step["observations"] = ["检测到finish_search文本格式调用找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search文本格式找到关于问题{question}的答案: {parsed_answer}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"找到答案:{parsed_answer}",
)
return True, parsed_answer, thinking_steps, False
else:
# found_answer为True但没有提供answer视为错误继续迭代
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 finish_search文本格式found_answer为True但未提供answer")
else:
# 未找到答案,直接退出查询
step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": False}})
step["observations"] = ["检测到finish_search文本格式调用未找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search文本格式判断未找到答案")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status="未找到答案通过finish_search文本格式判断未找到答案",
)
return False, "", thinking_steps, False
# 如果没有检测到finish_search格式记录思考过程继续下一轮迭代
step["observations"] = [f"思考完成,但未调用工具。响应: {response}"]
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 思考完成但未调用工具: {response}")
collected_info += f"思考: {response}"
@ -543,29 +508,51 @@ async def _react_agent_solve_question(
continue
# 处理工具调用
# 首先检查是否有found_answer工具调用如果有则立即返回不再处理其他工具
found_answer_from_tool = None
# 首先检查是否有finish_search工具调用如果有则立即返回不再处理其他工具
finish_search_found = None
finish_search_answer = None
for tool_call in tool_calls:
tool_name = tool_call.func_name
tool_args = tool_call.args or {}
if tool_name == "found_answer":
found_answer_from_tool = tool_args.get("answer", "")
if found_answer_from_tool:
step["actions"].append({"action_type": "found_answer", "action_params": {"answer": found_answer_from_tool}})
step["observations"] = ["检测到found_answer工具调用"]
if tool_name == "finish_search":
finish_search_found = tool_args.get("found_answer", False)
finish_search_answer = tool_args.get("answer", "")
if finish_search_found:
# 找到了答案
if finish_search_answer:
step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": True, "answer": finish_search_answer}})
step["observations"] = ["检测到finish_search工具调用找到答案"]
thinking_steps.append(step)
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search工具找到关于问题{question}的答案: {finish_search_answer}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"找到答案:{finish_search_answer}",
)
return True, finish_search_answer, thinking_steps, False
else:
# found_answer为True但没有提供answer视为错误
logger.warning(f"ReAct Agent 第 {iteration + 1} 次迭代 finish_search工具found_answer为True但未提供answer")
else:
# 未找到答案,直接退出查询
step["actions"].append({"action_type": "finish_search", "action_params": {"found_answer": False}})
step["observations"] = ["检测到finish_search工具调用未找到答案"]
thinking_steps.append(step)
logger.debug(f"ReAct Agent 第 {iteration + 1} 次迭代 通过found_answer工具找到关于问题{question}的答案: {found_answer_from_tool}")
logger.info(f"ReAct Agent 第 {iteration + 1} 次迭代 通过finish_search工具判断未找到答案")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"找到答案:{found_answer_from_tool}",
final_status="未找到答案通过finish_search工具判断未找到答案",
)
return True, found_answer_from_tool, thinking_steps, False
return False, "", thinking_steps, False
# 如果没有found_answer工具调用或者found_answer工具调用没有答案继续处理其他工具
# 如果没有finish_search工具调用,继续处理其他工具
tool_tasks = []
for i, tool_call in enumerate(tool_calls):
tool_name = tool_call.func_name
@ -575,8 +562,8 @@ async def _react_agent_solve_question(
f"ReAct Agent 第 {iteration + 1} 次迭代 工具调用 {i + 1}/{len(tool_calls)}: {tool_name}({tool_args})"
)
# 跳过found_answer工具调用(已经在上面处理过了)
if tool_name == "found_answer":
# 跳过finish_search工具调用(已经在上面处理过了)
if tool_name == "finish_search":
continue
# 普通工具调用
@ -649,24 +636,186 @@ async def _react_agent_solve_question(
thinking_steps.append(step)
# 达到最大迭代次数或超时但Agent没有明确返回found_answer
# 迭代超时应该直接视为not_enough_info而不是使用已有信息
# 只有Agent明确返回found_answer时才认为找到了答案
if collected_info:
logger.warning(
f"ReAct Agent达到最大迭代次数或超时但未明确返回found_answer。已收集信息: {collected_info[:100]}..."
)
# 正常迭代结束后,如果达到最大迭代次数或超时,执行最终评估
# 最终评估单独处理,不算在迭代中
should_do_final_evaluation = False
if is_timeout:
logger.warning("ReAct Agent超时直接视为not_enough_info")
else:
logger.warning("ReAct Agent达到最大迭代次数直接视为not_enough_info")
should_do_final_evaluation = True
logger.warning(f"ReAct Agent超时已迭代{iteration + 1}次,进入最终评估")
elif iteration + 1 >= max_iterations:
should_do_final_evaluation = True
logger.info(f"ReAct Agent达到最大迭代次数已迭代{iteration + 1}次),进入最终评估")
# React完成时输出消息列表
timeout_reason = "超时" if is_timeout else "达到最大迭代次数"
if should_do_final_evaluation:
# 获取必要变量用于最终评估
tool_registry = get_tool_registry()
bot_name = global_config.bot.nickname
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
current_iteration = iteration + 1
remaining_iterations = 0
# 提取函数调用中参数的值,支持单引号和双引号
def extract_quoted_content(text, func_name, param_name):
"""从文本中提取函数调用中参数的值,支持单引号和双引号
Args:
text: 要搜索的文本
func_name: 函数名 'found_answer'
param_name: 参数名 'answer'
Returns:
提取的参数值如果未找到则返回None
"""
if not text:
return None
# 查找函数调用位置(不区分大小写)
func_pattern = func_name.lower()
text_lower = text.lower()
func_pos = text_lower.find(func_pattern)
if func_pos == -1:
return None
# 查找参数名和等号
param_pattern = f"{param_name}="
param_pos = text_lower.find(param_pattern, func_pos)
if param_pos == -1:
return None
# 跳过参数名、等号和空白
start_pos = param_pos + len(param_pattern)
while start_pos < len(text) and text[start_pos] in " \t\n":
start_pos += 1
if start_pos >= len(text):
return None
# 确定引号类型
quote_char = text[start_pos]
if quote_char not in ['"', "'"]:
return None
# 查找匹配的结束引号(考虑转义)
end_pos = start_pos + 1
while end_pos < len(text):
if text[end_pos] == quote_char:
# 检查是否是转义的引号
if end_pos > start_pos + 1 and text[end_pos - 1] == "\\":
end_pos += 1
continue
# 找到匹配的引号
content = text[start_pos + 1 : end_pos]
# 处理转义字符
content = content.replace('\\"', '"').replace("\\'", "'").replace("\\\\", "\\")
return content
end_pos += 1
return None
# 执行最终评估
evaluation_prompt = await global_prompt_manager.format_prompt(
"memory_retrieval_react_final_prompt",
bot_name=bot_name,
time_now=time_now,
question=question,
collected_info=collected_info if collected_info else "暂无信息",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
eval_success, eval_response, eval_reasoning_content, eval_model_name, eval_tool_calls = await llm_api.generate_with_model_with_tools(
evaluation_prompt,
model_config=model_config.model_task_config.tool_use,
tool_options=[], # 最终评估阶段不提供工具
request_type="memory.react.final",
)
if not eval_success:
logger.error(f"ReAct Agent 最终评估阶段 LLM调用失败: {eval_response}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status="未找到答案最终评估阶段LLM调用失败",
)
return False, "最终评估阶段LLM调用失败", thinking_steps, is_timeout
if global_config.debug.show_memory_prompt:
logger.info(f"ReAct Agent 最终评估Prompt: {evaluation_prompt}")
logger.info(f"ReAct Agent 最终评估响应: {eval_response}")
# 从最终评估响应中提取found_answer或not_enough_info
found_answer_content = None
not_enough_info_reason = None
if eval_response:
found_answer_content = extract_quoted_content(eval_response, "found_answer", "answer")
if not found_answer_content:
not_enough_info_reason = extract_quoted_content(eval_response, "not_enough_info", "reason")
# 如果找到答案,返回(找到答案时,无论是否超时,都视为成功完成)
if found_answer_content:
eval_step = {
"iteration": current_iteration,
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "found_answer", "action_params": {"answer": found_answer_content}}],
"observations": ["最终评估阶段检测到found_answer"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 最终评估阶段找到关于问题{question}的答案: {found_answer_content}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"找到答案:{found_answer_content}",
)
return True, found_answer_content, thinking_steps, False
# 如果评估为not_enough_info返回空字符串不返回任何信息
if not_enough_info_reason:
eval_step = {
"iteration": current_iteration,
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "not_enough_info", "action_params": {"reason": not_enough_info_reason}}],
"observations": ["最终评估阶段检测到not_enough_info"]
}
thinking_steps.append(eval_step)
logger.info(f"ReAct Agent 最终评估阶段判断信息不足: {not_enough_info_reason}")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"未找到答案:{not_enough_info_reason}",
)
return False, "", thinking_steps, is_timeout
# 如果没有明确判断视为not_enough_info返回空字符串不返回任何信息
eval_step = {
"iteration": current_iteration,
"thought": f"[最终评估] {eval_response}",
"actions": [{"action_type": "not_enough_info", "action_params": {"reason": "已到达最大迭代次数,无法找到答案"}}],
"observations": ["已到达最大迭代次数,无法找到答案"]
}
thinking_steps.append(eval_step)
logger.info("ReAct Agent 已到达最大迭代次数,无法找到答案")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status="未找到答案:已到达最大迭代次数,无法找到答案",
)
return False, "", thinking_steps, is_timeout
# 如果正常迭代过程中提前找到答案返回,不会到达这里
# 如果正常迭代结束但没有触发最终评估(理论上不应该发生),直接返回
logger.warning("ReAct Agent正常迭代结束但未触发最终评估")
_log_conversation_messages(
conversation_messages,
head_prompt=first_head_prompt,
final_status=f"未找到答案:{timeout_reason}",
final_status="未找到答案:正常迭代结束",
)
return False, "", thinking_steps, is_timeout
@ -851,7 +1000,7 @@ async def _process_single_question(
question=question,
chat_id=chat_id,
max_iterations=global_config.memory.max_agent_iterations,
timeout=120.0,
timeout=global_config.memory.agent_timeout_seconds,
initial_info=question_initial_info,
initial_jargon_concepts=jargon_concepts_for_agent,
)
@ -967,9 +1116,10 @@ async def build_memory_retrieval_prompt(
logger.info(f"无当次查询,不返回任何结果,耗时: {(end_time - start_time):.3f}")
return ""
# 第二步:并行处理所有问题(使用配置的最大迭代次数/120秒超时
# 第二步:并行处理所有问题(使用配置的最大迭代次数和超时时间
max_iterations = global_config.memory.max_agent_iterations
logger.debug(f"问题数量: {len(questions)},设置最大迭代次数: {max_iterations},超时时间: 120秒")
timeout_seconds = global_config.memory.agent_timeout_seconds
logger.debug(f"问题数量: {len(questions)},设置最大迭代次数: {max_iterations},超时时间: {timeout_seconds}")
# 并行处理所有问题,将概念检索结果作为初始信息传递
question_tasks = [

View File

@ -14,7 +14,7 @@ from .tool_registry import (
from .query_chat_history import register_tool as register_query_chat_history
from .query_lpmm_knowledge import register_tool as register_lpmm_knowledge
from .query_person_info import register_tool as register_query_person_info
from .found_answer import register_tool as register_found_answer
from .found_answer import register_tool as register_finish_search
from src.config.config import global_config
@ -22,7 +22,7 @@ def init_all_tools():
"""初始化并注册所有记忆检索工具"""
register_query_chat_history()
register_query_person_info()
register_found_answer() # 注册found_answer工具
register_finish_search() # 注册finish_search工具
if global_config.lpmm_knowledge.lpmm_mode == "agent":
register_lpmm_knowledge()

View File

@ -1,5 +1,5 @@
"""
found_answer工具 - 用于在记忆检索过程中标记找到答案
finish_search工具 - 用于在记忆检索过程中结束查询
"""
from src.common.logger import get_logger
@ -8,33 +8,43 @@ from .tool_registry import register_memory_retrieval_tool
logger = get_logger("memory_retrieval_tools")
async def found_answer(answer: str) -> str:
"""标记已找到问题的答案
async def finish_search(found_answer: bool, answer: str = "") -> str:
"""结束查询
Args:
answer: 找到的答案内容
found_answer: 是否找到了答案
answer: 如果找到了答案提供答案内容如果未找到可以为空
Returns:
str: 确认信息
"""
# 这个工具主要用于标记,实际答案会通过返回值传递
logger.info(f"找到答案: {answer}")
return f"已确认找到答案: {answer}"
if found_answer:
logger.info(f"找到答案: {answer}")
return f"已确认找到答案: {answer}"
else:
logger.info("未找到答案,结束查询")
return "未找到答案,查询结束"
def register_tool():
"""注册found_answer工具"""
"""注册finish_search工具"""
register_memory_retrieval_tool(
name="found_answer",
description="当你在已收集的信息中找到了问题的明确答案时,调用此工具标记已找到答案。只有在检索到明确、具体的答案时才使用此工具,不要编造信息。",
name="finish_search",
description="当你决定结束查询时调用此工具。如果找到了明确答案设置found_answer为true并在answer中提供答案如果未找到答案设置found_answer为false。只有在检索到明确、具体的答案时才设置found_answer为true,不要编造信息。",
parameters=[
{
"name": "found_answer",
"type": "boolean",
"description": "是否找到了答案",
"required": True,
},
{
"name": "answer",
"type": "string",
"description": "找到的答案内容,必须基于已收集的信息,不要编造",
"required": True,
"description": "如果found_answer为true提供找到的答案内容,必须基于已收集的信息,不要编造如果found_answer为false可以为空",
"required": False,
},
],
execute_func=found_answer,
execute_func=finish_search,
)

View File

@ -109,7 +109,8 @@ talk_value_rules = [
include_planner_reasoning = false # 是否将planner推理加入replyer默认关闭不加入
[memory]
max_agent_iterations = 3 # 记忆思考深度最低为1不深入思考
max_agent_iterations = 2 # 记忆思考深度最低为1
agent_timeout_seconds = 45.0 # 最长回忆时间(秒)
enable_jargon_detection = true # 记忆检索过程中是否启用黑话识别
global_memory = false # 是否允许记忆检索进行全局查询