feat:表达方式主动提问

pull/1385/head
SengokuCola 2025-11-23 23:02:02 +08:00
parent b0a22011a4
commit 477959f6b5
9 changed files with 518 additions and 5 deletions

View File

@ -234,6 +234,27 @@ class BrainChatting:
if recent_messages_list is None:
recent_messages_list = []
_reply_text = "" # 初始化reply_text变量避免UnboundLocalError
# -------------------------------------------------------------------------
# ReflectTracker Check
# 在每次回复前检查一次上下文,看是否有反思问题得到了解答
# -------------------------------------------------------------------------
from src.express.reflect_tracker import reflect_tracker_manager
tracker = reflect_tracker_manager.get_tracker(self.stream_id)
if tracker:
resolved = await tracker.trigger_tracker()
if resolved:
reflect_tracker_manager.remove_tracker(self.stream_id)
logger.info(f"{self.log_prefix} ReflectTracker resolved and removed.")
# -------------------------------------------------------------------------
# Expression Reflection Check
# 检查是否需要提问表达反思
# -------------------------------------------------------------------------
from src.express.expression_reflector import expression_reflector_manager
reflector = expression_reflector_manager.get_or_create_reflector(self.stream_id)
asyncio.create_task(reflector.check_and_ask())
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
asyncio.create_task(self.expression_learner.trigger_learning_for_chat())

View File

@ -393,6 +393,27 @@ class HeartFChatting:
recent_messages_list = []
_reply_text = "" # 初始化reply_text变量避免UnboundLocalError
# -------------------------------------------------------------------------
# ReflectTracker Check
# 在每次回复前检查一次上下文,看是否有反思问题得到了解答
# -------------------------------------------------------------------------
from src.express.reflect_tracker import reflect_tracker_manager
tracker = reflect_tracker_manager.get_tracker(self.stream_id)
if tracker:
resolved = await tracker.trigger_tracker()
if resolved:
reflect_tracker_manager.remove_tracker(self.stream_id)
logger.info(f"{self.log_prefix} ReflectTracker resolved and removed.")
# -------------------------------------------------------------------------
# Expression Reflection Check
# 检查是否需要提问表达反思
# -------------------------------------------------------------------------
from src.express.expression_reflector import expression_reflector_manager
reflector = expression_reflector_manager.get_or_create_reflector(self.stream_id)
asyncio.create_task(reflector.check_and_ask())
start_time = time.time()
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
@ -814,6 +835,7 @@ class HeartFChatting:
"result": f"你回复内容{reply_text}",
"loop_info": loop_info,
}
else:
# 执行普通动作
with Timer("动作执行", cycle_timers):

View File

@ -316,6 +316,8 @@ class Expression(BaseModel):
last_active_time = FloatField()
chat_id = TextField(index=True)
create_date = FloatField(null=True) # 创建日期,允许为空以兼容老数据
checked = BooleanField(default=False) # 是否已检查
rejected = BooleanField(default=False) # 是否被拒绝但未更新
class Meta:
table_name = "expression"

View File

@ -279,6 +279,12 @@ class ExpressionConfig(ConfigBase):
格式: [["qq:12345:group", "qq:67890:private"]]
"""
reflect: bool = False
"""是否启用表达反思"""
reflect_operator_id: str = ""
"""表达反思操作员ID"""
def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]:
"""
解析流配置字符串并生成对应的 chat_id

View File

