diff --git a/src/jargon/jargon_miner.py b/src/jargon/jargon_miner.py index e41aac32..94dd0434 100644 --- a/src/jargon/jargon_miner.py +++ b/src/jargon/jargon_miner.py @@ -1,7 +1,7 @@ import time import json import asyncio -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Any from json_repair import repair_json from peewee import fn @@ -10,9 +10,12 @@ from src.common.database.database_model import Jargon from src.llm_models.utils_model import LLMRequest from src.config.config import model_config, global_config from src.chat.message_receive.chat_stream import get_chat_manager +from src.plugin_system.apis import llm_api from src.chat.utils.chat_message_builder import ( build_anonymous_messages, get_raw_msg_by_timestamp_with_chat_inclusive, + get_raw_msg_before_timestamp_with_chat, + build_readable_messages_with_list, ) from src.chat.utils.prompt_builder import Prompt, global_prompt_manager @@ -109,6 +112,97 @@ _init_prompt() _init_inference_prompts() +async def _enrich_raw_content_if_needed( + content: str, + raw_content_list: List[str], + chat_id: str, + messages: List[Any], + extraction_start_time: float, + extraction_end_time: float, +) -> List[str]: + """ + 检查raw_content是否只包含黑话本身,如果是,则获取该消息的前三条消息作为原始内容 + + Args: + content: 黑话内容 + raw_content_list: 原始raw_content列表 + chat_id: 聊天ID + messages: 当前时间窗口内的消息列表 + extraction_start_time: 提取开始时间 + extraction_end_time: 提取结束时间 + + Returns: + 处理后的raw_content列表 + """ + enriched_list = [] + + for raw_content in raw_content_list: + # 检查raw_content是否只包含黑话本身(去除空白字符后比较) + raw_content_clean = raw_content.strip() + content_clean = content.strip() + + # 如果raw_content只包含黑话本身(可能有一些标点或空白),则尝试获取上下文 + # 去除所有空白字符后比较,确保只包含黑话本身 + raw_content_normalized = raw_content_clean.replace(" ", "").replace("\n", "").replace("\t", "") + content_normalized = content_clean.replace(" ", "").replace("\n", "").replace("\t", "") + + if raw_content_normalized == content_normalized: + # 在消息列表中查找只包含该黑话的消息(去除空白后比较) + target_message = None + for msg in messages: + msg_content = (msg.processed_plain_text or msg.display_message or "").strip() + msg_content_normalized = msg_content.replace(" ", "").replace("\n", "").replace("\t", "") + # 检查消息内容是否只包含黑话本身(去除空白后完全匹配) + if msg_content_normalized == content_normalized: + target_message = msg + break + + if target_message and target_message.time: + # 获取该消息的前三条消息 + try: + previous_messages = get_raw_msg_before_timestamp_with_chat( + chat_id=chat_id, + timestamp=target_message.time, + limit=3 + ) + + if previous_messages: + # 将前三条消息和当前消息一起格式化 + context_messages = previous_messages + [target_message] + # 按时间排序 + context_messages.sort(key=lambda x: x.time or 0) + + # 格式化为可读消息 + formatted_context, _ = await build_readable_messages_with_list( + context_messages, + replace_bot_name=True, + timestamp_mode="relative", + truncate=False, + ) + + if formatted_context.strip(): + enriched_list.append(formatted_context.strip()) + logger.warning(f"为黑话 {content} 补充了上下文消息") + else: + # 如果格式化失败,使用原始raw_content + enriched_list.append(raw_content) + else: + # 没有找到前三条消息,使用原始raw_content + enriched_list.append(raw_content) + except Exception as e: + logger.warning(f"获取黑话 {content} 的上下文消息失败: {e}") + # 出错时使用原始raw_content + enriched_list.append(raw_content) + else: + # 没有找到包含黑话的消息,使用原始raw_content + enriched_list.append(raw_content) + else: + # raw_content包含更多内容,直接使用 + enriched_list.append(raw_content) + + return enriched_list + + def _should_infer_meaning(jargon_obj: Jargon) -> bool: """ 判断是否需要进行含义推断 @@ -453,6 +547,17 @@ class JargonMiner: for entry in uniq_entries: content = entry["content"] raw_content_list = entry["raw_content"] # 已经是列表 + + # 检查并补充raw_content:如果只包含黑话本身,则获取前三条消息作为上下文 + raw_content_list = await _enrich_raw_content_if_needed( + content=content, + raw_content_list=raw_content_list, + chat_id=self.chat_id, + messages=messages, + extraction_start_time=extraction_start_time, + extraction_end_time=extraction_end_time, + ) + try: # 根据all_global配置决定查询逻辑 if global_config.jargon.all_global: @@ -650,3 +755,83 @@ def search_jargon( return results +async def store_jargon_from_answer(jargon_keyword: str, answer: str, chat_id: str) -> None: + """将黑话存入jargon系统 + + Args: + jargon_keyword: 黑话关键词 + answer: 答案内容(将概括为raw_content) + chat_id: 聊天ID + """ + try: + # 概括答案为简短的raw_content + summary_prompt = f"""请将以下答案概括为一句简短的话(不超过50字),作为黑话"{jargon_keyword}"的使用示例: + +答案:{answer} + +只输出概括后的内容,不要输出其他内容:""" + + success, summary, _, _ = await llm_api.generate_with_model( + summary_prompt, + model_config=model_config.model_task_config.utils_small, + request_type="memory.summarize_jargon", + ) + + logger.info(f"概括答案提示: {summary_prompt}") + logger.info(f"概括答案: {summary}") + + if not success: + logger.warning(f"概括答案失败,使用原始答案: {summary}") + summary = answer[:100] # 截取前100字符作为备用 + + raw_content = summary.strip()[:200] # 限制长度 + + # 检查是否已存在 + if global_config.jargon.all_global: + query = Jargon.select().where(Jargon.content == jargon_keyword) + else: + query = Jargon.select().where( + (Jargon.chat_id == chat_id) & + (Jargon.content == jargon_keyword) + ) + + if query.exists(): + # 更新现有记录 + obj = query.get() + obj.count = (obj.count or 0) + 1 + + # 合并raw_content列表 + existing_raw_content = [] + if obj.raw_content: + try: + existing_raw_content = json.loads(obj.raw_content) if isinstance(obj.raw_content, str) else obj.raw_content + if not isinstance(existing_raw_content, list): + existing_raw_content = [existing_raw_content] if existing_raw_content else [] + except (json.JSONDecodeError, TypeError): + existing_raw_content = [obj.raw_content] if obj.raw_content else [] + + # 合并并去重 + merged_list = list(dict.fromkeys(existing_raw_content + [raw_content])) + obj.raw_content = json.dumps(merged_list, ensure_ascii=False) + + if global_config.jargon.all_global: + obj.is_global = True + + obj.save() + logger.info(f"更新jargon记录: {jargon_keyword}") + else: + # 创建新记录 + is_global_new = True if global_config.jargon.all_global else False + Jargon.create( + content=jargon_keyword, + raw_content=json.dumps([raw_content], ensure_ascii=False), + chat_id=chat_id, + is_global=is_global_new, + count=1 + ) + logger.info(f"创建新jargon记录: {jargon_keyword}") + + except Exception as e: + logger.error(f"存储jargon失败: {e}") + + diff --git a/src/memory_system/memory_retrieval.py b/src/memory_system/memory_retrieval.py index 53489ff3..6bc42807 100644 --- a/src/memory_system/memory_retrieval.py +++ b/src/memory_system/memory_retrieval.py @@ -91,7 +91,7 @@ def init_memory_retrieval_prompt(): 你需要通过思考(Think)、行动(Action)、观察(Observation)的循环来回答问题。 **重要限制:** -- 最大查询轮数:5轮(当前第{current_iteration}轮,剩余{remaining_iterations}轮) +- 最大查询轮数:{max_iterations}轮(当前第{current_iteration}轮,剩余{remaining_iterations}轮) - 必须尽快得出答案,避免不必要的查询 - 思考要简短,直接切入要点 - 必须严格使用检索到的信息回答问题,不要编造信息 @@ -127,7 +127,7 @@ def init_memory_retrieval_prompt(): 你需要通过思考(Think)、行动(Action)、观察(Observation)的循环来回答问题。 **重要限制:** -- 最大查询轮数:5轮(当前第{current_iteration}轮,剩余{remaining_iterations}轮) +- 最大查询轮数:{max_iterations}轮(当前第{current_iteration}轮,剩余{remaining_iterations}轮) - 必须尽快得出答案,避免不必要的查询 - 思考要简短,直接切入要点 - 必须严格使用检索到的信息回答问题,不要编造信息 @@ -416,6 +416,7 @@ async def _react_agent_solve_question( collected_info=collected_info if collected_info else "暂无信息", current_iteration=current_iteration, remaining_iterations=remaining_iterations, + max_iterations=max_iterations, ) @@ -428,6 +429,7 @@ async def _react_agent_solve_question( question=question, current_iteration=current_iteration, remaining_iterations=remaining_iterations, + max_iterations=max_iterations, ) def message_factory( @@ -821,6 +823,91 @@ def _query_thinking_back(chat_id: str, question: str) -> Optional[Tuple[bool, st return None +async def _analyze_question_answer(question: str, answer: str, chat_id: str) -> None: + """异步分析问题和答案的类别,并存储到相应系统 + + Args: + question: 问题 + answer: 答案 + chat_id: 聊天ID + """ + try: + # 使用LLM分析类别 + analysis_prompt = f"""请分析以下问题和答案的类别: + +问题:{question} +答案:{answer} + +类别说明: +1. 人物信息:有关某个用户的个体信息(如某人的喜好、习惯、经历等) +2. 黑话:对特定概念、缩写词、谐音词、自创词的解释(如"yyds"、"社死"等) +3. 其他:除此之外的其他内容 + +请输出JSON格式: +{{ + "category": "人物信息" | "黑话" | "其他", + "jargon_keyword": "如果是黑话,提取关键词(如'yyds'),否则为空字符串", + "person_name": "如果是人物信息,提取人物名称,否则为空字符串", + "memory_content": "如果是人物信息,提取要存储的记忆内容(简短概括),否则为空字符串" +}} + +只输出JSON,不要输出其他内容:""" + + success, response, _, _ = await llm_api.generate_with_model( + analysis_prompt, + model_config=model_config.model_task_config.utils, + request_type="memory.analyze_qa", + ) + + if not success: + logger.error(f"分析问题和答案失败: {response}") + return + + # 解析JSON响应 + try: + json_pattern = r"```json\s*(.*?)\s*```" + matches = re.findall(json_pattern, response, re.DOTALL) + + if matches: + json_str = matches[0] + else: + json_str = response.strip() + + repaired_json = repair_json(json_str) + analysis_result = json.loads(repaired_json) + + category = analysis_result.get("category", "").strip() + + if category == "黑话": + # 处理黑话 + jargon_keyword = analysis_result.get("jargon_keyword", "").strip() + if jargon_keyword: + from src.jargon.jargon_miner import store_jargon_from_answer + await store_jargon_from_answer(jargon_keyword, answer, chat_id) + else: + logger.warning(f"分析为黑话但未提取到关键词,问题: {question[:50]}...") + + elif category == "人物信息": + # 处理人物信息 + person_name = analysis_result.get("person_name", "").strip() + memory_content = analysis_result.get("memory_content", "").strip() + if person_name and memory_content: + from src.person_info.person_info import store_person_memory_from_answer + await store_person_memory_from_answer(person_name, memory_content, chat_id) + else: + logger.warning(f"分析为人物信息但未提取到人物名称或记忆内容,问题: {question[:50]}...") + + else: + logger.info(f"问题和答案类别为'其他',不进行存储,问题: {question[:50]}...") + + except Exception as e: + logger.error(f"解析分析结果失败: {e}, 响应: {response[:200]}...") + + except Exception as e: + logger.error(f"分析问题和答案时发生异常: {e}") + + + def _store_thinking_back( chat_id: str, question: str, @@ -937,7 +1024,7 @@ async def _process_single_question( found_answer, answer, thinking_steps, is_timeout = await _react_agent_solve_question( question=question, chat_id=chat_id, - max_iterations=5, + max_iterations=global_config.memory.max_agent_iterations, timeout=120.0, initial_info=initial_info ) @@ -956,6 +1043,8 @@ async def _process_single_question( logger.info(f"ReAct Agent超时,不存储到数据库,问题: {question[:50]}...") if found_answer and answer: + # 创建异步任务分析问题和答案 + asyncio.create_task(_analyze_question_answer(question, answer, chat_id)) return f"问题:{question}\n答案:{answer}" return None @@ -1067,8 +1156,9 @@ async def build_memory_retrieval_prompt( logger.info(f"解析到 {len(questions)} 个问题: {questions}") - # 第二步:并行处理所有问题(固定使用5次迭代/120秒超时) - logger.info(f"问题数量: {len(questions)},固定设置最大迭代次数: 5,超时时间: 120秒") + # 第二步:并行处理所有问题(使用配置的最大迭代次数/120秒超时) + max_iterations = global_config.memory.max_agent_iterations + logger.info(f"问题数量: {len(questions)},设置最大迭代次数: {max_iterations},超时时间: 120秒") # 并行处理所有问题,将概念检索结果作为初始信息传递 question_tasks = [ diff --git a/src/person_info/person_info.py b/src/person_info/person_info.py index cd5ab1d4..bbc3fb99 100644 --- a/src/person_info/person_info.py +++ b/src/person_info/person_info.py @@ -13,6 +13,7 @@ from src.common.database.database import db from src.common.database.database_model import PersonInfo from src.llm_models.utils_model import LLMRequest from src.config.config import global_config, model_config +from src.chat.message_receive.chat_stream import get_chat_manager logger = get_logger("person_info") @@ -722,3 +723,74 @@ class PersonInfoManager: person_info_manager = PersonInfoManager() + + +async def store_person_memory_from_answer(person_name: str, memory_content: str, chat_id: str) -> None: + """将人物信息存入person_info的memory_points + + Args: + person_name: 人物名称 + memory_content: 记忆内容 + chat_id: 聊天ID + """ + try: + # 从chat_id获取chat_stream + chat_stream = get_chat_manager().get_stream(chat_id) + if not chat_stream: + logger.warning(f"无法获取chat_stream for chat_id: {chat_id}") + return + + platform = chat_stream.platform + + # 尝试从person_name查找person_id + # 首先尝试通过person_name查找 + person_id = get_person_id_by_person_name(person_name) + + if not person_id: + # 如果通过person_name找不到,尝试从chat_stream获取user_info + if chat_stream.user_info: + user_id = chat_stream.user_info.user_id + person_id = get_person_id(platform, user_id) + else: + logger.warning(f"无法确定person_id for person_name: {person_name}, chat_id: {chat_id}") + return + + # 创建或获取Person对象 + person = Person(person_id=person_id) + + if not person.is_known: + logger.warning(f"用户 {person_name} (person_id: {person_id}) 尚未认识,无法存储记忆") + return + + # 确定记忆分类(可以根据memory_content判断,这里使用通用分类) + category = "其他" # 默认分类,可以根据需要调整 + + # 记忆点格式:category:content:weight + weight = "1.0" # 默认权重 + memory_point = f"{category}:{memory_content}:{weight}" + + # 添加到memory_points + if not person.memory_points: + person.memory_points = [] + + # 检查是否已存在相似的记忆点(避免重复) + is_duplicate = False + for existing_point in person.memory_points: + if existing_point and isinstance(existing_point, str): + parts = existing_point.split(":", 2) + if len(parts) >= 2: + existing_content = parts[1].strip() + # 简单相似度检查(如果内容相同或非常相似,则跳过) + if existing_content == memory_content or memory_content in existing_content or existing_content in memory_content: + is_duplicate = True + break + + if not is_duplicate: + person.memory_points.append(memory_point) + person.sync_to_database() + logger.info(f"成功添加记忆点到 {person_name} (person_id: {person_id}): {memory_point}") + else: + logger.debug(f"记忆点已存在,跳过: {memory_point}") + + except Exception as e: + logger.error(f"存储人物记忆失败: {e}")