import random import time from typing import Optional, Union from ....common.database import db from ...chat.utils import get_embedding, get_recent_group_detailed_plain_text, get_recent_group_speaker from ...chat.chat_stream import chat_manager from ...moods.moods import MoodManager from ....individuality.individuality import Individuality from ...memory_system.Hippocampus import HippocampusManager from ...schedule.schedule_generator import bot_schedule from ...config.config import global_config from ...person_info.relationship_manager import relationship_manager from src.common.logger import get_module_logger from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager logger = get_module_logger("prompt") def init_prompt(): Prompt( """ {relation_prompt_all} {memory_prompt} {prompt_info} {schedule_prompt} {chat_target} {chat_talking_prompt} 现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n 你的网名叫{bot_name},有人也叫你{bot_other_names},{prompt_personality}。 你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},然后给出日常且口语化的回复,平淡一些, 尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} 请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 请注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。 {moderation_prompt}不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""", "reasoning_prompt_main", ) Prompt( "{relation_prompt}关系等级越大,关系越好,请分析聊天记录,根据你和说话者{sender_name}的关系和态度进行回复,明确你的立场和情感。", "relationship_prompt", ) Prompt( "你想起你之前见过的事情:{related_memory_info}。\n以上是你的回忆,不一定是目前聊天里的人说的,也不一定是现在发生的事情,请记住。\n", "memory_prompt", ) Prompt("你现在正在做的事情是:{schedule_info}", "schedule_prompt") Prompt("\n你有以下这些**知识**:\n{prompt_info}\n请你**记住上面的知识**,之后可能会用到。\n", "knowledge_prompt") class PromptBuilder: def __init__(self): self.prompt_built = "" self.activate_messages = "" async def _build_prompt( self, chat_stream, message_txt: str, sender_name: str = "某人", stream_id: Optional[int] = None ) -> tuple[str, str]: # 开始构建prompt prompt_personality = "你" # person individuality = Individuality.get_instance() personality_core = individuality.personality.personality_core prompt_personality += personality_core personality_sides = individuality.personality.personality_sides random.shuffle(personality_sides) prompt_personality += f",{personality_sides[0]}" identity_detail = individuality.identity.identity_detail random.shuffle(identity_detail) prompt_personality += f",{identity_detail[0]}" # 关系 who_chat_in_group = [ (chat_stream.user_info.platform, chat_stream.user_info.user_id, chat_stream.user_info.user_nickname) ] who_chat_in_group += get_recent_group_speaker( stream_id, (chat_stream.user_info.platform, chat_stream.user_info.user_id), limit=global_config.MAX_CONTEXT_SIZE, ) relation_prompt = "" for person in who_chat_in_group: relation_prompt += await relationship_manager.build_relationship_info(person) # relation_prompt_all = ( # f"{relation_prompt}关系等级越大,关系越好,请分析聊天记录," # f"根据你和说话者{sender_name}的关系和态度进行回复,明确你的立场和情感。" # ) # 心情 mood_manager = MoodManager.get_instance() mood_prompt = mood_manager.get_prompt() # logger.info(f"心情prompt: {mood_prompt}") # 调取记忆 memory_prompt = "" related_memory = await HippocampusManager.get_instance().get_memory_from_text( text=message_txt, max_memory_num=2, max_memory_length=2, max_depth=3, fast_retrieval=False ) if related_memory: related_memory_info = "" for memory in related_memory: related_memory_info += memory[1] # memory_prompt = f"你想起你之前见过的事情:{related_memory_info}。\n以上是你的回忆,不一定是目前聊天里的人说的,也不一定是现在发生的事情,请记住。\n" memory_prompt = await global_prompt_manager.format_prompt( "memory_prompt", related_memory_info=related_memory_info ) else: related_memory_info = "" # print(f"相关记忆:{related_memory_info}") # 日程构建 # schedule_prompt = f"""你现在正在做的事情是:{bot_schedule.get_current_num_task(num=1, time_info=False)}""" # 获取聊天上下文 chat_in_group = True chat_talking_prompt = "" if stream_id: chat_talking_prompt = get_recent_group_detailed_plain_text( stream_id, limit=global_config.MAX_CONTEXT_SIZE, combine=True ) chat_stream = chat_manager.get_stream(stream_id) if chat_stream.group_info: chat_talking_prompt = chat_talking_prompt else: chat_in_group = False chat_talking_prompt = chat_talking_prompt # print(f"\033[1;34m[调试]\033[0m 已从数据库获取群 {group_id} 的消息记录:{chat_talking_prompt}") # 关键词检测与反应 keywords_reaction_prompt = "" for rule in global_config.keywords_reaction_rules: if rule.get("enable", False): if any(keyword in message_txt.lower() for keyword in rule.get("keywords", [])): logger.info( f"检测到以下关键词之一:{rule.get('keywords', [])},触发反应:{rule.get('reaction', '')}" ) keywords_reaction_prompt += rule.get("reaction", "") + "," else: for pattern in rule.get("regex", []): result = pattern.search(message_txt) if result: reaction = rule.get("reaction", "") for name, content in result.groupdict().items(): reaction = reaction.replace(f"[{name}]", content) logger.info(f"匹配到以下正则表达式:{pattern},触发反应:{reaction}") keywords_reaction_prompt += reaction + "," break # 中文高手(新加的好玩功能) prompt_ger = "" if random.random() < 0.04: prompt_ger += "你喜欢用倒装句" if random.random() < 0.02: prompt_ger += "你喜欢用反问句" if random.random() < 0.01: prompt_ger += "你喜欢用文言文" # 知识构建 start_time = time.time() prompt_info = "" prompt_info = await self.get_prompt_info(message_txt, threshold=0.38) if prompt_info: # prompt_info = f"""\n你有以下这些**知识**:\n{prompt_info}\n请你**记住上面的知识**,之后可能会用到。\n""" prompt_info = await global_prompt_manager.format_prompt("knowledge_prompt", prompt_info=prompt_info) end_time = time.time() logger.debug(f"知识检索耗时: {(end_time - start_time):.3f}秒") # moderation_prompt = "" # moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。 # 涉及政治敏感以及违法违规的内容请规避。""" logger.debug("开始构建prompt") # prompt = f""" # {relation_prompt_all} # {memory_prompt} # {prompt_info} # {schedule_prompt} # {chat_target} # {chat_talking_prompt} # 现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n # 你的网名叫{global_config.BOT_NICKNAME},有人也叫你{"/".join(global_config.BOT_ALIAS_NAMES)},{prompt_personality}。 # 你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},然后给出日常且口语化的回复,平淡一些, # 尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} # 请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 # 请注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。 # {moderation_prompt}不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""" prompt = await global_prompt_manager.format_prompt( "reasoning_prompt_main", relation_prompt_all=await global_prompt_manager.get_prompt_async("relationship_prompt"), relation_prompt=relation_prompt, sender_name=sender_name, memory_prompt=memory_prompt, prompt_info=prompt_info, schedule_prompt=await global_prompt_manager.format_prompt( "schedule_prompt", schedule_info=bot_schedule.get_current_num_task(num=1, time_info=False) ), chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1") if chat_in_group else await global_prompt_manager.get_prompt_async("chat_target_private1"), chat_target_2=await global_prompt_manager.get_prompt_async("chat_target_group2") if chat_in_group else await global_prompt_manager.get_prompt_async("chat_target_private2"), chat_talking_prompt=chat_talking_prompt, message_txt=message_txt, bot_name=global_config.BOT_NICKNAME, bot_other_names="/".join( global_config.BOT_ALIAS_NAMES, ), prompt_personality=prompt_personality, mood_prompt=mood_prompt, keywords_reaction_prompt=keywords_reaction_prompt, prompt_ger=prompt_ger, moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), ) return prompt async def get_prompt_info(self, message: str, threshold: float): start_time = time.time() related_info = "" logger.debug(f"获取知识库内容,元消息:{message[:30]}...,消息长度: {len(message)}") # 1. 先从LLM获取主题,类似于记忆系统的做法 topics = [] # try: # # 先尝试使用记忆系统的方法获取主题 # hippocampus = HippocampusManager.get_instance()._hippocampus # topic_num = min(5, max(1, int(len(message) * 0.1))) # topics_response = await hippocampus.llm_topic_judge.generate_response(hippocampus.find_topic_llm(message, topic_num)) # # 提取关键词 # topics = re.findall(r"<([^>]+)>", topics_response[0]) # if not topics: # topics = [] # else: # topics = [ # topic.strip() # for topic in ",".join(topics).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") # if topic.strip() # ] # logger.info(f"从LLM提取的主题: {', '.join(topics)}") # except Exception as e: # logger.error(f"从LLM提取主题失败: {str(e)}") # # 如果LLM提取失败,使用jieba分词提取关键词作为备选 # words = jieba.cut(message) # topics = [word for word in words if len(word) > 1][:5] # logger.info(f"使用jieba提取的主题: {', '.join(topics)}") # 如果无法提取到主题,直接使用整个消息 if not topics: logger.info("未能提取到任何主题,使用整个消息进行查询") embedding = await get_embedding(message, request_type="prompt_build") if not embedding: logger.error("获取消息嵌入向量失败") return "" related_info = self.get_info_from_db(embedding, limit=3, threshold=threshold) logger.info(f"知识库检索完成,总耗时: {time.time() - start_time:.3f}秒") return related_info # 2. 对每个主题进行知识库查询 logger.info(f"开始处理{len(topics)}个主题的知识库查询") # 优化:批量获取嵌入向量,减少API调用 embeddings = {} topics_batch = [topic for topic in topics if len(topic) > 0] if message: # 确保消息非空 topics_batch.append(message) # 批量获取嵌入向量 embed_start_time = time.time() for text in topics_batch: if not text or len(text.strip()) == 0: continue try: embedding = await get_embedding(text, request_type="prompt_build") if embedding: embeddings[text] = embedding else: logger.warning(f"获取'{text}'的嵌入向量失败") except Exception as e: logger.error(f"获取'{text}'的嵌入向量时发生错误: {str(e)}") logger.info(f"批量获取嵌入向量完成,耗时: {time.time() - embed_start_time:.3f}秒") if not embeddings: logger.error("所有嵌入向量获取失败") return "" # 3. 对每个主题进行知识库查询 all_results = [] query_start_time = time.time() # 首先添加原始消息的查询结果 if message in embeddings: original_results = self.get_info_from_db(embeddings[message], limit=3, threshold=threshold, return_raw=True) if original_results: for result in original_results: result["topic"] = "原始消息" all_results.extend(original_results) logger.info(f"原始消息查询到{len(original_results)}条结果") # 然后添加每个主题的查询结果 for topic in topics: if not topic or topic not in embeddings: continue try: topic_results = self.get_info_from_db(embeddings[topic], limit=3, threshold=threshold, return_raw=True) if topic_results: # 添加主题标记 for result in topic_results: result["topic"] = topic all_results.extend(topic_results) logger.info(f"主题'{topic}'查询到{len(topic_results)}条结果") except Exception as e: logger.error(f"查询主题'{topic}'时发生错误: {str(e)}") logger.info(f"知识库查询完成,耗时: {time.time() - query_start_time:.3f}秒,共获取{len(all_results)}条结果") # 4. 去重和过滤 process_start_time = time.time() unique_contents = set() filtered_results = [] for result in all_results: content = result["content"] if content not in unique_contents: unique_contents.add(content) filtered_results.append(result) # 5. 按相似度排序 filtered_results.sort(key=lambda x: x["similarity"], reverse=True) # 6. 限制总数量(最多10条) filtered_results = filtered_results[:10] logger.info( f"结果处理完成,耗时: {time.time() - process_start_time:.3f}秒,过滤后剩余{len(filtered_results)}条结果" ) # 7. 格式化输出 if filtered_results: format_start_time = time.time() grouped_results = {} for result in filtered_results: topic = result["topic"] if topic not in grouped_results: grouped_results[topic] = [] grouped_results[topic].append(result) # 按主题组织输出 for topic, results in grouped_results.items(): related_info += f"【主题: {topic}】\n" for _i, result in enumerate(results, 1): _similarity = result["similarity"] content = result["content"].strip() # 调试:为内容添加序号和相似度信息 # related_info += f"{i}. [{similarity:.2f}] {content}\n" related_info += f"{content}\n" related_info += "\n" logger.info(f"格式化输出完成,耗时: {time.time() - format_start_time:.3f}秒") logger.info(f"知识库检索总耗时: {time.time() - start_time:.3f}秒") return related_info def get_info_from_db( self, query_embedding: list, limit: int = 1, threshold: float = 0.5, return_raw: bool = False ) -> Union[str, list]: if not query_embedding: return "" if not return_raw else [] # 使用余弦相似度计算 pipeline = [ { "$addFields": { "dotProduct": { "$reduce": { "input": {"$range": [0, {"$size": "$embedding"}]}, "initialValue": 0, "in": { "$add": [ "$$value", { "$multiply": [ {"$arrayElemAt": ["$embedding", "$$this"]}, {"$arrayElemAt": [query_embedding, "$$this"]}, ] }, ] }, } }, "magnitude1": { "$sqrt": { "$reduce": { "input": "$embedding", "initialValue": 0, "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}, } } }, "magnitude2": { "$sqrt": { "$reduce": { "input": query_embedding, "initialValue": 0, "in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}, } } }, } }, {"$addFields": {"similarity": {"$divide": ["$dotProduct", {"$multiply": ["$magnitude1", "$magnitude2"]}]}}}, { "$match": { "similarity": {"$gte": threshold} # 只保留相似度大于等于阈值的结果 } }, {"$sort": {"similarity": -1}}, {"$limit": limit}, {"$project": {"content": 1, "similarity": 1}}, ] results = list(db.knowledges.aggregate(pipeline)) logger.debug(f"知识库查询结果数量: {len(results)}") if not results: return "" if not return_raw else [] if return_raw: return results else: # 返回所有找到的内容,用换行分隔 return "\n".join(str(result["content"]) for result in results) init_prompt() prompt_builder = PromptBuilder()