diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 4d2291c3..ceb4cfc4 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -774,7 +774,7 @@ class Conversation: self.conversation_info.done_action[-1] if self.conversation_info.done_action else {} # type: ignore ) if last_action_record.get("action") == "send_new_message" and \ - last_action_record.get("status") == "done_no_reply": + last_action_record.get("status") == "done_no_reply": logger.info(f"[私聊][{self.private_name}] 检测到 ReplyGenerator 决定不发送消息,将在下一轮强制使用反思Prompt。") force_reflect_and_act = True # 设置标志,下一轮使用反思prompt # 不需要立即 continue,让循环自然进入下一轮,下一轮的 plan 会用这个标志 diff --git a/src/plugins/PFC/pfc_processor.py b/src/plugins/PFC/pfc_processor.py new file mode 100644 index 00000000..523e2261 --- /dev/null +++ b/src/plugins/PFC/pfc_processor.py @@ -0,0 +1,125 @@ +import traceback + +from maim_message import UserInfo +from src.config.config import global_config +from src.common.logger_manager import get_logger +from ..chat.chat_stream import chat_manager +from typing import Optional, Dict, Any +from .pfc_manager import PFCManager +from src.plugins.chat.message import MessageRecv +from src.plugins.storage.storage import MessageStorage +from datetime import datetime + + +logger = get_logger("pfc_processor") + +async def _handle_error(error: Exception, context: str, message: Optional[MessageRecv] = None) -> None: + """统一的错误处理函数 + + Args: + error: 捕获到的异常 + context: 错误发生的上下文描述 + message: 可选的消息对象,用于记录相关消息内容 + """ + logger.error(f"{context}: {error}") + logger.error(traceback.format_exc()) + if message and hasattr(message, "raw_message"): + logger.error(f"相关消息原始内容: {message.raw_message}") + +class PFCProcessor: + """ PFC 处理器,负责处理接收到的信息并计数""" + + def __init__(self): + """初始化 PFC 处理器,创建消息存储实例""" + self.storage = MessageStorage() + self.pfc_manager = PFCManager.get_instance() + + async def process_message(self, message_data: Dict[str, Any]) -> None: + """处理接收到的原始消息数据 + + 主要流程: + 1. 消息解析与初始化 + 2. 过滤检查 + 3. 消息存储 + 4. 创建 PFC 流 + 5. 日志记录 + + Args: + message_data: 原始消息字符串 + """ + message = None + try: + # 1. 消息解析与初始化 + message = MessageRecv(message_data) + groupinfo = message.message_info.group_info + userinfo = message.message_info.user_info + messageinfo = message.message_info + + logger.trace(f"准备为{userinfo.user_id}创建/获取聊天流") + chat = await chat_manager.get_or_create_stream( + platform=messageinfo.platform, + user_info=userinfo, + group_info=groupinfo, + ) + message.update_chat_stream(chat) + + # 2. 过滤检查 + # 处理消息 + await message.process() + # 过滤词/正则表达式过滤 + if self._check_ban_words(message.processed_plain_text, userinfo) or self._check_ban_regex( + message.raw_message, userinfo + ): + return + + # 3. 消息存储 + await self.storage.store_message(message, chat) + logger.trace(f"存储成功: {message.processed_plain_text}") + + # 4. 创建 PFC 聊天流 + await self._create_pfc_chat(message) + + # 5. 日志记录 + # 将时间戳转换为datetime对象 + current_time = datetime.fromtimestamp(message.message_info.time).strftime("%H:%M:%S") + logger.info( + f"[{current_time}][私聊]{message.message_info.user_info.user_nickname}: {message.processed_plain_text}" + ) + + except Exception as e: + await _handle_error(e, "消息处理失败", message) + + async def _create_pfc_chat(self, message: MessageRecv): + try: + chat_id = str(message.chat_stream.stream_id) + private_name = str(message.message_info.user_info.user_nickname) + + if global_config.enable_pfc_chatting: + await self.pfc_manager.get_or_create_conversation(chat_id, private_name) + + except Exception as e: + logger.error(f"创建PFC聊天失败: {e}") + + @staticmethod + def _check_ban_words(text: str, userinfo: UserInfo) -> bool: + """检查消息中是否包含过滤词""" + for word in global_config.ban_words: + if word in text: + logger.info( + f"[私聊]{userinfo.user_nickname}:{text}" + ) + logger.info(f"[过滤词识别]消息中含有{word},filtered") + return True + return False + + @staticmethod + def _check_ban_regex(text: str, userinfo: UserInfo) -> bool: + """检查消息是否匹配过滤正则表达式""" + for pattern in global_config.ban_msgs_regex: + if pattern.search(text): + logger.info( + f"[私聊]{userinfo.user_nickname}:{text}" + ) + logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered") + return True + return False \ No newline at end of file diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index 9c4a3358..ed22ef2b 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -3,12 +3,10 @@ from typing import Dict, Any from ..moods.moods import MoodManager # 导入情绪管理器 from ...config.config import global_config from .message import MessageRecv -from ..PFC.pfc_manager import PFCManager -from .chat_stream import chat_manager -from .only_message_process import MessageProcessor from src.common.logger_manager import get_logger from ..heartFC_chat.heartflow_processor import HeartFCProcessor +from ..PFC.pfc_processor import PFCProcessor from ..utils.prompt_builder import Prompt, global_prompt_manager import traceback @@ -25,10 +23,7 @@ class ChatBot: self._started = False self.mood_manager = MoodManager.get_instance() # 获取情绪管理器单例 self.heartflow_processor = HeartFCProcessor() # 新增 - - # 创建初始化PFC管理器的任务,会在_ensure_started时执行 - self.only_process_chat = MessageProcessor() - self.pfc_manager = PFCManager.get_instance() + self.pfc_processor = PFCProcessor() async def _ensure_started(self): """确保所有任务已启动""" @@ -37,17 +32,6 @@ class ChatBot: self._started = True - async def _create_pfc_chat(self, message: MessageRecv): - try: - chat_id = str(message.chat_stream.stream_id) - private_name = str(message.message_info.user_info.user_nickname) - - if global_config.enable_pfc_chatting: - await self.pfc_manager.get_or_create_conversation(chat_id, private_name) - - except Exception as e: - logger.error(f"创建PFC聊天失败: {e}") - async def message_process(self, message_data: Dict[str, Any]) -> None: """处理转化后的统一格式消息 这个函数本质是预处理一些数据,根据配置信息和消息内容,预处理消息,并分发到合适的消息处理器中 @@ -118,18 +102,7 @@ class ChatBot: # 是否进入PFC if global_config.enable_pfc_chatting: logger.trace("进入PFC私聊处理流程") - userinfo = message.message_info.user_info - messageinfo = message.message_info - # 创建聊天流 - logger.trace(f"为{userinfo.user_id}创建/获取聊天流") - chat = await chat_manager.get_or_create_stream( - platform=messageinfo.platform, - user_info=userinfo, - group_info=groupinfo, - ) - message.update_chat_stream(chat) - await self.only_process_chat.process_message(message) - await self._create_pfc_chat(message) + await self.pfc_processor.process_message(message_data) # 禁止PFC,进入普通的心流消息处理逻辑 else: logger.trace("进入普通心流私聊处理") diff --git a/src/plugins/chat/only_message_process.py b/src/plugins/chat/only_message_process.py deleted file mode 100644 index b1bb0cea..00000000 --- a/src/plugins/chat/only_message_process.py +++ /dev/null @@ -1,67 +0,0 @@ -from src.common.logger_manager import get_logger -from src.plugins.chat.message import MessageRecv -from src.plugins.storage.storage import MessageStorage -from src.config.config import global_config -from datetime import datetime - -logger = get_logger("pfc") - - -class MessageProcessor: - """消息处理器,负责处理接收到的消息并存储""" - - def __init__(self): - self.storage = MessageStorage() - - @staticmethod - def _check_ban_words(text: str, chat, userinfo) -> bool: - """检查消息中是否包含过滤词""" - for word in global_config.ban_words: - if word in text: - logger.info( - f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}" - ) - logger.info(f"[过滤词识别]消息中含有{word},filtered") - return True - return False - - @staticmethod - def _check_ban_regex(text: str, chat, userinfo) -> bool: - """检查消息是否匹配过滤正则表达式""" - for pattern in global_config.ban_msgs_regex: - if pattern.search(text): - logger.info( - f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}" - ) - logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered") - return True - return False - - async def process_message(self, message: MessageRecv) -> None: - """处理消息并存储 - - Args: - message: 消息对象 - """ - userinfo = message.message_info.user_info - chat = message.chat_stream - - # 处理消息 - await message.process() - - # 过滤词/正则表达式过滤 - if self._check_ban_words(message.processed_plain_text, chat, userinfo) or self._check_ban_regex( - message.raw_message, chat, userinfo - ): - return - - # 存储消息 - await self.storage.store_message(message, chat) - - # 打印消息信息 - mes_name = chat.group_info.group_name if chat.group_info else "私聊" - # 将时间戳转换为datetime对象 - current_time = datetime.fromtimestamp(message.message_info.time).strftime("%H:%M:%S") - logger.info( - f"[{current_time}][{mes_name}]{message.message_info.user_info.user_nickname}: {message.processed_plain_text}" - )