From a4285673aabd6e17bebf390abaeb3f88989e282e Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Thu, 11 Sep 2025 14:25:02 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E6=94=B9=E4=B8=BA=E5=8D=95planner?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E8=A7=A3=E6=9E=90=E5=A4=9A=E4=B8=AA=E5=8A=A8?= =?UTF-8?q?=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/test_interest_embedding.py | 613 ++++++++++ .../talk_frequency_control.py | 146 +++ src/chat/heart_flow/heartFC_chat.py | 383 +++---- src/chat/planner_actions/planner.py | 1000 ++++++----------- src/config/config.py | 2 +- src/llm_models/model_client/openai_client.py | 24 +- src/llm_models/utils_model.py | 2 + 7 files changed, 1284 insertions(+), 886 deletions(-) create mode 100644 scripts/test_interest_embedding.py diff --git a/scripts/test_interest_embedding.py b/scripts/test_interest_embedding.py new file mode 100644 index 00000000..f5fcd195 --- /dev/null +++ b/scripts/test_interest_embedding.py @@ -0,0 +1,613 @@ +#!/usr/bin/env python3 +""" +基于Embedding的兴趣度计算测试脚本 +使用MaiBot-Core的EmbeddingStore计算兴趣描述与目标文本的关联度 +""" + +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from typing import List, Dict, Tuple, Optional +import time +import json +import asyncio +from src.chat.knowledge.embedding_store import EmbeddingStore, cosine_similarity +from src.chat.knowledge.embedding_store import EMBEDDING_DATA_DIR_STR +from src.llm_models.utils_model import LLMRequest +from src.config.config import model_config + + +class InterestScorer: + """基于Embedding的兴趣度计算器""" + + def __init__(self, namespace: str = "interest_test"): + """初始化兴趣度计算器""" + self.embedding_store = EmbeddingStore(namespace, EMBEDDING_DATA_DIR_STR) + + async def get_embedding(self, text: str) -> Tuple[Optional[List[float]], float]: + """获取文本的嵌入向量""" + start_time = time.time() + try: + # 直接使用异步方式获取嵌入 + from src.llm_models.utils_model import LLMRequest + from src.config.config import model_config + + llm = LLMRequest(model_set=model_config.model_task_config.embedding, request_type="embedding") + embedding, _ = await llm.get_embedding(text) + + end_time = time.time() + elapsed = end_time - start_time + + if embedding and len(embedding) > 0: + return embedding, elapsed + return None, elapsed + except Exception as e: + print(f"获取嵌入向量失败: {e}") + return None, 0.0 + + async def calculate_similarity(self, text1: str, text2: str) -> Tuple[float, float, float]: + """计算两段文本的余弦相似度,返回(相似度, 文本1耗时, 文本2耗时)""" + emb1, time1 = await self.get_embedding(text1) + emb2, time2 = await self.get_embedding(text2) + + if emb1 is None or emb2 is None: + return 0.0, time1, time2 + + return cosine_similarity(emb1, emb2), time1, time2 + + async def calculate_interest_score(self, interest_text: str, target_text: str) -> Dict: + """ + 计算兴趣度分数 + + Args: + interest_text: 兴趣描述文本 + target_text: 目标文本 + + Returns: + 包含各种分数的字典 + """ + # 只计算语义相似度(嵌入分数) + semantic_score, interest_time, target_time = await self.calculate_similarity(interest_text, target_text) + + # 直接使用语义相似度作为最终分数 + final_score = semantic_score + + return { + "final_score": final_score, + "semantic_score": semantic_score, + "timing": { + "interest_embedding_time": interest_time, + "target_embedding_time": target_time, + "total_time": interest_time + target_time + } + } + + async def batch_calculate(self, interest_text: str, target_texts: List[str]) -> List[Dict]: + """批量计算兴趣度""" + results = [] + total_start_time = time.time() + + print(f"开始批量计算兴趣度...") + print(f"兴趣文本: {interest_text}") + print(f"目标文本数量: {len(target_texts)}") + + # 获取兴趣文本的嵌入向量(只需要一次) + interest_embedding, interest_time = await self.get_embedding(interest_text) + if interest_embedding is None: + print("无法获取兴趣文本的嵌入向量") + return [] + + print(f"兴趣文本嵌入计算耗时: {interest_time:.3f}秒") + + total_target_time = 0.0 + + for i, target_text in enumerate(target_texts): + print(f"处理第 {i+1}/{len(target_texts)} 个文本...") + + # 获取目标文本的嵌入向量 + target_embedding, target_time = await self.get_embedding(target_text) + total_target_time += target_time + + if target_embedding is None: + semantic_score = 0.0 + else: + semantic_score = cosine_similarity(interest_embedding, target_embedding) + + # 直接使用语义相似度作为最终分数 + final_score = semantic_score + + results.append({ + "target_text": target_text, + "final_score": final_score, + "semantic_score": semantic_score, + "timing": { + "target_embedding_time": target_time, + "item_total_time": target_time + } + }) + + # 按分数排序 + results.sort(key=lambda x: x["final_score"], reverse=True) + + total_time = time.time() - total_start_time + avg_target_time = total_target_time / len(target_texts) if target_texts else 0 + + print(f"\n=== 性能统计 ===") + print(f"兴趣文本嵌入计算耗时: {interest_time:.3f}秒") + print(f"目标文本嵌入计算总耗时: {total_target_time:.3f}秒") + print(f"目标文本嵌入计算平均耗时: {avg_target_time:.3f}秒") + print(f"总耗时: {total_time:.3f}秒") + print(f"平均每个目标文本处理耗时: {total_time / len(target_texts):.3f}秒") + + return results + + async def generate_paraphrases(self, original_text: str, num_sentences: int = 5) -> List[str]: + """ + 使用LLM生成近义句子 + + Args: + original_text: 原始文本 + num_sentences: 生成句子数量 + + Returns: + 近义句子列表 + """ + try: + # 创建LLM请求实例 + llm_request = LLMRequest( + model_set=model_config.model_task_config.replyer, + request_type="paraphrase_generator" + ) + + # 构建生成近义句子的提示词 + prompt = f"""请为以下兴趣描述生成{num_sentences}个意义相近但表达不同的句子: + +原始兴趣描述:{original_text} + +要求: +1. 保持原意不变,但尽量自由发挥,使用不同的表达方式,内容也可以有差异 +2. 句子结构要有所变化 +3. 可以适当调整语气和重点 +4. 每个句子都要完整且自然 +5. 只返回句子,不要编号,每行一个句子 + +生成的近义句子:""" + + print(f"正在生成近义句子...") + content, (reasoning, model_name, tool_calls) = await llm_request.generate_response_async(prompt) + + # 解析生成的句子 + sentences = [] + for line in content.strip().split('\n'): + line = line.strip() + if line and not line.startswith('生成') and not line.startswith('近义'): + sentences.append(line) + + # 确保返回指定数量的句子 + sentences = sentences[:num_sentences] + print(f"成功生成 {len(sentences)} 个近义句子") + print(f"使用的模型: {model_name}") + + return sentences + + except Exception as e: + print(f"生成近义句子失败: {e}") + return [] + + async def evaluate_all_paraphrases(self, original_text: str, target_texts: List[str], num_sentences: int = 5) -> Dict: + """ + 评估原始文本和所有近义句子的兴趣度 + + Args: + original_text: 原始兴趣描述文本 + target_texts: 目标文本列表 + num_sentences: 生成近义句子数量 + + Returns: + 包含所有评估结果的字典 + """ + print(f"\n=== 开始近义句子兴趣度评估 ===") + print(f"原始兴趣描述: {original_text}") + print(f"目标文本数量: {len(target_texts)}") + print(f"生成近义句子数量: {num_sentences}") + + # 生成近义句子 + paraphrases = await self.generate_paraphrases(original_text, num_sentences) + if not paraphrases: + print("生成近义句子失败,使用原始文本进行评估") + paraphrases = [] + + # 所有待评估的文本(原始文本 + 近义句子) + all_texts = [original_text] + paraphrases + + # 对每个文本进行兴趣度评估 + evaluation_results = {} + + for i, text in enumerate(all_texts): + text_type = "原始文本" if i == 0 else f"近义句子{i}" + print(f"\n--- 评估 {text_type} ---") + print(f"文本内容: {text}") + + # 计算兴趣度 + results = await self.batch_calculate(text, target_texts) + evaluation_results[text_type] = { + "text": text, + "results": results, + "top_score": results[0]["final_score"] if results else 0.0, + "average_score": sum(r["final_score"] for r in results) / len(results) if results else 0.0 + } + + return { + "original_text": original_text, + "paraphrases": paraphrases, + "evaluations": evaluation_results, + "summary": self._generate_summary(evaluation_results, target_texts) + } + + def _generate_summary(self, evaluation_results: Dict, target_texts: List[str]) -> Dict: + """生成评估摘要 - 关注目标句子的表现""" + summary = { + "best_performer": None, + "worst_performer": None, + "average_scores": {}, + "max_scores": {}, + "rankings": [], + "target_stats": {}, + "target_rankings": [] + } + + scores = [] + + for text_type, data in evaluation_results.items(): + scores.append({ + "text_type": text_type, + "text": data["text"], + "top_score": data["top_score"], + "average_score": data["average_score"] + }) + + # 按top_score排序 + scores.sort(key=lambda x: x["top_score"], reverse=True) + + summary["rankings"] = scores + summary["best_performer"] = scores[0] if scores else None + summary["worst_performer"] = scores[-1] if scores else None + + # 计算原始文本统计 + original_score = next((s for s in scores if s["text_type"] == "原始文本"), None) + if original_score: + summary["average_scores"]["original"] = original_score["average_score"] + summary["max_scores"]["original"] = original_score["top_score"] + + # 计算目标句子的统计信息 + target_stats = {} + for i, target_text in enumerate(target_texts): + target_key = f"目标{i+1}" + scores_for_target = [] + + # 收集所有兴趣描述对该目标文本的分数 + for text_type, data in evaluation_results.items(): + for result in data["results"]: + if result["target_text"] == target_text: + scores_for_target.append(result["final_score"]) + + if scores_for_target: + target_stats[target_key] = { + "target_text": target_text, + "scores": scores_for_target, + "average": sum(scores_for_target) / len(scores_for_target), + "max": max(scores_for_target), + "min": min(scores_for_target), + "std": (sum((x - sum(scores_for_target) / len(scores_for_target)) ** 2 for x in scores_for_target) / len(scores_for_target)) ** 0.5 + } + + summary["target_stats"] = target_stats + + # 按平均分对目标文本排序 + target_rankings = [] + for target_key, stats in target_stats.items(): + target_rankings.append({ + "target_key": target_key, + "target_text": stats["target_text"], + "average_score": stats["average"], + "max_score": stats["max"], + "min_score": stats["min"], + "std_score": stats["std"] + }) + + target_rankings.sort(key=lambda x: x["average_score"], reverse=True) + summary["target_rankings"] = target_rankings + + # 计算目标文本的整体统计 + if target_rankings: + all_target_averages = [t["average_score"] for t in target_rankings] + all_target_scores = [] + for stats in target_stats.values(): + all_target_scores.extend(stats["scores"]) + + summary["target_overall"] = { + "avg_of_averages": sum(all_target_averages) / len(all_target_averages), + "overall_max": max(all_target_scores), + "overall_min": min(all_target_scores), + "best_target": target_rankings[0]["target_text"], + "worst_target": target_rankings[-1]["target_text"] + } + + return summary + + +async def run_single_test(): + """运行单个测试""" + print("单个兴趣度测试") + print("=" * 40) + + # 输入兴趣文本 + # interest_text = input("请输入兴趣描述文本: ").strip() + # if not interest_text: + # print("兴趣描述不能为空") + # return + + interest_text ="对技术相关话题,游戏和动漫相关话题感兴趣,也对日常话题感兴趣,不喜欢太过沉重严肃的话题" + + # 输入目标文本 + print("请输入目标文本 (输入空行结束):") + import random + target_texts = [ + "AveMujica非常好看,你看了吗", + "明日方舟这个游戏挺好玩的", + "你能不能说点正经的", + "明日方舟挺好玩的", + "你的名字非常好看,你看了吗", + "《你的名字》非常好看,你看了吗", + "我们来聊聊苏联政治吧", + "轻音少女非常好看,你看了吗", + "我还挺喜欢打游戏的", + "我嘞个原神玩家啊", + "我心买了PlayStation5", + "直接Steam", + "有没有R" + ] + random.shuffle(target_texts) + # while True: + # line = input().strip() + # if not line: + # break + # target_texts.append(line) + + # if not target_texts: + # print("目标文本不能为空") + # return + + # 计算兴趣度 + scorer = InterestScorer() + results = await scorer.batch_calculate(interest_text, target_texts) + + # 显示结果 + print(f"\n兴趣度排序结果:") + print("-" * 80) + print(f"{'排名':<4} {'最终分数':<10} {'语义分数':<10} {'耗时(秒)':<10} {'目标文本'}") + print("-" * 80) + + for j, result in enumerate(results): + target_text = result['target_text'] + if len(target_text) > 40: + target_text = target_text[:37] + "..." + + timing = result.get('timing', {}) + item_time = timing.get('item_total_time', 0.0) + + print(f"{j+1:<4} {result['final_score']:<10.3f} {result['semantic_score']:<10.3f} " + f"{item_time:<10.3f} {target_text}") + + +async def run_paraphrase_test(): + """运行近义句子测试""" + print("近义句子兴趣度对比测试") + print("=" * 40) + + # 输入兴趣文本 + interest_text = "对技术相关话题,游戏和动漫相关话题感兴趣,比如明日方舟和原神,也对日常话题感兴趣,不喜欢太过沉重严肃的话题" + + # 输入目标文本 + print("请输入目标文本 (输入空行结束):") + # target_texts = [] + # while True: + # line = input().strip() + # if not line: + # break + # target_texts.append(line) + target_texts = [ + "AveMujica非常好看,你看了吗", + "明日方舟这个游戏挺好玩的", + "你能不能说点正经的", + "明日方舟挺好玩的", + "你的名字非常好看,你看了吗", + "《你的名字》非常好看,你看了吗", + "我们来聊聊苏联政治吧", + "轻音少女非常好看,你看了吗", + "我还挺喜欢打游戏的", + "刚加好友就视奸空间14条", + "可乐老大加我好友,我先日一遍空间", + "鸟一茬茬的", + "可乐可以是m,群友可以是s" + ] + + if not target_texts: + print("目标文本不能为空") + return + + # 创建评估器 + scorer = InterestScorer() + + # 运行评估 + result = await scorer.evaluate_all_paraphrases(interest_text, target_texts, num_sentences=5) + + # 显示结果 + display_paraphrase_results(result, target_texts) + + +def display_paraphrase_results(result: Dict, target_texts: List[str]): + """显示近义句子评估结果""" + print("\n" + "=" * 80) + print("近义句子兴趣度评估结果") + print("=" * 80) + + # 显示目标文本 + print(f"\n📋 目标文本列表:") + print("-" * 40) + for i, target in enumerate(target_texts): + print(f"{i+1}. {target}") + + # 显示生成的近义句子 + print(f"\n📝 生成的近义句子 (作为兴趣描述):") + print("-" * 40) + for i, paraphrase in enumerate(result["paraphrases"]): + print(f"{i+1}. {paraphrase}") + + # 显示摘要 + summary = result["summary"] + print(f"\n📊 评估摘要:") + print("-" * 40) + + if summary["best_performer"]: + print(f"最佳表现: {summary['best_performer']['text_type']} (最高分: {summary['best_performer']['top_score']:.3f})") + + if summary["worst_performer"]: + print(f"最差表现: {summary['worst_performer']['text_type']} (最高分: {summary['worst_performer']['top_score']:.3f})") + + print(f"原始文本平均分: {summary['average_scores'].get('original', 0):.3f}") + + # 显示目标文本的整体统计 + if "target_overall" in summary: + overall = summary["target_overall"] + print(f"\n📈 目标文本整体统计:") + print("-" * 40) + print(f"目标文本数量: {len(summary['target_rankings'])}") + print(f"平均分的平均值: {overall['avg_of_averages']:.3f}") + print(f"所有匹配中的最高分: {overall['overall_max']:.3f}") + print(f"所有匹配中的最低分: {overall['overall_min']:.3f}") + print(f"最佳匹配目标: {overall['best_target'][:50]}...") + print(f"最差匹配目标: {overall['worst_target'][:50]}...") + + # 显示目标文本排名 + if "target_rankings" in summary and summary["target_rankings"]: + print(f"\n🏆 目标文本排名 (按平均分):") + print("-" * 80) + print(f"{'排名':<4} {'平均分':<8} {'最高分':<8} {'最低分':<8} {'标准差':<8} {'目标文本'}") + print("-" * 80) + + for i, target in enumerate(summary["target_rankings"]): + target_text = target["target_text"][:40] + "..." if len(target["target_text"]) > 40 else target["target_text"] + print(f"{i+1:<4} {target['average_score']:<8.3f} {target['max_score']:<8.3f} {target['min_score']:<8.3f} {target['std_score']:<8.3f} {target_text}") + + # 显示每个目标文本的详细分数分布 + if "target_stats" in summary: + print(f"\n📊 目标文本详细分数分布:") + print("-" * 80) + + for target_key, stats in summary["target_stats"].items(): + print(f"\n{target_key}: {stats['target_text']}") + print(f" 平均分: {stats['average']:.3f}") + print(f" 最高分: {stats['max']:.3f}") + print(f" 最低分: {stats['min']:.3f}") + print(f" 标准差: {stats['std']:.3f}") + print(f" 所有分数: {[f'{s:.3f}' for s in stats['scores']]}") + + # 显示最佳和最差兴趣描述的目标表现对比 + if summary["best_performer"] and summary["worst_performer"]: + print(f"\n🔍 最佳 vs 最差兴趣描述对比:") + print("-" * 80) + + best_data = result["evaluations"][summary["best_performer"]["text_type"]] + worst_data = result["evaluations"][summary["worst_performer"]["text_type"]] + + print(f"最佳兴趣描述: {summary['best_performer']['text']}") + print(f"最差兴趣描述: {summary['worst_performer']['text']}") + print(f"") + print(f"{'目标文本':<30} {'最佳分数':<10} {'最差分数':<10} {'差值'}") + print("-" * 60) + + for best_result, worst_result in zip(best_data["results"], worst_data["results"]): + if best_result["target_text"] == worst_result["target_text"]: + diff = best_result["final_score"] - worst_result["final_score"] + target_text = best_result["target_text"][:27] + "..." if len(best_result["target_text"]) > 30 else best_result["target_text"] + print(f"{target_text:<30} {best_result['final_score']:<10.3f} {worst_result['final_score']:<10.3f} {diff:+.3f}") + + # 显示排名 + print(f"\n🏆 兴趣描述性能排名:") + print("-" * 80) + print(f"{'排名':<4} {'文本类型':<10} {'最高分':<8} {'平均分':<8} {'兴趣描述内容'}") + print("-" * 80) + + for i, item in enumerate(summary["rankings"]): + text_content = item["text"][:40] + "..." if len(item["text"]) > 40 else item["text"] + print(f"{i+1:<4} {item['text_type']:<10} {item['top_score']:<8.3f} {item['average_score']:<8.3f} {text_content}") + + # 显示每个兴趣描述的详细结果 + print(f"\n🔍 详细结果:") + print("-" * 80) + + for text_type, data in result["evaluations"].items(): + print(f"\n--- {text_type} ---") + print(f"兴趣描述: {data['text']}") + print(f"最高分: {data['top_score']:.3f}") + print(f"平均分: {data['average_score']:.3f}") + + # 显示前3个匹配结果 + top_results = data["results"][:3] + print(f"前3个匹配的目标文本:") + for j, result_item in enumerate(top_results): + print(f" {j+1}. 分数: {result_item['final_score']:.3f} - {result_item['target_text']}") + + # 显示对比表格 + print(f"\n📈 兴趣描述对比表格:") + print("-" * 100) + header = f"{'兴趣描述':<20}" + for i, target in enumerate(target_texts): + target_name = f"目标{i+1}" + header += f" {target_name:<12}" + print(header) + print("-" * 100) + + # 原始文本行 + original_line = f"{'原始文本':<20}" + original_data = result["evaluations"]["原始文本"]["results"] + for i in range(len(target_texts)): + if i < len(original_data): + original_line += f" {original_data[i]['final_score']:<12.3f}" + else: + original_line += f" {'-':<12}" + print(original_line) + + # 近义句子行 + for i, paraphrase in enumerate(result["paraphrases"]): + text_type = f"近义句子{i+1}" + line = f"{text_type:<20}" + paraphrase_data = result["evaluations"][text_type]["results"] + for j in range(len(target_texts)): + if j < len(paraphrase_data): + line += f" {paraphrase_data[j]['final_score']:<12.3f}" + else: + line += f" {'-':<12}" + print(line) + + +def main(): + """主函数""" + print("基于Embedding的兴趣度计算测试工具") + print("1. 单个兴趣度测试") + print("2. 近义句子兴趣度对比测试") + + choice = input("\n请选择 (1/2): ").strip() + + if choice == "1": + asyncio.run(run_single_test()) + elif choice == "2": + asyncio.run(run_paraphrase_test()) + else: + print("无效选择") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/chat/frequency_control/talk_frequency_control.py b/src/chat/frequency_control/talk_frequency_control.py index ccf7ed66..b0733bb3 100644 --- a/src/chat/frequency_control/talk_frequency_control.py +++ b/src/chat/frequency_control/talk_frequency_control.py @@ -1,6 +1,9 @@ from typing import Optional +from datetime import datetime, timedelta +import statistics from src.config.config import global_config from src.chat.frequency_control.utils import parse_stream_config_to_chat_id +from src.common.database.database_model import Messages def get_config_base_talk_frequency(chat_id: Optional[str] = None) -> float: @@ -124,3 +127,146 @@ def get_global_frequency() -> Optional[float]: return get_time_based_frequency(config_item[1:]) return None + + +def get_weekly_hourly_message_stats(chat_id: str): + """ + 计算指定聊天最近一周每个小时的消息数量和用户数量 + + Args: + chat_id: 聊天ID(对应 Messages 表的 chat_id 字段) + + Returns: + dict: 包含24个小时统计数据,格式为: + { + "0": {"message_count": [5, 8, 3, 12, 6, 9, 7], "message_std_dev": 2.1}, + "1": {"message_count": [10, 15, 8, 20, 12, 18, 14], "message_std_dev": 3.2}, + ... + } + """ + # 计算一周前的时间戳 + one_week_ago = datetime.now() - timedelta(days=7) + one_week_ago_timestamp = one_week_ago.timestamp() + + # 初始化数据结构:按小时存储每天的消息计数 + hourly_data = {} + for hour in range(24): + hourly_data[f"hour_{hour}"] = {"daily_counts": []} + + try: + # 查询指定聊天最近一周的消息 + messages = Messages.select().where( + (Messages.time >= one_week_ago_timestamp) & + (Messages.chat_id == chat_id) + ) + + # 统计每个小时的数据 + for message in messages: + # 将时间戳转换为datetime + msg_time = datetime.fromtimestamp(message.time) + hour = msg_time.hour + + # 记录每天的消息计数(按日期分组) + day_key = msg_time.strftime("%Y-%m-%d") + hour_key = f"{hour}" + + # 为该小时添加当天的消息计数 + found = False + for day_count in hourly_data[hour_key]["daily_counts"]: + if day_count["date"] == day_key: + day_count["count"] += 1 + found = True + break + + if not found: + hourly_data[hour_key]["daily_counts"].append({"date": day_key, "count": 1}) + + + except Exception as e: + # 如果查询失败,返回空的统计结果 + print(f"Error getting weekly hourly message stats for chat {chat_id}: {e}") + hourly_stats = {} + for hour in range(24): + hourly_stats[f"hour_{hour}"] = { + "message_count": [], + "message_std_dev": 0.0 + } + return hourly_stats + + # 计算每个小时的统计结果 + hourly_stats = {} + for hour in range(24): + hour_key = f"hour_{hour}" + daily_counts = [day["count"] for day in hourly_data[hour_key]["daily_counts"]] + + # 计算总消息数 + total_messages = sum(daily_counts) + + # 计算标准差 + message_std_dev = 0.0 + if len(daily_counts) > 1: + message_std_dev = statistics.stdev(daily_counts) + elif len(daily_counts) == 1: + message_std_dev = 0.0 + + # 按日期排序每日消息计数 + daily_counts_sorted = sorted(hourly_data[hour_key]["daily_counts"], key=lambda x: x["date"]) + + hourly_stats[hour_key] = { + "message_count": [day["count"] for day in daily_counts_sorted], + "message_std_dev": message_std_dev + } + + return hourly_stats + +def get_recent_15min_stats(chat_id: str): + """ + 获取最近15分钟指定聊天的消息数量和发言人数 + + Args: + chat_id: 聊天ID(对应 Messages 表的 chat_id 字段) + + Returns: + dict: 包含消息数量和发言人数,格式为: + { + "message_count": 25, + "user_count": 8, + "time_range": "2025-01-01 14:30:00 - 2025-01-01 14:45:00" + } + """ + # 计算15分钟前的时间戳 + fifteen_min_ago = datetime.now() - timedelta(minutes=15) + fifteen_min_ago_timestamp = fifteen_min_ago.timestamp() + current_time = datetime.now() + + # 初始化统计结果 + message_count = 0 + user_set = set() + + try: + # 查询最近15分钟的消息 + messages = Messages.select().where( + (Messages.time >= fifteen_min_ago_timestamp) & + (Messages.chat_id == chat_id) + ) + + # 统计消息数量和用户 + for message in messages: + message_count += 1 + if message.user_id: + user_set.add(message.user_id) + + except Exception as e: + # 如果查询失败,返回空结果 + print(f"Error getting recent 15min stats for chat {chat_id}: {e}") + return { + "message_count": 0, + "user_count": 0, + "time_range": f"{fifteen_min_ago.strftime('%Y-%m-%d %H:%M:%S')} - {current_time.strftime('%Y-%m-%d %H:%M:%S')}" + } + + return { + "message_count": message_count, + "user_count": len(user_set), + "time_range": f"{fifteen_min_ago.strftime('%Y-%m-%d %H:%M:%S')} - {current_time.strftime('%Y-%m-%d %H:%M:%S')}" + } diff --git a/src/chat/heart_flow/heartFC_chat.py b/src/chat/heart_flow/heartFC_chat.py index dda509f0..56db0d9a 100644 --- a/src/chat/heart_flow/heartFC_chat.py +++ b/src/chat/heart_flow/heartFC_chat.py @@ -18,7 +18,6 @@ from src.chat.planner_actions.action_modifier import ActionModifier from src.chat.planner_actions.action_manager import ActionManager from src.chat.heart_flow.hfc_utils import CycleDetail from src.chat.heart_flow.hfc_utils import send_typing, stop_typing -from src.chat.frequency_control.frequency_control import frequency_control_manager from src.chat.express.expression_learner import expression_learner_manager from src.person_info.person_info import Person from src.plugin_system.base.component_types import ChatMode, EventType, ActionInfo @@ -52,6 +51,16 @@ ERROR_LOOP_INFO = { }, } +# ?什么时候发言: +# 1.聊天频率较低:与过去该时段发言频率进行比较 +# 2.感兴趣的话题:暂时使用Emb计算 +# 3.感兴趣的人:认识次数 +# 4.直接提及 + +# 什么时候不发言: +# 1.敏感话题:判断较难 +# 2.发言频率太高:近时判断,例如发言频率> 1/人数 *2 +# 3.明确被拒绝:planner判断 install(extra_lines=3) @@ -85,8 +94,6 @@ class HeartFChatting: self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id) - self.frequency_control = frequency_control_manager.get_or_create_frequency_control(self.stream_id) - self.action_manager = ActionManager() self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager) self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) @@ -101,6 +108,10 @@ class HeartFChatting: self._current_cycle_detail: CycleDetail = None # type: ignore self.last_read_time = time.time() - 10 + + + + self.no_reply_until_call = False async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" @@ -156,38 +167,12 @@ class HeartFChatting: formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" timer_strings.append(f"{name}: {formatted_time}") - # 获取动作类型,兼容新旧格式 - # 移除无用代码 - # action_type = "未知动作" - # if hasattr(self, "_current_cycle_detail") and self._current_cycle_detail: - # loop_plan_info = self._current_cycle_detail.loop_plan_info - # if isinstance(loop_plan_info, dict): - # action_result = loop_plan_info.get("action_result", {}) - # if isinstance(action_result, dict): - # # 旧格式:action_result是字典 - # action_type = action_result.get("action_type", "未知动作") - # elif isinstance(action_result, list) and action_result: - # # 新格式:action_result是actions列表 - # # TODO: 把这里写明白 - # action_type = action_result[0].action_type or "未知动作" - # elif isinstance(loop_plan_info, list) and loop_plan_info: - # # 直接是actions列表的情况 - # action_type = loop_plan_info[0].get("action_type", "未知动作") - logger.info( f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考," f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒" # type: ignore + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") ) - async def calculate_interest_value(self, recent_messages_list: List["DatabaseMessages"]) -> float: - total_interest = 0.0 - for msg in recent_messages_list: - interest_value = msg.interest_value - if interest_value is not None and msg.processed_plain_text: - total_interest += float(interest_value) - return total_interest / len(recent_messages_list) - async def _loopbody(self): recent_messages_list = message_api.get_messages_by_time_in_chat( chat_id=self.stream_id, @@ -200,9 +185,22 @@ class HeartFChatting: ) if recent_messages_list: + + # !处理no_reply_until_call逻辑 + if self.no_reply_until_call: + for message in recent_messages_list: + if message.is_mentioned or message.is_at: + self.no_reply_until_call = False + break + # 没有提到,继续保持沉默 + if self.no_reply_until_call: + logger.info(f"{self.log_prefix} 没有提到,继续保持沉默") + await asyncio.sleep(1) + return True + + self.last_read_time = time.time() await self._observe( - interest_value=await self.calculate_interest_value(recent_messages_list), recent_messages_list=recent_messages_list, ) else: @@ -262,190 +260,140 @@ class HeartFChatting: return loop_info, reply_text, cycle_timers async def _observe( - self, interest_value: float = 0.0, recent_messages_list: Optional[List["DatabaseMessages"]] = None + self, # interest_value: float = 0.0, + recent_messages_list: Optional[List["DatabaseMessages"]] = None ) -> bool: if recent_messages_list is None: recent_messages_list = [] reply_text = "" # 初始化reply_text变量,避免UnboundLocalError - # 使用sigmoid函数将interest_value转换为概率 - # 当interest_value为0时,概率接近0(使用Focus模式) - # 当interest_value很高时,概率接近1(使用Normal模式) - def calculate_normal_mode_probability(interest_val: float) -> float: - # 使用sigmoid函数,调整参数使概率分布更合理 - # 当interest_value = 0时,概率约为0.1 - # 当interest_value = 1时,概率约为0.5 - # 当interest_value = 2时,概率约为0.8 - # 当interest_value = 3时,概率约为0.95 - k = 2.0 # 控制曲线陡峭程度 - x0 = 1.0 # 控制曲线中心点 - return 1.0 / (1.0 + math.exp(-k * (interest_val - x0))) - - normal_mode_probability = ( - calculate_normal_mode_probability(interest_value) * 2 * self.frequency_control.get_final_talk_frequency() - ) - - # 对呼唤名字进行增幅 - for msg in recent_messages_list: - if msg.reply_probability_boost is not None and msg.reply_probability_boost > 0.0: - normal_mode_probability += msg.reply_probability_boost - if global_config.chat.mentioned_bot_reply and msg.is_mentioned: - normal_mode_probability += global_config.chat.mentioned_bot_reply - if global_config.chat.at_bot_inevitable_reply and msg.is_at: - normal_mode_probability += global_config.chat.at_bot_inevitable_reply - - # 根据概率决定使用直接回复 - interest_triggered = False - focus_triggered = False - - if random.random() < normal_mode_probability: - interest_triggered = True - - logger.info(f"{self.log_prefix} 有新消息,在{normal_mode_probability * 100:.0f}%概率下选择回复") - if s4u_config.enable_s4u: await send_typing() async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): await self.expression_learner.trigger_learning_for_chat() + + cycle_timers, thinking_id = self.start_cycle() + logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考") + # 第一步:动作检查 available_actions: Dict[str, ActionInfo] = {} + try: + await self.action_modifier.modify_actions() + available_actions = self.action_manager.get_using_actions() + except Exception as e: + logger.error(f"{self.log_prefix} 动作修改失败: {e}") - # 如果兴趣度不足以激活 - if not interest_triggered: - # 看看专注值够不够 - if random.random() < self.frequency_control.get_final_focus_value(): - # 专注值足够,仍然进入正式思考 - focus_triggered = True # 都没触发,路边 + # 执行planner + is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - # 任意一种触发都行 - if interest_triggered or focus_triggered: - # 进入正式思考模式 - cycle_timers, thinking_id = self.start_cycle() - logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考") + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.6), + ) + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=message_list_before_now, + timestamp_mode="normal_no_YMD", + read_mark=self.action_planner.last_obs_time_mark, + truncate=True, + show_actions=True, + ) - # 第一步:动作检查 - try: - await self.action_modifier.modify_actions() - available_actions = self.action_manager.get_using_actions() - except Exception as e: - logger.error(f"{self.log_prefix} 动作修改失败: {e}") - - # 执行planner - is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info() - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=self.stream_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.6), - ) - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=message_list_before_now, - timestamp_mode="normal_no_YMD", - read_mark=self.action_planner.last_obs_time_mark, - truncate=True, - show_actions=True, + prompt_info = await self.action_planner.build_planner_prompt( + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + current_available_actions=available_actions, + chat_content_block=chat_content_block, + message_id_list=message_id_list, + interest=global_config.personality.interest, + ) + continue_flag, modified_message = await events_manager.handle_mai_events( + EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id + ) + if not continue_flag: + return False + if modified_message and modified_message._modify_flags.modify_llm_prompt: + prompt_info = (modified_message.llm_prompt, prompt_info[1]) + + + with Timer("规划器", cycle_timers): + action_to_use_info, _ = await self.action_planner.plan( + loop_start_time=self.last_read_time, + available_actions=available_actions, ) - prompt_info = await self.action_planner.build_planner_prompt( - is_group_chat=is_group_chat, - chat_target_info=chat_target_info, - # current_available_actions=planner_info[2], - chat_content_block=chat_content_block, - # actions_before_now_block=actions_before_now_block, - message_id_list=message_id_list, + # 3. 并行执行所有动作 + action_tasks = [ + asyncio.create_task( + self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) ) - continue_flag, modified_message = await events_manager.handle_mai_events( - EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id - ) - if not continue_flag: - return False - if modified_message and modified_message._modify_flags.modify_llm_prompt: - prompt_info = (modified_message.llm_prompt, prompt_info[1]) - with Timer("规划器", cycle_timers): - # 根据不同触发,进入不同plan - if focus_triggered: - mode = ChatMode.FOCUS + for action in action_to_use_info + ] + + # 并行执行所有任务 + results = await asyncio.gather(*action_tasks, return_exceptions=True) + + # 处理执行结果 + reply_loop_info = None + reply_text_from_reply = "" + action_success = False + action_reply_text = "" + action_command = "" + + for i, result in enumerate(results): + if isinstance(result, BaseException): + logger.error(f"{self.log_prefix} 动作执行异常: {result}") + continue + + _cur_action = action_to_use_info[i] + if result["action_type"] != "reply": + action_success = result["success"] + action_reply_text = result["reply_text"] + action_command = result.get("command", "") + elif result["action_type"] == "reply": + if result["success"]: + reply_loop_info = result["loop_info"] + reply_text_from_reply = result["reply_text"] else: - mode = ChatMode.NORMAL + logger.warning(f"{self.log_prefix} 回复动作执行失败") - action_to_use_info, _ = await self.action_planner.plan( - mode=mode, - loop_start_time=self.last_read_time, - available_actions=available_actions, - ) - - # 3. 并行执行所有动作 - action_tasks = [ - asyncio.create_task( - self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers) - ) - for action in action_to_use_info - ] - - # 并行执行所有任务 - results = await asyncio.gather(*action_tasks, return_exceptions=True) - - # 处理执行结果 - reply_loop_info = None - reply_text_from_reply = "" - action_success = False - action_reply_text = "" - action_command = "" - - for i, result in enumerate(results): - if isinstance(result, BaseException): - logger.error(f"{self.log_prefix} 动作执行异常: {result}") - continue - - _cur_action = action_to_use_info[i] - if result["action_type"] != "reply": - action_success = result["success"] - action_reply_text = result["reply_text"] - action_command = result.get("command", "") - elif result["action_type"] == "reply": - if result["success"]: - reply_loop_info = result["loop_info"] - reply_text_from_reply = result["reply_text"] - else: - logger.warning(f"{self.log_prefix} 回复动作执行失败") - - # 构建最终的循环信息 - if reply_loop_info: - # 如果有回复信息,使用回复的loop_info作为基础 - loop_info = reply_loop_info - # 更新动作执行信息 - loop_info["loop_action_info"].update( - { - "action_taken": action_success, - "command": action_command, - "taken_time": time.time(), - } - ) - reply_text = reply_text_from_reply - else: - # 没有回复信息,构建纯动作的loop_info - loop_info = { - "loop_plan_info": { - "action_result": action_to_use_info, - }, - "loop_action_info": { - "action_taken": action_success, - "reply_text": action_reply_text, - "command": action_command, - "taken_time": time.time(), - }, + # 构建最终的循环信息 + if reply_loop_info: + # 如果有回复信息,使用回复的loop_info作为基础 + loop_info = reply_loop_info + # 更新动作执行信息 + loop_info["loop_action_info"].update( + { + "action_taken": action_success, + "command": action_command, + "taken_time": time.time(), } - reply_text = action_reply_text + ) + reply_text = reply_text_from_reply + else: + # 没有回复信息,构建纯动作的loop_info + loop_info = { + "loop_plan_info": { + "action_result": action_to_use_info, + }, + "loop_action_info": { + "action_taken": action_success, + "reply_text": action_reply_text, + "command": action_command, + "taken_time": time.time(), + }, + } + reply_text = action_reply_text - self.end_cycle(loop_info, cycle_timers) - self.print_cycle_info(cycle_timers) + self.end_cycle(loop_info, cycle_timers) + self.print_cycle_info(cycle_timers) - """S4U内容,暂时保留""" - if s4u_config.enable_s4u: - await stop_typing() - await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text) - """S4U内容,暂时保留""" + """S4U内容,暂时保留""" + if s4u_config.enable_s4u: + await stop_typing() + await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text) + """S4U内容,暂时保留""" return True @@ -579,7 +527,7 @@ class HeartFChatting: ): """执行单个动作的通用函数""" try: - if action_planner_info.action_type == "no_action": + if action_planner_info.action_type == "no_reply": # 直接处理no_action逻辑,不再通过动作系统 reason = action_planner_info.reasoning or "选择不回复" logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") @@ -594,26 +542,19 @@ class HeartFChatting: action_data={"reason": reason}, action_name="no_action", ) - return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""} - elif action_planner_info.action_type != "reply": - # 执行普通动作 - with Timer("动作执行", cycle_timers): - success, reply_text, command = await self._handle_action( - action_planner_info.action_type, - action_planner_info.reasoning or "", - action_planner_info.action_data or {}, - cycle_timers, - thinking_id, - action_planner_info.action_message, - ) - return { - "action_type": action_planner_info.action_type, - "success": success, - "reply_text": reply_text, - "command": command, - } - else: + + elif action_planner_info.action_type == "wait_time": + logger.info(f"{self.log_prefix} 等待{action_planner_info.action_data['time']}秒后回复") + await asyncio.sleep(action_planner_info.action_data["time"]) + return {"action_type": "wait_time", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "no_reply_until_call": + logger.info(f"{self.log_prefix} 保持沉默,直到有人直接叫的名字") + self.no_reply_until_call = True + return {"action_type": "no_reply_until_call", "success": True, "reply_text": "", "command": ""} + + elif action_planner_info.action_type == "reply": try: success, llm_response = await generator_api.generate_reply( chat_stream=self.chat_stream, @@ -652,6 +593,26 @@ class HeartFChatting: "reply_text": reply_text, "loop_info": loop_info, } + + # 其他动作 + else: + # 执行普通动作 + with Timer("动作执行", cycle_timers): + success, reply_text, command = await self._handle_action( + action_planner_info.action_type, + action_planner_info.reasoning or "", + action_planner_info.action_data or {}, + cycle_timers, + thinking_id, + action_planner_info.action_message, + ) + return { + "action_type": action_planner_info.action_type, + "success": success, + "reply_text": reply_text, + "command": command, + } + except Exception as e: logger.error(f"{self.log_prefix} 执行动作时出错: {e}") logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index a4de0419..9bf86ecd 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -1,9 +1,8 @@ import json import time import traceback -import asyncio -import math import random +import re from typing import Dict, Optional, Tuple, List, TYPE_CHECKING from rich.traceback import install from datetime import datetime @@ -47,73 +46,69 @@ def init_prompt(): **动作记录** {actions_before_now_block} -**回复标准** -请你根据聊天内容和用户的最新消息选择合适回复或者沉默: +**要求** +请你根据聊天内容和用户的最新消息选择合适的动作: 1.你可以选择呼叫了你的名字,但是你没有做出回应的消息进行回复 2.你可以自然的顺着正在进行的聊天内容进行回复或自然的提出一个问题 3.你的兴趣是:{interest} 4.如果你刚刚进行了回复,不要对同一个话题重复回应 -5.请控制你的发言频率,不要太过频繁的发言,当你刚刚发送了消息,没有人回复时,选择no_action +5.请控制你的发言频率,不要太过频繁的发言 6.如果有人对你感到厌烦,请减少回复 7.如果有人对你进行攻击,或者情绪激动,请你以合适的方法应对 -8.最好不要选择图片和表情包作为回复对象 +8.如果相同的内容已经被执行,请不要重复执行 {moderation_prompt} -**动作** -保持沉默:no_action +**可用的action** +no_reply:保持沉默,不回复直到有新消息 {{ - "action": "no_action", - "reason":"不回复的原因" + "action": "no_reply", }} -进行回复:reply +reply:进行回复 {{ "action": "reply", "target_message_id":"想要回复的消息id", "reason":"回复的原因" }} -你必须从上面列出的可用action中选择一个,并说明触发action的消息id(不是消息原文)和选择该action的原因。消息id格式:m+数字 -请根据动作示例,以严格的 JSON 格式输出,且仅包含 JSON 内容: + +no_reply_until_call:保持沉默,直到有人直接叫的名字 +{{ + "action": "no_reply_until_call", +}} + +wait_time:沉默等待时间,等待一段时间后回复 +{{ + "action": "wait_time", + "time":"等待时间", +}} + +{action_options_text} + +请选择一个或多个合适的action,并说明触发action的消息id和选择该action的原因。消息id格式:m+数字 +请选择所有符合使用要求的action,动作用json格式输出,如果输出多个json,每个json都要单独用```json包裹,你可以重复使用同一个动作或不同动作: +**示例** +```json +{{ + "action":"动作名", + "target_message_id":"触发动作的消息id", + //对应参数 +}} +``` +```json +{{ + "action":"动作名", + "target_message_id":"触发动作的消息id", + //对应参数 +}} +``` + """, "planner_prompt", ) Prompt( """ -{time_block} -{name_block} - -{chat_context_description} -**聊天内容** -{chat_content_block} - -**动作记录** -{actions_before_now_block} - -**回复标准** -请你选择合适的消息进行回复: -1.你可以选择呼叫了你的名字,但是你没有做出回应的消息进行回复 -2.你可以自然的顺着正在进行的聊天内容进行回复,或者自然的提出一个问题 -3.你的兴趣是{interest} -4.如果有人对你感到厌烦,请你不要太积极的提问或是表达,可以进行顺从 -5.如果有人对你进行攻击,或者情绪激动,请你以合适的方法应对 -6.最好不要选择图片和表情包作为回复对象 -7.{moderation_prompt} - -请你从新消息中选出一条需要回复的消息并输出其id,输出格式如下: -{{ - "action": "reply", - "target_message_id":"想要回复的消息id,消息id格式:m+数字", - "reason":"回复的原因" -}} -请根据示例,以严格的 JSON 格式输出,且仅包含 JSON 内容: -""", - "planner_reply_prompt", - ) - - Prompt( - """ -动作:{action_name} +{action_name} 动作描述:{action_description} {action_require} {{ @@ -125,36 +120,6 @@ def init_prompt(): "action_prompt", ) - Prompt( - """ -{name_block} - -{chat_context_description},{time_block},现在请你根据以下聊天内容,选择一个或多个合适的action。如果没有合适的action,请选择no_action。, -{chat_content_block} - -**要求** -1.action必须符合使用条件,如果符合条件,就选择 -2.如果聊天内容不适合使用action,即使符合条件,也不要使用 -3.{moderation_prompt} -4.请注意如果相同的内容已经被执行,请不要重复执行 -这是你最近执行过的动作: -{actions_before_now_block} - -**可用的action** - -no_action:不选择任何动作 -{{ - "action": "no_action", - "reason":"不动作的原因" -}} - -{action_options_text} - -请选择,并说明触发action的消息id和选择该action的原因。消息id格式:m+数字 -请根据动作示例,以严格的 JSON 格式输出,且仅包含 JSON 内容: -""", - "sub_planner_prompt", - ) class ActionPlanner: @@ -166,9 +131,6 @@ class ActionPlanner: self.planner_llm = LLMRequest( model_set=model_config.model_task_config.planner, request_type="planner" ) # 用于动作规划 - self.planner_small_llm = LLMRequest( - model_set=model_config.model_task_config.planner_small, request_type="planner_small" - ) # 用于动作规划 self.last_obs_time_mark = 0.0 @@ -206,27 +168,31 @@ class ActionPlanner: action_data = {key: value for key, value in action_json.items() if key not in ["action", "reasoning"]} # 非no_action动作需要target_message_id target_message = None - if action != "no_action": - if target_message_id := action_json.get("target_message_id"): - # 根据target_message_id查找原始消息 - target_message = self.find_message_by_id(target_message_id, message_id_list) - if target_message is None: - logger.warning(f"{self.log_prefix}无法找到target_message_id '{target_message_id}' 对应的消息") - # 选择最新消息作为target_message - target_message = message_id_list[-1][1] - else: - logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id") + + if target_message_id := action_json.get("target_message_id"): + # 根据target_message_id查找原始消息 + target_message = self.find_message_by_id(target_message_id, message_id_list) + if target_message is None: + logger.warning(f"{self.log_prefix}无法找到target_message_id '{target_message_id}' 对应的消息") + # 选择最新消息作为target_message + target_message = message_id_list[-1][1] + else: + target_message = message_id_list[-1][1] + logger.info(f"{self.log_prefix}动作'{action}'缺少target_message_id,使用最新消息作为target_message") + # 验证action是否可用 available_action_names = [action_name for action_name, _ in current_available_actions] - if action != "no_action" and action != "reply" and action not in available_action_names: + internal_action_names = ["no_reply", "reply", "wait_time", "no_reply_until_call"] + + if action not in internal_action_names and action not in available_action_names: logger.warning( - f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_action'" + f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_reply'" ) reasoning = ( f"LLM 返回了当前不可用的动作 '{action}' (可用: {available_action_names})。原始理由: {reasoning}" ) - action = "no_action" + action = "no_reply" # 创建ActionPlannerInfo对象 # 将列表转换为字典格式 @@ -247,7 +213,7 @@ class ActionPlanner: available_actions_dict = dict(current_available_actions) action_planner_infos.append( ActionPlannerInfo( - action_type="no_action", + action_type="no_reply", reasoning=f"解析单个action时出错: {e}", action_data={}, action_message=None, @@ -257,83 +223,121 @@ class ActionPlanner: return action_planner_infos - async def sub_plan( + + async def plan( self, - action_list: List[Tuple[str, ActionInfo]], - chat_content_block: str, + available_actions: Dict[str, ActionInfo], + loop_start_time: float = 0.0, + ) -> Tuple[List[ActionPlannerInfo], Optional["DatabaseMessages"]]: + """ + 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。 + """ + target_message: Optional["DatabaseMessages"] = None + + + # 获取聊天上下文 + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.chat_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.6), + ) + message_id_list: list[Tuple[str, "DatabaseMessages"]] = [] + chat_content_block, message_id_list = build_readable_messages_with_id( + messages=message_list_before_now, + timestamp_mode="normal_no_YMD", + read_mark=self.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + + message_list_before_now_short = message_list_before_now[-int(global_config.chat.max_context_size * 0.3) :] + chat_content_block_short, message_id_list_short = build_readable_messages_with_id( + messages=message_list_before_now_short, + timestamp_mode="normal_no_YMD", + truncate=False, + show_actions=False, + ) + + self.last_obs_time_mark = time.time() + + # 获取必要信息 + is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info() + + # 应用激活类型过滤 + filtered_actions = self._filter_actions_by_activation_type( + available_actions, chat_content_block_short + ) + + logger.info(f"{self.log_prefix}过滤后有{len(filtered_actions)}个可用动作") + + # 构建包含所有动作的提示词 + prompt, message_id_list = await self.build_planner_prompt( + is_group_chat=is_group_chat, + chat_target_info=chat_target_info, + current_available_actions=filtered_actions, + chat_content_block=chat_content_block, + message_id_list=message_id_list, + interest=global_config.personality.interest, + ) + + # 调用LLM获取决策 + actions = await self._execute_main_planner( + prompt=prompt, + message_id_list=message_id_list, + filtered_actions=filtered_actions, + available_actions=available_actions, + loop_start_time=loop_start_time + ) + + # 获取target_message(如果有非no_action的动作) + non_no_actions = [a for a in actions if a.action_type != "no_reply"] + if non_no_actions: + target_message = non_no_actions[0].action_message + + return actions, target_message + + async def build_planner_prompt( + self, + is_group_chat: bool, + chat_target_info: Optional["TargetPersonInfo"], + current_available_actions: Dict[str, ActionInfo], message_id_list: List[Tuple[str, "DatabaseMessages"]], - is_group_chat: bool = False, - chat_target_info: Optional["TargetPersonInfo"] = None, - ) -> List[ActionPlannerInfo]: - # 构建副planner并执行(单个副planner) + chat_content_block: str = "", + interest: str = "", + ) -> tuple[str, List[Tuple[str, "DatabaseMessages"]]]: + """构建 Planner LLM 的提示词 (获取模板并填充数据)""" try: + # 获取最近执行过的动作 actions_before_now = get_actions_by_timestamp_with_chat( chat_id=self.chat_id, - timestamp_start=time.time() - 1200, + timestamp_start=time.time() - 600, timestamp_end=time.time(), - limit=20, - ) - - # 获取最近的actions - # 只保留action_type在action_list中的ActionPlannerInfo - action_names_in_list = [name for name, _ in action_list] - # actions_before_now是List[Dict[str, Any]]格式,需要提取action_type字段 - filtered_actions: List["DatabaseActionRecords"] = [] - for action_record in actions_before_now: - # print(action_record) - # print(action_record['action_name']) - # print(action_names_in_list) - action_type = action_record.action_name - if action_type in action_names_in_list: - filtered_actions.append(action_record) - - actions_before_now_block = build_readable_actions( - actions=filtered_actions, - mode="absolute", + limit=6, ) + actions_before_now_block = build_readable_actions(actions=actions_before_now) + if actions_before_now_block: + actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" + else: + actions_before_now_block = "" + # 构建聊天上下文描述 chat_context_description = "你现在正在一个群聊中" - chat_target_name = None if not is_group_chat and chat_target_info: chat_target_name = chat_target_info.person_name or chat_target_info.user_nickname or "对方" chat_context_description = f"你正在和 {chat_target_name} 私聊" - action_options_block = "" - - for using_actions_name, using_actions_info in action_list: - if using_actions_info.action_parameters: - param_text = "\n" - for param_name, param_description in using_actions_info.action_parameters.items(): - param_text += f' "{param_name}":"{param_description}"\n' - param_text = param_text.rstrip("\n") - else: - param_text = "" - - require_text = "" - for require_item in using_actions_info.action_require: - require_text += f"- {require_item}\n" - require_text = require_text.rstrip("\n") - - using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") - using_action_prompt = using_action_prompt.format( - action_name=using_actions_name, - action_description=using_actions_info.description, - action_parameters=param_text, - action_require=require_text, - ) - - action_options_block += using_action_prompt + # 构建动作选项块 + action_options_block = await self._build_action_options_block(current_available_actions) + # 其他信息 moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。" time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" bot_name = global_config.bot.nickname - if global_config.bot.alias_names: - bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" - else: - bot_nickname = "" + bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else "" name_block = f"你的名字是{bot_name}{bot_nickname},请注意哪些是你自己的发言。" - planner_prompt_template = await global_prompt_manager.get_prompt_async("sub_planner_prompt") + # 获取主规划器模板并填充 + planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") prompt = planner_prompt_template.format( time_block=time_block, chat_context_description=chat_context_description, @@ -342,514 +346,9 @@ class ActionPlanner: action_options_text=action_options_block, moderation_prompt=moderation_prompt_block, name_block=name_block, - ) - # return prompt, message_id_list - except Exception as e: - logger.error(f"构建 Planner 提示词时出错: {e}") - logger.error(traceback.format_exc()) - # 返回一个默认的no_action而不是字符串 - return [ - ActionPlannerInfo( - action_type="no_action", - reasoning=f"构建 Planner Prompt 时出错: {e}", - action_data={}, - action_message=None, - available_actions=None, - ) - ] - - # --- 调用 LLM (普通文本生成) --- - llm_content = None - action_planner_infos: List[ActionPlannerInfo] = [] # 存储多个ActionPlannerInfo对象 - - try: - llm_content, (reasoning_content, _, _) = await self.planner_small_llm.generate_response_async(prompt=prompt) - - if global_config.debug.show_prompt: - logger.info(f"{self.log_prefix}副规划器原始提示词: {prompt}") - logger.info(f"{self.log_prefix}副规划器原始响应: {llm_content}") - if reasoning_content: - logger.info(f"{self.log_prefix}副规划器推理: {reasoning_content}") - else: - logger.debug(f"{self.log_prefix}副规划器原始提示词: {prompt}") - logger.debug(f"{self.log_prefix}副规划器原始响应: {llm_content}") - if reasoning_content: - logger.debug(f"{self.log_prefix}副规划器推理: {reasoning_content}") - - except Exception as req_e: - logger.error(f"{self.log_prefix}副规划器LLM 请求执行失败: {req_e}") - # 返回一个默认的no_action - action_planner_infos.append( - ActionPlannerInfo( - action_type="no_action", - reasoning=f"副规划器LLM 请求失败,模型出现问题: {req_e}", - action_data={}, - action_message=None, - available_actions=None, - ) - ) - return action_planner_infos - - if llm_content: - try: - parsed_json = json.loads(repair_json(llm_content)) - - # 处理不同的JSON格式 - if isinstance(parsed_json, list): - # 如果是列表,处理每个action - if parsed_json: - logger.info(f"{self.log_prefix}LLM返回了{len(parsed_json)}个action") - for action_item in parsed_json: - if isinstance(action_item, dict): - action_planner_infos.extend( - self._parse_single_action(action_item, message_id_list, action_list) - ) - else: - logger.warning(f"{self.log_prefix}列表中的action项不是字典类型: {type(action_item)}") - else: - logger.warning(f"{self.log_prefix}LLM返回了空列表") - action_planner_infos.append( - ActionPlannerInfo( - action_type="no_action", - reasoning="LLM返回了空列表,选择no_action", - action_data={}, - action_message=None, - available_actions=None, - ) - ) - elif isinstance(parsed_json, dict): - # 如果是单个字典,处理单个action - action_planner_infos.extend(self._parse_single_action(parsed_json, message_id_list, action_list)) - else: - logger.error(f"{self.log_prefix}解析后的JSON不是字典或列表类型: {type(parsed_json)}") - action_planner_infos.append( - ActionPlannerInfo( - action_type="no_action", - reasoning=f"解析后的JSON类型错误: {type(parsed_json)}", - action_data={}, - action_message=None, - available_actions=None, - ) - ) - - except Exception as json_e: - logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'") - traceback.print_exc() - action_planner_infos.append( - ActionPlannerInfo( - action_type="no_action", - reasoning=f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_action'.", - action_data={}, - action_message=None, - available_actions=None, - ) - ) - else: - # 如果没有LLM内容,返回默认的no_action - action_planner_infos.append( - ActionPlannerInfo( - action_type="no_action", - reasoning="副规划器没有获得LLM响应", - action_data={}, - action_message=None, - available_actions=None, - ) + interest=interest, ) - # 如果没有解析到任何action,返回默认的no_action - if not action_planner_infos: - action_planner_infos.append( - ActionPlannerInfo( - action_type="no_action", - reasoning="副规划器没有解析到任何有效action", - action_data={}, - action_message=None, - available_actions=None, - ) - ) - - logger.debug(f"{self.log_prefix}副规划器返回了{len(action_planner_infos)}个action") - return action_planner_infos - - async def plan( - self, - available_actions: Dict[str, ActionInfo], - mode: ChatMode = ChatMode.FOCUS, - loop_start_time: float = 0.0, - ) -> Tuple[List[ActionPlannerInfo], Optional["DatabaseMessages"]]: - # sourcery skip: use-or-for-fallback - """ - 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。 - """ - - action: str = "no_action" # 默认动作 - reasoning: str = "规划器初始化默认" - action_data = {} - current_available_actions: Dict[str, ActionInfo] = {} - target_message: Optional["DatabaseMessages"] = None # 初始化target_message变量 - prompt: str = "" - message_id_list: list[Tuple[str, "DatabaseMessages"]] = [] - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=self.chat_id, - timestamp=time.time(), - limit=int(global_config.chat.max_context_size * 0.6), - ) - chat_content_block, message_id_list = build_readable_messages_with_id( - messages=message_list_before_now, - timestamp_mode="normal_no_YMD", - read_mark=self.last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - message_list_before_now_short = message_list_before_now[-int(global_config.chat.max_context_size * 0.3) :] - - chat_content_block_short, message_id_list_short = build_readable_messages_with_id( - messages=message_list_before_now_short, - timestamp_mode="normal_no_YMD", - truncate=False, - show_actions=False, - ) - - self.last_obs_time_mark = time.time() - all_sub_planner_results: List[ActionPlannerInfo] = [] # 防止Unbound - try: - sub_planner_actions: Dict[str, ActionInfo] = {} - - for action_name, action_info in available_actions.items(): - if action_info.activation_type in [ActionActivationType.LLM_JUDGE, ActionActivationType.ALWAYS]: - sub_planner_actions[action_name] = action_info - elif action_info.activation_type == ActionActivationType.RANDOM: - if random.random() < action_info.random_activation_probability: - sub_planner_actions[action_name] = action_info - elif action_info.activation_type == ActionActivationType.KEYWORD: - if action_info.activation_keywords: - for keyword in action_info.activation_keywords: - if keyword in chat_content_block_short: - sub_planner_actions[action_name] = action_info - elif action_info.activation_type == ActionActivationType.NEVER: - logger.debug(f"{self.log_prefix}动作 {action_name} 设置为 NEVER 激活类型,跳过") - else: - logger.warning(f"{self.log_prefix}未知的激活类型: {action_info.activation_type},跳过处理") - - sub_planner_actions_num = len(sub_planner_actions) - sub_planner_size = int(global_config.chat.planner_size) - if random.random() < global_config.chat.planner_size - int(global_config.chat.planner_size): - sub_planner_size = int(global_config.chat.planner_size) + 1 - sub_planner_num = math.ceil(sub_planner_actions_num / sub_planner_size) - - logger.info(f"{self.log_prefix}使用{sub_planner_num}个小脑进行思考(尺寸:{sub_planner_size})") - - # 将sub_planner_actions随机分配到sub_planner_num个List中 - sub_planner_lists: List[List[Tuple[str, ActionInfo]]] = [] - if sub_planner_actions_num > 0: - # 将actions转换为列表并随机打乱 - action_items = list(sub_planner_actions.items()) - random.shuffle(action_items) - - # 初始化所有子列表 - for _ in range(sub_planner_num): - sub_planner_lists.append([]) - - # 分配actions到各个子列表 - for i, (action_name, action_info) in enumerate(action_items): - sub_planner_lists[i % sub_planner_num].append((action_name, action_info)) - - logger.debug( - f"{self.log_prefix}成功将{sub_planner_actions_num}个actions分配到{sub_planner_num}个子列表中" - ) - for i, action_list in enumerate(sub_planner_lists): - logger.debug(f"{self.log_prefix}子列表{i + 1}: {len(action_list)}个actions") - else: - logger.info(f"{self.log_prefix}没有可用的actions需要分配") - - # 先获取必要信息 - is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info() - - # 并行执行所有副规划器 - async def execute_sub_plan(action_list): - return await self.sub_plan( - action_list=action_list, - chat_content_block=chat_content_block_short, - message_id_list=message_id_list_short, - is_group_chat=is_group_chat, - chat_target_info=chat_target_info, - ) - - # 创建所有任务 - sub_plan_tasks = [execute_sub_plan(action_list) for action_list in sub_planner_lists] - - # 并行执行所有任务 - sub_plan_results = await asyncio.gather(*sub_plan_tasks) - - # 收集所有结果 - for sub_result in sub_plan_results: - all_sub_planner_results.extend(sub_result) - - logger.info(f"{self.log_prefix}小脑决定执行{len(all_sub_planner_results)}个动作") - - # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- - prompt, message_id_list = await self.build_planner_prompt( - is_group_chat=is_group_chat, # <-- Pass HFC state - chat_target_info=chat_target_info, # <-- 传递获取到的聊天目标信息 - # current_available_actions="", # <-- Pass determined actions - mode=mode, - chat_content_block=chat_content_block, - # actions_before_now_block=actions_before_now_block, - message_id_list=message_id_list, - interest=global_config.personality.interest, - ) - - # --- 调用 LLM (普通文本生成) --- - llm_content = None - try: - llm_content, (reasoning_content, _, _) = await self.planner_llm.generate_response_async(prompt=prompt) - - if global_config.debug.show_prompt: - logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}") - logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}") - if reasoning_content: - logger.info(f"{self.log_prefix}规划器推理: {reasoning_content}") - else: - logger.debug(f"{self.log_prefix}规划器原始提示词: {prompt}") - logger.debug(f"{self.log_prefix}规划器原始响应: {llm_content}") - if reasoning_content: - logger.debug(f"{self.log_prefix}规划器推理: {reasoning_content}") - - except Exception as req_e: - logger.error(f"{self.log_prefix}LLM 请求执行失败: {req_e}") - reasoning = f"LLM 请求失败,模型出现问题: {req_e}" - action = "no_action" - - if llm_content: - try: - parsed_json = json.loads(repair_json(llm_content)) - - # 处理不同的JSON格式,复用_parse_single_action函数 - if isinstance(parsed_json, list): - if parsed_json: - # 使用最后一个action(保持原有逻辑) - parsed_json = parsed_json[-1] - logger.warning(f"{self.log_prefix}LLM返回了多个JSON对象,使用最后一个: {parsed_json}") - else: - parsed_json = {} - - if isinstance(parsed_json, dict): - # 使用_parse_single_action函数解析单个action - # 将字典转换为列表格式 - current_available_actions_list = list(current_available_actions.items()) - action_planner_infos = self._parse_single_action( - parsed_json, message_id_list, current_available_actions_list - ) - - if action_planner_infos: - # 获取第一个(也是唯一一个)action的信息 - action_info = action_planner_infos[0] - action = action_info.action_type - reasoning = action_info.reasoning or "没有理由" - action_data.update(action_info.action_data or {}) - target_message = action_info.action_message - - # 处理target_message为None的情况(保持原有的重试逻辑) - if target_message is None and action != "no_action": - # 尝试获取最新消息作为target_message - target_message = message_id_list[-1][1] - if target_message is None: - logger.warning(f"{self.log_prefix}无法获取任何消息作为target_message") - else: - # 如果没有解析到action,使用默认值 - action = "no_action" - reasoning = "解析action失败" - target_message = None - else: - logger.error(f"{self.log_prefix}解析后的JSON不是字典类型: {type(parsed_json)}") - action = "no_action" - reasoning = f"解析后的JSON类型错误: {type(parsed_json)}" - target_message = None - - except Exception as json_e: - logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'") - traceback.print_exc() - action = "no_action" - reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_action'." - target_message = None - - except Exception as outer_e: - logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_action: {outer_e}") - traceback.print_exc() - action = "no_action" - reasoning = f"Planner 内部处理错误: {outer_e}" - - is_parallel = True - for action_planner_info in all_sub_planner_results: - if action_planner_info.action_type == "no_action": - continue - if not current_available_actions[action_planner_info.action_type].parallel_action: - is_parallel = False - break - - action_data["loop_start_time"] = loop_start_time - - # 根据is_parallel决定返回值 - if is_parallel: - # 如果为真,将主规划器的结果和副规划器的结果都返回 - main_actions = [] - - # 添加主规划器的action(如果不是no_action) - if action != "no_action": - main_actions.append( - ActionPlannerInfo( - action_type=action, - reasoning=reasoning, - action_data=action_data, - action_message=target_message, - available_actions=available_actions, - ) - ) - - # 先合并主副规划器的结果 - all_actions = main_actions + all_sub_planner_results - - # 然后统一过滤no_action - actions = self._filter_no_actions(all_actions) - - # 如果所有结果都是no_action,返回一个no_action - if not actions: - actions = [ - ActionPlannerInfo( - action_type="no_action", - reasoning="所有规划器都选择不执行动作", - action_data={}, - action_message=None, - available_actions=available_actions, - ) - ] - - action_str = "" - for action_planner_info in actions: - action_str += f"{action_planner_info.action_type} " - logger.info(f"{self.log_prefix}大脑小脑决定执行{len(actions)}个动作: {action_str}") - else: - # 如果为假,只返回副规划器的结果 - actions = self._filter_no_actions(all_sub_planner_results) - - # 如果所有结果都是no_action,返回一个no_action - if not actions: - actions = [ - ActionPlannerInfo( - action_type="no_action", - reasoning="副规划器都选择不执行动作", - action_data={}, - action_message=None, - available_actions=available_actions, - ) - ] - - logger.info(f"{self.log_prefix}跳过大脑,执行小脑的{len(actions)}个动作") - - return actions, target_message - - async def build_planner_prompt( - self, - is_group_chat: bool, # Now passed as argument - chat_target_info: Optional["TargetPersonInfo"], # Now passed as argument - # current_available_actions: Dict[str, ActionInfo], - message_id_list: List[Tuple[str, "DatabaseMessages"]], - mode: ChatMode = ChatMode.FOCUS, - # actions_before_now_block :str = "", - chat_content_block: str = "", - interest: str = "", - ) -> tuple[str, List[Tuple[str, "DatabaseMessages"]]]: # sourcery skip: use-join - """构建 Planner LLM 的提示词 (获取模板并填充数据)""" - try: - actions_before_now = get_actions_by_timestamp_with_chat( - chat_id=self.chat_id, - timestamp_start=time.time() - 600, - timestamp_end=time.time(), - limit=6, - ) - - actions_before_now_block = build_readable_actions( - actions=actions_before_now, - ) - - if actions_before_now_block: - actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" - else: - actions_before_now_block = "" - - chat_context_description = "你现在正在一个群聊中" - chat_target_name = None - if not is_group_chat and chat_target_info: - chat_target_name = chat_target_info.person_name or chat_target_info.user_nickname or "对方" - chat_context_description = f"你正在和 {chat_target_name} 私聊" - - # 别删,之后可能会允许主Planner扩展 - - # action_options_block = "" - - # if current_available_actions: - # for using_actions_name, using_actions_info in current_available_actions.items(): - # if using_actions_info.action_parameters: - # param_text = "\n" - # for param_name, param_description in using_actions_info.action_parameters.items(): - # param_text += f' "{param_name}":"{param_description}"\n' - # param_text = param_text.rstrip("\n") - # else: - # param_text = "" - - # require_text = "" - # for require_item in using_actions_info.action_require: - # require_text += f"- {require_item}\n" - # require_text = require_text.rstrip("\n") - - # using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") - # using_action_prompt = using_action_prompt.format( - # action_name=using_actions_name, - # action_description=using_actions_info.description, - # action_parameters=param_text, - # action_require=require_text, - # ) - - # action_options_block += using_action_prompt - # else: - # action_options_block = "" - - moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。" - - time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - - bot_name = global_config.bot.nickname - if global_config.bot.alias_names: - bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" - else: - bot_nickname = "" - name_block = f"你的名字是{bot_name}{bot_nickname},请注意哪些是你自己的发言。" - - if mode == ChatMode.FOCUS: - planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") - prompt = planner_prompt_template.format( - time_block=time_block, - chat_context_description=chat_context_description, - chat_content_block=chat_content_block, - actions_before_now_block=actions_before_now_block, - # action_options_text=action_options_block, - moderation_prompt=moderation_prompt_block, - name_block=name_block, - interest=interest, - ) - else: - planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_reply_prompt") - prompt = planner_prompt_template.format( - time_block=time_block, - chat_context_description=chat_context_description, - chat_content_block=chat_content_block, - moderation_prompt=moderation_prompt_block, - name_block=name_block, - actions_before_now_block=actions_before_now_block, - interest=interest, - ) return prompt, message_id_list except Exception as e: logger.error(f"构建 Planner 提示词时出错: {e}") @@ -879,14 +378,185 @@ class ActionPlanner: return is_group_chat, chat_target_info, current_available_actions - # 过滤掉no_action,除非所有结果都是no_action - def _filter_no_actions(self, action_list: List[ActionPlannerInfo]) -> List[ActionPlannerInfo]: - """过滤no_action,如果所有都是no_action则返回一个""" - if non_no_actions := [a for a in action_list if a.action_type != "no_action"]: - return non_no_actions + + def _filter_actions_by_activation_type( + self, + available_actions: Dict[str, ActionInfo], + chat_content_block: str + ) -> Dict[str, ActionInfo]: + """根据激活类型过滤动作""" + filtered_actions = {} + + for action_name, action_info in available_actions.items(): + if action_info.activation_type == ActionActivationType.NEVER: + logger.debug(f"{self.log_prefix}动作 {action_name} 设置为 NEVER 激活类型,跳过") + continue + elif action_info.activation_type in [ActionActivationType.LLM_JUDGE, ActionActivationType.ALWAYS]: + filtered_actions[action_name] = action_info + elif action_info.activation_type == ActionActivationType.RANDOM: + if random.random() < action_info.random_activation_probability: + filtered_actions[action_name] = action_info + elif action_info.activation_type == ActionActivationType.KEYWORD: + if action_info.activation_keywords: + for keyword in action_info.activation_keywords: + if keyword in chat_content_block: + filtered_actions[action_name] = action_info + break + else: + logger.warning(f"{self.log_prefix}未知的激活类型: {action_info.activation_type},跳过处理") + + return filtered_actions + + async def _build_action_options_block(self, current_available_actions: Dict[str, ActionInfo]) -> str: + """构建动作选项块""" + if not current_available_actions: + return "" + + action_options_block = "" + for action_name, action_info in current_available_actions.items(): + # 构建参数文本 + param_text = "" + if action_info.action_parameters: + param_text = "\n" + for param_name, param_description in action_info.action_parameters.items(): + param_text += f' "{param_name}":"{param_description}"\n' + param_text = param_text.rstrip("\n") + + # 构建要求文本 + require_text = "" + for require_item in action_info.action_require: + require_text += f"- {require_item}\n" + require_text = require_text.rstrip("\n") + + # 获取动作提示模板并填充 + using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") + using_action_prompt = using_action_prompt.format( + action_name=action_name, + action_description=action_info.description, + action_parameters=param_text, + action_require=require_text, + ) + + action_options_block += using_action_prompt + + return action_options_block + + async def _execute_main_planner( + self, + prompt: str, + message_id_list: List[Tuple[str, "DatabaseMessages"]], + filtered_actions: Dict[str, ActionInfo], + available_actions: Dict[str, ActionInfo], + loop_start_time: float + ) -> List[ActionPlannerInfo]: + """执行主规划器""" + llm_content = None + actions: List[ActionPlannerInfo] = [] + + try: + # 调用LLM + llm_content, (reasoning_content, _, _) = await self.planner_llm.generate_response_async(prompt=prompt) + + logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}") + logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}") + + if global_config.debug.show_prompt: + logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}") + logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}") + if reasoning_content: + logger.info(f"{self.log_prefix}规划器推理: {reasoning_content}") + else: + logger.debug(f"{self.log_prefix}规划器原始提示词: {prompt}") + logger.debug(f"{self.log_prefix}规划器原始响应: {llm_content}") + if reasoning_content: + logger.debug(f"{self.log_prefix}规划器推理: {reasoning_content}") + + except Exception as req_e: + logger.error(f"{self.log_prefix}LLM 请求执行失败: {req_e}") + return [ + ActionPlannerInfo( + action_type="no_reply", + reasoning=f"LLM 请求失败,模型出现问题: {req_e}", + action_data={}, + action_message=None, + available_actions=available_actions, + ) + ] + + # 解析LLM响应 + if llm_content: + try: + # 处理新的格式:多个```json包裹的JSON对象 + json_objects = self._extract_json_from_markdown(llm_content) + + if json_objects: + logger.info(f"{self.log_prefix}从响应中提取到{len(json_objects)}个JSON对象") + filtered_actions_list = list(filtered_actions.items()) + for json_obj in json_objects: + actions.extend( + self._parse_single_action(json_obj, message_id_list, filtered_actions_list) + ) + else: + # 尝试解析为直接的JSON + logger.warning(f"{self.log_prefix}LLM没有返回可用动作: {llm_content}") + actions = self._create_no_reply("LLM没有返回可用动作", available_actions) + + except Exception as json_e: + logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'") + actions = self._create_no_reply(f"解析LLM响应JSON失败: {json_e}", available_actions) + traceback.print_exc() else: - # 如果所有都是no_action,返回第一个 - return [action_list[0]] if action_list else [] + actions = self._create_no_reply("规划器没有获得LLM响应", available_actions) + + + # 添加循环开始时间到所有非no_action动作 + for action in actions: + action.action_data["loop_start_time"] = loop_start_time + + logger.info(f"{self.log_prefix}规划器决定执行{len(actions)}个动作: {' '.join([a.action_type for a in actions])}") + + return actions + + def _create_no_reply(self, reasoning: str, available_actions: Dict[str, ActionInfo]) -> List[ActionPlannerInfo]: + """创建no_action""" + return [ + ActionPlannerInfo( + action_type="no_reply", + reasoning=reasoning, + action_data={}, + action_message=None, + available_actions=available_actions, + ) + ] + + def _extract_json_from_markdown(self, content: str) -> List[dict]: + """从Markdown格式的内容中提取JSON对象""" + json_objects = [] + + # 使用正则表达式查找```json包裹的JSON内容 + json_pattern = r'```json\s*(.*?)\s*```' + matches = re.findall(json_pattern, content, re.DOTALL) + + for match in matches: + try: + # 清理可能的注释和格式问题 + json_str = re.sub(r'//.*?\n', '\n', match) # 移除单行注释 + json_str = re.sub(r'/\*.*?\*/', '', json_str, flags=re.DOTALL) # 移除多行注释 + json_str = json_str.strip() + + if json_str: + json_obj = json.loads(repair_json(json_str)) + if isinstance(json_obj, dict): + json_objects.append(json_obj) + elif isinstance(json_obj, list): + for item in json_obj: + if isinstance(item, dict): + json_objects.append(item) + except Exception as e: + logger.warning(f"解析JSON块失败: {e}, 块内容: {match[:100]}...") + continue + + return json_objects init_prompt() diff --git a/src/config/config.py b/src/config/config.py index a35ba7b7..97c98737 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -56,7 +56,7 @@ TEMPLATE_DIR = os.path.join(PROJECT_ROOT, "template") # 考虑到,实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 # 对该字段的更新,请严格参照语义化版本规范:https://semver.org/lang/zh-CN/ -MMC_VERSION = "0.10.3-snapshot.1" +MMC_VERSION = "0.10.3-snapshot.2" def get_key_comment(toml_table, key): diff --git a/src/llm_models/model_client/openai_client.py b/src/llm_models/model_client/openai_client.py index 4901fd2e..c5146c5e 100644 --- a/src/llm_models/model_client/openai_client.py +++ b/src/llm_models/model_client/openai_client.py @@ -277,10 +277,12 @@ async def _default_stream_response_handler( # 空 choices / usage-only 帧的防御 if not hasattr(event, "choices") or not event.choices: if hasattr(event, "usage") and event.usage: + # 安全地获取usage属性,处理不同API版本的差异 + usage_obj = event.usage _usage_record = ( - event.usage.prompt_tokens or 0, - event.usage.completion_tokens or 0, - event.usage.total_tokens or 0, + getattr(usage_obj, 'prompt_tokens', 0) or 0, + getattr(usage_obj, 'completion_tokens', 0) or 0, + getattr(usage_obj, 'total_tokens', 0) or 0, ) continue # 跳过本帧,避免访问 choices[0] delta = event.choices[0].delta # 获取当前块的delta内容 @@ -300,10 +302,12 @@ async def _default_stream_response_handler( if event.usage: # 如果有使用情况,则将其存储在APIResponse对象中 + # 安全地获取usage属性,处理不同API版本的差异 + usage_obj = event.usage _usage_record = ( - event.usage.prompt_tokens or 0, - event.usage.completion_tokens or 0, - event.usage.total_tokens or 0, + getattr(usage_obj, 'prompt_tokens', 0) or 0, + getattr(usage_obj, 'completion_tokens', 0) or 0, + getattr(usage_obj, 'total_tokens', 0) or 0, ) try: @@ -370,10 +374,12 @@ def _default_normal_response_parser( # 提取Usage信息 if resp.usage: + # 安全地获取usage属性,处理不同API版本的差异 + usage_obj = resp.usage _usage_record = ( - resp.usage.prompt_tokens or 0, - resp.usage.completion_tokens or 0, - resp.usage.total_tokens or 0, + getattr(usage_obj, 'prompt_tokens', 0) or 0, + getattr(usage_obj, 'completion_tokens', 0) or 0, + getattr(usage_obj, 'total_tokens', 0) or 0, ) else: _usage_record = None diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index 529c52b0..96dbd290 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -5,6 +5,7 @@ import time from enum import Enum from rich.traceback import install from typing import Tuple, List, Dict, Optional, Callable, Any +import traceback from src.common.logger import get_logger from src.config.config import model_config @@ -391,6 +392,7 @@ class LLMRequest: logger.debug(f"附加内容: {str(e.ext_info)}") return -1, None # 不再重试请求该模型 else: + print(traceback.format_exc()) logger.error(f"任务-'{task_name}' 模型-'{model_name}': 未知异常,错误信息-{str(e)}") return -1, None # 不再重试请求该模型