feat:查询结果可以建构进jargon和person_info

pull/1359/head
SengokuCola 2025-11-15 19:18:39 +08:00
parent 04d1aa6763
commit d18d77cf4b
3 changed files with 353 additions and 6 deletions

View File

@ -1,7 +1,7 @@
import time
import json
import asyncio
from typing import List, Dict, Optional
from typing import List, Dict, Optional, Any
from json_repair import repair_json
from peewee import fn
@ -10,9 +10,12 @@ from src.common.database.database_model import Jargon
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config, global_config
from src.chat.message_receive.chat_stream import get_chat_manager
from src.plugin_system.apis import llm_api
from src.chat.utils.chat_message_builder import (
build_anonymous_messages,
get_raw_msg_by_timestamp_with_chat_inclusive,
get_raw_msg_before_timestamp_with_chat,
build_readable_messages_with_list,
)
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
@ -109,6 +112,97 @@ _init_prompt()
_init_inference_prompts()
async def _enrich_raw_content_if_needed(
content: str,
raw_content_list: List[str],
chat_id: str,
messages: List[Any],
extraction_start_time: float,
extraction_end_time: float,
) -> List[str]:
"""
检查raw_content是否只包含黑话本身如果是则获取该消息的前三条消息作为原始内容
Args:
content: 黑话内容
raw_content_list: 原始raw_content列表
chat_id: 聊天ID
messages: 当前时间窗口内的消息列表
extraction_start_time: 提取开始时间
extraction_end_time: 提取结束时间
Returns:
处理后的raw_content列表
"""
enriched_list = []
for raw_content in raw_content_list:
# 检查raw_content是否只包含黑话本身去除空白字符后比较
raw_content_clean = raw_content.strip()
content_clean = content.strip()
# 如果raw_content只包含黑话本身可能有一些标点或空白则尝试获取上下文
# 去除所有空白字符后比较,确保只包含黑话本身
raw_content_normalized = raw_content_clean.replace(" ", "").replace("\n", "").replace("\t", "")
content_normalized = content_clean.replace(" ", "").replace("\n", "").replace("\t", "")
if raw_content_normalized == content_normalized:
# 在消息列表中查找只包含该黑话的消息(去除空白后比较)
target_message = None
for msg in messages:
msg_content = (msg.processed_plain_text or msg.display_message or "").strip()
msg_content_normalized = msg_content.replace(" ", "").replace("\n", "").replace("\t", "")
# 检查消息内容是否只包含黑话本身(去除空白后完全匹配)
if msg_content_normalized == content_normalized:
target_message = msg
break
if target_message and target_message.time:
# 获取该消息的前三条消息
try:
previous_messages = get_raw_msg_before_timestamp_with_chat(
chat_id=chat_id,
timestamp=target_message.time,
limit=3
)
if previous_messages:
# 将前三条消息和当前消息一起格式化
context_messages = previous_messages + [target_message]
# 按时间排序
context_messages.sort(key=lambda x: x.time or 0)
# 格式化为可读消息
formatted_context, _ = await build_readable_messages_with_list(
context_messages,
replace_bot_name=True,
timestamp_mode="relative",
truncate=False,
)
if formatted_context.strip():
enriched_list.append(formatted_context.strip())
logger.warning(f"为黑话 {content} 补充了上下文消息")
else:
# 如果格式化失败使用原始raw_content
enriched_list.append(raw_content)
else:
# 没有找到前三条消息使用原始raw_content
enriched_list.append(raw_content)
except Exception as e:
logger.warning(f"获取黑话 {content} 的上下文消息失败: {e}")
# 出错时使用原始raw_content
enriched_list.append(raw_content)
else:
# 没有找到包含黑话的消息使用原始raw_content
enriched_list.append(raw_content)
else:
# raw_content包含更多内容直接使用
enriched_list.append(raw_content)
return enriched_list
def _should_infer_meaning(jargon_obj: Jargon) -> bool:
"""
判断是否需要进行含义推断
@ -453,6 +547,17 @@ class JargonMiner:
for entry in uniq_entries:
content = entry["content"]
raw_content_list = entry["raw_content"] # 已经是列表
# 检查并补充raw_content如果只包含黑话本身则获取前三条消息作为上下文
raw_content_list = await _enrich_raw_content_if_needed(
content=content,
raw_content_list=raw_content_list,
chat_id=self.chat_id,
messages=messages,
extraction_start_time=extraction_start_time,
extraction_end_time=extraction_end_time,
)
try:
# 根据all_global配置决定查询逻辑
if global_config.jargon.all_global:
@ -650,3 +755,83 @@ def search_jargon(
return results
async def store_jargon_from_answer(jargon_keyword: str, answer: str, chat_id: str) -> None:
"""将黑话存入jargon系统
Args:
jargon_keyword: 黑话关键词
answer: 答案内容将概括为raw_content
chat_id: 聊天ID
"""
try:
# 概括答案为简短的raw_content
summary_prompt = f"""请将以下答案概括为一句简短的话不超过50字作为黑话"{jargon_keyword}"的使用示例:
答案{answer}
只输出概括后的内容不要输出其他内容"""
success, summary, _, _ = await llm_api.generate_with_model(
summary_prompt,
model_config=model_config.model_task_config.utils_small,
request_type="memory.summarize_jargon",
)
logger.info(f"概括答案提示: {summary_prompt}")
logger.info(f"概括答案: {summary}")
if not success:
logger.warning(f"概括答案失败,使用原始答案: {summary}")
summary = answer[:100] # 截取前100字符作为备用
raw_content = summary.strip()[:200] # 限制长度
# 检查是否已存在
if global_config.jargon.all_global:
query = Jargon.select().where(Jargon.content == jargon_keyword)
else:
query = Jargon.select().where(
(Jargon.chat_id == chat_id) &
(Jargon.content == jargon_keyword)
)
if query.exists():
# 更新现有记录
obj = query.get()
obj.count = (obj.count or 0) + 1
# 合并raw_content列表
existing_raw_content = []
if obj.raw_content:
try:
existing_raw_content = json.loads(obj.raw_content) if isinstance(obj.raw_content, str) else obj.raw_content
if not isinstance(existing_raw_content, list):
existing_raw_content = [existing_raw_content] if existing_raw_content else []
except (json.JSONDecodeError, TypeError):
existing_raw_content = [obj.raw_content] if obj.raw_content else []
# 合并并去重
merged_list = list(dict.fromkeys(existing_raw_content + [raw_content]))
obj.raw_content = json.dumps(merged_list, ensure_ascii=False)
if global_config.jargon.all_global:
obj.is_global = True
obj.save()
logger.info(f"更新jargon记录: {jargon_keyword}")
else:
# 创建新记录
is_global_new = True if global_config.jargon.all_global else False
Jargon.create(
content=jargon_keyword,
raw_content=json.dumps([raw_content], ensure_ascii=False),
chat_id=chat_id,
is_global=is_global_new,
count=1
)
logger.info(f"创建新jargon记录: {jargon_keyword}")
except Exception as e:
logger.error(f"存储jargon失败: {e}")

View File

@ -91,7 +91,7 @@ def init_memory_retrieval_prompt():
你需要通过思考(Think)行动(Action)观察(Observation)的循环来回答问题
**重要限制**
- 最大查询轮数5当前第{current_iteration}剩余{remaining_iterations}
- 最大查询轮数{max_iterations}当前第{current_iteration}剩余{remaining_iterations}
- 必须尽快得出答案避免不必要的查询
- 思考要简短直接切入要点
- 必须严格使用检索到的信息回答问题不要编造信息
@ -127,7 +127,7 @@ def init_memory_retrieval_prompt():
你需要通过思考(Think)行动(Action)观察(Observation)的循环来回答问题
**重要限制**
- 最大查询轮数5当前第{current_iteration}剩余{remaining_iterations}
- 最大查询轮数{max_iterations}当前第{current_iteration}剩余{remaining_iterations}
- 必须尽快得出答案避免不必要的查询
- 思考要简短直接切入要点
- 必须严格使用检索到的信息回答问题不要编造信息
@ -416,6 +416,7 @@ async def _react_agent_solve_question(
collected_info=collected_info if collected_info else "暂无信息",
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
@ -428,6 +429,7 @@ async def _react_agent_solve_question(
question=question,
current_iteration=current_iteration,
remaining_iterations=remaining_iterations,
max_iterations=max_iterations,
)
def message_factory(
@ -821,6 +823,91 @@ def _query_thinking_back(chat_id: str, question: str) -> Optional[Tuple[bool, st
return None
async def _analyze_question_answer(question: str, answer: str, chat_id: str) -> None:
"""异步分析问题和答案的类别,并存储到相应系统
Args:
question: 问题
answer: 答案
chat_id: 聊天ID
"""
try:
# 使用LLM分析类别
analysis_prompt = f"""请分析以下问题和答案的类别:
问题{question}
答案{answer}
类别说明
1. 人物信息有关某个用户的个体信息如某人的喜好习惯经历等
2. 黑话对特定概念缩写词谐音词自创词的解释"yyds""社死"
3. 其他除此之外的其他内容
请输出JSON格式
{{
"category": "人物信息" | "黑话" | "其他",
"jargon_keyword": "如果是黑话,提取关键词(如'yyds'),否则为空字符串",
"person_name": "如果是人物信息,提取人物名称,否则为空字符串",
"memory_content": "如果是人物信息,提取要存储的记忆内容(简短概括),否则为空字符串"
}}
只输出JSON不要输出其他内容"""
success, response, _, _ = await llm_api.generate_with_model(
analysis_prompt,
model_config=model_config.model_task_config.utils,
request_type="memory.analyze_qa",
)
if not success:
logger.error(f"分析问题和答案失败: {response}")
return
# 解析JSON响应
try:
json_pattern = r"```json\s*(.*?)\s*```"
matches = re.findall(json_pattern, response, re.DOTALL)
if matches:
json_str = matches[0]
else:
json_str = response.strip()
repaired_json = repair_json(json_str)
analysis_result = json.loads(repaired_json)
category = analysis_result.get("category", "").strip()
if category == "黑话":
# 处理黑话
jargon_keyword = analysis_result.get("jargon_keyword", "").strip()
if jargon_keyword:
from src.jargon.jargon_miner import store_jargon_from_answer
await store_jargon_from_answer(jargon_keyword, answer, chat_id)
else:
logger.warning(f"分析为黑话但未提取到关键词,问题: {question[:50]}...")
elif category == "人物信息":
# 处理人物信息
person_name = analysis_result.get("person_name", "").strip()
memory_content = analysis_result.get("memory_content", "").strip()
if person_name and memory_content:
from src.person_info.person_info import store_person_memory_from_answer
await store_person_memory_from_answer(person_name, memory_content, chat_id)
else:
logger.warning(f"分析为人物信息但未提取到人物名称或记忆内容,问题: {question[:50]}...")
else:
logger.info(f"问题和答案类别为'其他',不进行存储,问题: {question[:50]}...")
except Exception as e:
logger.error(f"解析分析结果失败: {e}, 响应: {response[:200]}...")
except Exception as e:
logger.error(f"分析问题和答案时发生异常: {e}")
def _store_thinking_back(
chat_id: str,
question: str,
@ -937,7 +1024,7 @@ async def _process_single_question(
found_answer, answer, thinking_steps, is_timeout = await _react_agent_solve_question(
question=question,
chat_id=chat_id,
max_iterations=5,
max_iterations=global_config.memory.max_agent_iterations,
timeout=120.0,
initial_info=initial_info
)
@ -956,6 +1043,8 @@ async def _process_single_question(
logger.info(f"ReAct Agent超时不存储到数据库问题: {question[:50]}...")
if found_answer and answer:
# 创建异步任务分析问题和答案
asyncio.create_task(_analyze_question_answer(question, answer, chat_id))
return f"问题:{question}\n答案:{answer}"
return None
@ -1067,8 +1156,9 @@ async def build_memory_retrieval_prompt(
logger.info(f"解析到 {len(questions)} 个问题: {questions}")
# 第二步并行处理所有问题固定使用5次迭代/120秒超时
logger.info(f"问题数量: {len(questions)},固定设置最大迭代次数: 5超时时间: 120秒")
# 第二步:并行处理所有问题(使用配置的最大迭代次数/120秒超时
max_iterations = global_config.memory.max_agent_iterations
logger.info(f"问题数量: {len(questions)},设置最大迭代次数: {max_iterations},超时时间: 120秒")
# 并行处理所有问题,将概念检索结果作为初始信息传递
question_tasks = [

View File

@ -13,6 +13,7 @@ from src.common.database.database import db
from src.common.database.database_model import PersonInfo
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.chat.message_receive.chat_stream import get_chat_manager
logger = get_logger("person_info")
@ -722,3 +723,74 @@ class PersonInfoManager:
person_info_manager = PersonInfoManager()
async def store_person_memory_from_answer(person_name: str, memory_content: str, chat_id: str) -> None:
"""将人物信息存入person_info的memory_points
Args:
person_name: 人物名称
memory_content: 记忆内容
chat_id: 聊天ID
"""
try:
# 从chat_id获取chat_stream
chat_stream = get_chat_manager().get_stream(chat_id)
if not chat_stream:
logger.warning(f"无法获取chat_stream for chat_id: {chat_id}")
return
platform = chat_stream.platform
# 尝试从person_name查找person_id
# 首先尝试通过person_name查找
person_id = get_person_id_by_person_name(person_name)
if not person_id:
# 如果通过person_name找不到尝试从chat_stream获取user_info
if chat_stream.user_info:
user_id = chat_stream.user_info.user_id
person_id = get_person_id(platform, user_id)
else:
logger.warning(f"无法确定person_id for person_name: {person_name}, chat_id: {chat_id}")
return
# 创建或获取Person对象
person = Person(person_id=person_id)
if not person.is_known:
logger.warning(f"用户 {person_name} (person_id: {person_id}) 尚未认识,无法存储记忆")
return
# 确定记忆分类可以根据memory_content判断这里使用通用分类
category = "其他" # 默认分类,可以根据需要调整
# 记忆点格式category:content:weight
weight = "1.0" # 默认权重
memory_point = f"{category}:{memory_content}:{weight}"
# 添加到memory_points
if not person.memory_points:
person.memory_points = []
# 检查是否已存在相似的记忆点(避免重复)
is_duplicate = False
for existing_point in person.memory_points:
if existing_point and isinstance(existing_point, str):
parts = existing_point.split(":", 2)
if len(parts) >= 2:
existing_content = parts[1].strip()
# 简单相似度检查(如果内容相同或非常相似,则跳过)
if existing_content == memory_content or memory_content in existing_content or existing_content in memory_content:
is_duplicate = True
break
if not is_duplicate:
person.memory_points.append(memory_point)
person.sync_to_database()
logger.info(f"成功添加记忆点到 {person_name} (person_id: {person_id}): {memory_point}")
else:
logger.debug(f"记忆点已存在,跳过: {memory_point}")
except Exception as e:
logger.error(f"存储人物记忆失败: {e}")