feat:添加聊天记录总结模块

pull/1348/head
SengokuCola 2025-11-09 14:00:27 +08:00
parent fb1b520e68
commit d761d42dd7
7 changed files with 467 additions and 11 deletions

View File

@ -0,0 +1,422 @@
"""
聊天内容概括器
用于累积打包和压缩聊天记录
"""
import asyncio
import json
import time
from typing import List, Optional, Set
from dataclasses import dataclass
from src.common.logger import get_logger
from src.common.data_models.database_data_model import DatabaseMessages
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
from src.plugin_system.apis import message_api
from src.chat.utils.chat_message_builder import build_readable_messages
from src.person_info.person_info import Person
logger = get_logger("chat_history_summarizer")
@dataclass
class MessageBatch:
"""消息批次"""
messages: List[DatabaseMessages]
start_time: float
end_time: float
is_preparing: bool = False # 是否处于准备结束模式
class ChatHistorySummarizer:
"""聊天内容概括器"""
def __init__(self, chat_id: str, check_interval: int = 60):
"""
初始化聊天内容概括器
Args:
chat_id: 聊天ID
check_interval: 定期检查间隔默认60秒
"""
self.chat_id = chat_id
self.log_prefix = f"[ChatHistorySummarizer-{chat_id}]"
# 记录时间点,用于计算新消息
self.last_check_time = time.time()
# 当前累积的消息批次
self.current_batch: Optional[MessageBatch] = None
# LLM请求器用于压缩聊天内容
self.summarizer_llm = LLMRequest(
model_set=model_config.model_task_config.utils,
request_type="chat_history_summarizer"
)
# 后台循环相关
self.check_interval = check_interval # 检查间隔(秒)
self._periodic_task: Optional[asyncio.Task] = None
self._running = False
async def process(self, current_time: Optional[float] = None):
"""
处理聊天内容概括
Args:
current_time: 当前时间戳如果为None则使用time.time()
"""
if current_time is None:
current_time = time.time()
try:
logger.info(
f"{self.log_prefix} 开始处理聊天概括,窗口: {self.last_check_time:.2f} -> {current_time:.2f}"
)
# 获取从上次检查时间到当前时间的新消息
new_messages = message_api.get_messages_by_time_in_chat(
chat_id=self.chat_id,
start_time=self.last_check_time,
end_time=current_time,
limit=0,
limit_mode="latest",
filter_mai=False, # 不过滤bot消息因为需要检查bot是否发言
filter_command=False,
)
if not new_messages:
# 没有新消息,检查是否需要打包
logger.info(f"{self.log_prefix} 无新增消息,尝试对现有批次执行打包检查")
if self.current_batch and self.current_batch.messages:
await self._check_and_package(current_time)
self.last_check_time = current_time
return
# 有新消息,更新最后检查时间
self.last_check_time = current_time
logger.info(
f"{self.log_prefix} 获取到 {len(new_messages)} 条新消息,最新消息时间: {new_messages[-1].time if new_messages else 'N/A'}"
)
# 如果有当前批次,添加新消息
if self.current_batch:
before_count = len(self.current_batch.messages)
self.current_batch.messages.extend(new_messages)
self.current_batch.end_time = current_time
logger.info(
f"{self.log_prefix} 扩展现有批次: {before_count} -> {len(self.current_batch.messages)} 条消息,时间范围 {self.current_batch.start_time:.2f}-{self.current_batch.end_time:.2f}"
)
else:
# 创建新批次
self.current_batch = MessageBatch(
messages=new_messages,
start_time=new_messages[0].time if new_messages else current_time,
end_time=current_time,
)
logger.info(
f"{self.log_prefix} 创建新批次: 消息数 {len(new_messages)},时间范围 {self.current_batch.start_time:.2f}-{self.current_batch.end_time:.2f}"
)
# 检查是否需要打包
await self._check_and_package(current_time)
except Exception as e:
logger.error(f"{self.log_prefix} 处理聊天内容概括时出错: {e}")
import traceback
traceback.print_exc()
async def _check_and_package(self, current_time: float):
"""检查是否需要打包"""
if not self.current_batch or not self.current_batch.messages:
return
messages = self.current_batch.messages
message_count = len(messages)
last_message_time = messages[-1].time if messages else current_time
time_since_last_message = current_time - last_message_time
logger.info(
f"{self.log_prefix} 批次检查: 消息数={message_count}, 距离最后消息时间={time_since_last_message:.2f}s, 准备模式={self.current_batch.is_preparing}"
)
# 检查打包条件
should_package = False
# 条件1: 消息长度超过120直接打包
if message_count >= 120:
should_package = True
logger.info(f"{self.log_prefix} 消息数量达到120条开始打包")
# 条件2: 最后一条消息的时间和当前时间差>600秒直接打包
elif time_since_last_message > 600:
should_package = True
logger.info(f"{self.log_prefix} 最后一条消息超过600秒开始打包")
# 条件3: 消息长度超过100进入准备结束模式
elif message_count > 100:
if not self.current_batch.is_preparing:
self.current_batch.is_preparing = True
logger.info(f"{self.log_prefix} 消息数量超过100条进入准备结束模式")
# 在准备结束模式下,如果最后一条消息的时间和当前时间差>10秒就打包
if time_since_last_message > 10:
should_package = True
logger.info(f"{self.log_prefix} 准备结束模式下最后一条消息超过10秒开始打包")
if should_package:
await self._package_and_store()
async def _package_and_store(self):
"""打包并存储聊天记录"""
if not self.current_batch or not self.current_batch.messages:
return
messages = self.current_batch.messages
start_time = self.current_batch.start_time
end_time = self.current_batch.end_time
logger.info(
f"{self.log_prefix} 开始打包批次: 消息数={len(messages)}, 时间范围={start_time:.2f}-{end_time:.2f}"
)
# 检查是否有bot发言
# 第一条消息前推600s到最后一条消息的时间内
check_start_time = max(start_time - 600, 0)
check_end_time = end_time
# 使用包含边界的时间范围查询
bot_messages = message_api.get_messages_by_time_in_chat_inclusive(
chat_id=self.chat_id,
start_time=check_start_time,
end_time=check_end_time,
limit=0,
limit_mode="latest",
filter_mai=False,
filter_command=False,
)
# 检查是否有bot的发言
has_bot_message = False
bot_user_id = str(global_config.bot.qq_account)
for msg in bot_messages:
if msg.user_info.user_id == bot_user_id:
has_bot_message = True
break
if not has_bot_message:
logger.info(
f"{self.log_prefix} 打包内没有bot发言丢弃。检查范围: {check_start_time:.2f}-{check_end_time:.2f}"
)
self.current_batch = None
return
# 有bot发言进行压缩和存储
try:
# 构建对话原文
original_text = build_readable_messages(
messages=messages,
replace_bot_name=True,
timestamp_mode="normal_no_YMD",
read_mark=0.0,
truncate=False,
show_actions=False,
)
# 获取参与的所有人的昵称
participants_set: Set[str] = set()
for msg in messages:
# 使用 msg.user_platform扁平化字段或 msg.user_info.platform
platform = getattr(msg, 'user_platform', None) or (msg.user_info.platform if msg.user_info else None) or msg.chat_info.platform
person = Person(
platform=platform,
user_id=msg.user_info.user_id
)
person_name = person.person_name
if person_name:
participants_set.add(person_name)
participants = list(participants_set)
logger.info(
f"{self.log_prefix} 批次参与者: {participants if participants else '未知'}"
)
# 使用LLM压缩聊天内容
theme, keywords, summary = await self._compress_with_llm(original_text)
logger.info(
f"{self.log_prefix} LLM 压缩完成,主题: {theme}, 关键词数量: {len(keywords)}, 概括长度: {len(summary)}"
)
# 存储到数据库
await self._store_to_database(
start_time=start_time,
end_time=end_time,
original_text=original_text,
participants=participants,
theme=theme,
keywords=keywords,
summary=summary,
)
logger.info(f"{self.log_prefix} 成功打包并存储聊天记录,消息数: {len(messages)}, 主题: {theme}")
# 清空当前批次
self.current_batch = None
except Exception as e:
logger.error(f"{self.log_prefix} 打包和存储聊天记录时出错: {e}")
import traceback
traceback.print_exc()
# 出错时也清空批次,避免重复处理
self.current_batch = None
async def _compress_with_llm(self, original_text: str) -> tuple[str, List[str], str]:
"""
使用LLM压缩聊天内容
Returns:
tuple[str, List[str], str]: (主题, 关键词列表, 概括)
"""
prompt = f"""请对以下聊天记录进行概括,提取以下信息:
1. 主题这段对话的主要内容一个简短的标题不超过20字
2. 关键词这段对话的关键词用列表形式返回3-10个关键词
3. 概括对这段话的平文本概括50-200
请以JSON格式返回格式如下
{{
"theme": "主题",
"keywords": ["关键词1", "关键词2", ...],
"summary": "概括内容"
}}
聊天记录
{original_text}
请直接返回JSON不要包含其他内容"""
try:
response, _ = await self.summarizer_llm.generate_response_async(
prompt=prompt,
temperature=0.3,
max_tokens=500,
)
# 解析JSON响应
# 尝试提取JSON部分
import re
json_match = re.search(r'\{[^{}]*"theme"[^{}]*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
else:
json_str = response.strip()
# 移除可能的markdown代码块标记
json_str = re.sub(r'```json\s*', '', json_str)
json_str = re.sub(r'```\s*', '', json_str)
json_str = json_str.strip()
result = json.loads(json_str)
theme = result.get("theme", "未命名对话")
keywords = result.get("keywords", [])
summary = result.get("summary", "无概括")
# 确保keywords是列表
if isinstance(keywords, str):
keywords = [keywords]
return theme, keywords, summary
except Exception as e:
logger.error(f"{self.log_prefix} LLM压缩聊天内容时出错: {e}")
logger.error(f"{self.log_prefix} LLM响应: {response if 'response' in locals() else 'N/A'}")
# 返回默认值
return "未命名对话", [], "压缩失败,无法生成概括"
async def _store_to_database(
self,
start_time: float,
end_time: float,
original_text: str,
participants: List[str],
theme: str,
keywords: List[str],
summary: str,
):
"""存储到数据库"""
try:
from src.common.database.database_model import ChatHistory
from src.plugin_system.apis import database_api
# 准备数据
data = {
"chat_id": self.chat_id,
"start_time": start_time,
"end_time": end_time,
"original_text": original_text,
"participants": json.dumps(participants, ensure_ascii=False),
"theme": theme,
"keywords": json.dumps(keywords, ensure_ascii=False),
"summary": summary,
}
# 使用db_save存储使用start_time和chat_id作为唯一标识
# 由于可能有多条记录我们使用组合键但peewee不支持所以使用start_time作为唯一标识
# 但为了避免冲突我们使用组合键chat_id + start_time
# 由于peewee不支持组合键我们直接创建新记录不提供key_field和key_value
saved_record = await database_api.db_save(
ChatHistory,
data=data,
)
if saved_record:
logger.debug(f"{self.log_prefix} 成功存储聊天历史记录到数据库")
else:
logger.warning(f"{self.log_prefix} 存储聊天历史记录到数据库失败")
except Exception as e:
logger.error(f"{self.log_prefix} 存储到数据库时出错: {e}")
import traceback
traceback.print_exc()
raise
async def start(self):
"""启动后台定期检查循环"""
if self._running:
logger.warning(f"{self.log_prefix} 后台循环已在运行,无需重复启动")
return
self._running = True
self._periodic_task = asyncio.create_task(self._periodic_check_loop())
logger.info(f"{self.log_prefix} 已启动后台定期检查循环,检查间隔: {self.check_interval}")
async def stop(self):
"""停止后台定期检查循环"""
self._running = False
if self._periodic_task:
self._periodic_task.cancel()
try:
await self._periodic_task
except asyncio.CancelledError:
pass
self._periodic_task = None
logger.info(f"{self.log_prefix} 已停止后台定期检查循环")
async def _periodic_check_loop(self):
"""后台定期检查循环"""
try:
while self._running:
# 执行一次检查
await self.process()
# 等待指定间隔后再次检查
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 后台检查循环被取消")
raise
except Exception as e:
logger.error(f"{self.log_prefix} 后台检查循环出错: {e}")
import traceback
traceback.print_exc()
self._running = False

View File

@ -674,7 +674,7 @@ def build_readable_messages(
messages: 消息列表
replace_bot_name: 是否替换机器人名称为""
merge_messages: 是否合并连续消息
timestamp_mode: 时间戳显示模式
timestamp_mode: 时间戳显示模式"normal""normal_no_YMD""relative"
read_mark: 已读标记时间戳
truncate: 是否截断长消息
show_actions: 是否显示动作记录

View File

@ -365,6 +365,39 @@ class Jargon(BaseModel):
class Meta:
table_name = "jargon"
class ChatHistory(BaseModel):
"""
用于存储聊天历史概括的模型
"""
chat_id = TextField(index=True) # 聊天ID
start_time = DoubleField() # 起始时间
end_time = DoubleField() # 结束时间
original_text = TextField() # 对话原文
participants = TextField() # 参与的所有人的昵称JSON格式存储
theme = TextField() # 主题:这段对话的主要内容,一个简短的标题
keywords = TextField() # 关键词这段对话的关键词JSON格式存储
summary = TextField() # 概括:对这段话的平文本概括
class Meta:
table_name = "chat_history"
class ThinkingBack(BaseModel):
"""
用于存储记忆检索思考过程的模型
"""
chat_id = TextField(index=True) # 聊天ID
question = TextField() # 提出的问题
context = TextField(null=True) # 上下文信息
found_answer = BooleanField(default=False) # 是否找到答案
answer = TextField(null=True) # 答案内容
thinking_steps = TextField(null=True) # 思考步骤JSON格式
create_time = DoubleField() # 创建时间
update_time = DoubleField() # 更新时间
class Meta:
table_name = "thinking_back"
MODELS = [
ChatStreams,
LLMUsage,
@ -379,6 +412,8 @@ MODELS = [
MemoryChest,
MemoryConflict,
Jargon,
ChatHistory,
ThinkingBack,
]
def create_tables():

View File

@ -351,6 +351,7 @@ MODULE_COLORS = {
# 核心模块
"main": "\033[1;97m", # 亮白色+粗体 (主程序)
"memory": "\033[38;5;34m", # 天蓝色
"memory_retrieval": "\033[38;5;34m", # 天蓝色
"config": "\033[93m", # 亮黄色
"common": "\033[95m", # 亮紫色
"tools": "\033[96m", # 亮青色
@ -372,6 +373,8 @@ MODULE_COLORS = {
"chat_stream": "\033[38;5;51m", # 亮青色
"message_storage": "\033[38;5;33m", # 深蓝色
"expressor": "\033[38;5;166m", # 橙色
# jargon相关
"jargon": "\033[38;5;220m", # 金黄色,突出显示
# 插件系统
"plugins": "\033[31m", # 红色
"plugin_api": "\033[33m", # 黄色
@ -440,6 +443,7 @@ MODULE_ALIASES = {
"database_model": "数据库",
"mood": "情绪",
"memory": "记忆",
"memory_retrieval": "回忆",
"tool_executor": "工具",
"hfc": "聊天节奏",
"plugin_manager": "插件",

View File

@ -647,6 +647,9 @@ class DebugConfig(ConfigBase):
show_replyer_reasoning: bool = True
"""是否显示回复器推理"""
show_jargon_prompt: bool = False
"""是否显示jargon相关提示词"""
@dataclass

View File

@ -13,7 +13,6 @@ from src.common.logger import get_logger
from src.common.server import get_global_server, Server
from src.mood.mood_manager import mood_manager
from src.chat.knowledge import lpmm_start_up
from src.memory_system.memory_management_task import MemoryManagementTask, MemoryConflictCleanupTask
from rich.traceback import install
# from src.api.main import start_api_server
@ -92,14 +91,6 @@ class MainSystem:
asyncio.create_task(get_chat_manager()._auto_save_task())
logger.info("聊天管理器初始化成功")
# 添加记忆管理任务
await async_task_manager.add_task(MemoryManagementTask())
logger.info("记忆管理任务已启动")
# 添加记忆冲突清理任务
await async_task_manager.add_task(MemoryConflictCleanupTask())
logger.info("记忆冲突清理任务已启动")
# await asyncio.sleep(0.5) #防止logger输出飞了

View File

@ -1,5 +1,5 @@
[inner]
version = "6.20.1"
version = "6.20.2"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请递增version的值
@ -223,6 +223,7 @@ library_log_levels = { aiohttp = "WARNING"} # 设置特定库的日志级别
show_prompt = false # 是否显示prompt
show_replyer_prompt = false # 是否显示回复器prompt
show_replyer_reasoning = false # 是否显示回复器推理
show_jargon_prompt = false # 是否显示jargon相关提示词
[maim_message]
auth_token = [] # 认证令牌用于API验证为空则不启用验证