MaiBot/src/plugins/chat_module/reasoning_chat/reasoning_chat.py

327 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import traceback
from random import random
from typing import List, Optional
from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from .reasoning_generator import ResponseGenerator
from ...chat.chat_stream import chat_manager
from ...chat.emoji_manager import emoji_manager
from ...chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet
from ...chat.message_buffer import message_buffer
from ...chat.messagesender import message_manager
from ...chat.utils import is_mentioned_bot_in_message
from ...chat.utils_image import image_path_to_base64
from ...memory_system.Hippocampus import HippocampusManager
from ...message import UserInfo, Seg
from ...moods.moods import MoodManager
from ...person_info.relationship_manager import relationship_manager
from ...storage.storage import MessageStorage
from ...utils.timer_calculater import Timer
from ...willing.willing_manager import willing_manager
from ....config.config import global_config
# 定义日志配置
chat_config = LogConfig(
console_format=CHAT_STYLE_CONFIG["console_format"],
file_format=CHAT_STYLE_CONFIG["file_format"],
)
logger = get_module_logger("reasoning_chat", config=chat_config)
class ReasoningChat:
def __init__(self):
self.storage = MessageStorage()
self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance()
@staticmethod
async def _create_thinking_message(message, chat, userinfo, messageinfo):
"""创建思考消息"""
bot_user_info = UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=messageinfo.platform,
)
thinking_time_point = round(time.time(), 2)
thinking_id = "mt" + str(thinking_time_point)
thinking_message = MessageThinking(
message_id=thinking_id,
chat_stream=chat,
bot_user_info=bot_user_info,
reply=message,
thinking_start_time=thinking_time_point,
)
message_manager.add_message(thinking_message)
return thinking_id
@staticmethod
async def _send_response_messages(message, chat, response_set: List[str], thinking_id) -> Optional[MessageSending]:
"""发送回复消息"""
container = message_manager.get_container(chat.stream_id)
thinking_message = None
for msg in container.messages:
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
thinking_message = msg
container.messages.remove(msg)
break
if not thinking_message:
logger.warning("未找到对应的思考消息,可能已超时被移除")
return None
thinking_start_time = thinking_message.thinking_start_time
message_set = MessageSet(chat, thinking_id)
mark_head = False
first_bot_msg = None
for msg in response_set:
message_segment = Seg(type="text", data=msg)
bot_message = MessageSending(
message_id=thinking_id,
chat_stream=chat,
bot_user_info=UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=message.message_info.platform,
),
sender_info=message.message_info.user_info,
message_segment=message_segment,
reply=message,
is_head=not mark_head,
is_emoji=False,
thinking_start_time=thinking_start_time,
)
if not mark_head:
mark_head = True
first_bot_msg = bot_message
message_set.add_message(bot_message)
message_manager.add_message(message_set)
return first_bot_msg
@staticmethod
async def _handle_emoji(message, chat, response):
"""处理表情包"""
if random() < global_config.emoji_chance:
emoji_raw = await emoji_manager.get_emoji_for_text(response)
if emoji_raw:
emoji_path, description = emoji_raw
emoji_cq = image_path_to_base64(emoji_path)
thinking_time_point = round(message.message_info.time, 2)
message_segment = Seg(type="emoji", data=emoji_cq)
bot_message = MessageSending(
message_id="mt" + str(thinking_time_point),
chat_stream=chat,
bot_user_info=UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=message.message_info.platform,
),
sender_info=message.message_info.user_info,
message_segment=message_segment,
reply=message,
is_head=False,
is_emoji=True,
)
message_manager.add_message(bot_message)
async def _update_relationship(self, message: MessageRecv, response_set):
"""更新关系情绪"""
ori_response = ",".join(response_set)
stance, emotion = await self.gpt._get_emotion_tags(ori_response, message.processed_plain_text)
await relationship_manager.calculate_update_relationship_value(
chat_stream=message.chat_stream, label=emotion, stance=stance
)
self.mood_manager.update_mood_from_emotion(emotion, global_config.mood_intensity_factor)
async def process_message(self, message_data: str) -> None:
"""处理消息并生成回复"""
timing_results = {}
response_set = None
message = MessageRecv(message_data)
groupinfo = message.message_info.group_info
userinfo = message.message_info.user_info
messageinfo = message.message_info
# 消息加入缓冲池
await message_buffer.start_caching_messages(message)
# 创建聊天流
chat = await chat_manager.get_or_create_stream(
platform=messageinfo.platform,
user_info=userinfo,
group_info=groupinfo,
)
message.update_chat_stream(chat)
await message.process()
logger.trace(f"消息处理成功: {message.processed_plain_text}")
# 过滤词/正则表达式过滤
if self._check_ban_words(message.processed_plain_text, chat, userinfo) or self._check_ban_regex(
message.raw_message, chat, userinfo
):
return
# 查询缓冲器结果会整合前面跳过的消息改变processed_plain_text
buffer_result = await message_buffer.query_buffer_result(message)
# 处理缓冲器结果
if not buffer_result:
# await willing_manager.bombing_buffer_message_handle(message.message_info.message_id)
# willing_manager.delete(message.message_info.message_id)
f_type = "seglist"
if message.message_segment.type != "seglist":
f_type = message.message_segment.type
else:
if (
isinstance(message.message_segment.data, list)
and all(isinstance(x, Seg) for x in message.message_segment.data)
and len(message.message_segment.data) == 1
):
f_type = message.message_segment.data[0].type
if f_type == "text":
logger.info(f"触发缓冲,已炸飞消息:{message.processed_plain_text}")
elif f_type == "image":
logger.info("触发缓冲,已炸飞表情包/图片")
elif f_type == "seglist":
logger.info("触发缓冲,已炸飞消息列")
return
try:
await self.storage.store_message(message, chat)
logger.trace(f"存储成功 (通过缓冲后): {message.processed_plain_text}")
except Exception as e:
logger.error(f"存储消息失败: {e}")
logger.error(traceback.format_exc())
# 存储失败可能仍需考虑是否继续,暂时返回
return
is_mentioned, reply_probability = is_mentioned_bot_in_message(message)
# 记忆激活
with Timer("记忆激活", timing_results):
interested_rate = await HippocampusManager.get_instance().get_activate_from_text(
message.processed_plain_text, fast_retrieval=True
)
# 处理提及
# 意愿管理器设置当前message信息
willing_manager.setup(message, chat, is_mentioned, interested_rate)
# 获取回复概率
is_willing = False
if reply_probability != 1:
is_willing = True
reply_probability = await willing_manager.get_reply_probability(message.message_info.message_id)
if message.message_info.additional_config:
if "maimcore_reply_probability_gain" in message.message_info.additional_config.keys():
reply_probability += message.message_info.additional_config["maimcore_reply_probability_gain"]
# 打印消息信息
mes_name = chat.group_info.group_name if chat.group_info else "私聊"
current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time))
willing_log = f"[回复意愿:{await willing_manager.get_willing(chat.stream_id):.2f}]" if is_willing else ""
logger.info(
f"[{current_time}][{mes_name}]"
f"{message.message_info.user_info.user_nickname}:"
f"{message.processed_plain_text}{willing_log}[概率:{reply_probability * 100:.1f}%]"
)
do_reply = False
if random() < reply_probability:
do_reply = True
# 回复前处理
await willing_manager.before_generate_reply_handle(message.message_info.message_id)
# 创建思考消息
with Timer("创建思考消息", timing_results):
thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo)
logger.debug(f"创建捕捉器thinking_id:{thinking_id}")
info_catcher = info_catcher_manager.get_info_catcher(thinking_id)
info_catcher.catch_decide_to_response(message)
# 生成回复
try:
with Timer("生成回复", timing_results):
response_set = await self.gpt.generate_response(message, thinking_id)
info_catcher.catch_after_generate_response(timing_results["生成回复"])
except Exception as e:
logger.error(f"回复生成出现错误:{str(e)} {traceback.format_exc()}")
response_set = None
if not response_set:
logger.info("为什么生成回复失败?")
return
# 发送消息
with Timer("发送消息", timing_results):
first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id)
info_catcher.catch_after_response(timing_results["发送消息"], response_set, first_bot_msg)
info_catcher.done_catch()
# 处理表情包
with Timer("处理表情包", timing_results):
await self._handle_emoji(message, chat, response_set)
# 更新关系情绪
with Timer("更新关系情绪", timing_results):
await self._update_relationship(message, response_set)
# 回复后处理
await willing_manager.after_generate_reply_handle(message.message_info.message_id)
# 输出性能计时结果
if do_reply:
timing_str = " | ".join([f"{step}: {duration:.2f}" for step, duration in timing_results.items()])
trigger_msg = message.processed_plain_text
response_msg = " ".join(response_set) if response_set else "无回复"
logger.info(f"触发消息: {trigger_msg[:20]}... | 推理消息: {response_msg[:20]}... | 性能计时: {timing_str}")
else:
# 不回复处理
await willing_manager.not_reply_handle(message.message_info.message_id)
# 意愿管理器注销当前message信息
willing_manager.delete(message.message_info.message_id)
@staticmethod
def _check_ban_words(text: str, chat, userinfo) -> bool:
"""检查消息中是否包含过滤词"""
for word in global_config.ban_words:
if word in text:
logger.info(
f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}"
)
logger.info(f"[过滤词识别]消息中含有{word}filtered")
return True
return False
@staticmethod
def _check_ban_regex(text: str, chat, userinfo) -> bool:
"""检查消息是否匹配过滤正则表达式"""
for pattern in global_config.ban_msgs_regex:
if pattern.search(text):
logger.info(
f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}"
)
logger.info(f"[正则表达式过滤]消息匹配到{pattern}filtered")
return True
return False