feat:重构pf brain chat使用react

pull/1414/head
SengokuCola 2025-12-05 01:51:19 +08:00
parent 95a1712c90
commit cef094e125
3 changed files with 199 additions and 505 deletions

View File

@ -25,7 +25,6 @@ from src.chat.utils.chat_message_builder import (
build_readable_messages_with_id,
get_raw_msg_before_timestamp_with_chat,
)
from src.chat.brain_chat.brain_reply_checker import BrainReplyChecker, BrainLLMReplyChecker
if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages
@ -88,11 +87,6 @@ class BrainChatting:
self.running: bool = False
self._loop_task: Optional[asyncio.Task] = None # 主循环任务
# 轻量级回复检查器(比 PFC 更宽松)
self.reply_checker = BrainReplyChecker(chat_id=self.stream_id)
# 使用 planner 模型的一次性 LLM 检查器
self.llm_reply_checker = BrainLLMReplyChecker(chat_id=self.stream_id, max_retries=1)
# 添加循环信息管理相关的属性
self.history_loop: List[CycleDetail] = []
self._cycle_counter = 0
@ -105,9 +99,6 @@ class BrainChatting:
# 最近一次是否成功进行了 reply用于选择 BrainPlanner 的 Prompt
self._last_successful_reply: bool = False
# 类似 PFC 的 block_and_ignore在该时间点之前不主动参与该聊天
self._ignore_until_timestamp: Optional[float] = None
async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
@ -169,14 +160,7 @@ class BrainChatting:
)
async def _loopbody(self): # sourcery skip: hoist-if-from-if
# 如果当前处于 block_and_ignore 冷却期,直接跳过本轮思考
if self._ignore_until_timestamp and time.time() < self._ignore_until_timestamp:
await asyncio.sleep(0.5)
return True
elif self._ignore_until_timestamp and time.time() >= self._ignore_until_timestamp:
logger.info(f"{self.log_prefix} block_and_ignore 冷却结束,恢复该聊天的正常思考")
self._ignore_until_timestamp = None
# 获取最新消息(用于上下文,但不影响是否调用 observe
recent_messages_list = message_api.get_messages_by_time_in_chat(
chat_id=self.stream_id,
start_time=self.last_read_time,
@ -188,14 +172,22 @@ class BrainChatting:
filter_intercept_message_level=1,
)
# 如果有新消息,更新 last_read_time
if len(recent_messages_list) >= 1:
self.last_read_time = time.time()
await self._observe(recent_messages_list=recent_messages_list)
else:
# Normal模式消息数量不足等待
await asyncio.sleep(0.2)
# 总是执行一次思考迭代(不管有没有新消息)
# wait 动作会在其内部等待,不需要在这里处理
should_continue = await self._observe(recent_messages_list=recent_messages_list)
if not should_continue:
# 选择了 complete_talk停止循环
return True
# 继续下一次迭代(除非选择了 complete_talk
# 短暂等待后再继续,避免过于频繁的循环
await asyncio.sleep(0.1)
return True
async def _send_and_store_reply(
@ -292,9 +284,11 @@ class BrainChatting:
except Exception as e:
logger.error(f"{self.log_prefix} 动作修改失败: {e}")
# 执行planner
# 获取必要信息
is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info()
# 一次思考迭代Think - Act - Observe
# 获取聊天上下文
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id,
timestamp=time.time(),
@ -316,9 +310,7 @@ class BrainChatting:
chat_content_block=chat_content_block,
message_id_list=message_id_list,
interest=global_config.personality.interest,
prompt_key=(
"brain_planner_prompt_follow_up" if self._last_successful_reply else "brain_planner_prompt_initial"
),
prompt_key="brain_planner_prompt_react",
log_prompt=True,
)
continue_flag, modified_message = await events_manager.handle_mai_events(
@ -333,10 +325,14 @@ class BrainChatting:
action_to_use_info = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
last_successful_reply=self._last_successful_reply,
)
# 3. 并行执行所有动作
# 检查是否有 complete_talk 动作(会停止后续迭代)
has_complete_talk = any(
action.action_type == "complete_talk" for action in action_to_use_info
)
# 并行执行所有动作
action_tasks = [
asyncio.create_task(
self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers)
@ -368,7 +364,14 @@ class BrainChatting:
else:
logger.warning(f"{self.log_prefix} 回复动作执行失败")
# 构建最终的循环信息
# 更新观察时间标记
self.action_planner.last_obs_time_mark = time.time()
# 如果选择了 complete_talk标记为完成不再继续迭代
if has_complete_talk:
logger.info(f"{self.log_prefix} 检测到 complete_talk 动作,本次思考完成")
# 构建循环信息
if reply_loop_info:
# 如果有回复信息使用回复的loop_info作为基础
loop_info = reply_loop_info
@ -394,10 +397,16 @@ class BrainChatting:
}
_reply_text = action_reply_text
# 如果选择了 complete_talk返回 False 以停止 _loopbody 的循环
# 否则返回 True让 _loopbody 继续下一次迭代
should_continue = not has_complete_talk
self.end_cycle(loop_info, cycle_timers)
self.print_cycle_info(cycle_timers)
return True
# 如果选择了 complete_talk返回 False 停止循环
# 否则返回 True继续下一次思考迭代
return should_continue
async def _main_chat_loop(self):
"""主循环,持续进行计划并可能回复消息,直到被外部取消。"""
@ -531,12 +540,12 @@ class BrainChatting:
"""执行单个动作的通用函数"""
try:
with Timer(f"动作{action_planner_info.action_type}", cycle_timers):
if action_planner_info.action_type == "no_reply":
# 直接处理no_reply逻辑,不再通过动作系统
reason = action_planner_info.reasoning or "选择不回复"
# logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}")
if action_planner_info.action_type == "complete_talk":
# 直接处理complete_talk逻辑,不再通过动作系统
reason = action_planner_info.reasoning or "选择完成对话"
logger.info(f"{self.log_prefix} 选择完成对话,原因: {reason}")
# 存储no_reply信息到数据库
# 存储complete_talk信息到数据库
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
@ -544,134 +553,142 @@ class BrainChatting:
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason},
action_name="no_reply",
action_name="complete_talk",
)
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
return {"action_type": "complete_talk", "success": True, "reply_text": "", "command": ""}
elif action_planner_info.action_type == "reply":
# 使用规则 + 一次 LLM ReplyChecker 包一层重试逻辑
retry_count = 0
while True:
try:
success, llm_response = await generator_api.generate_reply(
chat_stream=self.chat_stream,
reply_message=action_planner_info.action_message,
available_actions=available_actions,
chosen_actions=chosen_action_plan_infos,
reply_reason=action_planner_info.reasoning or "",
enable_tool=global_config.tool.enable_tool,
request_type="replyer",
from_plugin=False,
)
if not success or not llm_response or not llm_response.reply_set:
if action_planner_info.action_message:
logger.info(
f"{action_planner_info.action_message.processed_plain_text} 的回复生成失败"
)
else:
logger.info("回复生成失败")
return {
"action_type": "reply",
"success": False,
"reply_text": "",
"loop_info": None,
}
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
response_set = llm_response.reply_set
# 预先拼接一次纯文本,供检查使用(与发送逻辑解耦)
preview_text = ""
for reply_content in response_set.reply_data:
if reply_content.content_type != ReplyContentType.TEXT:
continue
data: str = reply_content.content # type: ignore
preview_text += data
# 规则检查(不调用 LLM
rule_suitable, rule_reason, rule_need_retry = self.reply_checker.check(
reply_text=preview_text, retry_count=retry_count
try:
success, llm_response = await generator_api.generate_reply(
chat_stream=self.chat_stream,
reply_message=action_planner_info.action_message,
available_actions=available_actions,
chosen_actions=chosen_action_plan_infos,
reply_reason=action_planner_info.reasoning or "",
enable_tool=global_config.tool.enable_tool,
request_type="replyer",
from_plugin=False,
)
# LLM 检查(使用 planner 模型,一次机会)
llm_suitable, llm_reason, llm_need_retry = await self.llm_reply_checker.check(
reply_text=preview_text, retry_count=retry_count
)
if not success or not llm_response or not llm_response.reply_set:
if action_planner_info.action_message:
logger.info(
f"{action_planner_info.action_message.processed_plain_text} 的回复生成失败"
)
else:
logger.info("回复生成失败")
return {
"action_type": "reply",
"success": False,
"reply_text": "",
"loop_info": None,
}
# 是否需要重生成:只要有一方建议重试,且还在重试次数之内
if (rule_need_retry or llm_need_retry) and retry_count < max(
self.reply_checker.max_retries, self.llm_reply_checker.max_retries
):
retry_count += 1
logger.info(
f"{self.log_prefix} ReplyChecker 建议重试(第 {retry_count} 次),"
f"rule: {rule_reason}; llm: {llm_reason}"
)
continue
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
# 到这里为止,不再重试:即使有一方认为“不太理想”,也只记录原因并放行
if not rule_suitable or not llm_suitable:
logger.info(
f"{self.log_prefix} ReplyChecker 判断回复可能不太理想,"
f"rule: {rule_reason}; llm: {llm_reason},本次仍将发送。"
)
selected_expressions = llm_response.selected_expressions
loop_info, reply_text, _ = await self._send_and_store_reply(
response_set=response_set,
action_message=action_planner_info.action_message, # type: ignore
cycle_timers=cycle_timers,
thinking_id=thinking_id,
actions=chosen_action_plan_infos,
selected_expressions=selected_expressions,
)
# 标记这次循环已经成功进行了回复,下一轮 Planner 使用 follow_up Prompt
self._last_successful_reply = True
return {
"action_type": "reply",
"success": True,
"reply_text": reply_text,
"loop_info": loop_info,
}
response_set = llm_response.reply_set
selected_expressions = llm_response.selected_expressions
loop_info, reply_text, _ = await self._send_and_store_reply(
response_set=response_set,
action_message=action_planner_info.action_message, # type: ignore
cycle_timers=cycle_timers,
thinking_id=thinking_id,
actions=chosen_action_plan_infos,
selected_expressions=selected_expressions,
)
# 标记这次循环已经成功进行了回复
self._last_successful_reply = True
return {
"action_type": "reply",
"success": True,
"reply_text": reply_text,
"loop_info": loop_info,
}
# 其他动作
else:
# 内建 wait / listening / block_and_ignore:不通过插件系统,直接在这里处理
if action_planner_info.action_type in ["wait", "listening", "block_and_ignore"]:
# 内建 wait / listening不通过插件系统直接在这里处理
if action_planner_info.action_type in ["wait", "listening"]:
reason = action_planner_info.reasoning or ""
action_data = action_planner_info.action_data or {}
if action_planner_info.action_type == "block_and_ignore":
# 设置一段时间的忽略窗口,例如 10 分钟
ignore_minutes = 10
self._ignore_until_timestamp = time.time() + ignore_minutes * 60
logger.info(
f"{self.log_prefix} 收到 block_and_ignore 动作,将在接下来 {ignore_minutes} 分钟内不再主动参与该聊天"
if action_planner_info.action_type == "wait":
# 获取等待时间(必填)
wait_seconds = action_data.get("wait_seconds")
if wait_seconds is None:
logger.warning(f"{self.log_prefix} wait 动作缺少 wait_seconds 参数,使用默认值 5 秒")
wait_seconds = 5
else:
try:
wait_seconds = float(wait_seconds)
if wait_seconds < 0:
logger.warning(f"{self.log_prefix} wait_seconds 不能为负数,使用默认值 5 秒")
wait_seconds = 5
except (ValueError, TypeError):
logger.warning(f"{self.log_prefix} wait_seconds 参数格式错误,使用默认值 5 秒")
wait_seconds = 5
logger.info(f"{self.log_prefix} 执行 wait 动作,等待 {wait_seconds}")
# 记录动作信息
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason or f"等待 {wait_seconds}",
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason, "wait_seconds": wait_seconds},
action_name="wait",
)
# 等待指定时间
await asyncio.sleep(wait_seconds)
logger.info(f"{self.log_prefix} wait 动作完成,继续下一次思考")
# 这些动作本身不产生文本回复
self._last_successful_reply = False
return {
"action_type": "wait",
"success": True,
"reply_text": "",
"command": "",
}
# 统一将这三种策略动作记录到数据库,便于后续分析
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason or f"执行动作: {action_planner_info.action_type}",
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason},
action_name=action_planner_info.action_type,
)
# 这些动作本身不产生文本回复
self._last_successful_reply = False
return {
"action_type": action_planner_info.action_type,
"success": True,
"reply_text": "",
"command": "",
}
# listening 已合并到 wait如果遇到则转换为 wait向后兼容
elif action_planner_info.action_type == "listening":
logger.debug(f"{self.log_prefix} 检测到 listening 动作,已合并到 wait自动转换")
# 使用默认等待时间
wait_seconds = 3
logger.info(f"{self.log_prefix} 执行 listening转换为 wait动作等待 {wait_seconds}")
# 记录动作信息
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason or f"倾听并等待 {wait_seconds}",
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason, "wait_seconds": wait_seconds},
action_name="listening",
)
# 等待指定时间
await asyncio.sleep(wait_seconds)
logger.info(f"{self.log_prefix} listening 动作完成,继续下一次思考")
# 这些动作本身不产生文本回复
self._last_successful_reply = False
return {
"action_type": "listening",
"success": True,
"reply_text": "",
"command": "",
}
# 其余动作:走原有插件 Action 体系
with Timer("动作执行", cycle_timers):