@ -0,0 +1,235 @@
import random
import time
from typing import Optional, Dict
from src.common.logger import get_logger
from src.common.database.database_model import Expression
from src.config.config import global_config
from src.chat.message_receive.chat_stream import get_chat_manager
from src.plugin_system.apis import send_api
logger = get_logger("expression_reflector")
class ExpressionReflector:
"""表达反思器,管理单个聊天流的表达反思提问"""
def __init__(self, chat_id: str):
self.chat_id = chat_id
self.last_ask_time: float = 0.0
async def check_and_ask(self) -> bool:
"""
检查是否需要提问表达反思如果需要则提问
Returns:
bool: 是否执行了提问
"""
try:
logger.info(f"[Expression Reflection] 开始检查是否需要提问 (stream_id: {self.chat_id})")
if not global_config.expression.reflect:
logger.info(f"[Expression Reflection] 表达反思功能未启用,跳过")
return False
operator_config = global_config.expression.reflect_operator_id
if not operator_config:
logger.info(f"[Expression Reflection] Operator ID 未配置,跳过")
return False
# 检查上一次提问时间
current_time = time.time()
time_since_last_ask = current_time - self.last_ask_time
# 5-10分钟间隔随机选择
min_interval = 10 * 60 # 5分钟
max_interval = 15 * 60 # 10分钟
interval = random.uniform(min_interval, max_interval)
logger.info(f"[Expression Reflection] 上次提问时间: {self.last_ask_time:.2f}, 当前时间: {current_time:.2f}, 已过时间: {time_since_last_ask:.2f}秒 ({time_since_last_ask/60:.2f}分钟), 需要间隔: {interval:.2f}秒 ({interval/60:.2f}分钟)")
if time_since_last_ask < interval:
remaining_time = interval - time_since_last_ask
logger.info(f"[Expression Reflection] 距离上次提问时间不足,还需等待 {remaining_time:.2f}秒 ({remaining_time/60:.2f}分钟),跳过")
return False
# 检查是否已经有针对该 Operator 的 Tracker 在运行
logger.info(f"[Expression Reflection] 检查 Operator {operator_config} 是否已有活跃的 Tracker")
if await _check_tracker_exists(operator_config):
logger.info(f"[Expression Reflection] Operator {operator_config} 已有活跃的 Tracker跳过本次提问")
return False
# 获取未检查的表达
try:
logger.info(f"[Expression Reflection] 查询未检查且未拒绝的表达")
expressions = (Expression
.select()
.where((Expression.checked == False) & (Expression.rejected == False))
.limit(50))
expr_list = list(expressions)
logger.info(f"[Expression Reflection] 找到 {len(expr_list)} 个候选表达")
if not expr_list:
logger.info(f"[Expression Reflection] 没有可用的表达,跳过")
return False
target_expr: Expression = random.choice(expr_list)
logger.info(f"[Expression Reflection] 随机选择了表达 ID: {target_expr.id}, Situation: {target_expr.situation}, Style: {target_expr.style}")
# 生成询问文本
ask_text = _generate_ask_text(target_expr)
if not ask_text:
logger.warning(f"[Expression Reflection] 生成询问文本失败,跳过")
return False
logger.info(f"[Expression Reflection] 准备向 Operator {operator_config} 发送提问")
# 发送给 Operator
await _send_to_operator(operator_config, ask_text, target_expr)
# 更新上一次提问时间
self.last_ask_time = current_time
logger.info(f"[Expression Reflection] 提问成功,已更新上次提问时间为 {current_time:.2f}")
return True
except Exception as e:
logger.error(f"[Expression Reflection] 检查或提问过程中出错: {e}")
import traceback
logger.error(traceback.format_exc())
return False
except Exception as e:
logger.error(f"[Expression Reflection] 检查或提问过程中出错: {e}")
import traceback
logger.error(traceback.format_exc())
return False
class ExpressionReflectorManager:
"""表达反思管理器,管理多个聊天流的表达反思实例"""
def __init__(self):
self.reflectors: Dict[str, ExpressionReflector] = {}
def get_or_create_reflector(self, chat_id: str) -> ExpressionReflector:
"""获取或创建指定聊天流的表达反思实例"""
if chat_id not in self.reflectors:
self.reflectors[chat_id] = ExpressionReflector(chat_id)
return self.reflectors[chat_id]
# 创建全局实例
expression_reflector_manager = ExpressionReflectorManager()
async def _check_tracker_exists(operator_config: str) -> bool:
"""检查指定 Operator 是否已有活跃的 Tracker"""
from src.express.reflect_tracker import reflect_tracker_manager
chat_manager = get_chat_manager()
chat_stream = None
# 尝试解析配置字符串 "platform:id:type"
parts = operator_config.split(":")
if len(parts) == 3:
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
user_info = None
group_info = None
from maim_message import UserInfo, GroupInfo
if stream_type == "group":
group_info = GroupInfo(group_id=id_str, platform=platform)
user_info = UserInfo(user_id="system", user_nickname="System", platform=platform)
elif stream_type == "private":
user_info = UserInfo(user_id=id_str, platform=platform, user_nickname="Operator")
else:
return False
if user_info:
try:
chat_stream = await chat_manager.get_or_create_stream(platform, user_info, group_info)
except Exception as e:
logger.error(f"Failed to get or create chat stream for checking tracker: {e}")
return False
else:
chat_stream = chat_manager.get_stream(operator_config)
if not chat_stream:
return False
return reflect_tracker_manager.get_tracker(chat_stream.stream_id) is not None
def _generate_ask_text(expr: Expression) -> Optional[str]:
try:
ask_text = (
f"我正在学习新的表达方式,请帮我看看这个是否合适?\n\n"
f"**学习到的表达信息**\n"
f"- 情景 (Situation): {expr.situation}\n"
f"- 风格 (Style): {expr.style}\n"
)
return ask_text
except Exception as e:
logger.error(f"Failed to generate ask text: {e}")
return None
async def _send_to_operator(operator_config: str, text: str, expr: Expression):
chat_manager = get_chat_manager()
chat_stream = None
# 尝试解析配置字符串 "platform:id:type"
parts = operator_config.split(":")
if len(parts) == 3:
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
user_info = None
group_info = None
from maim_message import UserInfo, GroupInfo
if stream_type == "group":
group_info = GroupInfo(group_id=id_str, platform=platform)
user_info = UserInfo(user_id="system", user_nickname="System", platform=platform)
elif stream_type == "private":
user_info = UserInfo(user_id=id_str, platform=platform, user_nickname="Operator")
else:
logger.warning(f"Unknown stream type in operator config: {stream_type}")
return
if user_info:
try:
chat_stream = await chat_manager.get_or_create_stream(platform, user_info, group_info)
except Exception as e:
logger.error(f"Failed to get or create chat stream for operator {operator_config}: {e}")
return
else:
chat_stream = chat_manager.get_stream(operator_config)
if not chat_stream:
logger.warning(f"Could not find or create chat stream for operator: {operator_config}")
return
stream_id = chat_stream.stream_id
# 注册 Tracker
from src.express.reflect_tracker import ReflectTracker, reflect_tracker_manager
tracker = ReflectTracker(chat_stream=chat_stream, expression=expr, created_time=time.time())
reflect_tracker_manager.add_tracker(stream_id, tracker)
# 发送消息
await send_api.text_to_stream(
text=text,
stream_id=stream_id,
typing=True
)
logger.info(f"Sent expression reflect query to operator {operator_config} for expr {expr.id}")

