长期记忆 第一版

pull/937/head
114514 2025-05-09 16:58:57 +08:00
parent b9085f4b05
commit ed7e339907
3 changed files with 525 additions and 312 deletions

View File

@ -1,123 +1,171 @@
import traceback import traceback
import re
from maim_message import UserInfo from typing import Any, Dict
from datetime import datetime # 确保导入 datetime
from maim_message import UserInfo, MessageRecv # 从 maim_message 导入 MessageRecv
from src.config.config import global_config from src.config.config import global_config
from src.common.logger_manager import get_logger from src.common.logger_manager import get_logger
from ..chat.chat_stream import chat_manager from ..chat.chat_stream import chat_manager
from typing import Optional, Dict, Any from src.plugins.chat.utils import get_embedding
from src.common.database import db
from .pfc_manager import PFCManager from .pfc_manager import PFCManager
from src.plugins.chat.message import MessageRecv
from src.plugins.storage.storage import MessageStorage
from datetime import datetime
logger = get_logger("pfc_processor") logger = get_logger("pfc_processor")
async def _handle_error(error: Exception, context: str, message: Optional[MessageRecv] = None) -> None: async def _handle_error(error: Exception, context: str, message: MessageRecv | None = None) -> None: # 明确 message 类型
"""统一的错误处理函数 """统一的错误处理函数
# ... (方法注释不变) ...
Args:
error: 捕获到的异常
context: 错误发生的上下文描述
message: 可选的消息对象用于记录相关消息内容
""" """
logger.error(f"{context}: {error}") logger.error(f"{context}: {error}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
if message and hasattr(message, "raw_message"): # 检查 message 是否 None 以及是否有 raw_message 属性
if message and hasattr(message, 'message_info') and hasattr(message.message_info, 'raw_message'): # MessageRecv 结构可能没有直接的 raw_message
raw_msg_content = getattr(message.message_info, 'raw_message', None) # 安全获取
if raw_msg_content:
logger.error(f"相关消息原始内容: {raw_msg_content}")
elif message and hasattr(message, 'raw_message'): # 如果 MessageRecv 直接有 raw_message
logger.error(f"相关消息原始内容: {message.raw_message}") logger.error(f"相关消息原始内容: {message.raw_message}")
class PFCProcessor: class PFCProcessor:
"""PFC 处理器,负责处理接收到的信息并计数"""
def __init__(self): def __init__(self):
"""初始化 PFC 处理器,创建消息存储实例""" """初始化 PFC 处理器,创建消息存储实例"""
self.storage = MessageStorage() # MessageStorage() 的实例化位置和具体类是什么?
# 我们假设它来自 src.plugins.storage.storage
# 但由于我们不能修改那个文件,所以这里的 self.storage 将按原样使用
from src.plugins.storage.storage import MessageStorage # 明确导入,以便类型提示和理解
self.storage: MessageStorage = MessageStorage()
self.pfc_manager = PFCManager.get_instance() self.pfc_manager = PFCManager.get_instance()
async def process_message(self, message_data: Dict[str, Any]) -> None: async def process_message(self, message_data: dict[str, Any]) -> None: # 使用 dict[str, Any] 替代 Dict
"""处理接收到的原始消息数据 """处理接收到的原始消息数据
# ... (方法注释不变) ...
主要流程:
1. 消息解析与初始化
2. 过滤检查
3. 消息存储
4. 创建 PFC
5. 日志记录
Args:
message_data: 原始消息字符串
""" """
message = None message_obj: MessageRecv | None = None # 初始化为 None并明确类型
try: try:
# 1. 消息解析与初始化 # 1. 消息解析与初始化
message = MessageRecv(message_data) message_obj = MessageRecv(message_data) # 使用你提供的 message.py 中的 MessageRecv
groupinfo = message.message_info.group_info # 确保 message_obj.message_info 存在
userinfo = message.message_info.user_info if not hasattr(message_obj, 'message_info'):
messageinfo = message.message_info logger.error("MessageRecv 对象缺少 message_info 属性。跳过处理。")
return
groupinfo = getattr(message_obj.message_info, 'group_info', None)
userinfo = getattr(message_obj.message_info, 'user_info', None)
if userinfo is None: # 确保 userinfo 存在
logger.error("message_obj.message_info 中缺少 user_info。跳过处理。")
return
if not hasattr(userinfo, 'user_id'): # 确保 user_id 存在
logger.error("userinfo 对象中缺少 user_id。跳过处理。")
return
logger.trace(f"准备为{userinfo.user_id}创建/获取聊天流") logger.trace(f"准备为{userinfo.user_id}创建/获取聊天流")
chat = await chat_manager.get_or_create_stream( chat = await chat_manager.get_or_create_stream(
platform=messageinfo.platform, platform=message_obj.message_info.platform,
user_info=userinfo, user_info=userinfo,
group_info=groupinfo, group_info=groupinfo,
) )
message.update_chat_stream(chat) message_obj.update_chat_stream(chat) # message.py 中 MessageRecv 有此方法
# 2. 过滤检查 # 2. 过滤检查
# 处理消息 await message_obj.process() # 调用 MessageRecv 的异步 process 方法
await message.process() if self._check_ban_words(message_obj.processed_plain_text, userinfo) or \
# 过滤词/正则表达式过滤 self._check_ban_regex(message_obj.raw_message, userinfo): # MessageRecv 有 raw_message 属性
if self._check_ban_words(message.processed_plain_text, userinfo) or self._check_ban_regex(
message.raw_message, userinfo
):
return return
# 3. 消息存储 # 3. 消息存储 (保持原有调用)
await self.storage.store_message(message, chat) # 这里的 self.storage.store_message 来自 src/plugins/storage/storage.py
logger.trace(f"存储成功: {message.processed_plain_text}") # 它内部会将 message_obj 转换为字典并存储
await self.storage.store_message(message_obj, chat)
logger.trace(f"存储成功 (初步): {message_obj.processed_plain_text}")
# === 新增:为已存储的消息生成嵌入并更新数据库文档 ===
embedding_vector = None
text_for_embedding = message_obj.processed_plain_text # 使用处理后的纯文本
# 在 storage.py 中,会对 processed_plain_text 进行一次过滤
# 为了保持一致,我们也在这里应用相同的过滤逻辑
# 当然,更优的做法是 store_message 返回过滤后的文本,或在 message_obj 中增加一个 filtered_processed_plain_text 属性
# 这里为了简单,我们先重复一次过滤逻辑
pattern = r"<MainRule>.*?</MainRule>|<schedule>.*?</schedule>|<UserMessage>.*?</UserMessage>"
if text_for_embedding:
filtered_text_for_embedding = re.sub(pattern, "", text_for_embedding, flags=re.DOTALL)
else:
filtered_text_for_embedding = ""
if filtered_text_for_embedding and filtered_text_for_embedding.strip():
try:
# request_type 参数根据你的 get_embedding 函数实际需求来定
embedding_vector = await get_embedding(filtered_text_for_embedding, request_type="pfc_private_memory")
if embedding_vector:
logger.debug(f"成功为消息 ID '{message_obj.message_info.message_id}' 生成嵌入向量。")
# 更新数据库中的对应文档
# 确保你有权限访问和操作 db 对象
update_result = await db.messages.update_one(
{"message_id": message_obj.message_info.message_id, "chat_id": chat.stream_id},
{"$set": {"embedding_vector": embedding_vector}}
)
if update_result.modified_count > 0:
logger.info(f"成功为消息 ID '{message_obj.message_info.message_id}' 更新嵌入向量到数据库。")
elif update_result.matched_count > 0:
logger.warning(f"消息 ID '{message_obj.message_info.message_id}' 已存在嵌入向量或未作修改。")
else:
logger.error(f"未能找到消息 ID '{message_obj.message_info.message_id}' (chat_id: {chat.stream_id}) 来更新嵌入向量。可能是存储和更新之间存在延迟或问题。")
else:
logger.warning(f"未能为消息 ID '{message_obj.message_info.message_id}' 的文本 '{filtered_text_for_embedding[:30]}...' 生成嵌入向量。")
except Exception as e_embed_update:
logger.error(f"为消息 ID '{message_obj.message_info.message_id}' 生成嵌入或更新数据库时发生异常: {e_embed_update}", exc_info=True)
else:
logger.debug(f"消息 ID '{message_obj.message_info.message_id}' 的过滤后纯文本为空,不生成或更新嵌入。")
# === 新增结束 ===
# 4. 创建 PFC 聊天流 # 4. 创建 PFC 聊天流
await self._create_pfc_chat(message) await self._create_pfc_chat(message_obj)
# 5. 日志记录 # 5. 日志记录
# 将时间戳转换为datetime对象 # 确保 message_obj.message_info.time 是 float 类型的时间戳
current_time = datetime.fromtimestamp(message.message_info.time).strftime("%H:%M:%S") current_time_display = datetime.fromtimestamp(float(message_obj.message_info.time)).strftime("%H:%M:%S")
# 确保 userinfo.user_nickname 存在
user_nickname_display = getattr(userinfo, 'user_nickname', '未知用户')
logger.info( logger.info(
f"[{current_time}][私聊]{message.message_info.user_info.user_nickname}: {message.processed_plain_text}" f"[{current_time_display}][私聊]{user_nickname_display}: {message_obj.processed_plain_text}"
) )
except Exception as e: except Exception as e:
await _handle_error(e, "消息处理失败", message) await _handle_error(e, "消息处理失败", message_obj) # 传递 message_obj
async def _create_pfc_chat(self, message: MessageRecv): async def _create_pfc_chat(self, message: MessageRecv): # 明确 message 类型
try: try:
chat_id = str(message.chat_stream.stream_id) chat_id = str(message.chat_stream.stream_id)
private_name = str(message.message_info.user_info.user_nickname) private_name = str(message.message_info.user_info.user_nickname) # 假设 UserInfo 有 user_nickname
if global_config.enable_pfc_chatting: if global_config.enable_pfc_chatting:
await self.pfc_manager.get_or_create_conversation(chat_id, private_name) await self.pfc_manager.get_or_create_conversation(chat_id, private_name)
except Exception as e: except Exception as e:
logger.error(f"创建PFC聊天失败: {e}") logger.error(f"创建PFC聊天失败: {e}", exc_info=True) # 添加 exc_info=True
@staticmethod @staticmethod
def _check_ban_words(text: str, userinfo: UserInfo) -> bool: def _check_ban_words(text: str, userinfo: UserInfo) -> bool: # 明确 userinfo 类型
"""检查消息中是否包含过滤词""" """检查消息中是否包含过滤词"""
for word in global_config.ban_words: for word in global_config.ban_words:
if word in text: if word in text:
logger.info(f"[私聊]{userinfo.user_nickname}:{text}") logger.info(f"[私聊]{userinfo.user_nickname}:{text}") # 假设 UserInfo 有 user_nickname
logger.info(f"[过滤词识别]消息中含有{word}filtered") logger.info(f"[过滤词识别]消息中含有{word}filtered")
return True return True
return False return False
@staticmethod @staticmethod
def _check_ban_regex(text: str, userinfo: UserInfo) -> bool: def _check_ban_regex(text: str, userinfo: UserInfo) -> bool: # 明确 userinfo 类型
"""检查消息是否匹配过滤正则表达式""" """检查消息是否匹配过滤正则表达式"""
for pattern in global_config.ban_msgs_regex: for pattern in global_config.ban_msgs_regex:
if pattern.search(text): if pattern.search(text): # 假设 ban_msgs_regex 中的元素是已编译的正则对象
logger.info(f"[私聊]{userinfo.user_nickname}:{text}") logger.info(f"[私聊]{userinfo.user_nickname}:{text}") # _nickname
logger.info(f"[正则表达式过滤]消息匹配到{pattern}filtered") logger.info(f"[正则表达式过滤]消息匹配到{pattern.pattern}filtered") # .pattern 获取原始表达式字符串
return True return True
return False return False

View File

@ -1,88 +1,285 @@
import traceback import traceback
import json import json
import re import re
from typing import Dict, Any, Optional, Tuple, List, Union import asyncio # 确保导入 asyncio
from src.common.logger_manager import get_logger # 确认 logger 的导入路径 from typing import Dict, Any, Optional, Tuple, List, Union # 确保导入这些类型
from src.plugins.memory_system.Hippocampus import HippocampusManager
from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # 确认 prompt_builder 的导入路径 from src.common.logger_manager import get_logger
from src.plugins.chat.chat_stream import ChatStream
from ..person_info.person_info import person_info_manager
import math
from src.plugins.utils.chat_message_builder import build_readable_messages
from .observation_info import ObservationInfo
from src.config.config import global_config from src.config.config import global_config
from src.common.database import db # << 确认此路径
# --- 依赖于你项目结构的导入,请务必仔细检查并根据你的实际情况调整 ---
from src.plugins.memory_system.Hippocampus import HippocampusManager # << 确认此路径
from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder # << 确认此路径
from src.plugins.chat.utils import get_embedding # << 确认此路径
from src.plugins.utils.chat_message_builder import build_readable_messages # << 确认此路径
# --- 依赖导入结束 ---
from src.plugins.chat.chat_stream import ChatStream # 来自原始 pfc_utils.py
from ..person_info.person_info import person_info_manager # 来自原始 pfc_utils.py (相对导入)
import math # 来自原始 pfc_utils.py
from .observation_info import ObservationInfo # 来自原始 pfc_utils.py (相对导入)
logger = get_logger("pfc_utils") logger = get_logger("pfc_utils")
# ==============================================================================
async def retrieve_contextual_info(text: str, private_name: str) -> Tuple[str, str]: # 新增:专门用于检索 PFC 私聊历史对话上下文的函数
# ==============================================================================
async def find_most_relevant_historical_message(
chat_id: str,
query_text: str,
similarity_threshold: float = 0.3 # 相似度阈值,可以根据效果调整
) -> Optional[Dict[str, Any]]:
""" """
根据输入文本检索相关的记忆和知识 根据查询文本在指定 chat_id 的历史消息中查找最相关的消息
Args:
text: 用于检索的上下文文本 (例如聊天记录)
private_name: 私聊对象的名称用于日志记录
Returns:
Tuple[str, str]: (检索到的记忆字符串, 检索到的知识字符串)
""" """
retrieved_memory_str = "无相关记忆。" if not query_text or not query_text.strip():
logger.debug(f"[{chat_id}] (私聊历史)查询文本为空,跳过检索。")
return None
logger.debug(f"[{chat_id}] (私聊历史)开始为查询文本 '{query_text[:50]}...' 检索。")
# 使用你项目中已有的 get_embedding 函数
# request_type 参数需要根据 get_embedding 的实际需求调整
query_embedding = await get_embedding(query_text, request_type="pfc_historical_chat_query")
if not query_embedding:
logger.warning(f"[{chat_id}] (私聊历史)未能为查询文本 '{query_text[:50]}...' 生成嵌入向量。")
return None
pipeline = [
{
"$match": {
"chat_id": chat_id,
"embedding_vector": {"$exists": True, "$ne": None, "$not": {"$size": 0}}
}
},
{
"$addFields": {
"dotProduct": {"$reduce": {"input": {"$range": [0, {"$size": "$embedding_vector"}]}, "initialValue": 0, "in": {"$add": ["$$value", {"$multiply": [{"$arrayElemAt": ["$embedding_vector", "$$this"]}, {"$arrayElemAt": [query_embedding, "$$this"]}]}]}}},
"queryVecMagnitude": {"$sqrt": {"$reduce": {"input": query_embedding, "initialValue": 0, "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}}}},
"docVecMagnitude": {"$sqrt": {"$reduce": {"input": "$embedding_vector", "initialValue": 0, "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}}}}
}
},
{
"$addFields": {
"similarity": {
"$cond": [
{"$and": [{"$gt": ["$queryVecMagnitude", 0]}, {"$gt": ["$docVecMagnitude", 0]}]},
{"$divide": ["$dotProduct", {"$multiply": ["$queryVecMagnitude", "$docVecMagnitude"]}]},
0
]
}
}
},
{"$match": {"similarity": {"$gte": similarity_threshold}}},
{"$sort": {"similarity": -1}},
{"$limit": 1},
{"$project": {"_id": 0, "message_id": 1, "time": 1, "chat_id": 1, "user_info": 1, "processed_plain_text": 1, "similarity": 1}} # 可以不返回 embedding_vector 节省带宽
]
try:
# 假设 db.messages 是存储PFC私聊消息并带有embedding_vector的集合
results = await db.messages.aggregate(pipeline).to_list(length=1)
if results and len(results) > 0:
most_similar_message = results[0]
logger.info(f"[{chat_id}] (私聊历史)找到最相关消息 ID: {most_similar_message.get('message_id')}, 相似度: {most_similar_message.get('similarity'):.4f}")
return most_similar_message
else:
logger.debug(f"[{chat_id}] (私聊历史)未找到相似度超过 {similarity_threshold} 的相关消息。")
return None
except Exception as e:
logger.error(f"[{chat_id}] (私聊历史)在数据库中检索时出错: {e}", exc_info=True)
return None
async def retrieve_chat_context_window(
chat_id: str,
anchor_message_id: str,
anchor_message_time: float,
window_size_before: int = 7,
window_size_after: int = 7
) -> List[Dict[str, Any]]:
"""
以某条消息为锚点获取其前后的聊天记录形成一个上下文窗口
"""
if not anchor_message_id or anchor_message_time is None:
return []
context_messages: List[Dict[str, Any]] = [] # 明确类型
logger.debug(f"[{chat_id}] (私聊历史)准备以消息 ID '{anchor_message_id}' (时间: {anchor_message_time}) 为锚点,获取上下文窗口...")
try:
# 假设 db.messages 是存储PFC私聊消息的集合
anchor_message = await db.messages.find_one({"message_id": anchor_message_id, "chat_id": chat_id})
messages_before_cursor = db.messages.find(
{"chat_id": chat_id, "time": {"$lt": anchor_message_time}}
).sort("time", -1).limit(window_size_before)
messages_before = await messages_before_cursor.to_list(length=window_size_before)
messages_before.reverse()
messages_after_cursor = db.messages.find(
{"chat_id": chat_id, "time": {"$gt": anchor_message_time}}
).sort("time", 1).limit(window_size_after)
messages_after = await messages_after_cursor.to_list(length=window_size_after)
if messages_before:
context_messages.extend(messages_before)
if anchor_message:
anchor_message.pop("_id", None)
context_messages.append(anchor_message)
if messages_after:
context_messages.extend(messages_after)
final_window: List[Dict[str, Any]] = [] # 明确类型
seen_ids: set[str] = set() # 明确类型
for msg in context_messages:
msg_id = msg.get("message_id")
if msg_id and msg_id not in seen_ids: # 确保 msg_id 存在
final_window.append(msg)
seen_ids.add(msg_id)
final_window.sort(key=lambda m: m.get("time", 0))
logger.info(f"[{chat_id}] (私聊历史)为锚点 '{anchor_message_id}' 构建了包含 {len(final_window)} 条消息的上下文窗口。")
return final_window
except Exception as e:
logger.error(f"[{chat_id}] (私聊历史)获取消息 ID '{anchor_message_id}' 的上下文窗口时出错: {e}", exc_info=True)
return []
# ==============================================================================
# 修改后的 retrieve_contextual_info 函数
# ==============================================================================
async def retrieve_contextual_info(
text: str, # 用于全局记忆和知识检索的主查询文本 (通常是短期聊天记录)
private_name: str, # 用于日志
chat_id: str, # 用于特定私聊历史的检索
historical_chat_query_text: Optional[str] = None # 专门为私聊历史检索准备的查询文本 (例如最新的N条消息合并)
) -> Tuple[str, str, str]: # 返回: 全局记忆, 知识, 私聊历史回忆
"""
检索三种类型的上下文信息全局压缩记忆知识库知识当前私聊的特定历史对话
"""
# 初始化返回值
retrieved_global_memory_str = "无相关全局记忆。"
retrieved_knowledge_str = "无相关知识。" retrieved_knowledge_str = "无相关知识。"
memory_log_msg = "未自动检索到相关记忆。" retrieved_historical_chat_str = "无相关私聊历史回忆。"
knowledge_log_msg = "未自动检索到相关知识。"
if not text or text == "还没有聊天记录。" or text == "[构建聊天记录出错]": # --- 1. 全局压缩记忆检索 (来自 HippocampusManager) ---
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 无有效上下文,跳过检索。") # (保持你原始 pfc_utils.py 中这部分的逻辑基本不变)
return retrieved_memory_str, retrieved_knowledge_str global_memory_log_msg = f"开始全局压缩记忆检索 (基于文本: '{text[:30]}...')"
if text and text.strip() and text != "还没有聊天记录。" and text != "[构建聊天记录出错]":
# 1. 检索记忆 (逻辑来自原 _get_memory_info) try:
try: related_memory = await HippocampusManager.get_instance().get_memory_from_text(
related_memory = await HippocampusManager.get_instance().get_memory_from_text( text=text,
text=text, max_memory_num=2,
max_memory_num=2, max_memory_length=2, # 你原始代码中这里是2不是200
max_memory_length=2, max_depth=3,
max_depth=3, fast_retrieval=False, # 你原始代码中这里是False
fast_retrieval=False, )
) if related_memory:
if related_memory: temp_global_memory_info = ""
related_memory_info = "" for memory_item in related_memory:
for memory in related_memory: if isinstance(memory_item, (list, tuple)) and len(memory_item) > 1:
related_memory_info += memory[1] + "\n" temp_global_memory_info += str(memory_item[1]) + "\n"
if related_memory_info: elif isinstance(memory_item, str):
# 注意:原版提示信息可以根据需要调整 temp_global_memory_info += memory_item + "\n"
retrieved_memory_str = f"你回忆起:\n{related_memory_info.strip()}\n(以上是你的回忆,供参考)\n"
memory_log_msg = f"自动检索到记忆: {related_memory_info.strip()[:100]}..." if temp_global_memory_info.strip():
retrieved_global_memory_str = f"你回忆起一些相关的全局记忆:\n{temp_global_memory_info.strip()}\n(以上是你的全局记忆,供参考)\n"
global_memory_log_msg = f"自动检索到全局压缩记忆: {temp_global_memory_info.strip()[:100]}..."
else:
global_memory_log_msg = "全局压缩记忆检索返回为空或格式不符。"
else: else:
memory_log_msg = "自动检索记忆返回为空。" global_memory_log_msg = "全局压缩记忆检索返回为空列表。"
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 记忆检索: {memory_log_msg}") logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 全局压缩记忆检索: {global_memory_log_msg}")
except Exception as e:
except Exception as e: logger.error(
logger.error( f"[私聊][{private_name}] (retrieve_contextual_info) 检索全局压缩记忆时出错: {e}\n{traceback.format_exc()}"
f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索记忆时出错: {e}\n{traceback.format_exc()}" )
) retrieved_global_memory_str = "[检索全局压缩记忆时出错]\n"
retrieved_memory_str = "检索记忆时出错。\n" else:
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 无有效主查询文本,跳过全局压缩记忆检索。")
# 2. 检索知识 (逻辑来自原 action_planner 和 reply_generator)
try:
# 使用导入的 prompt_builder 实例及其方法
knowledge_result = await prompt_builder.get_prompt_info(
message=text,
threshold=0.38, # threshold 可以根据需要调整
)
if knowledge_result:
retrieved_knowledge_str = knowledge_result # 直接使用返回结果
knowledge_log_msg = "自动检索到相关知识。"
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 知识检索: {knowledge_log_msg}")
except Exception as e:
logger.error(
f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}"
)
retrieved_knowledge_str = "检索知识时出错。\n"
return retrieved_memory_str, retrieved_knowledge_str
# --- 2. 相关知识检索 (来自 prompt_builder) ---
# (保持你原始 pfc_utils.py 中这部分的逻辑基本不变)
knowledge_log_msg = f"开始知识检索 (基于文本: '{text[:30]}...')"
if text and text.strip() and text != "还没有聊天记录。" and text != "[构建聊天记录出错]":
try:
knowledge_result = await prompt_builder.get_prompt_info(
message=text,
threshold=0.38,
)
if knowledge_result and knowledge_result.strip(): # 确保结果不为空
retrieved_knowledge_str = knowledge_result # 直接使用返回结果,如果需要也可以包装
knowledge_log_msg = f"自动检索到相关知识: {knowledge_result[:100]}..."
else:
knowledge_log_msg = "知识检索返回为空。"
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 知识检索: {knowledge_log_msg}")
except Exception as e:
logger.error(
f"[私聊][{private_name}] (retrieve_contextual_info) 自动检索知识时出错: {e}\n{traceback.format_exc()}"
)
retrieved_knowledge_str = "[检索知识时出错]\n"
else:
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 无有效主查询文本,跳过知识检索。")
# --- 3. 当前私聊的特定历史对话上下文检索 (新增逻辑) ---
query_for_historical_chat = historical_chat_query_text if historical_chat_query_text and historical_chat_query_text.strip() else None
historical_chat_log_msg = f"开始私聊历史检索 (查询文本: '{str(query_for_historical_chat)[:30]}...')"
if query_for_historical_chat:
try:
most_relevant_message_doc = await find_most_relevant_historical_message(
chat_id=chat_id,
query_text=query_for_historical_chat,
similarity_threshold=0.5 # 你可以根据需要调整这个阈值
)
if most_relevant_message_doc:
anchor_id = most_relevant_message_doc.get("message_id")
anchor_time = most_relevant_message_doc.get("time")
if anchor_id and anchor_time is not None:
context_window_messages = await retrieve_chat_context_window(
chat_id=chat_id,
anchor_message_id=anchor_id,
anchor_message_time=anchor_time,
window_size_before=7, # 我们的目标上7条
window_size_after=7 # 我们的目标下7条 (共15条包括锚点)
)
if context_window_messages:
formatted_window_str = await build_readable_messages(
context_window_messages,
replace_bot_name=False, # 在回忆中,保留原始发送者名称
merge_messages=False,
timestamp_mode="relative", # 可以选择 'absolute' 或 'none'
read_mark=0.0
)
if formatted_window_str and formatted_window_str.strip():
retrieved_historical_chat_str = f"你回忆起一段与当前对话相关的历史聊天:\n------\n{formatted_window_str.strip()}\n------\n(以上是针对本次私聊的回忆,供参考)\n"
historical_chat_log_msg = f"自动检索到相关私聊历史片段 (锚点ID: {anchor_id}, 相似度: {most_relevant_message_doc.get('similarity'):.3f})"
else:
historical_chat_log_msg = "检索到的私聊历史对话窗口格式化后为空。"
else:
historical_chat_log_msg = f"找到了相关锚点消息 (ID: {anchor_id}),但未能构建其上下文窗口。"
else:
historical_chat_log_msg = "检索到的最相关私聊历史消息文档缺少 message_id 或 time。"
else:
historical_chat_log_msg = "未找到足够相关的私聊历史对话消息。"
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 私聊历史对话检索: {historical_chat_log_msg}")
except Exception as e:
logger.error(
f"[私聊][{private_name}] (retrieve_contextual_info) 检索私聊历史对话时出错: {e}\n{traceback.format_exc()}"
)
retrieved_historical_chat_str = "[检索私聊历史对话时出错]\n"
else:
logger.debug(f"[私聊][{private_name}] (retrieve_contextual_info) 无专门的私聊历史查询文本,跳过私聊历史对话检索。")
return retrieved_global_memory_str, retrieved_knowledge_str, retrieved_historical_chat_str
# ==============================================================================
# 你原始 pfc_utils.py 中的其他函数保持不变
# ==============================================================================
def get_items_from_json( def get_items_from_json(
content: str, content: str,
private_name: str, private_name: str,
@ -92,121 +289,66 @@ def get_items_from_json(
allow_array: bool = True, allow_array: bool = True,
) -> Tuple[bool, Union[Dict[str, Any], List[Dict[str, Any]]]]: ) -> Tuple[bool, Union[Dict[str, Any], List[Dict[str, Any]]]]:
"""从文本中提取JSON内容并获取指定字段 """从文本中提取JSON内容并获取指定字段
(保持你原始 pfc_utils.py 中的此函数代码不变)
Args:
content: 包含JSON的文本
private_name: 私聊名称
*items: 要提取的字段名
default_values: 字段的默认值格式为 {字段名: 默认值}
required_types: 字段的必需类型格式为 {字段名: 类型}
allow_array: 是否允许解析JSON数组
Returns:
Tuple[bool, Union[Dict[str, Any], List[Dict[str, Any]]]]: (是否成功, 提取的字段字典或字典列表)
""" """
cleaned_content = content.strip() cleaned_content = content.strip()
result: Union[Dict[str, Any], List[Dict[str, Any]]] = {} # 初始化类型 result: Union[Dict[str, Any], List[Dict[str, Any]]] = {}
# 匹配 ```json ... ``` 或 ``` ... ```
markdown_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", cleaned_content, re.IGNORECASE) markdown_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", cleaned_content, re.IGNORECASE)
if markdown_match: if markdown_match:
cleaned_content = markdown_match.group(1).strip() cleaned_content = markdown_match.group(1).strip()
logger.debug(f"[私聊][{private_name}] 已去除 Markdown 标记,剩余内容: {cleaned_content[:100]}...") logger.debug(f"[私聊][{private_name}] 已去除 Markdown 标记,剩余内容: {cleaned_content[:100]}...")
# --- 新增结束 --- default_result: Dict[str, Any] = {}
# 设置默认值
default_result: Dict[str, Any] = {} # 用于单对象时的默认值
if default_values: if default_values:
default_result.update(default_values) default_result.update(default_values)
result = default_result.copy() # 先用默认值初始化 result = default_result.copy()
# 首先尝试解析为JSON数组
if allow_array: if allow_array:
try: try:
# 尝试直接解析清理后的内容为列表
json_array = json.loads(cleaned_content) json_array = json.loads(cleaned_content)
if isinstance(json_array, list): if isinstance(json_array, list):
valid_items_list: List[Dict[str, Any]] = [] valid_items_list: List[Dict[str, Any]] = []
for item in json_array: for item_json in json_array: # Renamed item to item_json to avoid conflict
if not isinstance(item, dict): if not isinstance(item_json, dict):
logger.warning(f"[私聊][{private_name}] JSON数组中的元素不是字典: {item}") logger.warning(f"[私聊][{private_name}] JSON数组中的元素不是字典: {item_json}")
continue continue
current_item_result = default_result.copy()
current_item_result = default_result.copy() # 每个元素都用默认值初始化
valid_item = True valid_item = True
for field in items: # items is args from function signature
# 提取并验证字段 if field in item_json:
for field in items: current_item_result[field] = item_json[field]
if field in item: elif field not in default_result:
current_item_result[field] = item[field] logger.warning(f"[私聊][{private_name}] JSON数组元素缺少必要字段 '{field}': {item_json}")
elif field not in default_result: # 如果字段不存在且没有默认值 valid_item = False; break
logger.warning(f"[私聊][{private_name}] JSON数组元素缺少必要字段 '{field}': {item}") if not valid_item: continue
valid_item = False
break # 这个元素无效
if not valid_item:
continue
# 验证类型
if required_types: if required_types:
for field, expected_type in required_types.items(): for field, expected_type in required_types.items():
# 检查 current_item_result 中是否存在该字段 (可能来自 item 或 default_values) if field in current_item_result and not isinstance(current_item_result[field], expected_type):
if field in current_item_result and not isinstance( logger.warning(f"[私聊][{private_name}] JSON数组元素字段 '{field}' 类型错误 (应为 {expected_type.__name__}, 实际为 {type(current_item_result[field]).__name__}): {item_json}")
current_item_result[field], expected_type valid_item = False; break
): if not valid_item: continue
logger.warning(
f"[私聊][{private_name}] JSON数组元素字段 '{field}' 类型错误 (应为 {expected_type.__name__}, 实际为 {type(current_item_result[field]).__name__}): {item}"
)
valid_item = False
break
if not valid_item:
continue
# 验证字符串不为空 (只检查 items 中要求的字段)
for field in items: for field in items:
if ( if field in current_item_result and isinstance(current_item_result[field], str) and not current_item_result[field].strip():
field in current_item_result logger.warning(f"[私聊][{private_name}] JSON数组元素字段 '{field}' 不能为空字符串: {item_json}")
and isinstance(current_item_result[field], str) valid_item = False; break
and not current_item_result[field].strip() if valid_item: valid_items_list.append(current_item_result)
): if valid_items_list:
logger.warning(f"[私聊][{private_name}] JSON数组元素字段 '{field}' 不能为空字符串: {item}")
valid_item = False
break
if valid_item:
valid_items_list.append(current_item_result) # 只添加完全有效的项
if valid_items_list: # 只有当列表不为空时才认为是成功
logger.debug(f"[私聊][{private_name}] 成功解析JSON数组包含 {len(valid_items_list)} 个有效项目。") logger.debug(f"[私聊][{private_name}] 成功解析JSON数组包含 {len(valid_items_list)} 个有效项目。")
return True, valid_items_list return True, valid_items_list
else: else:
# 如果列表为空(可能所有项都无效),则继续尝试解析为单个对象
logger.debug(f"[私聊][{private_name}] 解析为JSON数组但未找到有效项目尝试解析单个JSON对象。") logger.debug(f"[私聊][{private_name}] 解析为JSON数组但未找到有效项目尝试解析单个JSON对象。")
# result 重置回单个对象的默认值
result = default_result.copy() result = default_result.copy()
except json.JSONDecodeError: except json.JSONDecodeError:
logger.debug(f"[私聊][{private_name}] JSON数组直接解析失败尝试解析单个JSON对象") logger.debug(f"[私聊][{private_name}] JSON数组直接解析失败尝试解析单个JSON对象")
# result 重置回单个对象的默认值
result = default_result.copy() result = default_result.copy()
except Exception as e: except Exception as e:
logger.error(f"[私聊][{private_name}] 尝试解析JSON数组时发生未知错误: {str(e)}") logger.error(f"[私聊][{private_name}] 尝试解析JSON数组时发生未知错误: {str(e)}")
# result 重置回单个对象的默认值
result = default_result.copy() result = default_result.copy()
# 尝试解析为单个JSON对象
try: try:
# 尝试直接解析清理后的内容
json_data = json.loads(cleaned_content) json_data = json.loads(cleaned_content)
if not isinstance(json_data, dict): if not isinstance(json_data, dict):
logger.error(f"[私聊][{private_name}] 解析为单个对象,但结果不是字典类型: {type(json_data)}") logger.error(f"[私聊][{private_name}] 解析为单个对象,但结果不是字典类型: {type(json_data)}")
return False, default_result # 返回失败和默认值 return False, default_result
except json.JSONDecodeError: except json.JSONDecodeError:
# 如果直接解析失败,尝试用正则表达式查找 JSON 对象部分 (作为后备) json_pattern = r"\{[\s\S]*?\}"
# 这个正则比较简单,可能无法处理嵌套或复杂的 JSON
json_pattern = r"\{[\s\S]*?\}" # 使用非贪婪匹配
json_match = re.search(json_pattern, cleaned_content) json_match = re.search(json_pattern, cleaned_content)
if json_match: if json_match:
try: try:
@ -220,133 +362,97 @@ def get_items_from_json(
logger.error(f"[私聊][{private_name}] 正则提取的部分 '{potential_json_str[:100]}...' 无法解析为JSON。") logger.error(f"[私聊][{private_name}] 正则提取的部分 '{potential_json_str[:100]}...' 无法解析为JSON。")
return False, default_result return False, default_result
else: else:
logger.error( logger.error(f"[私聊][{private_name}] 无法在返回内容中找到有效的JSON对象部分。原始内容: {cleaned_content[:100]}...")
f"[私聊][{private_name}] 无法在返回内容中找到有效的JSON对象部分。原始内容: {cleaned_content[:100]}..."
)
return False, default_result return False, default_result
if not isinstance(result, dict): result = default_result.copy()
# 提取并验证字段 (适用于单个JSON对象)
# 确保 result 是字典类型用于更新
if not isinstance(result, dict):
result = default_result.copy() # 如果之前是列表,重置为字典
valid_single_object = True valid_single_object = True
for item in items: for item_field in items: # Renamed item to item_field
if item in json_data: if item_field in json_data: result[item_field] = json_data[item_field]
result[item] = json_data[item] elif item_field not in default_result:
elif item not in default_result: # 如果字段不存在且没有默认值 logger.error(f"[私聊][{private_name}] JSON对象缺少必要字段 '{item_field}'。JSON内容: {json_data}")
logger.error(f"[私聊][{private_name}] JSON对象缺少必要字段 '{item}'。JSON内容: {json_data}") valid_single_object = False; break
valid_single_object = False if not valid_single_object: return False, default_result
break # 这个对象无效
if not valid_single_object:
return False, default_result
# 验证类型
if required_types: if required_types:
for field, expected_type in required_types.items(): for field, expected_type in required_types.items():
if field in result and not isinstance(result[field], expected_type): if field in result and not isinstance(result[field], expected_type):
logger.error( logger.error(f"[私聊][{private_name}] JSON对象字段 '{field}' 类型错误 (应为 {expected_type.__name__}, 实际为 {type(result[field]).__name__})")
f"[私聊][{private_name}] JSON对象字段 '{field}' 类型错误 (应为 {expected_type.__name__}, 实际为 {type(result[field]).__name__})" valid_single_object = False; break
) if not valid_single_object: return False, default_result
valid_single_object = False
break
if not valid_single_object:
return False, default_result
# 验证字符串不为空 (只检查 items 中要求的字段)
for field in items: for field in items:
if field in result and isinstance(result[field], str) and not result[field].strip(): if field in result and isinstance(result[field], str) and not result[field].strip():
logger.error(f"[私聊][{private_name}] JSON对象字段 '{field}' 不能为空字符串") logger.error(f"[私聊][{private_name}] JSON对象字段 '{field}' 不能为空字符串")
valid_single_object = False valid_single_object = False; break
break
if valid_single_object: if valid_single_object:
logger.debug(f"[私聊][{private_name}] 成功解析并验证了单个JSON对象。") logger.debug(f"[私聊][{private_name}] 成功解析并验证了单个JSON对象。")
return True, result # 返回提取并验证后的字典 return True, result
else: else:
return False, default_result # 验证失败 return False, default_result
async def get_person_id(private_name: str, chat_stream: ChatStream): async def get_person_id(private_name: str, chat_stream: ChatStream):
""" (保持你原始 pfc_utils.py 中的此函数代码不变) """
private_user_id_str: Optional[str] = None private_user_id_str: Optional[str] = None
private_platform_str: Optional[str] = None private_platform_str: Optional[str] = None
private_nickname_str = private_name # private_nickname_str = private_name # 这行在你提供的代码中没有被使用,可以考虑移除
if chat_stream.user_info: if chat_stream.user_info:
private_user_id_str = str(chat_stream.user_info.user_id) private_user_id_str = str(chat_stream.user_info.user_id)
private_platform_str = chat_stream.user_info.platform private_platform_str = chat_stream.user_info.platform
logger.debug( logger.debug(
f"[私聊][{private_name}] 从 ChatStream 获取到私聊对象信息: ID={private_user_id_str}, Platform={private_platform_str}, Name={private_nickname_str}" f"[私聊][{private_name}] 从 ChatStream 获取到私聊对象信息: ID={private_user_id_str}, Platform={private_platform_str}, Name={private_name}" # 使用 private_name
) )
elif chat_stream.group_info is None and private_name: # elif chat_stream.group_info is None and private_name: # 这个 elif 条件体为空,可以移除
pass # pass
if private_user_id_str and private_platform_str: if private_user_id_str and private_platform_str:
try: try:
private_user_id_int = int(private_user_id_str) private_user_id_int = int(private_user_id_str)
# person_id = person_info_manager.get_person_id( # get_person_id 可能只查询,不创建
# private_platform_str,
# private_user_id_int
# )
# 使用 get_or_create_person 确保用户存在
person_id = await person_info_manager.get_or_create_person( person_id = await person_info_manager.get_or_create_person(
platform=private_platform_str, platform=private_platform_str,
user_id=private_user_id_int, user_id=private_user_id_int,
nickname=private_name, # 使用传入的 private_name 作为昵称 nickname=private_name,
) )
if person_id is None: # 如果 get_or_create_person 返回 None说明创建失败 if person_id is None:
logger.error(f"[私聊][{private_name}] get_or_create_person 未能获取或创建 person_id。") logger.error(f"[私聊][{private_name}] get_or_create_person 未能获取或创建 person_id。")
return None # 返回 None 表示失败 return None
return person_id, private_platform_str, private_user_id_str
return person_id, private_platform_str, private_user_id_str # 返回获取或创建的 person_id
except ValueError: except ValueError:
logger.error(f"[私聊][{private_name}] 无法将 private_user_id_str ('{private_user_id_str}') 转换为整数。") logger.error(f"[私聊][{private_name}] 无法将 private_user_id_str ('{private_user_id_str}') 转换为整数。")
return None # 返回 None 表示失败 return None
except Exception as e_pid: except Exception as e_pid:
logger.error(f"[私聊][{private_name}] 获取或创建 person_id 时出错: {e_pid}") logger.error(f"[私聊][{private_name}] 获取或创建 person_id 时出错: {e_pid}")
return None # 返回 None 表示失败 return None
else: else:
logger.warning( logger.warning(
f"[私聊][{private_name}] 未能确定私聊对象的 user_id 或 platform无法获取 person_id。将在收到消息后尝试。" f"[私聊][{private_name}] 未能确定私聊对象的 user_id 或 platform无法获取 person_id。将在收到消息后尝试。"
) )
return None # 返回 None 表示失败 return None
async def adjust_relationship_value_nonlinear(old_value: float, raw_adjustment: float) -> float: async def adjust_relationship_value_nonlinear(old_value: float, raw_adjustment: float) -> float:
# 限制 old_value 范围 """ (保持你原始 pfc_utils.py 中的此函数代码不变) """
old_value = max(-1000, min(1000, old_value)) old_value = max(-1000, min(1000, old_value))
value = raw_adjustment value = raw_adjustment
if old_value >= 0: if old_value >= 0:
if value >= 0: if value >= 0:
value = value * math.cos(math.pi * old_value / 2000) value = value * math.cos(math.pi * old_value / 2000)
if old_value > 500: if old_value > 500:
rdict = await person_info_manager.get_specific_value_list("relationship_value", lambda x: x > 700) # 确保 person_info_manager.get_specific_value_list 是异步的,如果是同步则需要调整
rdict = await person_info_manager.get_specific_value_list("relationship_value", lambda x: x > 700 if isinstance(x, (int, float)) else False)
high_value_count = len(rdict) high_value_count = len(rdict)
if old_value > 700: if old_value > 700: value *= 3 / (high_value_count + 2)
value *= 3 / (high_value_count + 2) else: value *= 3 / (high_value_count + 3)
else: elif value < 0: value = value * math.exp(old_value / 2000)
value *= 3 / (high_value_count + 3) # else: value = 0 # 你原始代码中没有这句如果value为0保持为0
elif value < 0: else: # old_value < 0
value = value * math.exp(old_value / 2000) if value >= 0: value = value * math.exp(old_value / 2000)
else: elif value < 0: value = value * math.cos(math.pi * old_value / 2000)
value = 0 # else: value = 0 # 你原始代码中没有这句
else:
if value >= 0:
value = value * math.exp(old_value / 2000)
elif value < 0:
value = value * math.cos(math.pi * old_value / 2000)
else:
value = 0
return value return value
async def build_chat_history_text(observation_info: ObservationInfo, private_name: str) -> str: async def build_chat_history_text(observation_info: ObservationInfo, private_name: str) -> str:
"""构建聊天历史记录文本 (包含未处理消息)""" """ (保持你原始 pfc_utils.py 中的此函数代码不变) """
chat_history_text = "" chat_history_text = ""
try: try:
if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str: if hasattr(observation_info, "chat_history_str") and observation_info.chat_history_str:
@ -358,27 +464,32 @@ async def build_chat_history_text(observation_info: ObservationInfo, private_nam
) )
else: else:
chat_history_text = "还没有聊天记录。\n" chat_history_text = "还没有聊天记录。\n"
unread_count = getattr(observation_info, "new_messages_count", 0) unread_count = getattr(observation_info, "new_messages_count", 0)
unread_messages = getattr(observation_info, "unprocessed_messages", []) unread_messages = getattr(observation_info, "unprocessed_messages", [])
if unread_count > 0 and unread_messages: if unread_count > 0 and unread_messages:
bot_qq_str = str(global_config.BOT_QQ) bot_qq_str = str(global_config.BOT_QQ) if global_config.BOT_QQ else None # 安全获取
other_unread_messages = [ if bot_qq_str: # 仅当 bot_qq_str 有效时进行过滤
msg for msg in unread_messages if msg.get("user_info", {}).get("user_id") != bot_qq_str other_unread_messages = [
] msg for msg in unread_messages if msg.get("user_info", {}).get("user_id") != bot_qq_str
other_unread_count = len(other_unread_messages) ]
if other_unread_count > 0: other_unread_count = len(other_unread_messages)
new_messages_str = await build_readable_messages( if other_unread_count > 0:
other_unread_messages, new_messages_str = await build_readable_messages(
replace_bot_name=True, other_unread_messages,
merge_messages=False, replace_bot_name=True, # 这里是未处理消息,可能不需要替换机器人名字
timestamp_mode="relative", merge_messages=False,
read_mark=0.0, timestamp_mode="relative",
) read_mark=0.0,
chat_history_text += f"\n{new_messages_str}\n------\n" )
chat_history_text += f"\n{new_messages_str}\n------\n" # 原始代码是加在末尾的
else:
logger.warning(f"[私聊][{private_name}] BOT_QQ 未配置,无法准确过滤未读消息中的机器人自身消息。")
except AttributeError as e: except AttributeError as e:
logger.warning(f"[私聊][{private_name}] 构建聊天记录文本时属性错误: {e}") logger.warning(f"[私聊][{private_name}] 构建聊天记录文本时属性错误: {e}")
chat_history_text = "[获取聊天记录时出错]\n" chat_history_text = "[获取聊天记录时出错]\n"
except Exception as e: except Exception as e:
logger.error(f"[私聊][{private_name}] 处理聊天记录时发生未知错误: {e}") logger.error(f"[私聊][{private_name}] 处理聊天记录时发生未知错误: {e}")
chat_history_text = "[处理聊天记录时出错]\n" chat_history_text = "[处理聊天记录时出错]\n"
return chat_history_text return chat_history_text

View File

@ -1,5 +1,5 @@
import random import random
import asyncio
from .pfc_utils import retrieve_contextual_info from .pfc_utils import retrieve_contextual_info
from src.common.logger_manager import get_logger from src.common.logger_manager import get_logger
@ -60,6 +60,9 @@ PROMPT_DIRECT_REPLY = """
{retrieved_knowledge_str} {retrieved_knowledge_str}
请你**记住上面的知识**在回复中有可能会用到 请你**记住上面的知识**在回复中有可能会用到
你还想到了一些你们之前的聊天记录
{retrieved_historical_chat_str}
最近的聊天记录 最近的聊天记录
{chat_history_text} {chat_history_text}
@ -68,6 +71,8 @@ PROMPT_DIRECT_REPLY = """
{last_rejection_info} {last_rejection_info}
请根据上述信息结合聊天记录回复对方该回复应该 请根据上述信息结合聊天记录回复对方该回复应该
1. 符合对话目标""的角度发言不要自己与自己对话 1. 符合对话目标""的角度发言不要自己与自己对话
2. 符合你的性格特征和身份细节 2. 符合你的性格特征和身份细节
@ -97,6 +102,9 @@ PROMPT_SEND_NEW_MESSAGE = """
{retrieved_knowledge_str} {retrieved_knowledge_str}
请你**记住上面的知识**在发消息时有可能会用到 请你**记住上面的知识**在发消息时有可能会用到
你还想到了一些你们之前的聊天记录
{retrieved_historical_chat_str}
最近的聊天记录 最近的聊天记录
{chat_history_text} {chat_history_text}
@ -223,12 +231,59 @@ class ReplyGenerator:
current_emotion_text_str = getattr(conversation_info, "current_emotion_text", "心情平静。") current_emotion_text_str = getattr(conversation_info, "current_emotion_text", "心情平静。")
persona_text = f"你的名字是{self.name}{self.personality_info}" persona_text = f"你的名字是{self.name}{self.personality_info}"
retrieval_context = chat_history_text historical_chat_query = ""
retrieved_memory_str, retrieved_knowledge_str = await retrieve_contextual_info( num_recent_messages_for_query = 3 # 例如取最近3条作为查询引子
retrieval_context, self.private_name if observation_info.chat_history and len(observation_info.chat_history) > 0:
) # 从 chat_history (已处理并存入 ObservationInfo 的历史) 中取最新N条
# 或者,如果 observation_info.unprocessed_messages 更能代表“当前上下文”,也可以考虑用它
# 我们先用 chat_history因为它包含了双方的对话历史可能更稳定
recent_messages_for_query_list = observation_info.chat_history[-num_recent_messages_for_query:]
# 将这些消息的文本内容合并
query_texts_list = []
for msg_dict in recent_messages_for_query_list:
text_content = msg_dict.get("processed_plain_text", "")
if text_content.strip(): # 只添加有内容的文本
# 可以选择是否添加发送者信息到查询文本中,例如:
# sender_nickname = msg_dict.get("user_info", {}).get("user_nickname", "用户")
# query_texts_list.append(f"{sender_nickname}: {text_content}")
query_texts_list.append(text_content) # 简单合并文本内容
if query_texts_list:
historical_chat_query = " ".join(query_texts_list).strip()
logger.debug(f"[私聊][{self.private_name}] (ReplyGenerator) 生成的私聊历史查询文本 (最近{num_recent_messages_for_query}条): '{historical_chat_query[:100]}...'")
else:
logger.debug(f"[私聊][{self.private_name}] (ReplyGenerator) 最近{num_recent_messages_for_query}条消息无有效文本内容,不进行私聊历史查询。")
else:
logger.debug(f"[私聊][{self.private_name}] (ReplyGenerator) 无聊天历史可用于生成私聊历史查询文本。")
current_chat_id = self.chat_observer.stream_id if self.chat_observer else None
if not current_chat_id:
logger.error(f"[私聊][{self.private_name}] (ReplyGenerator) 无法获取 current_chat_id跳过所有上下文检索")
retrieved_global_memory_str = "[获取全局记忆出错chat_id 未知]"
retrieved_knowledge_str = "[获取知识出错chat_id 未知]"
retrieved_historical_chat_str = "[获取私聊历史回忆出错chat_id 未知]"
else:
# retrieval_context 之前是用 chat_history_text现在也用它作为全局记忆和知识的检索上下文
retrieval_context_for_global_and_knowledge = chat_history_text
(
retrieved_global_memory_str,
retrieved_knowledge_str,
retrieved_historical_chat_str # << 新增接收私聊历史回忆
) = await retrieve_contextual_info(
text=retrieval_context_for_global_and_knowledge, # 用于全局记忆和知识
private_name=self.private_name,
chat_id=current_chat_id, # << 传递 chat_id
historical_chat_query_text=historical_chat_query # << 传递专门的查询文本
)
# === 调用修改结束 ===
logger.info( logger.info(
f"[私聊][{self.private_name}] (ReplyGenerator) 统一检索完成。记忆: {'' if '回忆起' in retrieved_memory_str else ''} / 知识: {'' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str else ''}" f"[私聊][{self.private_name}] (ReplyGenerator) 上下文检索完成。\n"
f" 全局记忆: {'有内容' if '回忆起' in retrieved_global_memory_str else '无或出错'}\n"
f" 知识: {'有内容' if '出错' not in retrieved_knowledge_str and '无相关知识' not in retrieved_knowledge_str and retrieved_knowledge_str.strip() else '无或出错'}\n"
f" 私聊历史回忆: {'有内容' if '回忆起一段相关的历史聊天' in retrieved_historical_chat_str else '无或出错'}"
) )
last_rejection_info_str = "" last_rejection_info_str = ""
@ -292,11 +347,10 @@ class ReplyGenerator:
base_format_params = { base_format_params = {
"persona_text": persona_text, "persona_text": persona_text,
"goals_str": goals_str, "goals_str": goals_str,
"chat_history_text": chat_history_text, "chat_history_text": chat_history_text if chat_history_text.strip() else "还没有聊天记录。", # 当前短期历史
"retrieved_memory_str": retrieved_memory_str if retrieved_memory_str else "无相关记忆。", # 确保已定义 "retrieved_global_memory_str": retrieved_global_memory_str if retrieved_global_memory_str.strip() else "无相关全局记忆。",
"retrieved_knowledge_str": retrieved_knowledge_str "retrieved_knowledge_str": retrieved_knowledge_str if retrieved_knowledge_str.strip() else "无相关知识。",
if retrieved_knowledge_str "retrieved_historical_chat_str": retrieved_historical_chat_str if retrieved_historical_chat_str.strip() else "无相关私聊历史回忆。", # << 新增
else "无相关知识。", # 确保已定义
"last_rejection_info": last_rejection_info_str, "last_rejection_info": last_rejection_info_str,
"current_time_str": current_time_value, "current_time_str": current_time_value,
"sender_name": sender_name_str, "sender_name": sender_name_str,