Merge branch 'PFC-test' of https://github.com/smartmita/MaiBot into PFC-test

pull/924/head
Bakadax 2025-05-03 20:44:16 +08:00
commit 25834f88e7
2 changed files with 42 additions and 470 deletions

View File

@ -4,7 +4,10 @@ from src.plugins.memory_system.Hippocampus import HippocampusManager
from src.plugins.knowledge.knowledge_lib import qa_manager
from src.common.database import db
from src.plugins.chat.utils import get_embedding
# --- NEW IMPORT ---
# 从 heartflow 导入知识检索和数据库查询函数/实例
from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder
# --- END NEW IMPORT ---
# import jieba # 如果需要旧版知识库的回退,可能需要
# import re # 如果需要旧版知识库的回退,可能需要
from src.common.logger_manager import get_logger
@ -35,7 +38,7 @@ PROMPT_INITIAL_REPLY = """{persona_text}。现在你在参与一场QQ私聊
上一次行动的详细情况和结果
{last_action_context}
时间和超时提示
{time_since_last_bot_message_info}{timeout_context}
{time_since_last_bot_message_info}{timeout_context}
最近的对话记录(包括你已成功发送的消息 新收到的消息)
{chat_history_text}
你的的回忆
@ -58,7 +61,7 @@ block_and_ignore: 更加极端的结束对话方式,直接结束对话并在
注意请严格按照JSON格式输出不要包含任何其他内容"""
# Prompt(2): 上一次成功回复后,决定继续发言时的决策 Prompt
PROMPT_FOLLOW_UP = """{persona_text}。现在你在参与一场QQ私聊刚刚你已经回复了对方请根据以下【所有信息】审慎且灵活的决策下一步行动可以继续发送新消息可以等待可以倾听可以调取知识甚至可以屏蔽对方
PROMPT_FOLLOW_UP = """{persona_text}。现在你在参与一场QQ私聊刚刚你已经回复了对方请根据以下【所有信息】审慎且灵活的决策下一步行动可以继续发送新消息可以等待可以倾听可以调取知识甚至可以屏蔽对方
当前对话目标
{goals_str}
@ -69,7 +72,7 @@ PROMPT_FOLLOW_UP = """{persona_text}。现在你在参与一场QQ私聊刚刚
上一次行动的详细情况和结果
{last_action_context}
时间和超时提示
{time_since_last_bot_message_info}{timeout_context}
{time_since_last_bot_message_info}{timeout_context}
最近的对话记录(包括你已成功发送的消息 新收到的消息)
{chat_history_text}
你的的回忆
@ -126,6 +129,7 @@ class ActionPlanner:
self.private_name = private_name
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
# _get_memory_info 保持不变
async def _get_memory_info(self, text: str) -> str:
"""根据文本自动检索相关记忆"""
memory_prompt = ""
@ -156,112 +160,9 @@ class ActionPlanner:
# memory_prompt = "检索记忆时出错。\n" # 可以选择是否提示错误
return memory_prompt
async def _get_prompt_info_old(self, message: str, threshold: float) -> str:
"""
旧版的知识检索方法根据消息文本从旧知识库knowledges collection检索
(移植并自 heartflow_prompt_builder.py)
"""
related_info = ""
start_time = time.time()
logger.debug(f"[私聊]决策层[{self.private_name}]开始使用旧版知识检索,消息: {message[:30]}...")
# --- REMOVED _get_prompt_info_old ---
# 简化处理:直接使用整个消息进行查询,不再提取主题
query_text = message.strip()
if not query_text:
logger.debug(f"[私聊]决策层[{self.private_name}]旧版知识检索:消息为空,跳过。")
return ""
embedding = None
try:
embedding = await get_embedding(query_text, request_type="pfc_implicit_knowledge")
except Exception as e:
logger.error(f"[私聊]决策层[{self.private_name}]旧版知识检索:获取嵌入向量时出错: {str(e)}")
if not embedding:
logger.error(f"[私聊]决策层[{self.private_name}]旧版知识检索:获取嵌入向量失败。")
return ""
# 调用我们之前添加的 get_info_from_db 函数
results = get_info_from_db(embedding, limit=5, threshold=threshold, return_raw=True) # 最多查 5 条
logger.info(
f"[私聊][{self.private_name}]旧版知识库查询完成,耗时: {time.time() - start_time:.3f}秒,获取{len(results)}条结果"
)
# 去重和格式化
unique_contents = set()
final_results_content = []
for result in results:
content = result.get("content", "").strip()
# similarity = result.get("similarity", 0.0)
if content and content not in unique_contents:
unique_contents.add(content)
# 可以选择性地加入相似度信息,或者只加内容
# final_results_content.append(f"[{similarity:.2f}] {content}")
final_results_content.append(content)
if final_results_content:
related_info = "\n".join(final_results_content)
logger.debug(f"[私聊][{self.private_name}]旧版知识检索格式化后内容: {related_info[:100]}...")
else:
logger.debug(f"[私聊][{self.private_name}]旧版知识检索未找到合适结果或结果为空。")
logger.info(f"[私聊][{self.private_name}]旧版知识检索总耗时: {time.time() - start_time:.3f}")
return related_info
async def _get_prompt_info(self, message: str, threshold: float = 0.38) -> str:
"""
自动检索相关知识的主函数优先使用 LPMM失败则回退到旧版
(移植自 heartflow_prompt_builder.py)
"""
related_info = ""
start_time = time.time()
message = message.strip()
if not message:
logger.debug(f"[私聊][{self.private_name}]自动知识检索:输入消息为空。")
return ""
logger.debug(f"[私聊][{self.private_name}]开始自动知识检索,消息: {message[:30]}...")
# 1. 尝试从 LPMM 知识库获取知识
try:
found_knowledge_from_lpmm = qa_manager.get_knowledge(message)
if found_knowledge_from_lpmm and found_knowledge_from_lpmm.strip():
related_info = found_knowledge_from_lpmm.strip()
logger.info(f"[私聊][{self.private_name}]从 LPMM 知识库获取到知识,长度: {len(related_info)}")
logger.debug(f"[私聊][{self.private_name}]LPMM 知识内容: {related_info[:100]}...")
# LPMM 成功获取,直接返回
logger.info(f"[私聊][{self.private_name}]自动知识检索(LPMM)耗时: {time.time() - start_time:.3f}")
return related_info
else:
logger.debug(f"[私聊][{self.private_name}]LPMM 知识库未返回有效知识,尝试旧版数据库检索。")
except Exception as e:
logger.error(
f"[私聊][{self.private_name}]调用 LPMM 知识库 (qa_manager.get_knowledge) 时发生异常: {str(e)},尝试旧版数据库检索。"
)
# 2. 如果 LPMM 失败或无结果,尝试旧版数据库
try:
knowledge_from_old = await self._get_prompt_info_old(message, threshold=threshold)
if knowledge_from_old and knowledge_from_old.strip():
related_info = knowledge_from_old.strip()
logger.info(f"[私聊][{self.private_name}]从旧版数据库检索到知识,长度: {len(related_info)}")
# 旧版成功获取,返回
logger.info(f"[私聊][{self.private_name}]自动知识检索(旧版)耗时: {time.time() - start_time:.3f}")
return related_info
else:
logger.debug(f"[私聊][{self.private_name}]旧版数据库也未检索到有效知识。")
except Exception as e2:
logger.error(
f"[私聊][{self.private_name}]调用旧版知识库检索 (_get_prompt_info_old) 时也发生异常: {str(e2)}"
)
# 如果两种方法都失败或无结果
logger.info(
f"[私聊][{self.private_name}]自动知识检索总耗时: {time.time() - start_time:.3f}秒,未找到任何相关知识。"
)
return "" # 返回空字符串
# --- REMOVED _get_prompt_info ---
# 修改 plan 方法签名,增加 last_successful_reply_action 参数
async def plan(
@ -370,40 +271,6 @@ class ActionPlanner:
logger.error(f"[私聊][{self.private_name}]构建对话目标字符串时出错: {e}")
goals_str = "- 构建对话目标时出错。\n"
# --- 知识信息字符串构建开始 ---
# knowledge_info_str = "【已获取的相关知识和记忆】\n"
# try:
# 检查 conversation_info 是否有 knowledge_list 并且不为空
# if hasattr(conversation_info, "knowledge_list") and conversation_info.knowledge_list:
# 最多只显示最近的 5 条知识,防止 Prompt 过长
# recent_knowledge = conversation_info.knowledge_list[-5:]
# for i, knowledge_item in enumerate(recent_knowledge):
# if isinstance(knowledge_item, dict):
# query = knowledge_item.get("query", "未知查询")
# knowledge = knowledge_item.get("knowledge", "无知识内容")
# source = knowledge_item.get("source", "未知来源")
# 只取知识内容的前 2000 个字,避免太长
# knowledge_snippet = knowledge[:2000] + "..." if len(knowledge) > 2000 else knowledge
# knowledge_info_str += (
# f"{i + 1}. 关于 '{query}' 的知识 (来源: {source}):\n {knowledge_snippet}\n"
# )
# else:
# 处理列表里不是字典的异常情况
# knowledge_info_str += f"{i + 1}. 发现一条格式不正确的知识记录。\n"
# if not recent_knowledge: # 如果 knowledge_list 存在但为空
# knowledge_info_str += "- 暂无相关知识和记忆。\n"
# else:
# 如果 conversation_info 没有 knowledge_list 属性,或者列表为空
# knowledge_info_str += "- 暂无相关知识记忆。\n"
# except AttributeError:
# logger.warning(f"[私聊][{self.private_name}]ConversationInfo 对象可能缺少 knowledge_list 属性。")
# knowledge_info_str += "- 获取知识列表时出错。\n"
# except Exception as e:
# logger.error(f"[私聊][{self.private_name}]构建知识信息字符串时出错: {e}")
# knowledge_info_str += "- 处理知识列表时出错。\n"
# --- 知识信息字符串构建结束 ---
# 获取聊天历史记录 (chat_history_text)
try:
@ -512,22 +379,26 @@ class ActionPlanner:
last_action_context += f"- 该行动当前状态: {status}\n"
# self.last_successful_action_type = None # 非完成状态,清除记录
retrieved_memory_str_planner = ""
retrieved_memory_str_planner = ""
retrieved_knowledge_str_planner = ""
retrieval_context = chat_history_text # 使用聊天记录作为检索上下文
if retrieval_context and retrieval_context != "还没有聊天记录。" and retrieval_context != "[构建聊天记录出错]":
try:
# 调用本地的 _get_memory_info
logger.debug(f"[私聊][{self.private_name}] (ActionPlanner) 开始自动检索记忆...")
retrieved_memory_str_planner = await self._get_memory_info(text=retrieval_context)
logger.info(
f"[私聊][{self.private_name}] (ActionPlanner) 自动检索记忆 {'完成' if retrieved_memory_str_planner else '无结果'}"
)
logger.debug(f"[私聊][{self.private_name}] (ActionPlanner) 开始自动知识检索...")
retrieved_knowledge_str_planner = await self._get_prompt_info(message=retrieval_context)
logger.info(
f"[私聊][{self.private_name}] (ActionPlanner) 自动检索知识 {'完成' if retrieved_knowledge_str_planner else '无结果'}"
)
# --- MODIFIED KNOWLEDGE RETRIEVAL ---
# 调用导入的 prompt_builder.get_prompt_info
logger.debug(f"[私聊][{self.private_name}] (ActionPlanner) 开始自动检索知识 (使用导入函数)...")
# 使用导入的 prompt_builder 实例及其方法
retrieved_knowledge_str_planner = await prompt_builder.get_prompt_info(message=retrieval_context, threshold=0.38)
# --- END MODIFIED KNOWLEDGE RETRIEVAL ---
logger.info(f"[私聊][{self.private_name}] (ActionPlanner) 自动检索知识 {'完成' if retrieved_knowledge_str_planner else '无结果'}")
except Exception as retrieval_err:
logger.error(f"[私聊][{self.private_name}] (ActionPlanner) 自动检索时出错: {retrieval_err}")
retrieved_memory_str_planner = "检索记忆时出错。\n"
@ -554,7 +425,7 @@ class ActionPlanner:
time_since_last_bot_message_info=time_since_last_bot_message_info,
timeout_context=timeout_context,
chat_history_text=chat_history_text if chat_history_text.strip() else "还没有聊天记录。",
# knowledge_info_str=knowledge_info_str,
# knowledge_info_str=knowledge_info_str, # 移除了旧知识展示方式
retrieved_memory_str=retrieved_memory_str_planner if retrieved_memory_str_planner else "无相关记忆。",
retrieved_knowledge_str=retrieved_knowledge_str_planner
if retrieved_knowledge_str_planner
@ -660,90 +531,4 @@ class ActionPlanner:
except Exception as e:
# 外层异常处理保持不变
logger.error(f"[私聊][{self.private_name}]规划行动时调用 LLM 或处理结果出错: {str(e)}")
return "wait", f"行动规划处理中发生错误,暂时等待: {str(e)}"
def get_info_from_db(
query_embedding: list, limit: int = 1, threshold: float = 0.5, return_raw: bool = False
) -> Union[str, list]:
"""
从旧知识库 (knowledges collection) 中根据嵌入向量相似度检索信息
(移植自 heartflow_prompt_builder.py)
"""
if not query_embedding:
return "" if not return_raw else []
# 使用余弦相似度计算
pipeline = [
{
"$addFields": {
"dotProduct": {
"$reduce": {
"input": {"$range": [0, {"$size": "$embedding"}]},
"initialValue": 0,
"in": {
"$add": [
"$$value",
{
"$multiply": [
{"$arrayElemAt": ["$embedding", "$$this"]},
{"$arrayElemAt": [query_embedding, "$$this"]},
]
},
]
},
}
},
"magnitude1": {
"$sqrt": {
"$reduce": {
"input": "$embedding",
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
}
}
},
"magnitude2": {
"$sqrt": {
"$reduce": {
"input": query_embedding,
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
}
}
},
}
},
# 防止除以零错误,添加一个小的 epsilon
{
"$addFields": {
"similarity": {
"$divide": ["$dotProduct", {"$max": [{"$multiply": ["$magnitude1", "$magnitude2"]}, 1e-9]}]
}
}
},
{
"$match": {
"similarity": {"$gte": threshold} # 只保留相似度大于等于阈值的结果
}
},
{"$sort": {"similarity": -1}},
{"$limit": limit},
{"$project": {"content": 1, "similarity": 1}},
]
try:
results = list(db.knowledges.aggregate(pipeline))
# 注意:这里的 logger 需要能访问到,或者在这个函数里获取 logger 实例
# logger.debug(f"旧知识库查询结果数量: {len(results)}") # 暂时注释掉,避免 logger 未定义
except Exception as e:
logger.debug(f"执行旧知识库聚合查询时出错: {e}")
results = []
if not results:
return "" if not return_raw else []
if return_raw:
return results
else:
# 返回所有找到的内容,用换行分隔
return "\n".join(str(result["content"]) for result in results)
return "wait", f"行动规划处理中发生错误,暂时等待: {str(e)}"

View File

@ -9,7 +9,10 @@ from src.common.database import db
# 用于获取文本的嵌入向量 (旧知识库需要)
from src.plugins.chat.utils import get_embedding
# --- NEW IMPORT ---
# 从 heartflow 导入知识检索和数据库查询函数/实例
from src.plugins.heartFC_chat.heartflow_prompt_builder import prompt_builder
# --- END NEW IMPORT ---
# 可能用于旧知识库提取主题 (如果需要回退到旧方法)
# import jieba # 如果报错说找不到 jieba可能需要安装: pip install jieba
# import re # 正则表达式库,通常 Python 自带
@ -59,7 +62,7 @@ PROMPT_DIRECT_REPLY = """{persona_text}。现在你在参与一场QQ私聊
请直接输出回复内容不需要任何额外格式"""
# Prompt for send_new_message (追问/补充)
PROMPT_SEND_NEW_MESSAGE = """{persona_text}。现在你在参与一场QQ私聊**刚刚你已经发送了一条或多条消息**,现在请根据以下信息再发一条新消息:
PROMPT_SEND_NEW_MESSAGE = """{persona_text}。现在你在参与一场QQ私聊**刚刚你已经发送了一条或多条消息**,现在请根据以下信息再发一条新消息:
当前对话目标{goals_str}
@ -72,7 +75,7 @@ PROMPT_SEND_NEW_MESSAGE = """{persona_text}。现在你在参与一场QQ私聊
{retrieved_memory_str}
请根据上述信息结合聊天记录继续发一条新消息例如对之前消息的补充深入话题或追问等等该消息应该
请根据上述信息结合聊天记录继续发一条新消息例如对之前消息的补充深入话题或追问等等该消息应该
1. 符合对话目标""的角度发言不要自己与自己对话
2. 符合你的性格特征和身份细节
3. 通俗易懂自然流畅像正常聊天一样简短通常20字以内除非特殊情况
@ -121,6 +124,7 @@ class ReplyGenerator:
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
self.reply_checker = ReplyChecker(stream_id, private_name)
# _get_memory_info 保持不变,因为它不是与 heartflow 重复的部分
async def _get_memory_info(self, text: str) -> str:
"""根据文本自动检索相关记忆"""
memory_prompt = ""
@ -149,111 +153,9 @@ class ReplyGenerator:
# memory_prompt = "检索记忆时出错。\n" # 可以选择是否提示错误
return memory_prompt
async def _get_prompt_info_old(self, message: str, threshold: float) -> str:
"""
旧版的知识检索方法根据消息文本从旧知识库knowledges collection检索
(移植并简化自 heartflow_prompt_builder.py)
"""
related_info = ""
start_time = time.time()
logger.debug(f"[私聊][{self.private_name}]开始使用旧版知识检索,消息: {message[:30]}...")
# --- REMOVED _get_prompt_info_old ---
# 简化处理:直接使用整个消息进行查询,不再提取主题
query_text = message.strip()
if not query_text:
logger.debug(f"[私聊][{self.private_name}]旧版知识检索:消息为空,跳过。")
return ""
embedding = None
try:
embedding = await get_embedding(query_text, request_type="pfc_implicit_knowledge")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]旧版知识检索:获取嵌入向量时出错: {str(e)}")
if not embedding:
logger.error(f"[私聊][{self.private_name}]旧版知识检索:获取嵌入向量失败。")
return ""
# 调用我们之前添加的 get_info_from_db 函数
results = get_info_from_db(embedding, limit=5, threshold=threshold, return_raw=True) # 最多查 5 条
logger.info(
f"[私聊][{self.private_name}]旧版知识库查询完成,耗时: {time.time() - start_time:.3f}秒,获取{len(results)}条结果"
)
# 去重和格式化
unique_contents = set()
final_results_content = []
for result in results:
content = result.get("content", "").strip()
# similarity = result.get("similarity", 0.0)
if content and content not in unique_contents:
unique_contents.add(content)
# 可以选择性地加入相似度信息,或者只加内容
# final_results_content.append(f"[{similarity:.2f}] {content}")
final_results_content.append(content)
if final_results_content:
related_info = "\n".join(final_results_content)
logger.debug(f"[私聊][{self.private_name}]旧版知识检索格式化后内容: {related_info[:100]}...")
else:
logger.debug(f"[私聊][{self.private_name}]旧版知识检索未找到合适结果或结果为空。")
logger.info(f"[私聊][{self.private_name}]旧版知识检索总耗时: {time.time() - start_time:.3f}")
return related_info
async def _get_prompt_info(self, message: str, threshold: float = 0.38) -> str:
"""
自动检索相关知识的主函数优先使用 LPMM失败则回退到旧版
(移植自 heartflow_prompt_builder.py)
"""
related_info = ""
start_time = time.time()
message = message.strip()
if not message:
logger.debug(f"[私聊][{self.private_name}]自动知识检索:输入消息为空。")
return ""
logger.debug(f"[私聊][{self.private_name}]开始自动知识检索,消息: {message[:30]}...")
# 1. 尝试从 LPMM 知识库获取知识
try:
found_knowledge_from_lpmm = qa_manager.get_knowledge(message)
if found_knowledge_from_lpmm and found_knowledge_from_lpmm.strip():
related_info = found_knowledge_from_lpmm.strip()
logger.info(f"[私聊][{self.private_name}]从 LPMM 知识库获取到知识,长度: {len(related_info)}")
logger.debug(f"[私聊][{self.private_name}]LPMM 知识内容: {related_info[:100]}...")
# LPMM 成功获取,直接返回
logger.info(f"[私聊][{self.private_name}]自动知识检索(LPMM)耗时: {time.time() - start_time:.3f}")
return related_info
else:
logger.debug(f"[私聊][{self.private_name}]LPMM 知识库未返回有效知识,尝试旧版数据库检索。")
except Exception as e:
logger.error(
f"[私聊][{self.private_name}]调用 LPMM 知识库 (qa_manager.get_knowledge) 时发生异常: {str(e)},尝试旧版数据库检索。"
)
# 2. 如果 LPMM 失败或无结果,尝试旧版数据库
try:
knowledge_from_old = await self._get_prompt_info_old(message, threshold=threshold)
if knowledge_from_old and knowledge_from_old.strip():
related_info = knowledge_from_old.strip()
logger.info(f"[私聊][{self.private_name}]从旧版数据库检索到知识,长度: {len(related_info)}")
# 旧版成功获取,返回
logger.info(f"[私聊][{self.private_name}]自动知识检索(旧版)耗时: {time.time() - start_time:.3f}")
return related_info
else:
logger.debug(f"[私聊][{self.private_name}]旧版数据库也未检索到有效知识。")
except Exception as e2:
logger.error(
f"[私聊][{self.private_name}]调用旧版知识库检索 (_get_prompt_info_old) 时也发生异常: {str(e2)}"
)
# 如果两种方法都失败或无结果
logger.info(
f"[私聊][{self.private_name}]自动知识检索总耗时: {time.time() - start_time:.3f}秒,未找到任何相关知识。"
)
return "" # 返回空字符串
# --- REMOVED _get_prompt_info ---
# 修改 generate 方法签名,增加 action_type 参数
async def generate(
@ -294,37 +196,6 @@ class ReplyGenerator:
else:
goals_str = "- 目前没有明确对话目标\n" # 简化无目标情况
# --- 新增:构建知识信息字符串 ---
# knowledge_info_str = "【供参考的相关知识和记忆】\n" # 稍微改下标题,表明是供参考
# try:
# 检查 conversation_info 是否有 knowledge_list 并且不为空
# if hasattr(conversation_info, "knowledge_list") and conversation_info.knowledge_list:
# 最多只显示最近的 5 条知识
# recent_knowledge = conversation_info.knowledge_list[-5:]
# for i, knowledge_item in enumerate(recent_knowledge):
# if isinstance(knowledge_item, dict):
# query = knowledge_item.get("query", "未知查询")
# knowledge = knowledge_item.get("knowledge", "无知识内容")
# source = knowledge_item.get("source", "未知来源")
# 只取知识内容的前 2000 个字
# knowledge_snippet = knowledge[:2000] + "..." if len(knowledge) > 2000 else knowledge
# knowledge_info_str += (
# f"{i + 1}. 关于 '{query}' (来源: {source}): {knowledge_snippet}\n" # 格式微调,更简洁
# )
# else:
# knowledge_info_str += f"{i + 1}. 发现一条格式不正确的知识记录。\n"
# if not recent_knowledge:
# knowledge_info_str += "- 暂无。\n" # 更简洁的提示
# else:
# knowledge_info_str += "- 暂无。\n"
# except AttributeError:
# logger.warning(f"[私聊][{self.private_name}]ConversationInfo 对象可能缺少 knowledge_list 属性。")
# knowledge_info_str += "- 获取知识列表时出错。\n"
# except Exception as e:
# logger.error(f"[私聊][{self.private_name}]构建知识信息字符串时出错: {e}")
# knowledge_info_str += "- 处理知识列表时出错。\n"
# 获取聊天历史记录 (chat_history_text)
chat_history_text = observation_info.chat_history_str
@ -349,7 +220,7 @@ class ReplyGenerator:
retrieval_context = chat_history_text
if retrieval_context and retrieval_context != "还没有聊天记录。" and retrieval_context != "[构建聊天记录出错]":
try:
# 提取记忆
# 提取记忆 (调用本地的 _get_memory_info)
logger.debug(f"[私聊][{self.private_name}]开始自动检索记忆...")
retrieved_memory_str = await self._get_memory_info(text=retrieval_context)
if retrieved_memory_str:
@ -357,9 +228,13 @@ class ReplyGenerator:
else:
logger.info(f"[私聊][{self.private_name}]未自动检索到相关记忆。")
# 提取知识
logger.debug(f"[私聊][{self.private_name}]开始自动检索知识...")
retrieved_knowledge_str = await self._get_prompt_info(message=retrieval_context)
# --- MODIFIED KNOWLEDGE RETRIEVAL ---
# 提取知识 (调用导入的 prompt_builder.get_prompt_info)
logger.debug(f"[私聊][{self.private_name}]开始自动检索知识 (使用导入函数)...")
# 使用导入的 prompt_builder 实例及其方法
retrieved_knowledge_str = await prompt_builder.get_prompt_info(message=retrieval_context, threshold=0.38)
# --- END MODIFIED KNOWLEDGE RETRIEVAL ---
if retrieved_knowledge_str:
logger.info(f"[私聊][{self.private_name}]自动检索到相关知识。")
else:
@ -390,11 +265,9 @@ class ReplyGenerator:
persona_text=persona_text,
goals_str=goals_str,
chat_history_text=chat_history_text,
# knowledge_info_str=knowledge_info_str,
retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。", # 如果为空则提示无
retrieved_knowledge_str=retrieved_knowledge_str
if retrieved_knowledge_str
else "无相关知识。", # 如果为空则提示无
# knowledge_info_str=knowledge_info_str, # 移除了这个旧的知识展示方式
retrieved_memory_str=retrieved_memory_str if retrieved_memory_str else "无相关记忆。", # 如果为空则提示无
retrieved_knowledge_str=retrieved_knowledge_str if retrieved_knowledge_str else "无相关知识。" # 如果为空则提示无
)
# --- 调用 LLM 生成 ---
@ -417,89 +290,3 @@ class ReplyGenerator:
(此方法逻辑保持不变)
"""
return await self.reply_checker.check(reply, goal, chat_history, chat_history_str, retry_count)
def get_info_from_db(
query_embedding: list, limit: int = 1, threshold: float = 0.5, return_raw: bool = False
) -> Union[str, list]:
"""
从旧知识库 (knowledges collection) 中根据嵌入向量相似度检索信息
(移植自 heartflow_prompt_builder.py)
"""
if not query_embedding:
return "" if not return_raw else []
# 使用余弦相似度计算
pipeline = [
{
"$addFields": {
"dotProduct": {
"$reduce": {
"input": {"$range": [0, {"$size": "$embedding"}]},
"initialValue": 0,
"in": {
"$add": [
"$$value",
{
"$multiply": [
{"$arrayElemAt": ["$embedding", "$$this"]},
{"$arrayElemAt": [query_embedding, "$$this"]},
]
},
]
},
}
},
"magnitude1": {
"$sqrt": {
"$reduce": {
"input": "$embedding",
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
}
}
},
"magnitude2": {
"$sqrt": {
"$reduce": {
"input": query_embedding,
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]},
}
}
},
}
},
# 防止除以零错误,添加一个小的 epsilon
{
"$addFields": {
"similarity": {
"$divide": ["$dotProduct", {"$max": [{"$multiply": ["$magnitude1", "$magnitude2"]}, 1e-9]}]
}
}
},
{
"$match": {
"similarity": {"$gte": threshold} # 只保留相似度大于等于阈值的结果
}
},
{"$sort": {"similarity": -1}},
{"$limit": limit},
{"$project": {"content": 1, "similarity": 1}},
]
try:
results = list(db.knowledges.aggregate(pipeline))
# 注意:这里的 logger 需要能访问到,或者在这个函数里获取 logger 实例
# logger.debug(f"旧知识库查询结果数量: {len(results)}") # 暂时注释掉,避免 logger 未定义
except Exception as e:
logger.debug(f"执行旧知识库聚合查询时出错: {e}")
results = []
if not results:
return "" if not return_raw else []
if return_raw:
return results
else:
# 返回所有找到的内容,用换行分隔
return "\n".join(str(result["content"]) for result in results)