feat:修复部分破损json的解析问题

pull/1385/head
SengokuCola 2025-11-26 13:11:34 +08:00
parent 644d470558
commit ac51f0c41d
4 changed files with 111 additions and 14 deletions

View File

@ -13,7 +13,7 @@ from typing import Optional, Tuple, List, Any
from PIL import Image
from rich.traceback import install
from src.common.database.database_model import Emoji
from src.common.database.database_model import Emoji, EmojiDescriptionCache
from src.common.database.database import db as peewee_db
from src.common.logger import get_logger
from src.config.config import global_config, model_config
@ -398,6 +398,7 @@ class EmojiManager:
raise RuntimeError("数据库连接失败")
_ensure_emoji_dir()
Emoji.create_table(safe=True) # Ensures table exists
EmojiDescriptionCache.create_table(safe=True)
self._initialized = True
def _ensure_db(self) -> None:
@ -918,17 +919,15 @@ class EmojiManager:
image_hash = hashlib.md5(image_bytes).hexdigest()
image_format = Image.open(io.BytesIO(image_bytes)).format.lower() # type: ignore
# 尝试从Images表获取已有的详细描述可能在收到表情包时已生成
# 尝试从 EmojiDescriptionCache 表获取已有的详细描述
existing_description = None
try:
from src.common.database.database_model import Images
existing_image = Images.get_or_none((Images.emoji_hash == image_hash) & (Images.type == "emoji"))
if existing_image and existing_image.description:
existing_description = existing_image.description
logger.info(f"[复用描述] 找到已有详细描述: {existing_description[:50]}...")
cache_record = EmojiDescriptionCache.get_or_none(EmojiDescriptionCache.emoji_hash == image_hash)
if cache_record and cache_record.description:
existing_description = cache_record.description
logger.info(f"[复用描述] 表情描述缓存命中: {existing_description[:50]}...")
except Exception as e:
logger.debug(f"查询已有描述时出错: {e}")
logger.debug(f"查询表情描述缓存时出错: {e}")
# 第一步VLM视觉分析如果没有已有描述才调用
if existing_description:
@ -950,6 +949,21 @@ class EmojiManager:
prompt, image_base64, image_format, temperature=0.5
)
# 若是新生成的描述,写入缓存表(此时还没有情感标签,稍后会更新)
if not existing_description:
try:
cache_record, created = EmojiDescriptionCache.get_or_create(
emoji_hash=image_hash,
defaults={"description": description, "timestamp": time.time()},
)
if not created:
# 更新描述,但保留已有的情感标签(如果有)
cache_record.description = description
cache_record.timestamp = time.time()
cache_record.save()
except Exception as cache_error:
logger.debug(f"写入表情描述缓存失败: {cache_error}")
# 审核表情包
if global_config.emoji.content_filtration:
prompt = f'''
@ -989,6 +1003,30 @@ class EmojiManager:
logger.info(f"[注册分析] 详细描述: {description[:50]}... -> 情感标签: {emotions}")
# 将情感标签列表转换为逗号分隔的字符串
emotion_tags_str = ",".join(emotions)
# 更新EmojiDescriptionCache保存情感标签
try:
cache_record = EmojiDescriptionCache.get_or_none(EmojiDescriptionCache.emoji_hash == image_hash)
if cache_record:
# 更新已有记录的情感标签
cache_record.emotion_tags = emotion_tags_str
cache_record.timestamp = time.time()
cache_record.save()
logger.info(f"[缓存更新] 表情包情感标签已更新到EmojiDescriptionCache: {image_hash[:8]}...")
else:
# 如果缓存不存在,创建新记录(包含描述和情感标签)
EmojiDescriptionCache.create(
emoji_hash=image_hash,
description=description,
emotion_tags=emotion_tags_str,
timestamp=time.time(),
)
logger.info(f"[缓存创建] 表情包描述和情感标签已保存到EmojiDescriptionCache: {image_hash[:8]}...")
except Exception as cache_error:
logger.debug(f"更新表情包情感标签缓存失败: {cache_error}")
return f"[表情包:{description}]", emotions
except Exception as e:

View File

@ -852,7 +852,7 @@ class DefaultReplyer:
memory_retrieval: str = results_dict["memory_retrieval"]
keywords_reaction_prompt = await self.build_keywords_reaction_prompt(target)
mood_state_prompt: str = results_dict["mood_state_prompt"]
jargon_explanation: Optional[str] = results_dict.get("jargon_explanation")
jargon_explanation: str = results_dict.get("jargon_explanation") or ""
# 从 chosen_actions 中提取 planner 的整体思考理由
planner_reasoning = ""

View File

@ -773,7 +773,7 @@ class PrivateReplyer:
mood_state_prompt: str = results_dict["mood_state_prompt"]
memory_retrieval: str = results_dict["memory_retrieval"]
keywords_reaction_prompt = await self.build_keywords_reaction_prompt(target)
jargon_explanation: Optional[str] = results_dict.get("jargon_explanation")
jargon_explanation: str = results_dict.get("jargon_explanation") or ""
# 从 chosen_actions 中提取 planner 的整体思考理由
planner_reasoning = ""

View File

@ -1,6 +1,7 @@
import time
import json
import os
import re
from typing import List, Optional, Tuple
import traceback
from src.common.logger import get_logger
@ -225,6 +226,19 @@ class ExpressionLearner:
match_responses = []
try:
response = response.strip()
# 尝试提取JSON代码块如果存在
json_pattern = r"```json\s*(.*?)\s*```"
matches = re.findall(json_pattern, response, re.DOTALL)
if matches:
response = matches[0].strip()
# 移除可能的markdown代码块标记如果没有找到```json但可能有```
if not matches:
response = re.sub(r"^```\s*", "", response, flags=re.MULTILINE)
response = re.sub(r"```\s*$", "", response, flags=re.MULTILINE)
response = response.strip()
# 检查是否已经是标准JSON数组格式
if response.startswith("[") and response.endswith("]"):
match_responses = json.loads(response)
@ -280,15 +294,60 @@ class ExpressionLearner:
logger.error(f"match_responses 不是列表或字典类型: {type(match_responses)}, 内容: {match_responses}")
return []
# 清理和规范化 match_responses 中的元素
normalized_responses = []
for item in match_responses:
if isinstance(item, dict):
# 已经是字典,直接添加
normalized_responses.append(item)
elif isinstance(item, str):
# 如果是字符串,尝试解析为 JSON
try:
parsed = json.loads(item)
if isinstance(parsed, dict):
normalized_responses.append(parsed)
elif isinstance(parsed, list):
# 如果是列表,递归处理
for sub_item in parsed:
if isinstance(sub_item, dict):
normalized_responses.append(sub_item)
else:
logger.debug(f"跳过非字典类型的子元素: {type(sub_item)}, 内容: {sub_item}")
else:
logger.debug(f"跳过无法转换为字典的字符串元素: {item}")
except (json.JSONDecodeError, TypeError):
logger.debug(f"跳过无法解析为JSON的字符串元素: {item}")
elif isinstance(item, list):
# 如果是列表,展开并处理其中的字典
for sub_item in item:
if isinstance(sub_item, dict):
normalized_responses.append(sub_item)
elif isinstance(sub_item, str):
# 尝试解析字符串
try:
parsed = json.loads(sub_item)
if isinstance(parsed, dict):
normalized_responses.append(parsed)
else:
logger.debug(f"跳过非字典类型的解析结果: {type(parsed)}, 内容: {parsed}")
except (json.JSONDecodeError, TypeError):
logger.debug(f"跳过无法解析为JSON的字符串子元素: {sub_item}")
else:
logger.debug(f"跳过非字典类型的列表元素: {type(sub_item)}, 内容: {sub_item}")
else:
logger.debug(f"跳过无法处理的元素类型: {type(item)}, 内容: {item}")
match_responses = normalized_responses
matched_expressions = []
used_pair_indices = set() # 用于跟踪已经使用的expression_pair索引
logger.debug(f"match_responses 类型: {type(match_responses)}, 长度: {len(match_responses)}")
logger.debug(f"match_responses 内容: {match_responses}")
logger.debug(f"规范化后的 match_responses 类型: {type(match_responses)}, 长度: {len(match_responses)}")
logger.debug(f"规范化后的 match_responses 内容: {match_responses}")
for match_response in match_responses:
try:
# 检查 match_response 的类型
# 检查 match_response 的类型(此时应该都是字典)
if not isinstance(match_response, dict):
logger.error(f"match_response 不是字典类型: {type(match_response)}, 内容: {match_response}")
continue