View File

@ -35,13 +35,14 @@ install(extra_lines=3)
def init_prompt():
# 初次 / 非连续回复时使用的 Planner Prompt
# ReAct 形式的 Planner Prompt
Prompt(
"""
{time_block}
{name_block}
你的兴趣是{interest}
{chat_context_description}以下是具体的聊天内容
**聊天内容**
{chat_content_block}
@ -58,40 +59,20 @@ reply
"reason":"回复的原因"
}}
no_reply
动作描述
等待保持沉默等待对方发言
{{
"action": "no_reply",
}}
wait
动作描述
在当前轮次暂时不再发言等待对方进一步发言或后续更合适的时机再回复这通常用于你已经表达清楚一轮想给对方留出空间
暂时不再发言等待指定时间后再继续下一次思考适用于以下情况
- 你已经表达清楚一轮想给对方留出空间
- 你感觉对方的话还没说完或者刚刚发了好几条连续消息
- 你想保持安静专注""而不是马上回复
请你根据上下文来判断要等待多久请你灵活判断
{{
"action": "wait",
"target_message_id":"想要作为这次等待依据的消息id通常是对方的最新消息",
"wait_seconds": 等待的秒数必填例如5 表示等待5秒,
"reason":"选择等待的原因"
}}
listening
动作描述
倾听对方继续说话你感觉对方的话还没说完或者刚刚发了好几条连续消息这时你可以选择保持安静专注而不是马上回复
{{
"action": "listening",
"target_message_id":"你正在倾听的那条消息id通常是对方的最新消息",
"reason":"选择倾听的原因"
}}
block_and_ignore
动作描述
当你觉得当前对话让你非常不适存在明显骚扰或恶意时可以选择在一段时间内不再主动回应该对话对方再发消息你也先不理
{{
"action": "block_and_ignore",
"target_message_id":"触发你做出这一决定的消息id",
"reason":"为什么你认为需要暂时屏蔽这段对话"
}}
{action_options_text}
请选择合适的action并说明触发action的消息id和选择该action的原因消息id格式:m+数字
@ -120,97 +101,7 @@ block_and_ignore
```
""",
"brain_planner_prompt_initial",
)
# 刚刚已经回复过,对“要不要继续说 / 追问”更敏感的 Planner Prompt
Prompt(
"""
{time_block}
{name_block}
你的兴趣是{interest}
{chat_context_description}以下是具体的聊天内容
**聊天内容**
{chat_content_block}
**动作记录**
{actions_before_now_block}
**可用的action**
reply
动作描述
在你刚刚已经进行过一次或多次回复的前提下你可以选择
- 继续顺着正在进行的聊天内容进行补充或追问
- 也可以选择暂时不再回复给对方留出回复空间
{{
"action": "reply",
"target_message_id":"想要回复的消息id",
"reason":"继续回复的原因(或者解释为什么当前仍然适合连续发言)"
}}
no_reply
动作描述
保持沉默等待对方发言特别是在你已经连续发言或对方长时间未回复的情况下可以更多考虑这一选项
{{
"action": "no_reply",
}}
wait
动作描述
你刚刚已经发过一轮现在选择暂时不再继续追问或补充给对方更多时间和空间来回应
{{
"action": "wait",
"target_message_id":"想要作为这次等待依据的消息id通常是你刚刚回复的那条或对方的最新消息",
"reason":"为什么此时更适合等待而不是继续连续发言"
}}
listening
动作描述
你感觉对方还有话要说或者刚刚连续发送了多条消息这时你可以选择继续而不是马上再插话
{{
"action": "listening",
"target_message_id":"你正在倾听的那条消息id通常是对方的最新消息",
"reason":"你为什么认为对方还需要继续表达"
}}
block_and_ignore
动作描述
如果你在连续若干轮对话后明确感到这是不友善的骚扰或让你极度不适的对话可以选择在一段时间内不再回应这条对话
{{
"action": "block_and_ignore",
"target_message_id":"触发你做出这一决定的消息id",
"reason":"为什么你认为需要暂时屏蔽这段对话"
}}
{action_options_text}
请选择合适的action并说明触发action的消息id和选择该action的原因消息id格式:m+数字
先输出你的选择思考理由再输出你选择的action理由是一段平文本不要分点精简
**动作选择要求**
请你根据聊天内容,用户的最新消息和以下标准选择合适的动作:
{plan_style}
{moderation_prompt}
请选择所有符合使用要求的action动作用json格式输出如果输出多个json每个json都要单独用```json包裹你可以重复使用同一个动作或不同动作:
**示例**
// 理由文本
```json
{{
"action":"动作名",
"target_message_id":"触发动作的消息id",
//对应参数
}}
```
```json
{{
"action":"动作名",
"target_message_id":"触发动作的消息id",
//对应参数
}}
```
""",
"brain_planner_prompt_follow_up",
"brain_planner_prompt_react",
)
Prompt(
@ -270,10 +161,10 @@ class BrainPlanner:
action_planner_infos = []
try:
action = action_json.get("action", "no_reply")
action = action_json.get("action", "complete_talk")
reasoning = action_json.get("reason", "未提供原因")
action_data = {key: value for key, value in action_json.items() if key not in ["action", "reason"]}
# 非no_reply动作需要target_message_id
# 非complete_talk动作需要target_message_id
target_message = None
if target_message_id := action_json.get("target_message_id"):
@ -290,16 +181,22 @@ class BrainPlanner:
# 验证action是否可用
available_action_names = [action_name for action_name, _ in current_available_actions]
# 内部保留动作(不依赖插件系统)
internal_action_names = ["no_reply", "reply", "wait_time", "wait", "listening", "block_and_ignore"]
# 注意listening 已合并到 wait 中,如果遇到 listening 则转换为 wait
internal_action_names = ["complete_talk", "reply", "wait_time", "wait", "listening"]
# 将 listening 转换为 wait向后兼容
if action == "listening":
logger.debug(f"{self.log_prefix}检测到 listening 动作,已合并到 wait自动转换")
action = "wait"
if action not in internal_action_names and action not in available_action_names:
logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_reply'"
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'complete_talk'"
)
reasoning = (
f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}"
)
action = "no_reply"
action = "complete_talk"
# 创建ActionPlannerInfo对象
# 将列表转换为字典格式
@ -320,7 +217,7 @@ class BrainPlanner:
available_actions_dict = dict(current_available_actions)
action_planner_infos.append(
ActionPlannerInfo(
action_type="no_reply",
action_type="complete_talk",
reasoning=f"解析单个action时出错: {e}",
action_data={},
action_message=None,
@ -334,11 +231,10 @@ class BrainPlanner:
self,
available_actions: Dict[str, ActionInfo],
loop_start_time: float = 0.0,
last_successful_reply: bool = False,
) -> List[ActionPlannerInfo]:
# sourcery skip: use-named-expression
"""
规划器 (Planner): 使用LLM根据上下文决定做出什么动作
规划器 (Planner): 使用LLM根据上下文决定做出什么动作ReAct模式
"""
# 获取聊天上下文
@ -377,10 +273,8 @@ class BrainPlanner:
logger.debug(f"{self.log_prefix}过滤后有{len(filtered_actions)}个可用动作")
# 构建包含所有动作的提示词:根据是否刚刚成功回复来选择不同的 Prompt
prompt_key = (
"brain_planner_prompt_follow_up" if last_successful_reply else "brain_planner_prompt_initial"
)
# 构建包含所有动作的提示词:使用统一的 ReAct Prompt
prompt_key = "brain_planner_prompt_react"
# 这里不记录日志,避免重复打印,由调用方按需控制 log_prompt
prompt, message_id_list = await self.build_planner_prompt(
is_group_chat=is_group_chat,
@ -412,7 +306,7 @@ class BrainPlanner:
message_id_list: List[Tuple[str, "DatabaseMessages"]],
chat_content_block: str = "",
interest: str = "",
prompt_key: str = "brain_planner_prompt_initial",
prompt_key: str = "brain_planner_prompt_react",
log_prompt: bool = False,
) -> tuple[str, List[Tuple[str, "DatabaseMessages"]]]:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
@ -590,7 +484,7 @@ class BrainPlanner:
logger.error(f"{self.log_prefix}LLM 请求执行失败: {req_e}")
return [
ActionPlannerInfo(
action_type="no_reply",
action_type="complete_talk",
reasoning=f"LLM 请求失败,模型出现问题: {req_e}",
action_data={},
action_message=None,
@ -609,16 +503,16 @@ class BrainPlanner:
else:
# 尝试解析为直接的JSON
logger.warning(f"{self.log_prefix}LLM没有返回可用动作: {llm_content}")
actions = self._create_no_reply("LLM没有返回可用动作", available_actions)
actions = self._create_complete_talk("LLM没有返回可用动作", available_actions)
except Exception as json_e:
logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'")
actions = self._create_no_reply(f"解析LLM响应JSON失败: {json_e}", available_actions)
actions = self._create_complete_talk(f"解析LLM响应JSON失败: {json_e}", available_actions)
traceback.print_exc()
else:
actions = self._create_no_reply("规划器没有获得LLM响应", available_actions)
actions = self._create_complete_talk("规划器没有获得LLM响应", available_actions)
# 添加循环开始时间到所有非no_reply动作
# 添加循环开始时间到所有动作
for action in actions:
action.action_data = action.action_data or {}
action.action_data["loop_start_time"] = loop_start_time
@ -629,11 +523,11 @@ class BrainPlanner:
return actions
def _create_no_reply(self, reasoning: str, available_actions: Dict[str, ActionInfo]) -> List[ActionPlannerInfo]:
"""创建no_reply"""
def _create_complete_talk(self, reasoning: str, available_actions: Dict[str, ActionInfo]) -> List[ActionPlannerInfo]:
"""创建complete_talk"""
return [
ActionPlannerInfo(
action_type="no_reply",
action_type="complete_talk",
reasoning=reasoning,
action_data={},
action_message=None,

View File

@ -1,217 +0,0 @@
from __future__ import annotations
import traceback
from typing import Tuple, Optional
import time
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.plugin_system.apis import message_api
from src.llm_models.utils_model import LLMRequest
logger = get_logger("bc_reply_checker")
class BrainReplyChecker:
"""
BrainChat 的轻量级回复检查器
设计目标
- BrainChat 主循环低耦合只依赖 chat_id message_api
- 更宽松只做少量简单检查尽量不阻塞发送
- LLM避免额外的模型调用开销
"""
def __init__(self, chat_id: str, max_retries: int = 1) -> None:
self.chat_id = chat_id
# 比 PFC 更宽松:默认只允许 1 次重试
self.max_retries = max_retries
def _get_last_bot_text(self) -> Optional[str]:
"""
获取当前会话中 Bot 最近一次发送的文本内容如果有
"""
try:
# end_time 必须是数字,这里使用当前时间戳
recent_messages = message_api.get_messages_by_time_in_chat(
chat_id=self.chat_id,
start_time=0,
end_time=time.time(),
limit=20,
limit_mode="latest",
filter_mai=False,
filter_command=False,
filter_intercept_message_level=1,
)
# 使用新配置中的 QQ 账号字段
bot_id = str(global_config.bot.qq_account)
for msg in reversed(recent_messages):
try:
if str(getattr(msg.user_info, "user_id", "")) == bot_id:
text = getattr(msg, "processed_plain_text", None)
if text:
return str(text)
except Exception:
# 单条消息解析失败不影响整体
continue
except Exception as e:
logger.warning(f"[{self.chat_id}] 获取最近 Bot 消息失败: {e}")
return None
def check(
self,
reply_text: str,
retry_count: int = 0,
) -> Tuple[bool, str, bool]:
"""
检查生成的回复是否合适宽松版本
返回:
(suitable, reason, need_retry)
"""
reply_text = reply_text or ""
reply_text = reply_text.strip()
if not reply_text:
return False, "回复内容为空", retry_count < self.max_retries
# 1. 与最近一条 Bot 消息做重复/高度相似检查
last_bot_text = self._get_last_bot_text()
if last_bot_text:
last_bot_text = last_bot_text.strip()
if reply_text == last_bot_text:
logger.info(f"[{self.chat_id}] ReplyChecker: 与上一条 Bot 消息完全相同,尝试重试生成。")
need_retry = retry_count < self.max_retries
return (
not need_retry, # 如果已经没有重试机会,就放行
"回复内容与上一条完全相同",
need_retry,
)
# 2. 粗略长度限制(过长时给一次重试机会,但整体仍偏宽松)
max_len = 300
if len(reply_text) > max_len:
logger.info(f"[{self.chat_id}] ReplyChecker: 回复长度为 {len(reply_text)},超过 {max_len} 字。")
need_retry = retry_count < self.max_retries
return (
not need_retry, # 超过长度但重试耗尽时也允许发送
f"回复内容偏长({len(reply_text)} 字)",
need_retry,
)
# 其他情况全部放行
return True, "通过检查", False
class BrainLLMReplyChecker:
"""
使用 planner 模型做一次轻量 LLM 逻辑检查
- 不参与主决策只作为这句话现在说合适吗的顾问
- 至多触发一次重生成机会
"""
def __init__(self, chat_id: str, max_retries: int = 1) -> None:
self.chat_id = chat_id
self.max_retries = max_retries
# 复用 planner 模型配置
self.llm = LLMRequest(model_set=model_config.model_task_config.planner, request_type="brain_reply_check")
def _build_chat_history_text(self, limit: int = 15) -> str:
"""构造一段简短的聊天文本上下文,供 LLM 参考。"""
try:
recent_messages = message_api.get_messages_by_time_in_chat(
chat_id=self.chat_id,
start_time=0,
end_time=time.time(), # end_time 也必须是数字
limit=limit,
limit_mode="latest",
filter_mai=False,
filter_command=False,
filter_intercept_message_level=1,
)
lines = []
for msg in recent_messages:
try:
user = getattr(msg.user_info, "user_nickname", None) or getattr(
msg.user_info, "user_id", "unknown"
)
text = getattr(msg, "processed_plain_text", "") or ""
if text:
lines.append(f"{user}: {text}")
except Exception:
continue
return "\n".join(lines) if lines else "(当前几乎没有聊天记录)"
except Exception as e:
logger.warning(f"[{self.chat_id}] 构造聊天上下文文本失败: {e}")
return "(构造聊天上下文时出错)"
async def check(self, reply_text: str, retry_count: int = 0) -> Tuple[bool, str, bool]:
"""
使用 planner 模型检查一次回复是否合适
返回:
(suitable, reason, need_retry)
"""
reply_text = (reply_text or "").strip()
if not reply_text:
return False, "回复内容为空", retry_count < self.max_retries
chat_history_text = self._build_chat_history_text()
prompt = f"""你是一个聊天逻辑检查器,使用 JSON 评估下面这条回复是否适合当前上下文。
最近的聊天记录按时间从旧到新
{chat_history_text}
候选回复
{reply_text}
请综合考虑
1. 是否和最近的聊天内容衔接自然
2. 是否明显重复啰嗦或完全没必要
3. 是否有可能被认为不礼貌或不合时宜
4. 是否在当前时机继续说话会打扰对方如果对方已经长时间没回可以宽松一点只要内容自然即可
请只用 JSON 格式回答不要输出多余文字例如
{{
"suitable": true,
"reason": "整体自然得体"
}}
其中
- suitable: 是否建议发送 (true/false)
- reason: 你的简短理由
"""
# 调试:展示用于 LLM 检查的 Prompt
logger.info(f"[{self.chat_id}] BrainLLMReplyChecker Prompt:\n{prompt}")
try:
content, _ = await self.llm.generate_response_async(prompt=prompt)
content = (content or "").strip()
import json
result = json.loads(content)
suitable = bool(result.get("suitable", True))
reason = str(result.get("reason", "未提供原因")).strip() or "未提供原因"
except Exception as e:
logger.warning(f"[{self.chat_id}] LLM 回复检查失败,将默认放行: {e}")
logger.debug(f"[{self.chat_id}] LLM 返回内容: {content[:200] if content else '(空)'}")
logger.debug(traceback.format_exc())
return True, "LLM 检查失败,默认放行", False
if not suitable and retry_count < self.max_retries:
# 给一次重新生成机会
return False, reason, True
# 不适合但已经没有重试机会时,只记录原因但不强制拦截
return True, reason, False