View File

@ -126,8 +126,10 @@ class ExpressionSelector:
# 支持多chat_id合并抽选
related_chat_ids = self.get_related_chat_ids(chat_id)
# 优化一次性查询所有相关chat_id的表达方式
style_query = Expression.select().where((Expression.chat_id.in_(related_chat_ids)))
# 优化一次性查询所有相关chat_id的表达方式排除 rejected=1 的表达
style_query = Expression.select().where(
(Expression.chat_id.in_(related_chat_ids)) & (Expression.rejected == False)
)
style_exprs = [
{

View File

@ -0,0 +1,196 @@
import time
from typing import Optional, Dict, TYPE_CHECKING
from src.common.logger import get_logger
from src.common.database.database_model import Expression
from src.llm_models.utils_model import LLMRequest
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.config.config import model_config, global_config
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.utils.chat_message_builder import (
get_raw_msg_by_timestamp_with_chat,
build_readable_messages,
)
from datetime import datetime
if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages
logger = get_logger("reflect_tracker")
class ReflectTracker:
def __init__(self, chat_stream: ChatStream, expression: Expression, created_time: float):
self.chat_stream = chat_stream
self.expression = expression
self.created_time = created_time
# self.message_count = 0 # Replaced by checking message list length
self.last_check_msg_count = 0
self.max_message_count = 30
self.max_duration = 15 * 60 # 15 minutes
# LLM for judging response
self.judge_model = LLMRequest(
model_set=model_config.model_task_config.utils, request_type="reflect.tracker"
)
self._init_prompts()
def _init_prompts(self):
judge_prompt = """
你是一个表达反思助手Bot之前询问了表达方式是否合适
你需要根据提供的上下文对话判断是否对该表达方式做出了肯定或否定的评价
**询问内容**
情景: {situation}
风格: {style}
**上下文对话**
{context_block}
**判断要求**
1. 判断对话中是否包含对上述询问的回答
2. 如果是判断是肯定Approve还是否定Reject或者是提供了修改意见
3. 如果不是回答或者是无关内容请返回 "Ignore"
4. 如果是否定并提供了修改意见请提取修正后的情景和风格
请输出JSON格式
```json
{{
"judgment": "Approve" | "Reject" | "Ignore",
"corrected_situation": "...", // 如果有修改意见提取修正后的情景否则留空
"corrected_style": "..." // 如果有修改意见提取修正后的风格否则留空
}}
```
"""
Prompt(judge_prompt, "reflect_judge_prompt")
async def trigger_tracker(self) -> bool:
"""
触发追踪检查
Returns: True if resolved (should destroy tracker), False otherwise
"""
# Check timeout
if time.time() - self.created_time > self.max_duration:
logger.info(f"ReflectTracker for expr {self.expression.id} timed out (duration).")
return True
# Fetch messages since creation
msg_list = get_raw_msg_by_timestamp_with_chat(
chat_id=self.chat_stream.stream_id,
timestamp_start=self.created_time,
timestamp_end=time.time(),
)
current_msg_count = len(msg_list)
# Check message limit
if current_msg_count > self.max_message_count:
logger.info(f"ReflectTracker for expr {self.expression.id} timed out (message count).")
return True
# If no new messages since last check, skip
if current_msg_count <= self.last_check_msg_count:
return False
self.last_check_msg_count = current_msg_count
# Build context block
# Use simple readable format
context_block = build_readable_messages(
msg_list,
replace_bot_name=True,
timestamp_mode="relative",
read_mark=0.0,
show_actions=False,
)
# LLM Judge
try:
prompt = await global_prompt_manager.format_prompt(
"reflect_judge_prompt",
situation=self.expression.situation,
style=self.expression.style,
context_block=context_block
)
logger.info(f"ReflectTracker LLM Prompt: {prompt}")
response, _ = await self.judge_model.generate_response_async(prompt, temperature=0.1)
logger.info(f"ReflectTracker LLM Response: {response}")
# Parse JSON
import json
import re
from json_repair import repair_json
json_pattern = r"```json\s*(.*?)\s*```"
matches = re.findall(json_pattern, response, re.DOTALL)
if not matches:
# Try to parse raw response if no code block
matches = [response]
json_obj = json.loads(repair_json(matches[0]))
judgment = json_obj.get("judgment")
if judgment == "Approve":
self.expression.checked = True
self.expression.rejected = False
self.expression.save()
logger.info(f"Expression {self.expression.id} approved by operator.")
return True
elif judgment == "Reject":
self.expression.checked = True
corrected_situation = json_obj.get("corrected_situation")
corrected_style = json_obj.get("corrected_style")
# 检查是否有更新
has_update = bool(corrected_situation or corrected_style)
if corrected_situation:
self.expression.situation = corrected_situation
if corrected_style:
self.expression.style = corrected_style
# 如果拒绝但未更新,标记为 rejected=1
if not has_update:
self.expression.rejected = True
else:
self.expression.rejected = False
self.expression.save()
if has_update:
logger.info(f"Expression {self.expression.id} rejected and updated by operator. New situation: {corrected_situation}, New style: {corrected_style}")
else:
logger.info(f"Expression {self.expression.id} rejected but no correction provided, marked as rejected=1.")
return True
elif judgment == "Ignore":
logger.info(f"ReflectTracker for expr {self.expression.id} judged as Ignore.")
return False
except Exception as e:
logger.error(f"Error in ReflectTracker check: {e}")
return False
return False
# Global manager for trackers
class ReflectTrackerManager:
def __init__(self):
self.trackers: Dict[str, ReflectTracker] = {} # chat_id -> tracker
def add_tracker(self, chat_id: str, tracker: ReflectTracker):
self.trackers[chat_id] = tracker
def get_tracker(self, chat_id: str) -> Optional[ReflectTracker]:
return self.trackers.get(chat_id)
def remove_tracker(self, chat_id: str):
if chat_id in self.trackers:
del self.trackers[chat_id]
reflect_tracker_manager = ReflectTrackerManager()

View File

@ -1,5 +1,5 @@
[inner]
version = "6.21.8"
version = "6.23.0"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请递增version的值
@ -80,6 +80,9 @@ expression_groups = [
# 注意如果为群聊则需要设置为group如果设置为私聊则需要设置为private
]
reflect = false # 是否启用表达反思Bot主动向管理员询问表达方式是否合适
reflect_operator_id = "" # 表达反思操作员ID格式platform:id:type (例如 "qq:123456:private" 或 "qq:654321:group")
[chat] #麦麦的聊天设置
talk_value = 1 #聊天频率越小越沉默范围0-1

View File

@ -1,5 +1,5 @@
[inner]
version = "1.7.8"
version = "1.8.0"
# 配置文件版本号迭代规则同bot_config.toml
@ -66,6 +66,32 @@ price_out = 3.0
[models.extra_params] # 可选的额外参数配置
enable_thinking = true # 不启用思考
[[models]]
model_identifier = "Qwen/Qwen3-Next-80B-A3B-Instruct"
name = "qwen3-next-80b"
api_provider = "SiliconFlow"
price_in = 1.0
price_out = 4.0
[[models]]
model_identifier = "zai-org/GLM-4.6"
name = "glm-4.6"
api_provider = "SiliconFlow"
price_in = 3.5
price_out = 14.0
[models.extra_params] # 可选的额外参数配置
enable_thinking = false # 不启用思考
[[models]]
model_identifier = "zai-org/GLM-4.6"
name = "glm-4.6-think"
api_provider = "SiliconFlow"
price_in = 3.5
price_out = 14.0
[models.extra_params] # 可选的额外参数配置
enable_thinking = true # 不启用思考
[[models]]
model_identifier = "deepseek-ai/DeepSeek-R1"
name = "siliconflow-deepseek-r1"
@ -120,7 +146,7 @@ temperature = 0.7
max_tokens = 800
[model_task_config.replyer] # 首要回复模型,还用于表达器和表达方式学习
model_list = ["siliconflow-deepseek-v3.2-think","siliconflow-glm-4.6-think","siliconflow-glm-4.6"]
model_list = ["siliconflow-deepseek-v3.2","siliconflow-deepseek-v3.2-think","siliconflow-glm-4.6","siliconflow-glm-4.6-think"]
temperature = 0.3 # 模型温度新V3建议0.1-0.3
max_tokens = 2048