MaiBot/scripts/analyze_evaluation_stats.py

323 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
评估结果统计脚本
功能:
1. 扫描temp目录下所有JSON文件
2. 分析每个文件的统计信息
3. 输出详细的统计报告
"""
import json
import os
import sys
import glob
from collections import Counter
from datetime import datetime
from typing import Dict, List, Set, Tuple
# 添加项目根目录到路径
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, project_root)
from src.common.logger import get_logger
logger = get_logger("evaluation_stats_analyzer")
# 评估结果文件路径
TEMP_DIR = os.path.join(os.path.dirname(__file__), "temp")
def parse_datetime(dt_str: str) -> datetime | None:
"""解析ISO格式的日期时间字符串"""
try:
return datetime.fromisoformat(dt_str)
except Exception:
return None
def analyze_single_file(file_path: str) -> Dict:
"""
分析单个JSON文件的统计信息
Args:
file_path: JSON文件路径
Returns:
统计信息字典
"""
file_name = os.path.basename(file_path)
stats = {
"file_name": file_name,
"file_path": file_path,
"file_size": os.path.getsize(file_path),
"error": None,
"last_updated": None,
"total_count": 0,
"actual_count": 0,
"suitable_count": 0,
"unsuitable_count": 0,
"suitable_rate": 0.0,
"unique_pairs": 0,
"evaluators": Counter(),
"evaluation_dates": [],
"date_range": None,
"has_expression_id": False,
"has_reason": False,
"reason_count": 0,
}
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 基本信息
stats["last_updated"] = data.get("last_updated")
stats["total_count"] = data.get("total_count", 0)
results = data.get("manual_results", [])
stats["actual_count"] = len(results)
if not results:
return stats
# 统计通过/不通过
suitable_count = sum(1 for r in results if r.get("suitable") is True)
unsuitable_count = sum(1 for r in results if r.get("suitable") is False)
stats["suitable_count"] = suitable_count
stats["unsuitable_count"] = unsuitable_count
stats["suitable_rate"] = (suitable_count / len(results) * 100) if results else 0.0
# 统计唯一的(situation, style)对
pairs: Set[Tuple[str, str]] = set()
for r in results:
if "situation" in r and "style" in r:
pairs.add((r["situation"], r["style"]))
stats["unique_pairs"] = len(pairs)
# 统计评估者
for r in results:
evaluator = r.get("evaluator", "unknown")
stats["evaluators"][evaluator] += 1
# 统计评估时间
evaluation_dates = []
for r in results:
evaluated_at = r.get("evaluated_at")
if evaluated_at:
dt = parse_datetime(evaluated_at)
if dt:
evaluation_dates.append(dt)
stats["evaluation_dates"] = evaluation_dates
if evaluation_dates:
min_date = min(evaluation_dates)
max_date = max(evaluation_dates)
stats["date_range"] = {
"start": min_date.isoformat(),
"end": max_date.isoformat(),
"duration_days": (max_date - min_date).days + 1
}
# 检查字段存在性
stats["has_expression_id"] = any("expression_id" in r for r in results)
stats["has_reason"] = any(r.get("reason") for r in results)
stats["reason_count"] = sum(1 for r in results if r.get("reason"))
except Exception as e:
stats["error"] = str(e)
logger.error(f"分析文件 {file_name} 时出错: {e}")
return stats
def print_file_stats(stats: Dict, index: int = None):
"""打印单个文件的统计信息"""
prefix = f"[{index}] " if index is not None else ""
print(f"\n{'=' * 80}")
print(f"{prefix}文件: {stats['file_name']}")
print(f"{'=' * 80}")
if stats["error"]:
print(f"✗ 错误: {stats['error']}")
return
print(f"文件路径: {stats['file_path']}")
print(f"文件大小: {stats['file_size']:,} 字节 ({stats['file_size'] / 1024:.2f} KB)")
if stats["last_updated"]:
print(f"最后更新: {stats['last_updated']}")
print("\n【记录统计】")
print(f" 文件中的 total_count: {stats['total_count']}")
print(f" 实际记录数: {stats['actual_count']}")
if stats['total_count'] != stats['actual_count']:
diff = stats['total_count'] - stats['actual_count']
print(f" ⚠️ 数量不一致,差值: {diff:+d}")
print("\n【评估结果统计】")
print(f" 通过 (suitable=True): {stats['suitable_count']} 条 ({stats['suitable_rate']:.2f}%)")
print(f" 不通过 (suitable=False): {stats['unsuitable_count']} 条 ({100 - stats['suitable_rate']:.2f}%)")
print("\n【唯一性统计】")
print(f" 唯一 (situation, style) 对: {stats['unique_pairs']}")
if stats['actual_count'] > 0:
duplicate_count = stats['actual_count'] - stats['unique_pairs']
duplicate_rate = (duplicate_count / stats['actual_count'] * 100) if stats['actual_count'] > 0 else 0
print(f" 重复记录: {duplicate_count} 条 ({duplicate_rate:.2f}%)")
print("\n【评估者统计】")
if stats['evaluators']:
for evaluator, count in stats['evaluators'].most_common():
rate = (count / stats['actual_count'] * 100) if stats['actual_count'] > 0 else 0
print(f" {evaluator}: {count} 条 ({rate:.2f}%)")
else:
print(" 无评估者信息")
print("\n【时间统计】")
if stats['date_range']:
print(f" 最早评估时间: {stats['date_range']['start']}")
print(f" 最晚评估时间: {stats['date_range']['end']}")
print(f" 评估时间跨度: {stats['date_range']['duration_days']}")
else:
print(" 无时间信息")
print("\n【字段统计】")
print(f" 包含 expression_id: {'' if stats['has_expression_id'] else ''}")
print(f" 包含 reason: {'' if stats['has_reason'] else ''}")
if stats['has_reason']:
rate = (stats['reason_count'] / stats['actual_count'] * 100) if stats['actual_count'] > 0 else 0
print(f" 有理由的记录: {stats['reason_count']} 条 ({rate:.2f}%)")
def print_summary(all_stats: List[Dict]):
"""打印汇总统计信息"""
print(f"\n{'=' * 80}")
print("汇总统计")
print(f"{'=' * 80}")
total_files = len(all_stats)
valid_files = [s for s in all_stats if not s.get("error")]
error_files = [s for s in all_stats if s.get("error")]
print("\n【文件统计】")
print(f" 总文件数: {total_files}")
print(f" 成功解析: {len(valid_files)}")
print(f" 解析失败: {len(error_files)}")
if error_files:
print("\n 失败文件列表:")
for stats in error_files:
print(f" - {stats['file_name']}: {stats['error']}")
if not valid_files:
print("\n没有成功解析的文件")
return
# 汇总记录统计
total_records = sum(s['actual_count'] for s in valid_files)
total_suitable = sum(s['suitable_count'] for s in valid_files)
total_unsuitable = sum(s['unsuitable_count'] for s in valid_files)
total_unique_pairs = set()
# 收集所有唯一的(situation, style)对
for stats in valid_files:
try:
with open(stats['file_path'], "r", encoding="utf-8") as f:
data = json.load(f)
results = data.get("manual_results", [])
for r in results:
if "situation" in r and "style" in r:
total_unique_pairs.add((r["situation"], r["style"]))
except Exception:
pass
print("\n【记录汇总】")
print(f" 总记录数: {total_records:,}")
print(f" 通过: {total_suitable:,} 条 ({total_suitable / total_records * 100:.2f}%)" if total_records > 0 else " 通过: 0 条")
print(f" 不通过: {total_unsuitable:,} 条 ({total_unsuitable / total_records * 100:.2f}%)" if total_records > 0 else " 不通过: 0 条")
print(f" 唯一 (situation, style) 对: {len(total_unique_pairs):,}")
if total_records > 0:
duplicate_count = total_records - len(total_unique_pairs)
duplicate_rate = (duplicate_count / total_records * 100) if total_records > 0 else 0
print(f" 重复记录: {duplicate_count:,} 条 ({duplicate_rate:.2f}%)")
# 汇总评估者统计
all_evaluators = Counter()
for stats in valid_files:
all_evaluators.update(stats['evaluators'])
print("\n【评估者汇总】")
if all_evaluators:
for evaluator, count in all_evaluators.most_common():
rate = (count / total_records * 100) if total_records > 0 else 0
print(f" {evaluator}: {count:,} 条 ({rate:.2f}%)")
else:
print(" 无评估者信息")
# 汇总时间范围
all_dates = []
for stats in valid_files:
all_dates.extend(stats['evaluation_dates'])
if all_dates:
min_date = min(all_dates)
max_date = max(all_dates)
print("\n【时间汇总】")
print(f" 最早评估时间: {min_date.isoformat()}")
print(f" 最晚评估时间: {max_date.isoformat()}")
print(f" 总时间跨度: {(max_date - min_date).days + 1}")
# 文件大小汇总
total_size = sum(s['file_size'] for s in valid_files)
avg_size = total_size / len(valid_files) if valid_files else 0
print("\n【文件大小汇总】")
print(f" 总大小: {total_size:,} 字节 ({total_size / 1024 / 1024:.2f} MB)")
print(f" 平均大小: {avg_size:,.0f} 字节 ({avg_size / 1024:.2f} KB)")
def main():
"""主函数"""
logger.info("=" * 80)
logger.info("开始分析评估结果统计信息")
logger.info("=" * 80)
if not os.path.exists(TEMP_DIR):
print(f"\n✗ 错误未找到temp目录: {TEMP_DIR}")
logger.error(f"未找到temp目录: {TEMP_DIR}")
return
# 查找所有JSON文件
json_files = glob.glob(os.path.join(TEMP_DIR, "*.json"))
if not json_files:
print(f"\n✗ 错误temp目录下未找到JSON文件: {TEMP_DIR}")
logger.error(f"temp目录下未找到JSON文件: {TEMP_DIR}")
return
json_files.sort() # 按文件名排序
print(f"\n找到 {len(json_files)} 个JSON文件")
print("=" * 80)
# 分析每个文件
all_stats = []
for i, json_file in enumerate(json_files, 1):
stats = analyze_single_file(json_file)
all_stats.append(stats)
print_file_stats(stats, index=i)
# 打印汇总统计
print_summary(all_stats)
print(f"\n{'=' * 80}")
print("分析完成")
print(f"{'=' * 80}")
if __name__ == "__main__":
main()