pull/1394/head
墨梓柒 2025-11-28 02:09:45 +08:00
commit 51bf81f1e2
No known key found for this signature in database
GPG Key ID: 4A65B9DBA35F7635
5 changed files with 927 additions and 495 deletions

View File

@ -29,7 +29,7 @@ from src.chat.utils.chat_message_builder import (
build_readable_messages_with_id,
get_raw_msg_before_timestamp_with_chat,
)
from src.chat.utils.chat_history_summarizer import ChatHistorySummarizer
from src.hippo_memorizer.chat_history_summarizer import ChatHistorySummarizer
if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages

View File

@ -222,7 +222,8 @@ class ActionPlanner:
# 非no_reply动作需要target_message_id
target_message = None
if target_message_id := action_json.get("target_message_id"):
target_message_id = action_json.get("target_message_id")
if target_message_id:
# 根据target_message_id查找原始消息
target_message = self.find_message_by_id(target_message_id, message_id_list)
if target_message is None:
@ -233,6 +234,20 @@ class ActionPlanner:
target_message = message_id_list[-1][1]
logger.debug(f"{self.log_prefix}动作'{action}'缺少target_message_id使用最新消息作为target_message")
if (
action != "no_reply"
and target_message is not None
and self._is_message_from_self(target_message)
):
logger.info(
f"{self.log_prefix}Planner选择了自己的消息 {target_message_id or target_message.message_id} 作为目标,强制使用 no_reply"
)
reasoning = (
f"目标消息 {target_message_id or target_message.message_id} 来自机器人自身,违反不回复自身消息规则。原始理由: {reasoning}"
)
action = "no_reply"
target_message = None
# 验证action是否可用
available_action_names = [action_name for action_name, _ in current_available_actions]
internal_action_names = ["no_reply", "reply", "wait_time", "no_reply_until_call"]
@ -277,6 +292,17 @@ class ActionPlanner:
return action_planner_infos
def _is_message_from_self(self, message: "DatabaseMessages") -> bool:
"""判断消息是否由机器人自身发送"""
try:
return (
str(message.user_info.user_id) == str(global_config.bot.qq_account)
and (message.user_info.platform or "") == (global_config.bot.platform or "")
)
except AttributeError:
logger.warning(f"{self.log_prefix}检测消息发送者失败,缺少必要字段")
return False
async def plan(
self,
available_actions: Dict[str, ActionInfo],

View File

@ -1,493 +0,0 @@
"""
聊天内容概括器
用于累积打包和压缩聊天记录
"""
import asyncio
import json
import time
from typing import List, Optional, Set
from dataclasses import dataclass
from src.common.logger import get_logger
from src.common.data_models.database_data_model import DatabaseMessages
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
from src.plugin_system.apis import message_api
from src.chat.utils.chat_message_builder import build_readable_messages
from src.person_info.person_info import Person
from src.chat.message_receive.chat_stream import get_chat_manager
logger = get_logger("chat_history_summarizer")
@dataclass
class MessageBatch:
"""消息批次"""
messages: List[DatabaseMessages]
start_time: float
end_time: float
is_preparing: bool = False # 是否处于准备结束模式
class ChatHistorySummarizer:
"""聊天内容概括器"""
def __init__(self, chat_id: str, check_interval: int = 60):
"""
初始化聊天内容概括器
Args:
chat_id: 聊天ID
check_interval: 定期检查间隔默认60秒
"""
self.chat_id = chat_id
self._chat_display_name = self._get_chat_display_name()
self.log_prefix = f"[{self._chat_display_name}]"
# 记录时间点,用于计算新消息
self.last_check_time = time.time()
# 当前累积的消息批次
self.current_batch: Optional[MessageBatch] = None
# LLM请求器用于压缩聊天内容
self.summarizer_llm = LLMRequest(
model_set=model_config.model_task_config.utils, request_type="chat_history_summarizer"
)
# 后台循环相关
self.check_interval = check_interval # 检查间隔(秒)
self._periodic_task: Optional[asyncio.Task] = None
self._running = False
def _get_chat_display_name(self) -> str:
"""获取聊天显示名称"""
try:
chat_name = get_chat_manager().get_stream_name(self.chat_id)
if chat_name:
return chat_name
# 如果获取失败使用简化的chat_id显示
if len(self.chat_id) > 20:
return f"{self.chat_id[:8]}..."
return self.chat_id
except Exception:
# 如果获取失败使用简化的chat_id显示
if len(self.chat_id) > 20:
return f"{self.chat_id[:8]}..."
return self.chat_id
async def process(self, current_time: Optional[float] = None):
"""
处理聊天内容概括
Args:
current_time: 当前时间戳如果为None则使用time.time()
"""
if current_time is None:
current_time = time.time()
try:
# 获取从上次检查时间到当前时间的新消息
new_messages = message_api.get_messages_by_time_in_chat(
chat_id=self.chat_id,
start_time=self.last_check_time,
end_time=current_time,
limit=0,
limit_mode="latest",
filter_mai=False, # 不过滤bot消息因为需要检查bot是否发言
filter_command=False,
)
if not new_messages:
# 没有新消息,检查是否需要打包
if self.current_batch and self.current_batch.messages:
await self._check_and_package(current_time)
self.last_check_time = current_time
return
logger.debug(
f"{self.log_prefix} 开始处理聊天概括,时间窗口: {self.last_check_time:.2f} -> {current_time:.2f}"
)
# 有新消息,更新最后检查时间
self.last_check_time = current_time
# 如果有当前批次,添加新消息
if self.current_batch:
before_count = len(self.current_batch.messages)
self.current_batch.messages.extend(new_messages)
self.current_batch.end_time = current_time
logger.info(f"{self.log_prefix} 更新聊天话题: {before_count} -> {len(self.current_batch.messages)} 条消息")
else:
# 创建新批次
self.current_batch = MessageBatch(
messages=new_messages,
start_time=new_messages[0].time if new_messages else current_time,
end_time=current_time,
)
logger.info(f"{self.log_prefix} 新建聊天话题: {len(new_messages)} 条消息")
# 检查是否需要打包
await self._check_and_package(current_time)
except Exception as e:
logger.error(f"{self.log_prefix} 处理聊天内容概括时出错: {e}")
import traceback
traceback.print_exc()
async def _check_and_package(self, current_time: float):
"""检查是否需要打包"""
if not self.current_batch or not self.current_batch.messages:
return
messages = self.current_batch.messages
message_count = len(messages)
last_message_time = messages[-1].time if messages else current_time
time_since_last_message = current_time - last_message_time
# 格式化时间差显示
if time_since_last_message < 60:
time_str = f"{time_since_last_message:.1f}"
elif time_since_last_message < 3600:
time_str = f"{time_since_last_message / 60:.1f}分钟"
else:
time_str = f"{time_since_last_message / 3600:.1f}小时"
preparing_status = "" if self.current_batch.is_preparing else ""
logger.info(
f"{self.log_prefix} 批次状态检查 | 消息数: {message_count} | 距最后消息: {time_str} | 准备结束模式: {preparing_status}"
)
# 检查打包条件
should_package = False
# 条件1: 消息长度超过120直接打包
if message_count >= 120:
should_package = True
logger.info(f"{self.log_prefix} 触发打包条件: 消息数量达到 {message_count} 条(阈值: 120条")
# 条件2: 最后一条消息的时间和当前时间差>600秒直接打包
elif time_since_last_message > 600:
should_package = True
logger.info(f"{self.log_prefix} 触发打包条件: 距最后消息 {time_str}(阈值: 10分钟")
# 条件3: 消息长度超过100进入准备结束模式
elif message_count > 100:
if not self.current_batch.is_preparing:
self.current_batch.is_preparing = True
logger.info(f"{self.log_prefix} 消息数量 {message_count} 条超过阈值100条进入准备结束模式")
# 在准备结束模式下,如果最后一条消息的时间和当前时间差>10秒就打包
if time_since_last_message > 10:
should_package = True
logger.info(f"{self.log_prefix} 触发打包条件: 准备结束模式下,距最后消息 {time_str}(阈值: 10秒")
if should_package:
await self._package_and_store()
async def _package_and_store(self):
"""打包并存储聊天记录"""
if not self.current_batch or not self.current_batch.messages:
return
messages = self.current_batch.messages
start_time = self.current_batch.start_time
end_time = self.current_batch.end_time
logger.info(
f"{self.log_prefix} 开始打包批次 | 消息数: {len(messages)} | 时间范围: {start_time:.2f} - {end_time:.2f}"
)
# 检查是否有bot发言
# 第一条消息前推600s到最后一条消息的时间内
check_start_time = max(start_time - 600, 0)
check_end_time = end_time
# 使用包含边界的时间范围查询
bot_messages = message_api.get_messages_by_time_in_chat_inclusive(
chat_id=self.chat_id,
start_time=check_start_time,
end_time=check_end_time,
limit=0,
limit_mode="latest",
filter_mai=False,
filter_command=False,
)
# 检查是否有bot的发言
has_bot_message = False
bot_user_id = str(global_config.bot.qq_account)
for msg in bot_messages:
if msg.user_info.user_id == bot_user_id:
has_bot_message = True
break
if not has_bot_message:
logger.info(
f"{self.log_prefix} 批次内无Bot发言丢弃批次 | 检查时间范围: {check_start_time:.2f} - {check_end_time:.2f}"
)
self.current_batch = None
return
# 有bot发言进行压缩和存储
try:
# 构建对话原文
original_text = build_readable_messages(
messages=messages,
replace_bot_name=True,
timestamp_mode="normal_no_YMD",
read_mark=0.0,
truncate=False,
show_actions=False,
)
# 获取参与的所有人的昵称
participants_set: Set[str] = set()
for msg in messages:
# 使用 msg.user_platform扁平化字段或 msg.user_info.platform
platform = (
getattr(msg, "user_platform", None)
or (msg.user_info.platform if msg.user_info else None)
or msg.chat_info.platform
)
person = Person(platform=platform, user_id=msg.user_info.user_id)
person_name = person.person_name
if person_name:
participants_set.add(person_name)
participants = list(participants_set)
logger.info(f"{self.log_prefix} 批次参与者: {', '.join(participants) if participants else '未知'}")
# 使用LLM压缩聊天内容
success, theme, keywords, summary = await self._compress_with_llm(original_text)
if not success:
logger.warning(f"{self.log_prefix} LLM压缩失败不存储到数据库 | 消息数: {len(messages)}")
# 清空当前批次,避免重复处理
self.current_batch = None
return
logger.info(
f"{self.log_prefix} LLM压缩完成 | 主题: {theme} | 关键词数: {len(keywords)} | 概括长度: {len(summary)}"
)
# 存储到数据库
await self._store_to_database(
start_time=start_time,
end_time=end_time,
original_text=original_text,
participants=participants,
theme=theme,
keywords=keywords,
summary=summary,
)
logger.info(f"{self.log_prefix} 成功打包并存储聊天记录 | 消息数: {len(messages)} | 主题: {theme}")
# 清空当前批次
self.current_batch = None
except Exception as e:
logger.error(f"{self.log_prefix} 打包和存储聊天记录时出错: {e}")
import traceback
traceback.print_exc()
# 出错时也清空批次,避免重复处理
self.current_batch = None
async def _compress_with_llm(self, original_text: str) -> tuple[bool, str, List[str], str]:
"""
使用LLM压缩聊天内容
Returns:
tuple[bool, str, List[str], str]: (是否成功, 主题, 关键词列表, 概括)
"""
prompt = f"""请对以下聊天记录进行概括,提取以下信息:
1. 主题这段对话的主要内容一个简短的标题不超过20字
2. 关键词这段对话的关键词用列表形式返回3-10个关键词
3. 概括对这段话的平文本概括50-200
请以JSON格式返回格式如下
{{
"theme": "主题",
"keywords": ["关键词1", "关键词2", ...],
"summary": "概括内容"
}}
聊天记录
{original_text}
请直接返回JSON不要包含其他内容"""
try:
response, _ = await self.summarizer_llm.generate_response_async(
prompt=prompt,
temperature=0.3,
max_tokens=500,
)
# 解析JSON响应
import re
# 移除可能的markdown代码块标记
json_str = response.strip()
json_str = re.sub(r"^```json\s*", "", json_str, flags=re.MULTILINE)
json_str = re.sub(r"^```\s*", "", json_str, flags=re.MULTILINE)
json_str = json_str.strip()
# 尝试找到JSON对象的开始和结束位置
# 查找第一个 { 和最后一个匹配的 }
start_idx = json_str.find("{")
if start_idx == -1:
raise ValueError("未找到JSON对象开始标记")
# 从后往前查找最后一个 }
end_idx = json_str.rfind("}")
if end_idx == -1 or end_idx <= start_idx:
raise ValueError("未找到JSON对象结束标记")
# 提取JSON字符串
json_str = json_str[start_idx : end_idx + 1]
# 尝试解析JSON
try:
result = json.loads(json_str)
except json.JSONDecodeError:
# 如果解析失败,尝试修复字符串值中的中文引号
# 简单方法:将字符串值中的中文引号替换为转义的英文引号
# 使用状态机方法:遍历字符串,在字符串值内部替换中文引号
fixed_chars = []
in_string = False
escape_next = False
i = 0
while i < len(json_str):
char = json_str[i]
if escape_next:
fixed_chars.append(char)
escape_next = False
elif char == "\\":
fixed_chars.append(char)
escape_next = True
elif char == '"' and not escape_next:
fixed_chars.append(char)
in_string = not in_string
elif in_string and (char == '"' or char == '"'):
# 在字符串值内部,将中文引号替换为转义的英文引号
fixed_chars.append('\\"')
else:
fixed_chars.append(char)
i += 1
json_str = "".join(fixed_chars)
# 再次尝试解析
result = json.loads(json_str)
theme = result.get("theme", "未命名对话")
keywords = result.get("keywords", [])
summary = result.get("summary", "无概括")
# 确保keywords是列表
if isinstance(keywords, str):
keywords = [keywords]
return True, theme, keywords, summary
except Exception as e:
logger.error(f"{self.log_prefix} LLM压缩聊天内容时出错: {e}")
logger.error(f"{self.log_prefix} LLM响应: {response if 'response' in locals() else 'N/A'}")
# 返回失败标志和默认值
return False, "未命名对话", [], "压缩失败,无法生成概括"
async def _store_to_database(
self,
start_time: float,
end_time: float,
original_text: str,
participants: List[str],
theme: str,
keywords: List[str],
summary: str,
):
"""存储到数据库"""
try:
from src.common.database.database_model import ChatHistory
from src.plugin_system.apis import database_api
# 准备数据
data = {
"chat_id": self.chat_id,
"start_time": start_time,
"end_time": end_time,
"original_text": original_text,
"participants": json.dumps(participants, ensure_ascii=False),
"theme": theme,
"keywords": json.dumps(keywords, ensure_ascii=False),
"summary": summary,
"count": 0,
}
# 使用db_save存储使用start_time和chat_id作为唯一标识
# 由于可能有多条记录我们使用组合键但peewee不支持所以使用start_time作为唯一标识
# 但为了避免冲突我们使用组合键chat_id + start_time
# 由于peewee不支持组合键我们直接创建新记录不提供key_field和key_value
saved_record = await database_api.db_save(
ChatHistory,
data=data,
)
if saved_record:
logger.debug(f"{self.log_prefix} 成功存储聊天历史记录到数据库")
else:
logger.warning(f"{self.log_prefix} 存储聊天历史记录到数据库失败")
except Exception as e:
logger.error(f"{self.log_prefix} 存储到数据库时出错: {e}")
import traceback
traceback.print_exc()
raise
async def start(self):
"""启动后台定期检查循环"""
if self._running:
logger.warning(f"{self.log_prefix} 后台循环已在运行,无需重复启动")
return
self._running = True
self._periodic_task = asyncio.create_task(self._periodic_check_loop())
logger.info(f"{self.log_prefix} 已启动后台定期检查循环 | 检查间隔: {self.check_interval}")
async def stop(self):
"""停止后台定期检查循环"""
self._running = False
if self._periodic_task:
self._periodic_task.cancel()
try:
await self._periodic_task
except asyncio.CancelledError:
pass
self._periodic_task = None
logger.info(f"{self.log_prefix} 已停止后台定期检查循环")
async def _periodic_check_loop(self):
"""后台定期检查循环"""
try:
while self._running:
# 执行一次检查
await self.process()
# 等待指定间隔后再次检查
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 后台检查循环被取消")
raise
except Exception as e:
logger.error(f"{self.log_prefix} 后台检查循环出错: {e}")
import traceback
traceback.print_exc()
self._running = False

View File

@ -372,6 +372,7 @@ class ChatHistory(BaseModel):
theme = TextField() # 主题:这段对话的主要内容,一个简短的标题
keywords = TextField() # 关键词这段对话的关键词JSON格式存储
summary = TextField() # 概括:对这段话的平文本概括
key_point = TextField(null=True) # 关键信息话题中的关键信息点JSON格式存储
count = IntegerField(default=0) # 被检索次数
forget_times = IntegerField(default=0) # 被遗忘检查的次数

View File

@ -0,0 +1,898 @@
"""
聊天内容概括器
用于累积打包和压缩聊天记录
"""
import asyncio
import json
import time
import re
from pathlib import Path
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from src.common.logger import get_logger
from src.common.data_models.database_data_model import DatabaseMessages
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
from src.plugin_system.apis import message_api
from src.chat.utils.chat_message_builder import build_readable_messages
from src.person_info.person_info import Person
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
logger = get_logger("chat_history_summarizer")
HIPPO_CACHE_DIR = Path(__file__).resolve().parents[2] / "data" / "hippo_memorizer"
def init_prompt():
"""初始化提示词模板"""
topic_analysis_prompt = """
历史话题标题列表仅标题不含具体内容
{history_topics_block}
本次聊天记录每条消息前有编号用于后续引用
{messages_block}
请完成以下任务
**识别话题**
1. 识别本次聊天记录中正在进行的一个或多个话题
2. 判断历史话题标题列表中的话题是否在本次聊天记录中出现如果出现则直接使用该历史话题标题字符串
**选取消息**
1. 对于每个话题新话题或历史话题从上述带编号的消息中选出与该话题强相关的消息编号列表
2. 每个话题用一句话清晰地描述正在发生的事件必须包含时间大致即可人物主要事件和主题保证精准且有区分度
请先输出一段简短思考说明有什么话题哪些是不包含在历史话题中的哪些是包含在历史话题中的并说明为什么
然后严格以 JSON 格式输出本次聊天记录中涉及的话题格式如下
[
{{
"topic": "话题",
"message_indices": [1, 2, 5]
}},
...
]
"""
Prompt(topic_analysis_prompt, "hippo_topic_analysis_prompt")
topic_summary_prompt = """
请基于以下话题对聊天记录片段进行概括提取以下信息
**话题**{topic}
**要求**
1. 关键词提取与话题相关的关键词用列表形式返回3-10个关键词
2. 概括对这段话的平文本概括50-200要求
- 仔细地转述发生的事件和聊天内容
- 可以适当摘取聊天记录中的原文
- 重点突出事件的发展过程和结果
- 围绕话题这个中心进行概括
3. 关键信息提取话题中的关键信息点用列表形式返回3-8个关键信息点每个关键信息点应该简洁明了
请以JSON格式返回格式如下
{{
"keywords": ["关键词1", "关键词2", ...],
"summary": "概括内容",
"key_point": ["关键信息1", "关键信息2", ...]
}}
聊天记录
{original_text}
请直接返回JSON不要包含其他内容
"""
Prompt(topic_summary_prompt, "hippo_topic_summary_prompt")
@dataclass
class MessageBatch:
"""消息批次(用于触发话题检查的原始消息累积)"""
messages: List[DatabaseMessages]
start_time: float
end_time: float
@dataclass
class TopicCacheItem:
"""
话题缓存项
Attributes:
topic: 话题标题一句话描述时间人物事件和主题
messages: 与该话题相关的消息字符串列表已经通过 build 函数转成可读文本
participants: 涉及到的发言人昵称集合
no_update_checks: 连续多少次检查没有新增内容
"""
topic: str
messages: List[str] = field(default_factory=list)
participants: Set[str] = field(default_factory=set)
no_update_checks: int = 0
class ChatHistorySummarizer:
"""聊天内容概括器"""
def __init__(self, chat_id: str, check_interval: int = 60):
"""
初始化聊天内容概括器
Args:
chat_id: 聊天ID
check_interval: 定期检查间隔默认60秒
"""
self.chat_id = chat_id
self._chat_display_name = self._get_chat_display_name()
self.log_prefix = f"[{self._chat_display_name}]"
# 记录时间点,用于计算新消息
self.last_check_time = time.time()
# 记录上一次话题检查的时间,用于判断是否需要触发检查
self.last_topic_check_time = time.time()
# 当前累积的消息批次
self.current_batch: Optional[MessageBatch] = None
# 话题缓存topic_str -> TopicCacheItem
# 在内存中维护,并通过本地文件实时持久化
self.topic_cache: Dict[str, TopicCacheItem] = {}
self._safe_chat_id = self._sanitize_chat_id(self.chat_id)
self._topic_cache_file = HIPPO_CACHE_DIR / f"{self._safe_chat_id}.json"
# 注意:批次加载需要异步查询消息,所以在 start() 中调用
# LLM请求器用于压缩聊天内容
self.summarizer_llm = LLMRequest(
model_set=model_config.model_task_config.utils, request_type="chat_history_summarizer"
)
# 后台循环相关
self.check_interval = check_interval # 检查间隔(秒)
self._periodic_task: Optional[asyncio.Task] = None
self._running = False
def _get_chat_display_name(self) -> str:
"""获取聊天显示名称"""
try:
chat_name = get_chat_manager().get_stream_name(self.chat_id)
if chat_name:
return chat_name
# 如果获取失败使用简化的chat_id显示
if len(self.chat_id) > 20:
return f"{self.chat_id[:8]}..."
return self.chat_id
except Exception:
# 如果获取失败使用简化的chat_id显示
if len(self.chat_id) > 20:
return f"{self.chat_id[:8]}..."
return self.chat_id
def _sanitize_chat_id(self, chat_id: str) -> str:
"""用于生成可作为文件名的 chat_id"""
return re.sub(r"[^a-zA-Z0-9_.-]", "_", chat_id)
def _load_topic_cache_from_disk(self):
"""在启动时加载本地话题缓存(同步部分),支持重启后继续"""
try:
if not self._topic_cache_file.exists():
return
with self._topic_cache_file.open("r", encoding="utf-8") as f:
data = json.load(f)
self.last_topic_check_time = data.get("last_topic_check_time", self.last_topic_check_time)
topics_data = data.get("topics", {})
loaded_count = 0
for topic, payload in topics_data.items():
self.topic_cache[topic] = TopicCacheItem(
topic=topic,
messages=payload.get("messages", []),
participants=set(payload.get("participants", [])),
no_update_checks=payload.get("no_update_checks", 0),
)
loaded_count += 1
if loaded_count:
logger.info(f"{self.log_prefix} 已加载 {loaded_count} 个话题缓存,继续追踪")
except Exception as e:
logger.error(f"{self.log_prefix} 加载话题缓存失败: {e}")
async def _load_batch_from_disk(self):
"""在启动时加载聊天批次,支持重启后继续"""
try:
if not self._topic_cache_file.exists():
return
with self._topic_cache_file.open("r", encoding="utf-8") as f:
data = json.load(f)
batch_data = data.get("current_batch")
if not batch_data:
return
start_time = batch_data.get("start_time")
end_time = batch_data.get("end_time")
if not start_time or not end_time:
return
# 根据时间范围重新查询消息
messages = message_api.get_messages_by_time_in_chat(
chat_id=self.chat_id,
start_time=start_time,
end_time=end_time,
limit=0,
limit_mode="latest",
filter_mai=False,
filter_command=False,
)
if messages:
self.current_batch = MessageBatch(
messages=messages,
start_time=start_time,
end_time=end_time,
)
logger.info(f"{self.log_prefix} 已恢复聊天批次,包含 {len(messages)} 条消息")
except Exception as e:
logger.error(f"{self.log_prefix} 加载聊天批次失败: {e}")
def _persist_topic_cache(self):
"""实时持久化话题缓存和聊天批次,避免重启后丢失"""
try:
# 如果既没有话题缓存也没有批次,删除缓存文件
if not self.topic_cache and not self.current_batch:
if self._topic_cache_file.exists():
self._topic_cache_file.unlink()
return
HIPPO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
data = {
"chat_id": self.chat_id,
"last_topic_check_time": self.last_topic_check_time,
"topics": {
topic: {
"messages": item.messages,
"participants": list(item.participants),
"no_update_checks": item.no_update_checks,
}
for topic, item in self.topic_cache.items()
},
}
# 保存当前批次的时间范围(如果有)
if self.current_batch:
data["current_batch"] = {
"start_time": self.current_batch.start_time,
"end_time": self.current_batch.end_time,
}
with self._topic_cache_file.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"{self.log_prefix} 持久化话题缓存失败: {e}")
async def process(self, current_time: Optional[float] = None):
"""
处理聊天内容概括
Args:
current_time: 当前时间戳如果为None则使用time.time()
"""
if current_time is None:
current_time = time.time()
try:
# 获取从上次检查时间到当前时间的新消息
new_messages = message_api.get_messages_by_time_in_chat(
chat_id=self.chat_id,
start_time=self.last_check_time,
end_time=current_time,
limit=0,
limit_mode="latest",
filter_mai=False, # 不过滤bot消息因为需要检查bot是否发言
filter_command=False,
)
if not new_messages:
# 没有新消息,检查是否需要进行“话题检查”
if self.current_batch and self.current_batch.messages:
await self._check_and_run_topic_check(current_time)
self.last_check_time = current_time
return
logger.debug(
f"{self.log_prefix} 开始处理聊天概括,时间窗口: {self.last_check_time:.2f} -> {current_time:.2f}"
)
# 有新消息,更新最后检查时间
self.last_check_time = current_time
# 如果有当前批次,添加新消息
if self.current_batch:
before_count = len(self.current_batch.messages)
self.current_batch.messages.extend(new_messages)
self.current_batch.end_time = current_time
logger.info(f"{self.log_prefix} 更新聊天检查批次: {before_count} -> {len(self.current_batch.messages)} 条消息")
# 更新批次后持久化
self._persist_topic_cache()
else:
# 创建新批次
self.current_batch = MessageBatch(
messages=new_messages,
start_time=new_messages[0].time if new_messages else current_time,
end_time=current_time,
)
logger.info(f"{self.log_prefix} 新建聊天检查批次: {len(new_messages)} 条消息")
# 创建批次后持久化
self._persist_topic_cache()
# 检查是否需要触发“话题检查”
await self._check_and_run_topic_check(current_time)
except Exception as e:
logger.error(f"{self.log_prefix} 处理聊天内容概括时出错: {e}")
import traceback
traceback.print_exc()
async def _check_and_run_topic_check(self, current_time: float):
"""
检查是否需要进行一次话题检查
触发条件
- 当前批次消息数 >= 100或者
- 距离上一次检查的时间 > 3600 1小时
"""
if not self.current_batch or not self.current_batch.messages:
return
messages = self.current_batch.messages
message_count = len(messages)
time_since_last_check = current_time - self.last_topic_check_time
# 格式化时间差显示
if time_since_last_check < 60:
time_str = f"{time_since_last_check:.1f}"
elif time_since_last_check < 3600:
time_str = f"{time_since_last_check / 60:.1f}分钟"
else:
time_str = f"{time_since_last_check / 3600:.1f}小时"
logger.info(
f"{self.log_prefix} 批次状态检查 | 消息数: {message_count} | 距上次检查: {time_str}"
)
# 检查“话题检查”触发条件
should_check = False
# 条件1: 消息数量 >= 100触发一次检查
if message_count >= 50:
should_check = True
logger.info(f"{self.log_prefix} 触发检查条件: 消息数量达到 {message_count} 条(阈值: 100条")
# 条件2: 距离上一次检查 > 3600 秒1小时触发一次检查
elif time_since_last_check > 1200:
should_check = True
logger.info(f"{self.log_prefix} 触发检查条件: 距上次检查 {time_str}(阈值: 1小时")
if should_check:
await self._run_topic_check_and_update_cache(messages)
# 本批次已经被处理为话题信息,可以清空
self.current_batch = None
# 更新上一次检查时间,并持久化
self.last_topic_check_time = current_time
self._persist_topic_cache()
async def _run_topic_check_and_update_cache(self, messages: List[DatabaseMessages]):
"""
执行一次话题检查
1. 首先确认这段消息里是否有 Bot 发言没有则直接丢弃本次批次
2. 将消息编号并转成字符串构造 LLM Prompt
3. 把历史话题标题列表放入 Prompt要求 LLM
- 识别当前聊天中的话题1 个或多个
- 为每个话题选出相关消息编号
- 若话题属于历史话题则沿用原话题标题
4. LLM 返回 JSON多个 {topic, message_indices}
5. 更新本地话题缓存并根据规则触发话题打包存储
"""
if not messages:
return
start_time = messages[0].time
end_time = messages[-1].time
logger.info(
f"{self.log_prefix} 开始话题检查 | 消息数: {len(messages)} | 时间范围: {start_time:.2f} - {end_time:.2f}"
)
# 1. 检查当前批次内是否有 bot 发言(只检查当前批次,不往前推)
# 原因:我们要记录的是 bot 参与过的对话片段,如果当前批次内 bot 没有发言,
# 说明 bot 没有参与这段对话,不应该记录
bot_user_id = str(global_config.bot.qq_account)
has_bot_message = False
for msg in messages:
if msg.user_info.user_id == bot_user_id:
has_bot_message = True
break
if not has_bot_message:
logger.info(
f"{self.log_prefix} 当前批次内无 Bot 发言,丢弃本次检查 | 时间范围: {start_time:.2f} - {end_time:.2f}"
)
return
# 2. 构造编号后的消息字符串和参与者信息
numbered_lines, index_to_msg_str, index_to_msg_text, index_to_participants = self._build_numbered_messages_for_llm(messages)
# 3. 调用 LLM 识别话题,并得到 topic -> indices
existing_topics = list(self.topic_cache.keys())
success, topic_to_indices = await self._analyze_topics_with_llm(
numbered_lines=numbered_lines,
existing_topics=existing_topics,
)
if not success or not topic_to_indices:
logger.warning(f"{self.log_prefix} 话题识别失败或无有效话题,本次检查忽略")
# 即使识别失败,也认为是一次“检查”,但不更新 no_update_checks保持原状
return
# 4. 统计哪些话题在本次检查中有新增内容
updated_topics: Set[str] = set()
for topic, indices in topic_to_indices.items():
if not indices:
continue
item = self.topic_cache.get(topic)
if not item:
# 新话题
item = TopicCacheItem(topic=topic)
self.topic_cache[topic] = item
# 收集属于该话题的消息文本(不带编号)
topic_msg_texts: List[str] = []
new_participants: Set[str] = set()
for idx in indices:
msg_text = index_to_msg_text.get(idx)
if not msg_text:
continue
topic_msg_texts.append(msg_text)
new_participants.update(index_to_participants.get(idx, set()))
if not topic_msg_texts:
continue
# 将本次检查中属于该话题的所有消息合并为一个字符串(不带编号)
merged_text = "\n".join(topic_msg_texts)
item.messages.append(merged_text)
item.participants.update(new_participants)
# 本次检查中该话题有更新,重置计数
item.no_update_checks = 0
updated_topics.add(topic)
# 5. 对于本次没有更新的历史话题no_update_checks + 1
for topic, item in list(self.topic_cache.items()):
if topic not in updated_topics:
item.no_update_checks += 1
# 6. 检查是否有话题需要打包存储
topics_to_finalize: List[str] = []
for topic, item in self.topic_cache.items():
if item.no_update_checks >= 3:
logger.info(f"{self.log_prefix} 话题[{topic}] 连续 5 次检查无新增内容,触发打包存储")
topics_to_finalize.append(topic)
continue
if len(item.messages) > 8:
logger.info(f"{self.log_prefix} 话题[{topic}] 消息条数超过 30触发打包存储")
topics_to_finalize.append(topic)
for topic in topics_to_finalize:
item = self.topic_cache.get(topic)
if not item:
continue
try:
await self._finalize_and_store_topic(
topic=topic,
item=item,
# 这里的时间范围尽量覆盖最近一次检查的区间
start_time=start_time,
end_time=end_time,
)
finally:
# 无论成功与否,都从缓存中删除,避免重复
self.topic_cache.pop(topic, None)
def _build_numbered_messages_for_llm(
self, messages: List[DatabaseMessages]
) -> tuple[List[str], Dict[int, str], Dict[int, str], Dict[int, Set[str]]]:
"""
将消息转为带编号的字符串 LLM 选择使用
返回:
numbered_lines: ["1. xxx", "2. yyy", ...] # 带编号,用于 LLM 选择
index_to_msg_str: idx -> "idx. xxx" # 带编号,用于 LLM 选择
index_to_msg_text: idx -> "xxx" # 不带编号,用于最终存储
index_to_participants: idx -> {nickname1, nickname2, ...}
"""
numbered_lines: List[str] = []
index_to_msg_str: Dict[int, str] = {}
index_to_msg_text: Dict[int, str] = {} # 不带编号的消息文本
index_to_participants: Dict[int, Set[str]] = {}
for idx, msg in enumerate(messages, start=1):
# 使用 build_readable_messages 生成可读文本
try:
text = build_readable_messages(
messages=[msg],
replace_bot_name=True,
timestamp_mode="normal_no_YMD",
read_mark=0.0,
truncate=False,
show_actions=False,
).strip()
except Exception:
# 回退到简单文本
text = getattr(msg, "processed_plain_text", "") or ""
# 获取发言人昵称
participants: Set[str] = set()
try:
platform = (
getattr(msg, "user_platform", None)
or (msg.user_info.platform if msg.user_info else None)
or msg.chat_info.platform
)
user_id = msg.user_info.user_id if msg.user_info else None
if platform and user_id:
person = Person(platform=platform, user_id=user_id)
if person.person_name:
participants.add(person.person_name)
except Exception:
pass
# 带编号的字符串(用于 LLM 选择)
line = f"{idx}. {text}"
numbered_lines.append(line)
index_to_msg_str[idx] = line
# 不带编号的文本(用于最终存储)
index_to_msg_text[idx] = text
index_to_participants[idx] = participants
return numbered_lines, index_to_msg_str, index_to_msg_text, index_to_participants
async def _analyze_topics_with_llm(
self,
numbered_lines: List[str],
existing_topics: List[str],
) -> tuple[bool, Dict[str, List[int]]]:
"""
使用 LLM 识别本次检查中的话题并为每个话题选择相关消息编号
要求
- 话题用一句话清晰描述正在发生的事件包括时间人物主要事件和主题
- 可以有 1 个或多个话题
- 若某个话题与历史话题列表中的某个话题是同一件事请直接使用历史话题的字符串
- 输出 JSON格式
[
{
"topic": "话题标题字符串",
"message_indices": [1, 2, 5]
},
...
]
"""
if not numbered_lines:
return False, {}
history_topics_block = (
"\n".join(f"- {t}" for t in existing_topics) if existing_topics else "(当前无历史话题)"
)
messages_block = "\n".join(numbered_lines)
prompt = await global_prompt_manager.format_prompt(
"hippo_topic_analysis_prompt",
history_topics_block=history_topics_block,
messages_block=messages_block,
)
try:
response, _ = await self.summarizer_llm.generate_response_async(
prompt=prompt,
temperature=0.2,
max_tokens=800,
)
import re
logger.info(f"{self.log_prefix} 话题识别LLM Prompt: {prompt}")
logger.info(f"{self.log_prefix} 话题识别LLM Response: {response}")
json_str = response.strip()
# 移除可能的 markdown 代码块标记
json_str = re.sub(r"^```json\s*", "", json_str, flags=re.MULTILINE)
json_str = re.sub(r"^```\s*", "", json_str, flags=re.MULTILINE)
json_str = json_str.strip()
# 尝试直接解析为 JSON 数组
result = json.loads(json_str)
if not isinstance(result, list):
logger.error(f"{self.log_prefix} 话题识别返回的 JSON 不是列表: {result}")
return False, {}
topic_to_indices: Dict[str, List[int]] = {}
for item in result:
if not isinstance(item, dict):
continue
topic = item.get("topic")
indices = item.get("message_indices") or item.get("messages") or []
if not topic or not isinstance(topic, str):
continue
if isinstance(indices, list):
valid_indices: List[int] = []
for v in indices:
try:
iv = int(v)
if iv > 0:
valid_indices.append(iv)
except (TypeError, ValueError):
continue
if valid_indices:
topic_to_indices[topic] = valid_indices
return True, topic_to_indices
except Exception as e:
logger.error(f"{self.log_prefix} 话题识别 LLM 调用或解析失败: {e}")
logger.error(f"{self.log_prefix} LLM响应: {response if 'response' in locals() else 'N/A'}")
return False, {}
async def _finalize_and_store_topic(
self,
topic: str,
item: TopicCacheItem,
start_time: float,
end_time: float,
):
"""
对某个话题进行最终打包存储
1. messages(list[str]) 拼接为 original_text
2. 使用 LLM original_text 进行总结得到 summary keywordstheme 直接使用话题字符串
3. 写入数据库 ChatHistory
4. 完成后调用方会从缓存中删除该话题
"""
if not item.messages:
logger.info(f"{self.log_prefix} 话题[{topic}] 无消息内容,跳过打包")
return
original_text = "\n".join(item.messages)
logger.info(
f"{self.log_prefix} 开始打包话题[{topic}] | 消息数: {len(item.messages)} | 时间范围: {start_time:.2f} - {end_time:.2f}"
)
# 使用 LLM 进行总结(基于话题名)
success, keywords, summary, key_point = await self._compress_with_llm(original_text, topic)
if not success:
logger.warning(f"{self.log_prefix} 话题[{topic}] LLM 概括失败,不写入数据库")
return
participants = list(item.participants)
await self._store_to_database(
start_time=start_time,
end_time=end_time,
original_text=original_text,
participants=participants,
theme=topic, # 主题直接使用话题名
keywords=keywords,
summary=summary,
key_point=key_point,
)
logger.info(
f"{self.log_prefix} 话题[{topic}] 成功打包并存储 | 消息数: {len(item.messages)} | 参与者数: {len(participants)}"
)
async def _compress_with_llm(self, original_text: str, topic: str) -> tuple[bool, List[str], str, List[str]]:
"""
使用LLM压缩聊天内容用于单个话题的最终总结
Args:
original_text: 聊天记录原文
topic: 话题名称
Returns:
tuple[bool, List[str], str, List[str]]: (是否成功, 关键词列表, 概括, 关键信息列表)
"""
prompt = await global_prompt_manager.format_prompt(
"hippo_topic_summary_prompt",
topic=topic,
original_text=original_text,
)
try:
response, _ = await self.summarizer_llm.generate_response_async(
prompt=prompt,
temperature=0.3,
max_tokens=500,
)
# 解析JSON响应
import re
# 移除可能的markdown代码块标记
json_str = response.strip()
json_str = re.sub(r"^```json\s*", "", json_str, flags=re.MULTILINE)
json_str = re.sub(r"^```\s*", "", json_str, flags=re.MULTILINE)
json_str = json_str.strip()
# 尝试找到JSON对象的开始和结束位置
# 查找第一个 { 和最后一个匹配的 }
start_idx = json_str.find("{")
if start_idx == -1:
raise ValueError("未找到JSON对象开始标记")
# 从后往前查找最后一个 }
end_idx = json_str.rfind("}")
if end_idx == -1 or end_idx <= start_idx:
raise ValueError("未找到JSON对象结束标记")
# 提取JSON字符串
json_str = json_str[start_idx : end_idx + 1]
# 尝试解析JSON
try:
result = json.loads(json_str)
except json.JSONDecodeError:
# 如果解析失败,尝试修复字符串值中的中文引号
# 简单方法:将字符串值中的中文引号替换为转义的英文引号
# 使用状态机方法:遍历字符串,在字符串值内部替换中文引号
fixed_chars = []
in_string = False
escape_next = False
i = 0
while i < len(json_str):
char = json_str[i]
if escape_next:
fixed_chars.append(char)
escape_next = False
elif char == "\\":
fixed_chars.append(char)
escape_next = True
elif char == '"' and not escape_next:
fixed_chars.append(char)
in_string = not in_string
elif in_string and (char == '"' or char == '"'):
# 在字符串值内部,将中文引号替换为转义的英文引号
fixed_chars.append('\\"')
else:
fixed_chars.append(char)
i += 1
json_str = "".join(fixed_chars)
# 再次尝试解析
result = json.loads(json_str)
keywords = result.get("keywords", [])
summary = result.get("summary", "无概括")
key_point = result.get("key_point", [])
# 确保keywords和key_point是列表
if isinstance(keywords, str):
keywords = [keywords]
if isinstance(key_point, str):
key_point = [key_point]
return True, keywords, summary, key_point
except Exception as e:
logger.error(f"{self.log_prefix} LLM压缩聊天内容时出错: {e}")
logger.error(f"{self.log_prefix} LLM响应: {response if 'response' in locals() else 'N/A'}")
# 返回失败标志和默认值
return False, [], "压缩失败,无法生成概括", []
async def _store_to_database(
self,
start_time: float,
end_time: float,
original_text: str,
participants: List[str],
theme: str,
keywords: List[str],
summary: str,
key_point: Optional[List[str]] = None,
):
"""存储到数据库"""
try:
from src.common.database.database_model import ChatHistory
from src.plugin_system.apis import database_api
# 准备数据
data = {
"chat_id": self.chat_id,
"start_time": start_time,
"end_time": end_time,
"original_text": original_text,
"participants": json.dumps(participants, ensure_ascii=False),
"theme": theme,
"keywords": json.dumps(keywords, ensure_ascii=False),
"summary": summary,
"count": 0,
}
# 存储 key_point如果存在
if key_point is not None:
data["key_point"] = json.dumps(key_point, ensure_ascii=False)
# 使用db_save存储使用start_time和chat_id作为唯一标识
# 由于可能有多条记录我们使用组合键但peewee不支持所以使用start_time作为唯一标识
# 但为了避免冲突我们使用组合键chat_id + start_time
# 由于peewee不支持组合键我们直接创建新记录不提供key_field和key_value
saved_record = await database_api.db_save(
ChatHistory,
data=data,
)
if saved_record:
logger.debug(f"{self.log_prefix} 成功存储聊天历史记录到数据库")
else:
logger.warning(f"{self.log_prefix} 存储聊天历史记录到数据库失败")
except Exception as e:
logger.error(f"{self.log_prefix} 存储到数据库时出错: {e}")
import traceback
traceback.print_exc()
raise
async def start(self):
"""启动后台定期检查循环"""
if self._running:
logger.warning(f"{self.log_prefix} 后台循环已在运行,无需重复启动")
return
# 加载聊天批次(如果有)
await self._load_batch_from_disk()
self._running = True
self._periodic_task = asyncio.create_task(self._periodic_check_loop())
logger.info(f"{self.log_prefix} 已启动后台定期检查循环 | 检查间隔: {self.check_interval}")
async def stop(self):
"""停止后台定期检查循环"""
self._running = False
if self._periodic_task:
self._periodic_task.cancel()
try:
await self._periodic_task
except asyncio.CancelledError:
pass
self._periodic_task = None
logger.info(f"{self.log_prefix} 已停止后台定期检查循环")
async def _periodic_check_loop(self):
"""后台定期检查循环"""
try:
while self._running:
# 执行一次检查
await self.process()
# 等待指定间隔后再次检查
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} 后台检查循环被取消")
raise
except Exception as e:
logger.error(f"{self.log_prefix} 后台检查循环出错: {e}")
import traceback
traceback.print_exc()
self._running = False
init_prompt()