diff --git a/src/chat/brain_chat/brain_chat.py b/src/chat/brain_chat/brain_chat.py index b5d2cec7..68790ae0 100644 --- a/src/chat/brain_chat/brain_chat.py +++ b/src/chat/brain_chat/brain_chat.py @@ -1,570 +1,572 @@ -import asyncio -import time -import traceback -import random -from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING -from rich.traceback import install - -from src.config.config import global_config -from src.common.logger import get_logger -from src.common.data_models.info_data_model import ActionPlannerInfo -from src.common.data_models.message_data_model import ReplyContentType -from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager -from src.chat.utils.prompt_builder import global_prompt_manager -from src.chat.utils.timer_calculator import Timer -from src.chat.brain_chat.brain_planner import BrainPlanner -from src.chat.planner_actions.action_modifier import ActionModifier -from src.chat.planner_actions.action_manager import ActionManager -from src.chat.heart_flow.hfc_utils import CycleDetail -from src.express.expression_learner import expression_learner_manager -from src.person_info.person_info import Person -from src.plugin_system.base.component_types import EventType, ActionInfo -from src.plugin_system.core import events_manager -from src.plugin_system.apis import generator_api, send_api, message_api, database_api -from src.chat.utils.chat_message_builder import ( - build_readable_messages_with_id, - get_raw_msg_before_timestamp_with_chat, -) - -if TYPE_CHECKING: - from src.common.data_models.database_data_model import DatabaseMessages - from src.common.data_models.message_data_model import ReplySetModel - - -ERROR_LOOP_INFO = { - "loop_plan_info": { - "action_result": { - "action_type": "error", - "action_data": {}, - "reasoning": "循环处理失败", - }, - }, - "loop_action_info": { - "action_taken": False, - "reply_text": "", - "command": "", - "taken_time": time.time(), - }, -} - - -install(extra_lines=3) - -# 注释:原来的动作修改超时常量已移除,因为改为顺序执行 - -logger = get_logger("bc") # Logger Name Changed - - -class BrainChatting: - """ - 管理一个连续的私聊Brain Chat循环 - 用于在特定聊天流中生成回复。 - """ - - def __init__(self, chat_id: str): - """ - BrainChatting 初始化函数 - - 参数: - chat_id: 聊天流唯一标识符(如stream_id) - on_stop_focus_chat: 当收到stop_focus_chat命令时调用的回调函数 - performance_version: 性能记录版本号,用于区分不同启动版本 - """ - # 基础属性 - self.stream_id: str = chat_id # 聊天流ID - self.chat_stream: ChatStream = get_chat_manager().get_stream(self.stream_id) # type: ignore - if not self.chat_stream: - raise ValueError(f"无法找到聊天流: {self.stream_id}") - self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]" - - self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) - - self.action_manager = ActionManager() - self.action_planner = BrainPlanner(chat_id=self.stream_id, action_manager=self.action_manager) - self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) - - # 循环控制内部状态 - self.running: bool = False - self._loop_task: Optional[asyncio.Task] = None # 主循环任务 - - # 添加循环信息管理相关的属性 - self.history_loop: List[CycleDetail] = [] - self._cycle_counter = 0 - self._current_cycle_detail: CycleDetail = None # type: ignore - - self.last_read_time = time.time() - 2 - - self.more_plan = False - - async def start(self): - """检查是否需要启动主循环,如果未激活则启动。""" - - # 如果循环已经激活,直接返回 - if self.running: - logger.debug(f"{self.log_prefix} BrainChatting 已激活,无需重复启动") - return - - try: - # 标记为活动状态,防止重复启动 - self.running = True - - self._loop_task = asyncio.create_task(self._main_chat_loop()) - self._loop_task.add_done_callback(self._handle_loop_completion) - logger.info(f"{self.log_prefix} BrainChatting 启动完成") - - except Exception as e: - # 启动失败时重置状态 - self.running = False - self._loop_task = None - logger.error(f"{self.log_prefix} BrainChatting 启动失败: {e}") - raise - - def _handle_loop_completion(self, task: asyncio.Task): - """当 _hfc_loop 任务完成时执行的回调。""" - try: - if exception := task.exception(): - logger.error(f"{self.log_prefix} BrainChatting: 脱离了聊天(异常): {exception}") - logger.error(traceback.format_exc()) # Log full traceback for exceptions - else: - logger.info(f"{self.log_prefix} BrainChatting: 脱离了聊天 (外部停止)") - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} BrainChatting: 结束了聊天") - - def start_cycle(self) -> Tuple[Dict[str, float], str]: - self._cycle_counter += 1 - self._current_cycle_detail = CycleDetail(self._cycle_counter) - self._current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}" - cycle_timers = {} - return cycle_timers, self._current_cycle_detail.thinking_id - - def end_cycle(self, loop_info, cycle_timers): - self._current_cycle_detail.set_loop_info(loop_info) - self.history_loop.append(self._current_cycle_detail) - self._current_cycle_detail.timers = cycle_timers - self._current_cycle_detail.end_time = time.time() - - def print_cycle_info(self, cycle_timers): - # 记录循环信息和计时器结果 - timer_strings = [] - for name, elapsed in cycle_timers.items(): - formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - logger.info( - f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," - f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒" # type: ignore - + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") - ) - - async def _loopbody(self): # sourcery skip: hoist-if-from-if - recent_messages_list = message_api.get_messages_by_time_in_chat( - chat_id=self.stream_id, - start_time=self.last_read_time, - end_time=time.time(), - limit=20, - limit_mode="latest", - filter_mai=True, - filter_command=True, - ) - - if len(recent_messages_list) >= 1: - self.last_read_time = time.time() - await self._observe(recent_messages_list=recent_messages_list) - - else: - # Normal模式:消息数量不足,等待 - await asyncio.sleep(0.2) - return True - return True - - async def _send_and_store_reply( - self, - response_set: "ReplySetModel", - action_message: "DatabaseMessages", - cycle_timers: Dict[str, float], - thinking_id, - actions, - selected_expressions: Optional[List[int]] = None, - ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: - with Timer("回复发送", cycle_timers): - reply_text = await self._send_response( - reply_set=response_set, - message_data=action_message, - selected_expressions=selected_expressions, - ) - - # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 - platform = action_message.chat_info.platform - if platform is None: - platform = getattr(self.chat_stream, "platform", "unknown") - - person = Person(platform=platform, user_id=action_message.user_info.user_id) - person_name = person.person_name - action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" - - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=action_prompt_display, - action_done=True, - thinking_id=thinking_id, - action_data={"reply_text": reply_text}, - action_name="reply", - ) - - # 构建循环信息 - loop_info: Dict[str, Any] = { - "loop_plan_info": { - "action_result": actions, - }, - "loop_action_info": { - "action_taken": True, - "reply_text": reply_text, - "command": "", - "taken_time": time.time(), - }, - } - - return loop_info, reply_text, cycle_timers - - async def _observe( - self, # interest_value: float = 0.0, - recent_messages_list: Optional[List["DatabaseMessages"]] = None, - ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if - if recent_messages_list is None: - recent_messages_list = [] - _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError - - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - await self.expression_learner.trigger_learning_for_chat() - - cycle_timers, thinking_id = self.start_cycle() - logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考") - - # 第一步:动作检查 - available_actions: Dict[str, ActionInfo] = {} - try: - await self.action_modifier.modify_actions() - available_actions = self.action_manager.get_using_actions() - except Exception as e: - logger.error(f"{self.log_prefix} 动作修改失败: {e}") - - # 执行planner - is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=self.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.6), - ) - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=message_list_before_now, - timestamp_mode="normal_no_YMD", - read_mark=self.action_planner.last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - prompt_info = await self.action_planner.build_planner_prompt( - is_group_chat=is_group_chat, - chat_target_info=chat_target_info, - current_available_actions=available_actions, - chat_content_block=chat_content_block, - message_id_list=message_id_list, - interest=global_config.personality.interest, - ) - continue_flag, modified_message = await events_manager.handle_mai_events( - EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id - ) - if not continue_flag: - return False - if modified_message and modified_message._modify_flags.modify_llm_prompt: - prompt_info = (modified_message.llm_prompt, prompt_info[1]) - - with Timer("规划器", cycle_timers): - action_to_use_info = await self.action_planner.plan( - loop_start_time=self.last_read_time, - available_actions=available_actions, - ) - - # 3. 并行执行所有动作 - action_tasks = [ - asyncio.create_task( - self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) - ) - for action in action_to_use_info - ] - - # 并行执行所有任务 - results = await asyncio.gather(*action_tasks, return_exceptions=True) - - # 处理执行结果 - reply_loop_info = None - reply_text_from_reply = "" - action_success = False - action_reply_text = "" - - for result in results: - if isinstance(result, BaseException): - logger.error(f"{self.log_prefix} 动作执行异常: {result}") - continue - - if result["action_type"] != "reply": - action_success = result["success"] - action_reply_text = result["reply_text"] - elif result["action_type"] == "reply": - if result["success"]: - reply_loop_info = result["loop_info"] - reply_text_from_reply = result["reply_text"] - else: - logger.warning(f"{self.log_prefix} 回复动作执行失败") - - # 构建最终的循环信息 - if reply_loop_info: - # 如果有回复信息,使用回复的loop_info作为基础 - loop_info = reply_loop_info - # 更新动作执行信息 - loop_info["loop_action_info"].update( - { - "action_taken": action_success, - "taken_time": time.time(), - } - ) - _reply_text = reply_text_from_reply - else: - # 没有回复信息,构建纯动作的loop_info - loop_info = { - "loop_plan_info": { - "action_result": action_to_use_info, - }, - "loop_action_info": { - "action_taken": action_success, - "reply_text": action_reply_text, - "taken_time": time.time(), - }, - } - _reply_text = action_reply_text - - self.end_cycle(loop_info, cycle_timers) - self.print_cycle_info(cycle_timers) - - return True - - async def _main_chat_loop(self): - """主循环,持续进行计划并可能回复消息,直到被外部取消。""" - try: - while self.running: - # 主循环 - success = await self._loopbody() - await asyncio.sleep(0.1) - if not success: - break - except asyncio.CancelledError: - # 设置了关闭标志位后被取消是正常流程 - logger.info(f"{self.log_prefix} 麦麦已关闭聊天") - except Exception: - logger.error(f"{self.log_prefix} 麦麦聊天意外错误,将于3s后尝试重新启动") - print(traceback.format_exc()) - await asyncio.sleep(3) - self._loop_task = asyncio.create_task(self._main_chat_loop()) - logger.error(f"{self.log_prefix} 结束了当前聊天循环") - - async def _handle_action( - self, - action: str, - reasoning: str, - action_data: dict, - cycle_timers: Dict[str, float], - thinking_id: str, - action_message: Optional["DatabaseMessages"] = None, - ) -> tuple[bool, str, str]: - """ - 处理规划动作,使用动作工厂创建相应的动作处理器 - - 参数: - action: 动作类型 - reasoning: 决策理由 - action_data: 动作数据,包含不同动作需要的参数 - cycle_timers: 计时器字典 - thinking_id: 思考ID - - 返回: - tuple[bool, str, str]: (是否执行了动作, 思考消息ID, 命令) - """ - try: - # 使用工厂创建动作处理器实例 - try: - action_handler = self.action_manager.create_action( - action_name=action, - action_data=action_data, - action_reasoning=reasoning, - cycle_timers=cycle_timers, - thinking_id=thinking_id, - chat_stream=self.chat_stream, - log_prefix=self.log_prefix, - action_message=action_message, - ) - except Exception as e: - logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}") - traceback.print_exc() - return False, "", "" - - if not action_handler: - logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}") - return False, "", "" - +import asyncio +import time +import traceback +import random +from typing import List, Optional, Dict, Any, Tuple, TYPE_CHECKING +from rich.traceback import install + +from src.config.config import global_config +from src.common.logger import get_logger +from src.common.data_models.info_data_model import ActionPlannerInfo +from src.common.data_models.message_data_model import ReplyContentType +from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager +from src.chat.utils.prompt_builder import global_prompt_manager +from src.chat.utils.timer_calculator import Timer +from src.chat.brain_chat.brain_planner import BrainPlanner +from src.chat.planner_actions.action_modifier import ActionModifier +from src.chat.planner_actions.action_manager import ActionManager +from src.chat.heart_flow.hfc_utils import CycleDetail +from src.express.expression_learner import expression_learner_manager +from src.person_info.person_info import Person +from src.plugin_system.base.component_types import EventType, ActionInfo +from src.plugin_system.core import events_manager +from src.plugin_system.apis import generator_api, send_api, message_api, database_api +from src.chat.utils.chat_message_builder import ( + build_readable_messages_with_id, + get_raw_msg_before_timestamp_with_chat, +) + +if TYPE_CHECKING: + from src.common.data_models.database_data_model import DatabaseMessages + from src.common.data_models.message_data_model import ReplySetModel + + +ERROR_LOOP_INFO = { + "loop_plan_info": { + "action_result": { + "action_type": "error", + "action_data": {}, + "reasoning": "循环处理失败", + }, + }, + "loop_action_info": { + "action_taken": False, + "reply_text": "", + "command": "", + "taken_time": time.time(), + }, +} + + +install(extra_lines=3) + +# 注释:原来的动作修改超时常量已移除,因为改为顺序执行 + +logger = get_logger("bc") # Logger Name Changed + + +class BrainChatting: + """ + 管理一个连续的私聊Brain Chat循环 + 用于在特定聊天流中生成回复。 + """ + + def __init__(self, chat_id: str): + """ + BrainChatting 初始化函数 + + 参数: + chat_id: 聊天流唯一标识符(如stream_id) + on_stop_focus_chat: 当收到stop_focus_chat命令时调用的回调函数 + performance_version: 性能记录版本号,用于区分不同启动版本 + """ + # 基础属性 + self.stream_id: str = chat_id # 聊天流ID + self.chat_stream: ChatStream = get_chat_manager().get_stream(self.stream_id) # type: ignore + if not self.chat_stream: + raise ValueError(f"无法找到聊天流: {self.stream_id}") + self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]" + + self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) + + self.action_manager = ActionManager() + self.action_planner = BrainPlanner(chat_id=self.stream_id, action_manager=self.action_manager) + self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) + + # 循环控制内部状态 + self.running: bool = False + self._loop_task: Optional[asyncio.Task] = None # 主循环任务 + + # 添加循环信息管理相关的属性 + self.history_loop: List[CycleDetail] = [] + self._cycle_counter = 0 + self._current_cycle_detail: CycleDetail = None # type: ignore + + self.last_read_time = time.time() - 2 + + self.more_plan = False + + async def start(self): + """检查是否需要启动主循环,如果未激活则启动。""" + + # 如果循环已经激活,直接返回 + if self.running: + logger.debug(f"{self.log_prefix} BrainChatting 已激活,无需重复启动") + return + + try: + # 标记为活动状态,防止重复启动 + self.running = True + + self._loop_task = asyncio.create_task(self._main_chat_loop()) + self._loop_task.add_done_callback(self._handle_loop_completion) + logger.info(f"{self.log_prefix} BrainChatting 启动完成") + + except Exception as e: + # 启动失败时重置状态 + self.running = False + self._loop_task = None + logger.error(f"{self.log_prefix} BrainChatting 启动失败: {e}") + raise + + def _handle_loop_completion(self, task: asyncio.Task): + """当 _hfc_loop 任务完成时执行的回调。""" + try: + if exception := task.exception(): + logger.error(f"{self.log_prefix} BrainChatting: 脱离了聊天(异常): {exception}") + logger.error(traceback.format_exc()) # Log full traceback for exceptions + else: + logger.info(f"{self.log_prefix} BrainChatting: 脱离了聊天 (外部停止)") + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} BrainChatting: 结束了聊天") + + def start_cycle(self) -> Tuple[Dict[str, float], str]: + self._cycle_counter += 1 + self._current_cycle_detail = CycleDetail(self._cycle_counter) + self._current_cycle_detail.thinking_id = f"tid{str(round(time.time(), 2))}" + cycle_timers = {} + return cycle_timers, self._current_cycle_detail.thinking_id + + def end_cycle(self, loop_info, cycle_timers): + self._current_cycle_detail.set_loop_info(loop_info) + self.history_loop.append(self._current_cycle_detail) + self._current_cycle_detail.timers = cycle_timers + self._current_cycle_detail.end_time = time.time() + + def print_cycle_info(self, cycle_timers): + # 记录循环信息和计时器结果 + timer_strings = [] + for name, elapsed in cycle_timers.items(): + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + timer_strings.append(f"{name}: {formatted_time}") + + logger.info( + f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," + f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒" # type: ignore + + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") + ) + + async def _loopbody(self): # sourcery skip: hoist-if-from-if + recent_messages_list = message_api.get_messages_by_time_in_chat( + chat_id=self.stream_id, + start_time=self.last_read_time, + end_time=time.time(), + limit=20, + limit_mode="latest", + filter_mai=True, + filter_command=True, + ) + + if len(recent_messages_list) >= 1: + self.last_read_time = time.time() + await self._observe(recent_messages_list=recent_messages_list) + + else: + # Normal模式:消息数量不足,等待 + await asyncio.sleep(0.2) + return True + return True + + async def _send_and_store_reply( + self, + response_set: "ReplySetModel", + action_message: "DatabaseMessages", + cycle_timers: Dict[str, float], + thinking_id, + actions, + selected_expressions: Optional[List[int]] = None, + ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: + with Timer("回复发送", cycle_timers): + reply_text = await self._send_response( + reply_set=response_set, + message_data=action_message, + selected_expressions=selected_expressions, + ) + + # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 + platform = action_message.chat_info.platform + if platform is None: + platform = getattr(self.chat_stream, "platform", "unknown") + + person = Person(platform=platform, user_id=action_message.user_info.user_id) + person_name = person.person_name + action_prompt_display = f"你对{person_name}进行了回复:{reply_text}" + + await database_api.store_action_info( + chat_stream=self.chat_stream, + action_build_into_prompt=False, + action_prompt_display=action_prompt_display, + action_done=True, + thinking_id=thinking_id, + action_data={"reply_text": reply_text}, + action_name="reply", + ) + + # 构建循环信息 + loop_info: Dict[str, Any] = { + "loop_plan_info": { + "action_result": actions, + }, + "loop_action_info": { + "action_taken": True, + "reply_text": reply_text, + "command": "", + "taken_time": time.time(), + }, + } + + return loop_info, reply_text, cycle_timers + + async def _observe( + self, # interest_value: float = 0.0, + recent_messages_list: Optional[List["DatabaseMessages"]] = None, + ) -> bool: # sourcery skip: merge-else-if-into-elif, remove-redundant-if + if recent_messages_list is None: + recent_messages_list = [] + _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError + + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + await self.expression_learner.trigger_learning_for_chat() + + cycle_timers, thinking_id = self.start_cycle() + logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考") + + # 第一步:动作检查 + available_actions: Dict[str, ActionInfo] = {} + try: + await self.action_modifier.modify_actions() + available_actions = self.action_manager.get_using_actions() + except Exception as e: + logger.error(f"{self.log_prefix} 动作修改失败: {e}") + + # 执行planner + is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() + + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.6), + ) + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=message_list_before_now, + timestamp_mode="normal_no_YMD", + read_mark=self.action_planner.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + + prompt_info = await self.action_planner.build_planner_prompt( + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + current_available_actions=available_actions, + chat_content_block=chat_content_block, + message_id_list=message_id_list, + interest=global_config.personality.interest, + ) + continue_flag, modified_message = await events_manager.handle_mai_events( + EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id + ) + if not continue_flag: + return False + if modified_message and modified_message._modify_flags.modify_llm_prompt: + prompt_info = (modified_message.llm_prompt, prompt_info[1]) + + with Timer("规划器", cycle_timers): + action_to_use_info = await self.action_planner.plan( + loop_start_time=self.last_read_time, + available_actions=available_actions, + ) + + # 3. 并行执行所有动作 + action_tasks = [ + asyncio.create_task( + self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) + ) + for action in action_to_use_info + ] + + # 并行执行所有任务 + results = await asyncio.gather(*action_tasks, return_exceptions=True) + + # 处理执行结果 + reply_loop_info = None + reply_text_from_reply = "" + action_success = False + action_reply_text = "" + + for result in results: + if isinstance(result, BaseException): + logger.error(f"{self.log_prefix} 动作执行异常: {result}") + continue + + if result["action_type"] != "reply": + action_success = result["success"] + action_reply_text = result["reply_text"] + elif result["action_type"] == "reply": + if result["success"]: + reply_loop_info = result["loop_info"] + reply_text_from_reply = result["reply_text"] + else: + logger.warning(f"{self.log_prefix} 回复动作执行失败") + + # 构建最终的循环信息 + if reply_loop_info: + # 如果有回复信息,使用回复的loop_info作为基础 + loop_info = reply_loop_info + # 更新动作执行信息 + loop_info["loop_action_info"].update( + { + "action_taken": action_success, + "taken_time": time.time(), + } + ) + _reply_text = reply_text_from_reply + else: + # 没有回复信息,构建纯动作的loop_info + loop_info = { + "loop_plan_info": { + "action_result": action_to_use_info, + }, + "loop_action_info": { + "action_taken": action_success, + "reply_text": action_reply_text, + "taken_time": time.time(), + }, + } + _reply_text = action_reply_text + + self.end_cycle(loop_info, cycle_timers) + self.print_cycle_info(cycle_timers) + + return True + + async def _main_chat_loop(self): + """主循环,持续进行计划并可能回复消息,直到被外部取消。""" + try: + while self.running: + # 主循环 + success = await self._loopbody() + await asyncio.sleep(0.1) + if not success: + break + except asyncio.CancelledError: + # 设置了关闭标志位后被取消是正常流程 + logger.info(f"{self.log_prefix} 麦麦已关闭聊天") + except Exception: + logger.error(f"{self.log_prefix} 麦麦聊天意外错误,将于3s后尝试重新启动") + print(traceback.format_exc()) + await asyncio.sleep(3) + self._loop_task = asyncio.create_task(self._main_chat_loop()) + logger.error(f"{self.log_prefix} 结束了当前聊天循环") + + async def _handle_action( + self, + action: str, + reasoning: str, + action_data: dict, + cycle_timers: Dict[str, float], + thinking_id: str, + action_message: Optional["DatabaseMessages"] = None, + ) -> tuple[bool, str, str]: + """ + 处理规划动作,使用动作工厂创建相应的动作处理器 + + 参数: + action: 动作类型 + reasoning: 决策理由 + action_data: 动作数据,包含不同动作需要的参数 + cycle_timers: 计时器字典 + thinking_id: 思考ID + + 返回: + tuple[bool, str, str]: (是否执行了动作, 思考消息ID, 命令) + """ + try: + # 使用工厂创建动作处理器实例 + try: + action_handler = self.action_manager.create_action( + action_name=action, + action_data=action_data, + action_reasoning=reasoning, + cycle_timers=cycle_timers, + thinking_id=thinking_id, + chat_stream=self.chat_stream, + log_prefix=self.log_prefix, + action_message=action_message, + ) + except Exception as e: + logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}") + traceback.print_exc() + return False, "", "" + + if not action_handler: + logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}") + return False, "", "" + # 处理动作并获取结果(固定记录一次动作信息) - result = await action_handler.run() - success, action_text = result - command = "" - - return success, action_text, command - - except Exception as e: - logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") - traceback.print_exc() - return False, "", "" - - async def _send_response( - self, - reply_set: "ReplySetModel", - message_data: "DatabaseMessages", - selected_expressions: Optional[List[int]] = None, - ) -> str: - new_message_count = message_api.count_new_messages( - chat_id=self.chat_stream.stream_id, start_time=self.last_read_time, end_time=time.time() - ) - - need_reply = new_message_count >= random.randint(2, 4) - - if need_reply: - logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复") - - reply_text = "" - first_replied = False - for reply_content in reply_set.reply_data: - if reply_content.content_type != ReplyContentType.TEXT: - continue - data: str = reply_content.content # type: ignore - if not first_replied: - await send_api.text_to_stream( - text=data, - stream_id=self.chat_stream.stream_id, - reply_message=message_data, - set_reply=need_reply, - typing=False, - selected_expressions=selected_expressions, - ) - first_replied = True - else: - await send_api.text_to_stream( - text=data, - stream_id=self.chat_stream.stream_id, - reply_message=message_data, - set_reply=False, - typing=True, - selected_expressions=selected_expressions, - ) - reply_text += data - - return reply_text - - async def _execute_action( - self, - action_planner_info: ActionPlannerInfo, - chosen_action_plan_infos: List[ActionPlannerInfo], - thinking_id: str, - available_actions: Dict[str, ActionInfo], - cycle_timers: Dict[str, float], - ): - """执行单个动作的通用函数""" - try: - with Timer(f"动作{action_planner_info.action_type}", cycle_timers): - if action_planner_info.action_type == "no_reply": - # 直接处理no_reply逻辑,不再通过动作系统 - reason = action_planner_info.reasoning or "选择不回复" - # logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") - - # 存储no_reply信息到数据库 - await database_api.store_action_info( - chat_stream=self.chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id, - action_data={"reason": reason}, - action_name="no_reply", - ) - return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} - - elif action_planner_info.action_type == "reply": - try: - success, llm_response = await generator_api.generate_reply( - chat_stream=self.chat_stream, - reply_message=action_planner_info.action_message, - available_actions=available_actions, - chosen_actions=chosen_action_plan_infos, - reply_reason=action_planner_info.reasoning or "", - enable_tool=global_config.tool.enable_tool, - request_type="replyer", - from_plugin=False, - ) - - if not success or not llm_response or not llm_response.reply_set: - if action_planner_info.action_message: - logger.info( - f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败" - ) - else: - logger.info("回复生成失败") - return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} - - except asyncio.CancelledError: - logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") - return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} - response_set = llm_response.reply_set - selected_expressions = llm_response.selected_expressions - loop_info, reply_text, _ = await self._send_and_store_reply( - response_set=response_set, - action_message=action_planner_info.action_message, # type: ignore - cycle_timers=cycle_timers, - thinking_id=thinking_id, - actions=chosen_action_plan_infos, - selected_expressions=selected_expressions, - ) - return { - "action_type": "reply", - "success": True, - "reply_text": reply_text, - "loop_info": loop_info, - } - - # 其他动作 - else: - # 执行普通动作 - with Timer("动作执行", cycle_timers): - success, reply_text, command = await self._handle_action( - action_planner_info.action_type, - action_planner_info.reasoning or "", - action_planner_info.action_data or {}, - cycle_timers, - thinking_id, - action_planner_info.action_message, - ) - return { - "action_type": action_planner_info.action_type, - "success": success, - "reply_text": reply_text, - "command": command, - } - - except Exception as e: - logger.error(f"{self.log_prefix} 执行动作时出错: {e}") - logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") - return { - "action_type": action_planner_info.action_type, - "success": False, - "reply_text": "", - "loop_info": None, - "error": str(e), - } + # BaseAction 定义了异步方法 execute() 作为统一执行入口 + # 这里调用 execute() 以兼容所有 Action 实现 + result = await action_handler.execute() + success, action_text = result + command = "" + + return success, action_text, command + + except Exception as e: + logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") + traceback.print_exc() + return False, "", "" + + async def _send_response( + self, + reply_set: "ReplySetModel", + message_data: "DatabaseMessages", + selected_expressions: Optional[List[int]] = None, + ) -> str: + new_message_count = message_api.count_new_messages( + chat_id=self.chat_stream.stream_id, start_time=self.last_read_time, end_time=time.time() + ) + + need_reply = new_message_count >= random.randint(2, 4) + + if need_reply: + logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复") + + reply_text = "" + first_replied = False + for reply_content in reply_set.reply_data: + if reply_content.content_type != ReplyContentType.TEXT: + continue + data: str = reply_content.content # type: ignore + if not first_replied: + await send_api.text_to_stream( + text=data, + stream_id=self.chat_stream.stream_id, + reply_message=message_data, + set_reply=need_reply, + typing=False, + selected_expressions=selected_expressions, + ) + first_replied = True + else: + await send_api.text_to_stream( + text=data, + stream_id=self.chat_stream.stream_id, + reply_message=message_data, + set_reply=False, + typing=True, + selected_expressions=selected_expressions, + ) + reply_text += data + + return reply_text + + async def _execute_action( + self, + action_planner_info: ActionPlannerInfo, + chosen_action_plan_infos: List[ActionPlannerInfo], + thinking_id: str, + available_actions: Dict[str, ActionInfo], + cycle_timers: Dict[str, float], + ): + """执行单个动作的通用函数""" + try: + with Timer(f"动作{action_planner_info.action_type}", cycle_timers): + if action_planner_info.action_type == "no_reply": + # 直接处理no_reply逻辑,不再通过动作系统 + reason = action_planner_info.reasoning or "选择不回复" + # logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") + + # 存储no_reply信息到数据库 + await database_api.store_action_info( + chat_stream=self.chat_stream, + action_build_into_prompt=False, + action_prompt_display=reason, + action_done=True, + thinking_id=thinking_id, + action_data={"reason": reason}, + action_name="no_reply", + ) + return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "reply": + try: + success, llm_response = await generator_api.generate_reply( + chat_stream=self.chat_stream, + reply_message=action_planner_info.action_message, + available_actions=available_actions, + chosen_actions=chosen_action_plan_infos, + reply_reason=action_planner_info.reasoning or "", + enable_tool=global_config.tool.enable_tool, + request_type="replyer", + from_plugin=False, + ) + + if not success or not llm_response or not llm_response.reply_set: + if action_planner_info.action_message: + logger.info( + f"对 {action_planner_info.action_message.processed_plain_text} 的回复生成失败" + ) + else: + logger.info("回复生成失败") + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + + except asyncio.CancelledError: + logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} + response_set = llm_response.reply_set + selected_expressions = llm_response.selected_expressions + loop_info, reply_text, _ = await self._send_and_store_reply( + response_set=response_set, + action_message=action_planner_info.action_message, # type: ignore + cycle_timers=cycle_timers, + thinking_id=thinking_id, + actions=chosen_action_plan_infos, + selected_expressions=selected_expressions, + ) + return { + "action_type": "reply", + "success": True, + "reply_text": reply_text, + "loop_info": loop_info, + } + + # 其他动作 + else: + # 执行普通动作 + with Timer("动作执行", cycle_timers): + success, reply_text, command = await self._handle_action( + action_planner_info.action_type, + action_planner_info.reasoning or "", + action_planner_info.action_data or {}, + cycle_timers, + thinking_id, + action_planner_info.action_message, + ) + return { + "action_type": action_planner_info.action_type, + "success": success, + "reply_text": reply_text, + "command": command, + } + + except Exception as e: + logger.error(f"{self.log_prefix} 执行动作时出错: {e}") + logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") + return { + "action_type": action_planner_info.action_type, + "success": False, + "reply_text": "", + "loop_info": None, + "error": str(e), + } diff --git a/src/chat/brain_chat/brain_planner.py b/src/chat/brain_chat/brain_planner.py index 35cdf172..0cbaab36 100644 --- a/src/chat/brain_chat/brain_planner.py +++ b/src/chat/brain_chat/brain_planner.py @@ -249,6 +249,8 @@ class BrainPlanner: # 获取必要信息 is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info() + # 提及/被@ 的处理由心流或统一判定模块驱动;Planner 不再做硬编码强制回复 + # 应用激活类型过滤 filtered_actions = self._filter_actions_by_activation_type(available_actions, chat_content_block_short) diff --git a/src/chat/heart_flow/heartflow_message_processor.py b/src/chat/heart_flow/heartflow_message_processor.py index fdc13e2f..822d05de 100644 --- a/src/chat/heart_flow/heartflow_message_processor.py +++ b/src/chat/heart_flow/heartflow_message_processor.py @@ -1,7 +1,7 @@ import re import traceback -from typing import Tuple, TYPE_CHECKING +from typing import TYPE_CHECKING from src.chat.message_receive.message import MessageRecv from src.chat.message_receive.storage import MessageStorage @@ -17,31 +17,6 @@ if TYPE_CHECKING: logger = get_logger("chat") - -async def _calculate_interest(message: MessageRecv) -> Tuple[float, list[str]]: - """计算消息的兴趣度 - - Args: - message: 待处理的消息对象 - - Returns: - Tuple[float, bool, list[str]]: (兴趣度, 是否被提及, 关键词) - """ - if message.is_picid or message.is_emoji: - return 0.0, [] - - is_mentioned, is_at, reply_probability_boost = is_mentioned_bot_in_message(message) - # interested_rate = 0.0 - keywords = [] - - message.interest_value = 1 - message.is_mentioned = is_mentioned - message.is_at = is_at - message.reply_probability_boost = reply_probability_boost - - return 1, keywords - - class HeartFCMessageReceiver: """心流处理器,负责处理接收到的消息并计算兴趣度""" @@ -67,12 +42,16 @@ class HeartFCMessageReceiver: userinfo = message.message_info.user_info chat = message.chat_stream - # 2. 兴趣度计算与更新 - _, keywords = await _calculate_interest(message) + # 2. 计算at信息 + is_mentioned, is_at, reply_probability_boost = is_mentioned_bot_in_message(message) + print(f"is_mentioned: {is_mentioned}, is_at: {is_at}, reply_probability_boost: {reply_probability_boost}") + message.is_mentioned = is_mentioned + message.is_at = is_at + message.reply_probability_boost = reply_probability_boost await self.storage.store_message(message, chat) - _heartflow_chat: HeartFChatting = await heartflow.get_or_create_heartflow_chat(chat.stream_id) # type: ignore + await heartflow.get_or_create_heartflow_chat(chat.stream_id) # type: ignore # 3. 日志记录 mes_name = chat.group_info.group_name if chat.group_info else "私聊" diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 2eb0b4e3..43d2754a 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -221,6 +221,8 @@ class ChatBot: # 处理消息内容,生成纯文本 await message.process() + # 平台层的 @ 检测由底层 is_mentioned_bot_in_message 统一处理;此处不做用户名硬编码匹配 + # 过滤检查 if _check_ban_words( message.processed_plain_text, diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index aa992552..6c1ec1a7 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -130,6 +130,16 @@ class MessageRecv(Message): self.key_words = [] self.key_words_lite = [] + # 兼容适配器通过 additional_config 传入的 @ 标记 + try: + msg_info_dict = message_dict.get("message_info", {}) + add_cfg = msg_info_dict.get("additional_config") or {} + if isinstance(add_cfg, dict) and add_cfg.get("at_bot"): + # 标记为被提及,提高后续回复优先级 + self.is_mentioned = True # type: ignore + except Exception: + pass + def update_chat_stream(self, chat_stream: "ChatStream"): self.chat_stream = chat_stream diff --git a/src/chat/replyer/group_generator.py b/src/chat/replyer/group_generator.py index 88935da7..3a24f04f 100644 --- a/src/chat/replyer/group_generator.py +++ b/src/chat/replyer/group_generator.py @@ -529,7 +529,7 @@ class DefaultReplyer: show_actions=True, ) core_dialogue_prompt = f"""-------------------------------- -这是你和{sender}的对话,你们正在交流中: +这是上述中你和{sender}的对话摘要,内容从上面的对话中截取,便于你理解: {core_dialogue_prompt_str} -------------------------------- """ diff --git a/src/chat/utils/chat_message_builder.py b/src/chat/utils/chat_message_builder.py index 52559ecb..8915e810 100644 --- a/src/chat/utils/chat_message_builder.py +++ b/src/chat/utils/chat_message_builder.py @@ -43,9 +43,12 @@ def replace_user_references( if name_resolver is None: def default_resolver(platform: str, user_id: str) -> str: - # 检查是否是机器人自己 - if replace_bot_name and user_id == global_config.bot.qq_account: - return f"{global_config.bot.nickname}(你)" + # 检查是否是机器人自己(支持多平台) + if replace_bot_name: + if platform == "qq" and user_id == global_config.bot.qq_account: + return f"{global_config.bot.nickname}(你)" + if platform == "telegram" and user_id == getattr(global_config.bot, "telegram_account", ""): + return f"{global_config.bot.nickname}(你)" person = Person(platform=platform, user_id=user_id) return person.person_name or user_id # type: ignore @@ -92,6 +95,8 @@ def replace_user_references( new_content += content[last_end:] content = new_content + # Telegram 文本 @username 的显示映射交由适配器或平台层处理;此处不做硬编码替换 + return content @@ -432,7 +437,10 @@ def _build_readable_messages_internal( person_name = ( person.person_name or f"{user_nickname}" or (f"昵称:{user_cardname}" if user_cardname else "某人") ) - if replace_bot_name and user_id == global_config.bot.qq_account: + if replace_bot_name and ( + (platform == global_config.bot.platform and user_id == global_config.bot.qq_account) + or (platform == "telegram" and user_id == getattr(global_config.bot, "telegram_account", "")) + ): person_name = f"{global_config.bot.nickname}(你)" # 使用独立函数处理用户引用格式 @@ -866,7 +874,9 @@ async def build_anonymous_messages(messages: List[DatabaseMessages]) -> str: # print(f"get_anon_name: platform:{platform}, user_id:{user_id}") # print(f"global_config.bot.qq_account:{global_config.bot.qq_account}") - if user_id == global_config.bot.qq_account: + if (platform == "qq" and user_id == global_config.bot.qq_account) or ( + platform == "telegram" and user_id == getattr(global_config.bot, "telegram_account", "") + ): # print("SELF11111111111111") return "SELF" try: diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index 240ce609..ce3eab08 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -30,76 +30,146 @@ def is_english_letter(char: str) -> bool: return "a" <= char.lower() <= "z" -def db_message_to_str(message_dict: dict) -> str: - logger.debug(f"message_dict: {message_dict}") - time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(message_dict["time"])) - try: - name = f"[({message_dict['user_id']}){message_dict.get('user_nickname', '')}]{message_dict.get('user_cardname', '')}" - except Exception: - name = message_dict.get("user_nickname", "") or f"用户{message_dict['user_id']}" - content = message_dict.get("processed_plain_text", "") - result = f"[{time_str}] {name}: {content}\n" - logger.debug(f"result: {result}") +def parse_platform_accounts(platforms: list[str]) -> dict[str, str]: + """解析 platforms 列表,返回平台到账号的映射 + + Args: + platforms: 格式为 ["platform:account"] 的列表,如 ["tg:123456789", "wx:wxid123"] + + Returns: + 字典,键为平台名,值为账号 + """ + result = {} + for platform_entry in platforms: + if ":" in platform_entry: + platform_name, account = platform_entry.split(":", 1) + result[platform_name.strip()] = account.strip() return result +def get_current_platform_account(platform: str, platform_accounts: dict[str, str], qq_account: str) -> str: + """根据当前平台获取对应的账号 + + Args: + platform: 当前消息的平台 + platform_accounts: 从 platforms 列表解析的平台账号映射 + qq_account: QQ 账号(兼容旧配置) + + Returns: + 当前平台对应的账号 + """ + if platform == "qq": + return qq_account + elif platform == "telegram": + # 优先使用 tg,其次使用 telegram + return platform_accounts.get("tg", "") or platform_accounts.get("telegram", "") + else: + # 其他平台直接使用平台名作为键 + return platform_accounts.get(platform, "") + + def is_mentioned_bot_in_message(message: MessageRecv) -> tuple[bool, bool, float]: - """检查消息是否提到了机器人""" - keywords = [global_config.bot.nickname] + list(global_config.bot.alias_names) + """检查消息是否提到了机器人(统一多平台实现)""" + text = message.processed_plain_text or "" + platform = getattr(message.message_info, "platform", "") or "" + + # 获取各平台账号 + platforms_list = getattr(global_config.bot, "platforms", []) or [] + platform_accounts = parse_platform_accounts(platforms_list) + qq_account = str(getattr(global_config.bot, "qq_account", "") or "") + + # 获取当前平台对应的账号 + current_account = get_current_platform_account(platform, platform_accounts, qq_account) + + nickname = str(global_config.bot.nickname or "") + alias_names = list(getattr(global_config.bot, "alias_names", []) or []) + keywords = [nickname] + alias_names + reply_probability = 0.0 is_at = False is_mentioned = False - # 这部分怎么处理啊啊啊啊 - # 我觉得可以给消息加一个 reply_probability_boost字段 - if ( - message.message_info.additional_config is not None - and message.message_info.additional_config.get("is_mentioned") is not None - ): + # 1) 直接的 additional_config 标记 + add_cfg = getattr(message.message_info, "additional_config", None) or {} + if isinstance(add_cfg, dict): + if add_cfg.get("at_bot") or add_cfg.get("is_mentioned"): + is_mentioned = True + # 当提供数值型 is_mentioned 时,当作概率提升 + try: + if add_cfg.get("is_mentioned") not in (None, ""): + reply_probability = float(add_cfg.get("is_mentioned")) # type: ignore + except Exception: + pass + + # 2) 已经在上游设置过的 message.is_mentioned + if getattr(message, "is_mentioned", False): + is_mentioned = True + + # 3) 扫描分段:是否包含 mention_bot(适配器插入) + def _has_mention_bot(seg) -> bool: try: - reply_probability = float(message.message_info.additional_config.get("is_mentioned")) # type: ignore - is_mentioned = True - return is_mentioned, is_at, reply_probability - except Exception as e: - logger.warning(str(e)) - logger.warning( - f"消息中包含不合理的设置 is_mentioned: {message.message_info.additional_config.get('is_mentioned')}" - ) + if seg is None: + return False + if getattr(seg, "type", None) == "mention_bot": + return True + if getattr(seg, "type", None) == "seglist": + for s in getattr(seg, "data", []) or []: + if _has_mention_bot(s): + return True + return False + except Exception: + return False - for keyword in keywords: - if keyword in message.processed_plain_text: - is_mentioned = True - - # 判断是否被@ - if re.search(rf"@<(.+?):{global_config.bot.qq_account}>", message.processed_plain_text): + if _has_mention_bot(getattr(message, "message_segment", None)): is_at = True is_mentioned = True - if is_at and global_config.chat.at_bot_inevitable_reply: + # 4) 统一的 @ 检测逻辑 + if current_account and not is_at and not is_mentioned: + if platform == "qq": + # QQ 格式: @ + if re.search(rf"@<(.+?):{re.escape(current_account)}>", text): + is_at = True + is_mentioned = True + else: + # 其他平台格式: @username 或 @account + if re.search(rf"@{re.escape(current_account)}(\b|$)", text, flags=re.IGNORECASE): + is_at = True + is_mentioned = True + + # 5) 统一的回复检测逻辑 + if not is_mentioned: + # 通用回复格式:包含 "(你)" 或 "(你)" + if re.search(r"\[回复 .*?\(你\):", text) or re.search(r"\[回复 .*?(你):", text): + is_mentioned = True + # ID 形式的回复检测 + elif current_account: + if re.search(rf"\[回复 (.+?)\({re.escape(current_account)}\):(.+?)\],说:", text): + is_mentioned = True + elif re.search(rf"\[回复<(.+?)(?=:{re.escape(current_account)}>)\:{re.escape(current_account)}>:(.+?)\],说:", text): + is_mentioned = True + + # 6) 名称/别名 提及(去除 @/回复标记后再匹配) + if not is_mentioned and keywords: + msg_content = text + # 去除各种 @ 与 回复标记,避免误判 + msg_content = re.sub(r"@(.+?)((\d+))", "", msg_content) + msg_content = re.sub(r"@<(.+?)(?=:(\d+))\:(\d+)>", "", msg_content) + msg_content = re.sub(r"\[回复 (.+?)\(((\d+)|未知id|你)\):(.+?)\],说:", "", msg_content) + msg_content = re.sub(r"\[回复<(.+?)(?=:(\d+))\:(\d+)>:(.+?)\],说:", "", msg_content) + for kw in keywords: + if kw and kw in msg_content: + is_mentioned = True + break + + # 7) 概率设置 + if is_at and getattr(global_config.chat, "at_bot_inevitable_reply", 1): reply_probability = 1.0 logger.debug("被@,回复概率设置为100%") - else: - if not is_mentioned: - # 判断是否被回复 - if re.match( - rf"\[回复 (.+?)\({str(global_config.bot.qq_account)}\):(.+?)\],说:", message.processed_plain_text - ) or re.match( - rf"\[回复<(.+?)(?=:{str(global_config.bot.qq_account)}>)\:{str(global_config.bot.qq_account)}>:(.+?)\],说:", - message.processed_plain_text, - ): - is_mentioned = True - else: - # 判断内容中是否被提及 - message_content = re.sub(r"@(.+?)((\d+))", "", message.processed_plain_text) - message_content = re.sub(r"@<(.+?)(?=:(\d+))\:(\d+)>", "", message_content) - message_content = re.sub(r"\[回复 (.+?)\(((\d+)|未知id)\):(.+?)\],说:", "", message_content) - message_content = re.sub(r"\[回复<(.+?)(?=:(\d+))\:(\d+)>:(.+?)\],说:", "", message_content) - for keyword in keywords: - if keyword in message_content: - is_mentioned = True - if is_mentioned and global_config.chat.mentioned_bot_reply: - reply_probability = 1.0 - logger.debug("被提及,回复概率设置为100%") + elif is_mentioned and getattr(global_config.chat, "mentioned_bot_reply", 1): + reply_probability = max(reply_probability, 1.0) + logger.debug("被提及,回复概率设置为100%") + return is_mentioned, is_at, reply_probability @@ -115,45 +185,6 @@ async def get_embedding(text, request_type="embedding") -> Optional[List[float]] return embedding -def get_recent_group_speaker(chat_stream_id: str, sender, limit: int = 12) -> list: - # 获取当前群聊记录内发言的人 - filter_query = {"chat_id": chat_stream_id} - sort_order = [("time", -1)] - recent_messages = find_messages(message_filter=filter_query, sort=sort_order, limit=limit) - - if not recent_messages: - return [] - - who_chat_in_group = [] - for db_msg in recent_messages: - # user_info = UserInfo.from_dict( - # { - # "platform": msg_db_data["user_platform"], - # "user_id": msg_db_data["user_id"], - # "user_nickname": msg_db_data["user_nickname"], - # "user_cardname": msg_db_data.get("user_cardname", ""), - # } - # ) - # if ( - # (user_info.platform, user_info.user_id) != sender - # and user_info.user_id != global_config.bot.qq_account - # and (user_info.platform, user_info.user_id, user_info.user_nickname) not in who_chat_in_group - # and len(who_chat_in_group) < 5 - # ): # 排除重复,排除消息发送者,排除bot,限制加载的关系数目 - # who_chat_in_group.append((user_info.platform, user_info.user_id, user_info.user_nickname)) - if ( - (db_msg.user_info.platform, db_msg.user_info.user_id) != sender - and db_msg.user_info.user_id != global_config.bot.qq_account - and (db_msg.user_info.platform, db_msg.user_info.user_id, db_msg.user_info.user_nickname) - not in who_chat_in_group - and len(who_chat_in_group) < 5 - ): # 排除重复,排除消息发送者,排除bot,限制加载的关系数目 - who_chat_in_group.append( - (db_msg.user_info.platform, db_msg.user_info.user_id, db_msg.user_info.user_nickname) - ) - - return who_chat_in_group - def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: """将文本分割成句子,并根据概率合并 @@ -410,42 +441,6 @@ def calculate_typing_time( return total_time # 加上回车时间 -def cosine_similarity(v1, v2): - """计算余弦相似度""" - dot_product = np.dot(v1, v2) - norm1 = np.linalg.norm(v1) - norm2 = np.linalg.norm(v2) - return 0 if norm1 == 0 or norm2 == 0 else dot_product / (norm1 * norm2) - - -def text_to_vector(text): - """将文本转换为词频向量""" - # 分词 - words = jieba.lcut(text) - return Counter(words) - - -def find_similar_topics_simple(text: str, topics: list, top_k: int = 5) -> list: - """使用简单的余弦相似度计算文本相似度""" - # 将输入文本转换为词频向量 - text_vector = text_to_vector(text) - - # 计算每个主题的相似度 - similarities = [] - for topic in topics: - topic_vector = text_to_vector(topic) - # 获取所有唯一词 - all_words = set(text_vector.keys()) | set(topic_vector.keys()) - # 构建向量 - v1 = [text_vector.get(word, 0) for word in all_words] - v2 = [topic_vector.get(word, 0) for word in all_words] - # 计算相似度 - similarity = cosine_similarity(v1, v2) - similarities.append((topic, similarity)) - - # 按相似度降序排序并返回前k个 - return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k] - def truncate_message(message: str, max_length=20) -> str: """截断消息,使其不超过指定长度""" @@ -523,47 +518,6 @@ def get_western_ratio(paragraph): return western_count / len(alnum_chars) -def count_messages_between(start_time: float, end_time: float, stream_id: str) -> tuple[int, int]: - """计算两个时间点之间的消息数量和文本总长度 - - Args: - start_time (float): 起始时间戳 (不包含) - end_time (float): 结束时间戳 (包含) - stream_id (str): 聊天流ID - - Returns: - tuple[int, int]: (消息数量, 文本总长度) - """ - count = 0 - total_length = 0 - - # 参数校验 (可选但推荐) - if start_time >= end_time: - # logger.debug(f"开始时间 {start_time} 大于或等于结束时间 {end_time},返回 0, 0") - return 0, 0 - if not stream_id: - logger.error("stream_id 不能为空") - return 0, 0 - - # 使用message_repository中的count_messages和find_messages函数 - - # 构建查询条件 - filter_query = {"chat_id": stream_id, "time": {"$gt": start_time, "$lte": end_time}} - - try: - # 先获取消息数量 - count = count_messages(filter_query) - - # 获取消息内容计算总长度 - messages = find_messages(message_filter=filter_query) - total_length = sum(len(msg.processed_plain_text or "") for msg in messages) - - return count, total_length - - except Exception as e: - logger.error(f"计算消息数量时发生意外错误: {e}") - return 0, 0 - def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal") -> str: # sourcery skip: merge-comparisons, merge-duplicate-blocks, switch @@ -698,65 +652,6 @@ def assign_message_ids(messages: List[DatabaseMessages]) -> List[Tuple[str, Data return result -# def assign_message_ids_flexible( -# messages: list, prefix: str = "msg", id_length: int = 6, use_timestamp: bool = False -# ) -> list: -# """ -# 为消息列表中的每个消息分配唯一的简短随机ID(增强版) - -# Args: -# messages: 消息列表 -# prefix: ID前缀,默认为"msg" -# id_length: ID的总长度(不包括前缀),默认为6 -# use_timestamp: 是否在ID中包含时间戳,默认为False - -# Returns: -# 包含 {'id': str, 'message': any} 格式的字典列表 -# """ -# result = [] -# used_ids = set() - -# for i, message in enumerate(messages): -# # 生成唯一的ID -# while True: -# if use_timestamp: -# # 使用时间戳的后几位 + 随机字符 -# timestamp_suffix = str(int(time.time() * 1000))[-3:] -# remaining_length = id_length - 3 -# random_chars = "".join(random.choices(string.ascii_lowercase + string.digits, k=remaining_length)) -# message_id = f"{prefix}{timestamp_suffix}{random_chars}" -# else: -# # 使用索引 + 随机字符 -# index_str = str(i + 1) -# remaining_length = max(1, id_length - len(index_str)) -# random_chars = "".join(random.choices(string.ascii_lowercase + string.digits, k=remaining_length)) -# message_id = f"{prefix}{index_str}{random_chars}" - -# if message_id not in used_ids: -# used_ids.add(message_id) -# break - -# result.append({"id": message_id, "message": message}) - -# return result - - -# 使用示例: -# messages = ["Hello", "World", "Test message"] -# -# # 基础版本 -# result1 = assign_message_ids(messages) -# # 结果: [{'id': 'm1123', 'message': 'Hello'}, {'id': 'm2456', 'message': 'World'}, {'id': 'm3789', 'message': 'Test message'}] -# -# # 增强版本 - 自定义前缀和长度 -# result2 = assign_message_ids_flexible(messages, prefix="chat", id_length=8) -# # 结果: [{'id': 'chat1abc2', 'message': 'Hello'}, {'id': 'chat2def3', 'message': 'World'}, {'id': 'chat3ghi4', 'message': 'Test message'}] -# -# # 增强版本 - 使用时间戳 -# result3 = assign_message_ids_flexible(messages, prefix="ts", use_timestamp=True) -# # 结果: [{'id': 'ts123a1b', 'message': 'Hello'}, {'id': 'ts123c2d', 'message': 'World'}, {'id': 'ts123e3f', 'message': 'Test message'}] - - def parse_keywords_string(keywords_input) -> list[str]: # sourcery skip: use-contextlib-suppress """ diff --git a/src/config/official_configs.py b/src/config/official_configs.py index df616a64..641b287a 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -27,6 +27,9 @@ class BotConfig(ConfigBase): nickname: str """昵称""" + + platforms: list[str] = field(default_factory=lambda: []) + """其他平台列表""" alias_names: list[str] = field(default_factory=lambda: []) """别名列表""" diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index b88077ca..f51c5203 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "6.18.3" +version = "6.18.4" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -14,6 +14,9 @@ version = "6.18.3" [bot] platform = "qq" qq_account = "1145141919810" # 麦麦的QQ账号 + +platforms = ["wx:114514","xx:1919810"] # 麦麦的其他平台账号 + nickname = "麦麦" # 麦麦的昵称 alias_names = ["麦叠", "牢麦"] # 麦麦的别名