diff --git a/src/chat/brain_chat/brain_chat.py b/src/chat/brain_chat/brain_chat.py index 3624646f..29840ff1 100644 --- a/src/chat/brain_chat/brain_chat.py +++ b/src/chat/brain_chat/brain_chat.py @@ -234,6 +234,27 @@ class BrainChatting: if recent_messages_list is None: recent_messages_list = [] _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError + + # ------------------------------------------------------------------------- + # ReflectTracker Check + # 在每次回复前检查一次上下文,看是否有反思问题得到了解答 + # ------------------------------------------------------------------------- + from src.express.reflect_tracker import reflect_tracker_manager + + tracker = reflect_tracker_manager.get_tracker(self.stream_id) + if tracker: + resolved = await tracker.trigger_tracker() + if resolved: + reflect_tracker_manager.remove_tracker(self.stream_id) + logger.info(f"{self.log_prefix} ReflectTracker resolved and removed.") + + # ------------------------------------------------------------------------- + # Expression Reflection Check + # 检查是否需要提问表达反思 + # ------------------------------------------------------------------------- + from src.express.expression_reflector import expression_reflector_manager + reflector = expression_reflector_manager.get_or_create_reflector(self.stream_id) + asyncio.create_task(reflector.check_and_ask()) async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): asyncio.create_task(self.expression_learner.trigger_learning_for_chat()) diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index bb37e29e..a6813426 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -393,6 +393,27 @@ class HeartFChatting: recent_messages_list = [] _reply_text = "" # 初始化reply_text变量,避免UnboundLocalError + # ------------------------------------------------------------------------- + # ReflectTracker Check + # 在每次回复前检查一次上下文,看是否有反思问题得到了解答 + # ------------------------------------------------------------------------- + from src.express.reflect_tracker import reflect_tracker_manager + + tracker = reflect_tracker_manager.get_tracker(self.stream_id) + if tracker: + resolved = await tracker.trigger_tracker() + if resolved: + reflect_tracker_manager.remove_tracker(self.stream_id) + logger.info(f"{self.log_prefix} ReflectTracker resolved and removed.") + + # ------------------------------------------------------------------------- + # Expression Reflection Check + # 检查是否需要提问表达反思 + # ------------------------------------------------------------------------- + from src.express.expression_reflector import expression_reflector_manager + reflector = expression_reflector_manager.get_or_create_reflector(self.stream_id) + asyncio.create_task(reflector.check_and_ask()) + start_time = time.time() async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): @@ -814,6 +835,7 @@ class HeartFChatting: "result": f"你回复内容{reply_text}", "loop_info": loop_info, } + else: # 执行普通动作 with Timer("动作执行", cycle_timers): diff --git a/src/common/database/database_model.py b/src/common/database/database_model.py index c97c0b72..440004db 100644 --- a/src/common/database/database_model.py +++ b/src/common/database/database_model.py @@ -316,6 +316,8 @@ class Expression(BaseModel): last_active_time = FloatField() chat_id = TextField(index=True) create_date = FloatField(null=True) # 创建日期,允许为空以兼容老数据 + checked = BooleanField(default=False) # 是否已检查 + rejected = BooleanField(default=False) # 是否被拒绝但未更新 class Meta: table_name = "expression" diff --git a/src/config/official_configs.py b/src/config/official_configs.py index fc084ecb..c0a938a1 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -279,6 +279,12 @@ class ExpressionConfig(ConfigBase): 格式: [["qq:12345:group", "qq:67890:private"]] """ + reflect: bool = False + """是否启用表达反思""" + + reflect_operator_id: str = "" + """表达反思操作员ID""" + def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]: """ 解析流配置字符串并生成对应的 chat_id diff --git a/src/express/expression_reflector.py b/src/express/expression_reflector.py new file mode 100644 index 00000000..4402c18d --- /dev/null +++ b/src/express/expression_reflector.py @@ -0,0 +1,235 @@ +import random +import time +from typing import Optional, Dict + +from src.common.logger import get_logger +from src.common.database.database_model import Expression +from src.config.config import global_config +from src.chat.message_receive.chat_stream import get_chat_manager +from src.plugin_system.apis import send_api + +logger = get_logger("expression_reflector") + + +class ExpressionReflector: + """表达反思器,管理单个聊天流的表达反思提问""" + + def __init__(self, chat_id: str): + self.chat_id = chat_id + self.last_ask_time: float = 0.0 + + async def check_and_ask(self) -> bool: + """ + 检查是否需要提问表达反思,如果需要则提问 + + Returns: + bool: 是否执行了提问 + """ + try: + logger.info(f"[Expression Reflection] 开始检查是否需要提问 (stream_id: {self.chat_id})") + + if not global_config.expression.reflect: + logger.info(f"[Expression Reflection] 表达反思功能未启用,跳过") + return False + + operator_config = global_config.expression.reflect_operator_id + if not operator_config: + logger.info(f"[Expression Reflection] Operator ID 未配置,跳过") + return False + + # 检查上一次提问时间 + current_time = time.time() + time_since_last_ask = current_time - self.last_ask_time + + # 5-10分钟间隔,随机选择 + min_interval = 10 * 60 # 5分钟 + max_interval = 15 * 60 # 10分钟 + interval = random.uniform(min_interval, max_interval) + + logger.info(f"[Expression Reflection] 上次提问时间: {self.last_ask_time:.2f}, 当前时间: {current_time:.2f}, 已过时间: {time_since_last_ask:.2f}秒 ({time_since_last_ask/60:.2f}分钟), 需要间隔: {interval:.2f}秒 ({interval/60:.2f}分钟)") + + if time_since_last_ask < interval: + remaining_time = interval - time_since_last_ask + logger.info(f"[Expression Reflection] 距离上次提问时间不足,还需等待 {remaining_time:.2f}秒 ({remaining_time/60:.2f}分钟),跳过") + return False + + # 检查是否已经有针对该 Operator 的 Tracker 在运行 + logger.info(f"[Expression Reflection] 检查 Operator {operator_config} 是否已有活跃的 Tracker") + if await _check_tracker_exists(operator_config): + logger.info(f"[Expression Reflection] Operator {operator_config} 已有活跃的 Tracker,跳过本次提问") + return False + + # 获取未检查的表达 + try: + logger.info(f"[Expression Reflection] 查询未检查且未拒绝的表达") + expressions = (Expression + .select() + .where((Expression.checked == False) & (Expression.rejected == False)) + .limit(50)) + + expr_list = list(expressions) + logger.info(f"[Expression Reflection] 找到 {len(expr_list)} 个候选表达") + + if not expr_list: + logger.info(f"[Expression Reflection] 没有可用的表达,跳过") + return False + + target_expr: Expression = random.choice(expr_list) + logger.info(f"[Expression Reflection] 随机选择了表达 ID: {target_expr.id}, Situation: {target_expr.situation}, Style: {target_expr.style}") + + # 生成询问文本 + ask_text = _generate_ask_text(target_expr) + if not ask_text: + logger.warning(f"[Expression Reflection] 生成询问文本失败,跳过") + return False + + logger.info(f"[Expression Reflection] 准备向 Operator {operator_config} 发送提问") + # 发送给 Operator + await _send_to_operator(operator_config, ask_text, target_expr) + + # 更新上一次提问时间 + self.last_ask_time = current_time + logger.info(f"[Expression Reflection] 提问成功,已更新上次提问时间为 {current_time:.2f}") + + return True + + except Exception as e: + logger.error(f"[Expression Reflection] 检查或提问过程中出错: {e}") + import traceback + logger.error(traceback.format_exc()) + return False + except Exception as e: + logger.error(f"[Expression Reflection] 检查或提问过程中出错: {e}") + import traceback + logger.error(traceback.format_exc()) + return False + + +class ExpressionReflectorManager: + """表达反思管理器,管理多个聊天流的表达反思实例""" + + def __init__(self): + self.reflectors: Dict[str, ExpressionReflector] = {} + + def get_or_create_reflector(self, chat_id: str) -> ExpressionReflector: + """获取或创建指定聊天流的表达反思实例""" + if chat_id not in self.reflectors: + self.reflectors[chat_id] = ExpressionReflector(chat_id) + return self.reflectors[chat_id] + + +# 创建全局实例 +expression_reflector_manager = ExpressionReflectorManager() + + +async def _check_tracker_exists(operator_config: str) -> bool: + """检查指定 Operator 是否已有活跃的 Tracker""" + from src.express.reflect_tracker import reflect_tracker_manager + chat_manager = get_chat_manager() + chat_stream = None + + # 尝试解析配置字符串 "platform:id:type" + parts = operator_config.split(":") + if len(parts) == 3: + platform = parts[0] + id_str = parts[1] + stream_type = parts[2] + + user_info = None + group_info = None + + from maim_message import UserInfo, GroupInfo + + if stream_type == "group": + group_info = GroupInfo(group_id=id_str, platform=platform) + user_info = UserInfo(user_id="system", user_nickname="System", platform=platform) + elif stream_type == "private": + user_info = UserInfo(user_id=id_str, platform=platform, user_nickname="Operator") + else: + return False + + if user_info: + try: + chat_stream = await chat_manager.get_or_create_stream(platform, user_info, group_info) + except Exception as e: + logger.error(f"Failed to get or create chat stream for checking tracker: {e}") + return False + else: + chat_stream = chat_manager.get_stream(operator_config) + + if not chat_stream: + return False + + return reflect_tracker_manager.get_tracker(chat_stream.stream_id) is not None + + +def _generate_ask_text(expr: Expression) -> Optional[str]: + try: + ask_text = ( + f"我正在学习新的表达方式,请帮我看看这个是否合适?\n\n" + f"**学习到的表达信息**\n" + f"- 情景 (Situation): {expr.situation}\n" + f"- 风格 (Style): {expr.style}\n" + ) + return ask_text + except Exception as e: + logger.error(f"Failed to generate ask text: {e}") + return None + + +async def _send_to_operator(operator_config: str, text: str, expr: Expression): + chat_manager = get_chat_manager() + chat_stream = None + + # 尝试解析配置字符串 "platform:id:type" + parts = operator_config.split(":") + if len(parts) == 3: + platform = parts[0] + id_str = parts[1] + stream_type = parts[2] + + user_info = None + group_info = None + + from maim_message import UserInfo, GroupInfo + + if stream_type == "group": + group_info = GroupInfo(group_id=id_str, platform=platform) + user_info = UserInfo(user_id="system", user_nickname="System", platform=platform) + elif stream_type == "private": + user_info = UserInfo(user_id=id_str, platform=platform, user_nickname="Operator") + else: + logger.warning(f"Unknown stream type in operator config: {stream_type}") + return + + if user_info: + try: + chat_stream = await chat_manager.get_or_create_stream(platform, user_info, group_info) + except Exception as e: + logger.error(f"Failed to get or create chat stream for operator {operator_config}: {e}") + return + else: + chat_stream = chat_manager.get_stream(operator_config) + + if not chat_stream: + logger.warning(f"Could not find or create chat stream for operator: {operator_config}") + return + + stream_id = chat_stream.stream_id + + # 注册 Tracker + from src.express.reflect_tracker import ReflectTracker, reflect_tracker_manager + + tracker = ReflectTracker(chat_stream=chat_stream, expression=expr, created_time=time.time()) + reflect_tracker_manager.add_tracker(stream_id, tracker) + + # 发送消息 + await send_api.text_to_stream( + text=text, + stream_id=stream_id, + typing=True + ) + logger.info(f"Sent expression reflect query to operator {operator_config} for expr {expr.id}") + + + diff --git a/src/express/expression_selector.py b/src/express/expression_selector.py index b4e25f36..d543025d 100644 --- a/src/express/expression_selector.py +++ b/src/express/expression_selector.py @@ -126,8 +126,10 @@ class ExpressionSelector: # 支持多chat_id合并抽选 related_chat_ids = self.get_related_chat_ids(chat_id) - # 优化:一次性查询所有相关chat_id的表达方式 - style_query = Expression.select().where((Expression.chat_id.in_(related_chat_ids))) + # 优化:一次性查询所有相关chat_id的表达方式,排除 rejected=1 的表达 + style_query = Expression.select().where( + (Expression.chat_id.in_(related_chat_ids)) & (Expression.rejected == False) + ) style_exprs = [ { diff --git a/src/express/reflect_tracker.py b/src/express/reflect_tracker.py new file mode 100644 index 00000000..7984e299 --- /dev/null +++ b/src/express/reflect_tracker.py @@ -0,0 +1,196 @@ +import time +from typing import Optional, Dict, TYPE_CHECKING +from src.common.logger import get_logger +from src.common.database.database_model import Expression +from src.llm_models.utils_model import LLMRequest +from src.chat.utils.prompt_builder import Prompt, global_prompt_manager +from src.config.config import model_config, global_config +from src.chat.message_receive.chat_stream import ChatStream +from src.chat.utils.chat_message_builder import ( + get_raw_msg_by_timestamp_with_chat, + build_readable_messages, +) +from datetime import datetime + +if TYPE_CHECKING: + from src.common.data_models.database_data_model import DatabaseMessages + +logger = get_logger("reflect_tracker") + +class ReflectTracker: + def __init__(self, chat_stream: ChatStream, expression: Expression, created_time: float): + self.chat_stream = chat_stream + self.expression = expression + self.created_time = created_time + # self.message_count = 0 # Replaced by checking message list length + self.last_check_msg_count = 0 + self.max_message_count = 30 + self.max_duration = 15 * 60 # 15 minutes + + # LLM for judging response + self.judge_model = LLMRequest( + model_set=model_config.model_task_config.utils, request_type="reflect.tracker" + ) + + self._init_prompts() + + def _init_prompts(self): + judge_prompt = """ +你是一个表达反思助手。Bot之前询问了表达方式是否合适。 +你需要根据提供的上下文对话,判断是否对该表达方式做出了肯定或否定的评价。 + +**询问内容** +情景: {situation} +风格: {style} + +**上下文对话** +{context_block} + +**判断要求** +1. 判断对话中是否包含对上述询问的回答。 +2. 如果是,判断是肯定(Approve)还是否定(Reject),或者是提供了修改意见。 +3. 如果不是回答,或者是无关内容,请返回 "Ignore"。 +4. 如果是否定并提供了修改意见,请提取修正后的情景和风格。 + +请输出JSON格式: +```json +{{ + "judgment": "Approve" | "Reject" | "Ignore", + "corrected_situation": "...", // 如果有修改意见,提取修正后的情景,否则留空 + "corrected_style": "..." // 如果有修改意见,提取修正后的风格,否则留空 +}} +``` +""" + Prompt(judge_prompt, "reflect_judge_prompt") + + async def trigger_tracker(self) -> bool: + """ + 触发追踪检查 + Returns: True if resolved (should destroy tracker), False otherwise + """ + # Check timeout + if time.time() - self.created_time > self.max_duration: + logger.info(f"ReflectTracker for expr {self.expression.id} timed out (duration).") + return True + + # Fetch messages since creation + msg_list = get_raw_msg_by_timestamp_with_chat( + chat_id=self.chat_stream.stream_id, + timestamp_start=self.created_time, + timestamp_end=time.time(), + ) + + current_msg_count = len(msg_list) + + # Check message limit + if current_msg_count > self.max_message_count: + logger.info(f"ReflectTracker for expr {self.expression.id} timed out (message count).") + return True + + # If no new messages since last check, skip + if current_msg_count <= self.last_check_msg_count: + return False + + self.last_check_msg_count = current_msg_count + + # Build context block + # Use simple readable format + context_block = build_readable_messages( + msg_list, + replace_bot_name=True, + timestamp_mode="relative", + read_mark=0.0, + show_actions=False, + ) + + # LLM Judge + try: + prompt = await global_prompt_manager.format_prompt( + "reflect_judge_prompt", + situation=self.expression.situation, + style=self.expression.style, + context_block=context_block + ) + + logger.info(f"ReflectTracker LLM Prompt: {prompt}") + + response, _ = await self.judge_model.generate_response_async(prompt, temperature=0.1) + + logger.info(f"ReflectTracker LLM Response: {response}") + + # Parse JSON + import json + import re + from json_repair import repair_json + + json_pattern = r"```json\s*(.*?)\s*```" + matches = re.findall(json_pattern, response, re.DOTALL) + if not matches: + # Try to parse raw response if no code block + matches = [response] + + json_obj = json.loads(repair_json(matches[0])) + + judgment = json_obj.get("judgment") + + if judgment == "Approve": + self.expression.checked = True + self.expression.rejected = False + self.expression.save() + logger.info(f"Expression {self.expression.id} approved by operator.") + return True + + elif judgment == "Reject": + self.expression.checked = True + corrected_situation = json_obj.get("corrected_situation") + corrected_style = json_obj.get("corrected_style") + + # 检查是否有更新 + has_update = bool(corrected_situation or corrected_style) + + if corrected_situation: + self.expression.situation = corrected_situation + if corrected_style: + self.expression.style = corrected_style + + # 如果拒绝但未更新,标记为 rejected=1 + if not has_update: + self.expression.rejected = True + else: + self.expression.rejected = False + + self.expression.save() + + if has_update: + logger.info(f"Expression {self.expression.id} rejected and updated by operator. New situation: {corrected_situation}, New style: {corrected_style}") + else: + logger.info(f"Expression {self.expression.id} rejected but no correction provided, marked as rejected=1.") + return True + + elif judgment == "Ignore": + logger.info(f"ReflectTracker for expr {self.expression.id} judged as Ignore.") + return False + + except Exception as e: + logger.error(f"Error in ReflectTracker check: {e}") + return False + + return False + +# Global manager for trackers +class ReflectTrackerManager: + def __init__(self): + self.trackers: Dict[str, ReflectTracker] = {} # chat_id -> tracker + + def add_tracker(self, chat_id: str, tracker: ReflectTracker): + self.trackers[chat_id] = tracker + + def get_tracker(self, chat_id: str) -> Optional[ReflectTracker]: + return self.trackers.get(chat_id) + + def remove_tracker(self, chat_id: str): + if chat_id in self.trackers: + del self.trackers[chat_id] + +reflect_tracker_manager = ReflectTrackerManager() + diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 2c1da1be..97ce96ad 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "6.21.8" +version = "6.23.0" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -80,6 +80,9 @@ expression_groups = [ # 注意:如果为群聊,则需要设置为group,如果设置为私聊,则需要设置为private ] +reflect = false # 是否启用表达反思(Bot主动向管理员询问表达方式是否合适) +reflect_operator_id = "" # 表达反思操作员ID,格式:platform:id:type (例如 "qq:123456:private" 或 "qq:654321:group") + [chat] #麦麦的聊天设置 talk_value = 1 #聊天频率,越小越沉默,范围0-1 diff --git a/template/model_config_template.toml b/template/model_config_template.toml index 6d956ace..7c78df6a 100644 --- a/template/model_config_template.toml +++ b/template/model_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "1.7.8" +version = "1.8.0" # 配置文件版本号迭代规则同bot_config.toml @@ -66,6 +66,32 @@ price_out = 3.0 [models.extra_params] # 可选的额外参数配置 enable_thinking = true # 不启用思考 +[[models]] +model_identifier = "Qwen/Qwen3-Next-80B-A3B-Instruct" +name = "qwen3-next-80b" +api_provider = "SiliconFlow" +price_in = 1.0 +price_out = 4.0 + +[[models]] +model_identifier = "zai-org/GLM-4.6" +name = "glm-4.6" +api_provider = "SiliconFlow" +price_in = 3.5 +price_out = 14.0 +[models.extra_params] # 可选的额外参数配置 +enable_thinking = false # 不启用思考 + +[[models]] +model_identifier = "zai-org/GLM-4.6" +name = "glm-4.6-think" +api_provider = "SiliconFlow" +price_in = 3.5 +price_out = 14.0 +[models.extra_params] # 可选的额外参数配置 +enable_thinking = true # 不启用思考 + + [[models]] model_identifier = "deepseek-ai/DeepSeek-R1" name = "siliconflow-deepseek-r1" @@ -120,7 +146,7 @@ temperature = 0.7 max_tokens = 800 [model_task_config.replyer] # 首要回复模型,还用于表达器和表达方式学习 -model_list = ["siliconflow-deepseek-v3.2-think","siliconflow-glm-4.6-think","siliconflow-glm-4.6"] +model_list = ["siliconflow-deepseek-v3.2","siliconflow-deepseek-v3.2-think","siliconflow-glm-4.6","siliconflow-glm-4.6-think"] temperature = 0.3 # 模型温度,新V3建议0.1-0.3 max_tokens = 2048