diff --git a/src/chat/brain_chat/brain_planner.py b/src/chat/brain_chat/brain_planner.py index 14d28575..ed1e78c9 100644 --- a/src/chat/brain_chat/brain_planner.py +++ b/src/chat/brain_chat/brain_planner.py @@ -11,6 +11,7 @@ from json_repair import repair_json from src.llm_models.utils_model import LLMRequest from src.config.config import global_config, model_config from src.common.logger import get_logger +from src.chat.logger.plan_reply_logger import PlanReplyLogger from src.common.data_models.info_data_model import ActionPlannerInfo from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.chat.utils.chat_message_builder import ( @@ -261,6 +262,7 @@ class BrainPlanner: """ 规划器 (Planner): 使用LLM根据上下文决定做出什么动作(ReAct模式)。 """ + plan_start = time.perf_counter() # 获取聊天上下文 message_list_before_now = get_raw_msg_before_timestamp_with_chat( @@ -298,6 +300,7 @@ class BrainPlanner: logger.debug(f"{self.log_prefix}过滤后有{len(filtered_actions)}个可用动作") + prompt_build_start = time.perf_counter() # 构建包含所有动作的提示词:使用统一的 ReAct Prompt prompt_key = "brain_planner_prompt_react" # 这里不记录日志,避免重复打印,由调用方按需控制 log_prompt @@ -308,9 +311,10 @@ class BrainPlanner: message_id_list=message_id_list, prompt_key=prompt_key, ) + prompt_build_ms = (time.perf_counter() - prompt_build_start) * 1000 # 调用LLM获取决策 - reasoning, actions = await self._execute_main_planner( + reasoning, actions, llm_raw_output, llm_reasoning, llm_duration_ms = await self._execute_main_planner( prompt=prompt, message_id_list=message_id_list, filtered_actions=filtered_actions, @@ -324,6 +328,25 @@ class BrainPlanner: ) self.add_plan_log(reasoning, actions) + try: + PlanReplyLogger.log_plan( + chat_id=self.chat_id, + prompt=prompt, + reasoning=reasoning, + raw_output=llm_raw_output, + raw_reasoning=llm_reasoning, + actions=actions, + timing={ + "prompt_build_ms": round(prompt_build_ms, 2), + "llm_duration_ms": round(llm_duration_ms, 2) if llm_duration_ms is not None else None, + "total_plan_ms": round((time.perf_counter() - plan_start) * 1000, 2), + "loop_start_time": loop_start_time, + }, + extra=None, + ) + except Exception: + logger.exception(f"{self.log_prefix}记录plan日志失败") + return actions async def build_planner_prompt( @@ -479,15 +502,20 @@ class BrainPlanner: filtered_actions: Dict[str, ActionInfo], available_actions: Dict[str, ActionInfo], loop_start_time: float, - ) -> Tuple[str, List[ActionPlannerInfo]]: + ) -> Tuple[str, List[ActionPlannerInfo], Optional[str], Optional[str], Optional[float]]: """执行主规划器""" llm_content = None actions: List[ActionPlannerInfo] = [] extracted_reasoning = "" + llm_reasoning = None + llm_duration_ms = None try: # 调用LLM + llm_start = time.perf_counter() llm_content, (reasoning_content, _, _) = await self.planner_llm.generate_response_async(prompt=prompt) + llm_duration_ms = (time.perf_counter() - llm_start) * 1000 + llm_reasoning = reasoning_content logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}") logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}") @@ -514,7 +542,7 @@ class BrainPlanner: action_message=None, available_actions=available_actions, ) - ] + ], llm_content, llm_reasoning, llm_duration_ms # 解析LLM响应 if llm_content: @@ -553,7 +581,7 @@ class BrainPlanner: f"{self.log_prefix}规划器决定执行{len(actions)}个动作: {' '.join([a.action_type for a in actions])}" ) - return extracted_reasoning, actions + return extracted_reasoning, actions, llm_content, llm_reasoning, llm_duration_ms def _create_complete_talk( self, reasoning: str, available_actions: Dict[str, ActionInfo] diff --git a/src/chat/logger/plan_reply_logger.py b/src/chat/logger/plan_reply_logger.py new file mode 100644 index 00000000..d1f720dc --- /dev/null +++ b/src/chat/logger/plan_reply_logger.py @@ -0,0 +1,132 @@ +import json +import time +from pathlib import Path +from typing import Any, Dict, List, Optional +from uuid import uuid4 + + +class PlanReplyLogger: + """独立的Plan/Reply日志记录器,负责落盘和容量控制。""" + + _BASE_DIR = Path("logs") + _PLAN_DIR = _BASE_DIR / "plan" + _REPLY_DIR = _BASE_DIR / "reply" + _MAX_PER_CHAT = 1000 + _TRIM_COUNT = 100 + + @classmethod + def log_plan( + cls, + chat_id: str, + prompt: str, + reasoning: str, + raw_output: Optional[str], + raw_reasoning: Optional[str], + actions: List[Any], + timing: Optional[Dict[str, Any]] = None, + extra: Optional[Dict[str, Any]] = None, + ) -> None: + payload = { + "type": "plan", + "chat_id": chat_id, + "timestamp": time.time(), + "prompt": prompt, + "reasoning": reasoning, + "raw_output": raw_output, + "raw_reasoning": raw_reasoning, + "actions": [cls._serialize_action(action) for action in actions], + "timing": timing or {}, + "extra": cls._safe_data(extra), + } + cls._write_json(cls._PLAN_DIR, chat_id, payload) + + @classmethod + def log_reply( + cls, + chat_id: str, + prompt: str, + output: Optional[str], + processed_output: Optional[List[Any]], + model: Optional[str], + timing: Optional[Dict[str, Any]] = None, + reasoning: Optional[str] = None, + think_level: Optional[int] = None, + error: Optional[str] = None, + success: bool = True, + ) -> None: + payload = { + "type": "reply", + "chat_id": chat_id, + "timestamp": time.time(), + "prompt": prompt, + "output": output, + "processed_output": cls._safe_data(processed_output), + "model": model, + "reasoning": reasoning, + "think_level": think_level, + "timing": timing or {}, + "error": error if not success else None, + "success": success, + } + cls._write_json(cls._REPLY_DIR, chat_id, payload) + + @classmethod + def _write_json(cls, base_dir: Path, chat_id: str, payload: Dict[str, Any]) -> None: + chat_dir = base_dir / chat_id + chat_dir.mkdir(parents=True, exist_ok=True) + file_path = chat_dir / f"{int(time.time() * 1000)}_{uuid4().hex[:8]}.json" + try: + with file_path.open("w", encoding="utf-8") as f: + json.dump(cls._safe_data(payload), f, ensure_ascii=False, indent=2) + finally: + cls._trim_overflow(chat_dir) + + @classmethod + def _trim_overflow(cls, chat_dir: Path) -> None: + """超过阈值时删除最老的若干文件,避免目录无限增长。""" + files = sorted(chat_dir.glob("*.json"), key=lambda p: p.stat().st_mtime) + if len(files) <= cls._MAX_PER_CHAT: + return + # 删除最老的 TRIM_COUNT 条 + for old_file in files[: cls._TRIM_COUNT]: + try: + old_file.unlink() + except FileNotFoundError: + continue + + @classmethod + def _serialize_action(cls, action: Any) -> Dict[str, Any]: + # ActionPlannerInfo 结构的轻量序列化,避免引用复杂对象 + message_info = None + action_message = getattr(action, "action_message", None) + if action_message: + user_info = getattr(action_message, "user_info", None) + message_info = { + "message_id": getattr(action_message, "message_id", None), + "user_id": getattr(user_info, "user_id", None) if user_info else None, + "platform": getattr(user_info, "platform", None) if user_info else None, + "text": getattr(action_message, "processed_plain_text", None), + } + + return { + "action_type": getattr(action, "action_type", None), + "reasoning": getattr(action, "reasoning", None), + "action_data": cls._safe_data(getattr(action, "action_data", None)), + "action_message": message_info, + "available_actions": cls._safe_data(getattr(action, "available_actions", None)), + "action_reasoning": getattr(action, "action_reasoning", None), + } + + @classmethod + def _safe_data(cls, value: Any) -> Any: + if isinstance(value, (str, int, float, bool)) or value is None: + return value + if isinstance(value, dict): + return {str(k): cls._safe_data(v) for k, v in value.items()} + if isinstance(value, (list, tuple, set)): + return [cls._safe_data(v) for v in value] + if isinstance(value, Path): + return str(value) + # Fallback to string for other complex types + return str(value) + diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 645ee003..121c7ebc 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -10,6 +10,7 @@ from json_repair import repair_json from src.llm_models.utils_model import LLMRequest from src.config.config import global_config, model_config from src.common.logger import get_logger +from src.chat.logger.plan_reply_logger import PlanReplyLogger from src.common.data_models.info_data_model import ActionPlannerInfo from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.chat.utils.chat_message_builder import ( @@ -310,6 +311,7 @@ class ActionPlanner: """ 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。 """ + plan_start = time.perf_counter() # 获取聊天上下文 message_list_before_now = get_raw_msg_before_timestamp_with_chat( @@ -345,6 +347,7 @@ class ActionPlanner: logger.debug(f"{self.log_prefix}过滤后有{len(filtered_actions)}个可用动作") + prompt_build_start = time.perf_counter() # 构建包含所有动作的提示词 prompt, message_id_list = await self.build_planner_prompt( is_group_chat=is_group_chat, @@ -353,9 +356,10 @@ class ActionPlanner: chat_content_block=chat_content_block, message_id_list=message_id_list, ) + prompt_build_ms = (time.perf_counter() - prompt_build_start) * 1000 # 调用LLM获取决策 - reasoning, actions = await self._execute_main_planner( + reasoning, actions, llm_raw_output, llm_reasoning, llm_duration_ms = await self._execute_main_planner( prompt=prompt, message_id_list=message_id_list, filtered_actions=filtered_actions, @@ -397,6 +401,25 @@ class ActionPlanner: self.add_plan_log(reasoning, actions) + try: + PlanReplyLogger.log_plan( + chat_id=self.chat_id, + prompt=prompt, + reasoning=reasoning, + raw_output=llm_raw_output, + raw_reasoning=llm_reasoning, + actions=actions, + timing={ + "prompt_build_ms": round(prompt_build_ms, 2), + "llm_duration_ms": round(llm_duration_ms, 2) if llm_duration_ms is not None else None, + "total_plan_ms": round((time.perf_counter() - plan_start) * 1000, 2), + "loop_start_time": loop_start_time, + }, + extra=None, + ) + except Exception: + logger.exception(f"{self.log_prefix}记录plan日志失败") + return actions def add_plan_log(self, reasoning: str, actions: List[ActionPlannerInfo]): @@ -610,14 +633,19 @@ class ActionPlanner: filtered_actions: Dict[str, ActionInfo], available_actions: Dict[str, ActionInfo], loop_start_time: float, - ) -> Tuple[str, List[ActionPlannerInfo]]: + ) -> Tuple[str, List[ActionPlannerInfo], Optional[str], Optional[str], Optional[float]]: """执行主规划器""" llm_content = None actions: List[ActionPlannerInfo] = [] + llm_reasoning = None + llm_duration_ms = None try: # 调用LLM + llm_start = time.perf_counter() llm_content, (reasoning_content, _, _) = await self.planner_llm.generate_response_async(prompt=prompt) + llm_duration_ms = (time.perf_counter() - llm_start) * 1000 + llm_reasoning = reasoning_content if global_config.debug.show_planner_prompt: logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}") @@ -640,7 +668,7 @@ class ActionPlanner: action_message=None, available_actions=available_actions, ) - ] + ], llm_content, llm_reasoning, llm_duration_ms # 解析LLM响应 extracted_reasoning = "" @@ -685,7 +713,7 @@ class ActionPlanner: logger.debug(f"{self.log_prefix}规划器选择了{len(actions)}个动作: {' '.join([a.action_type for a in actions])}") - return extracted_reasoning, actions + return extracted_reasoning, actions, llm_content, llm_reasoning, llm_duration_ms def _create_no_reply(self, reasoning: str, available_actions: Dict[str, ActionInfo]) -> List[ActionPlannerInfo]: """创建no_reply""" diff --git a/src/chat/replyer/group_generator.py b/src/chat/replyer/group_generator.py index 1ec04556..bdd56c6b 100644 --- a/src/chat/replyer/group_generator.py +++ b/src/chat/replyer/group_generator.py @@ -31,6 +31,7 @@ from src.person_info.person_info import Person from src.plugin_system.base.component_types import ActionInfo, EventType from src.plugin_system.apis import llm_api +from src.chat.logger.plan_reply_logger import PlanReplyLogger from src.chat.replyer.prompt.lpmm_prompt import init_lpmm_prompt from src.chat.replyer.prompt.replyer_prompt import init_replyer_prompt from src.chat.replyer.prompt.rewrite_prompt import init_rewrite_prompt @@ -74,6 +75,7 @@ class DefaultReplyer: reply_time_point: Optional[float] = time.time(), think_level: int = 1, unknown_words: Optional[List[str]] = None, + log_reply: bool = True, ) -> Tuple[bool, LLMGenerationDataModel]: # sourcery skip: merge-nested-ifs """ @@ -92,6 +94,9 @@ class DefaultReplyer: Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: (是否成功, 生成的回复, 使用的prompt) """ + overall_start = time.perf_counter() + prompt_duration_ms: Optional[float] = None + llm_duration_ms: Optional[float] = None prompt = None selected_expressions: Optional[List[int]] = None llm_response = LLMGenerationDataModel() @@ -101,6 +106,7 @@ class DefaultReplyer: # 3. 构建 Prompt timing_logs = [] almost_zero_str = "" + prompt_start = time.perf_counter() with Timer("构建Prompt", {}): # 内部计时器,可选保留 prompt, selected_expressions, timing_logs, almost_zero_str = await self.build_prompt_reply_context( extra_info=extra_info, @@ -113,11 +119,37 @@ class DefaultReplyer: think_level=think_level, unknown_words=unknown_words, ) + prompt_duration_ms = (time.perf_counter() - prompt_start) * 1000 llm_response.prompt = prompt llm_response.selected_expressions = selected_expressions + llm_response.timing = { + "prompt_ms": round(prompt_duration_ms or 0.0, 2), + "overall_ms": None, # 占位,稍后写入 + } + llm_response.timing_logs = timing_logs + llm_response.timing["timing_logs"] = timing_logs if not prompt: logger.warning("构建prompt失败,跳过回复生成") + llm_response.timing["overall_ms"] = round((time.perf_counter() - overall_start) * 1000, 2) + llm_response.timing["almost_zero"] = almost_zero_str + llm_response.timing["timing_logs"] = timing_logs + if log_reply: + try: + PlanReplyLogger.log_reply( + chat_id=self.chat_stream.stream_id, + prompt="", + output=None, + processed_output=None, + model=None, + timing=llm_response.timing, + reasoning=None, + think_level=think_level, + error="build_prompt_failed", + success=False, + ) + except Exception: + logger.exception("记录reply日志失败") return False, llm_response from src.plugin_system.core.events_manager import events_manager @@ -137,7 +169,9 @@ class DefaultReplyer: model_name = "unknown_model" try: + llm_start = time.perf_counter() content, reasoning_content, model_name, tool_call = await self.llm_generate_content(prompt) + llm_duration_ms = (time.perf_counter() - llm_start) * 1000 # logger.debug(f"replyer生成内容: {content}") # 统一输出所有日志信息,使用try-except确保即使某个步骤出错也能输出 @@ -161,6 +195,26 @@ class DefaultReplyer: llm_response.reasoning = reasoning_content llm_response.model = model_name llm_response.tool_calls = tool_call + llm_response.timing["llm_ms"] = round(llm_duration_ms or 0.0, 2) + llm_response.timing["overall_ms"] = round((time.perf_counter() - overall_start) * 1000, 2) + llm_response.timing_logs = timing_logs + llm_response.timing["timing_logs"] = timing_logs + llm_response.timing["almost_zero"] = almost_zero_str + try: + if log_reply: + PlanReplyLogger.log_reply( + chat_id=self.chat_stream.stream_id, + prompt=prompt, + output=content, + processed_output=None, + model=model_name, + timing=llm_response.timing, + reasoning=reasoning_content, + think_level=think_level, + success=True, + ) + except Exception: + logger.exception("记录reply日志失败") continue_flag, modified_message = await events_manager.handle_mai_events( EventType.AFTER_LLM, None, prompt, llm_response, stream_id=stream_id ) @@ -194,6 +248,27 @@ class DefaultReplyer: except Exception as log_e: logger.warning(f"输出日志时出错: {log_e}") + llm_response.timing["llm_ms"] = round(llm_duration_ms or 0.0, 2) + llm_response.timing["overall_ms"] = round((time.perf_counter() - overall_start) * 1000, 2) + llm_response.timing_logs = timing_logs + llm_response.timing["timing_logs"] = timing_logs + llm_response.timing["almost_zero"] = almost_zero_str + if log_reply: + try: + PlanReplyLogger.log_reply( + chat_id=self.chat_stream.stream_id, + prompt=prompt or "", + output=None, + processed_output=None, + model=model_name, + timing=llm_response.timing, + reasoning=None, + think_level=think_level, + error=str(llm_e), + success=False, + ) + except Exception: + logger.exception("记录reply日志失败") return False, llm_response # LLM 调用失败则无法生成回复 return True, llm_response diff --git a/src/common/data_models/llm_data_model.py b/src/common/data_models/llm_data_model.py index 2d2ee0c3..68068cda 100644 --- a/src/common/data_models/llm_data_model.py +++ b/src/common/data_models/llm_data_model.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Optional, List, TYPE_CHECKING +from typing import Optional, List, TYPE_CHECKING, Dict, Any from . import BaseDataModel @@ -17,3 +17,6 @@ class LLMGenerationDataModel(BaseDataModel): prompt: Optional[str] = None selected_expressions: Optional[List[int]] = None reply_set: Optional["ReplySetModel"] = None + timing: Optional[Dict[str, Any]] = None + processed_output: Optional[List[str]] = None + timing_logs: Optional[List[str]] = None diff --git a/src/plugin_system/apis/generator_api.py b/src/plugin_system/apis/generator_api.py index 55242b2f..deaccc4b 100644 --- a/src/plugin_system/apis/generator_api.py +++ b/src/plugin_system/apis/generator_api.py @@ -20,6 +20,7 @@ from src.chat.message_receive.chat_stream import ChatStream from src.chat.utils.utils import process_llm_response from src.chat.replyer.replyer_manager import replyer_manager from src.plugin_system.base.component_types import ActionInfo +from src.chat.logger.plan_reply_logger import PlanReplyLogger if TYPE_CHECKING: from src.common.data_models.info_data_model import ActionPlannerInfo @@ -162,16 +163,37 @@ async def generate_reply( from_plugin=from_plugin, stream_id=chat_stream.stream_id if chat_stream else chat_id, reply_time_point=reply_time_point, + log_reply=False, ) if not success: logger.warning("[GeneratorAPI] 回复生成失败") return False, None reply_set: Optional[ReplySetModel] = None if content := llm_response.content: - reply_set = process_human_text(content, enable_splitter, enable_chinese_typo) + processed_response = process_llm_response(content, enable_splitter, enable_chinese_typo) + llm_response.processed_output = processed_response + reply_set = ReplySetModel() + for text in processed_response: + reply_set.add_text_content(text) llm_response.reply_set = reply_set logger.debug(f"[GeneratorAPI] 回复生成成功,生成了 {len(reply_set) if reply_set else 0} 个回复项") + # 统一在这里记录最终回复日志(包含分割后的 processed_output) + try: + PlanReplyLogger.log_reply( + chat_id=chat_stream.stream_id if chat_stream else (chat_id or ""), + prompt=llm_response.prompt or "", + output=llm_response.content, + processed_output=llm_response.processed_output, + model=llm_response.model, + timing=llm_response.timing, + reasoning=llm_response.reasoning, + think_level=think_level, + success=True, + ) + except Exception: + logger.exception("[GeneratorAPI] 记录reply日志失败") + return success, llm_response except ValueError as ve: