feat:添加回复loig

pull/1459/head
SengokuCola 2025-12-23 21:42:48 +08:00
parent 0f6ec45821
commit 839a42578c
6 changed files with 298 additions and 10 deletions

View File

@ -11,6 +11,7 @@ from json_repair import repair_json
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.common.logger import get_logger
from src.chat.logger.plan_reply_logger import PlanReplyLogger
from src.common.data_models.info_data_model import ActionPlannerInfo
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.chat.utils.chat_message_builder import (
@ -261,6 +262,7 @@ class BrainPlanner:
"""
规划器 (Planner): 使用LLM根据上下文决定做出什么动作ReAct模式
"""
plan_start = time.perf_counter()
# 获取聊天上下文
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
@ -298,6 +300,7 @@ class BrainPlanner:
logger.debug(f"{self.log_prefix}过滤后有{len(filtered_actions)}个可用动作")
prompt_build_start = time.perf_counter()
# 构建包含所有动作的提示词:使用统一的 ReAct Prompt
prompt_key = "brain_planner_prompt_react"
# 这里不记录日志,避免重复打印,由调用方按需控制 log_prompt
@ -308,9 +311,10 @@ class BrainPlanner:
message_id_list=message_id_list,
prompt_key=prompt_key,
)
prompt_build_ms = (time.perf_counter() - prompt_build_start) * 1000
# 调用LLM获取决策
reasoning, actions = await self._execute_main_planner(
reasoning, actions, llm_raw_output, llm_reasoning, llm_duration_ms = await self._execute_main_planner(
prompt=prompt,
message_id_list=message_id_list,
filtered_actions=filtered_actions,
@ -324,6 +328,25 @@ class BrainPlanner:
)
self.add_plan_log(reasoning, actions)
try:
PlanReplyLogger.log_plan(
chat_id=self.chat_id,
prompt=prompt,
reasoning=reasoning,
raw_output=llm_raw_output,
raw_reasoning=llm_reasoning,
actions=actions,
timing={
"prompt_build_ms": round(prompt_build_ms, 2),
"llm_duration_ms": round(llm_duration_ms, 2) if llm_duration_ms is not None else None,
"total_plan_ms": round((time.perf_counter() - plan_start) * 1000, 2),
"loop_start_time": loop_start_time,
},
extra=None,
)
except Exception:
logger.exception(f"{self.log_prefix}记录plan日志失败")
return actions
async def build_planner_prompt(
@ -479,15 +502,20 @@ class BrainPlanner:
filtered_actions: Dict[str, ActionInfo],
available_actions: Dict[str, ActionInfo],
loop_start_time: float,
) -> Tuple[str, List[ActionPlannerInfo]]:
) -> Tuple[str, List[ActionPlannerInfo], Optional[str], Optional[str], Optional[float]]:
"""执行主规划器"""
llm_content = None
actions: List[ActionPlannerInfo] = []
extracted_reasoning = ""
llm_reasoning = None
llm_duration_ms = None
try:
# 调用LLM
llm_start = time.perf_counter()
llm_content, (reasoning_content, _, _) = await self.planner_llm.generate_response_async(prompt=prompt)
llm_duration_ms = (time.perf_counter() - llm_start) * 1000
llm_reasoning = reasoning_content
logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}")
logger.info(f"{self.log_prefix}规划器原始响应: {llm_content}")
@ -514,7 +542,7 @@ class BrainPlanner:
action_message=None,
available_actions=available_actions,
)
]
], llm_content, llm_reasoning, llm_duration_ms
# 解析LLM响应
if llm_content:
@ -553,7 +581,7 @@ class BrainPlanner:
f"{self.log_prefix}规划器决定执行{len(actions)}个动作: {' '.join([a.action_type for a in actions])}"
)
return extracted_reasoning, actions
return extracted_reasoning, actions, llm_content, llm_reasoning, llm_duration_ms
def _create_complete_talk(
self, reasoning: str, available_actions: Dict[str, ActionInfo]

View File

@ -0,0 +1,132 @@
import json
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from uuid import uuid4
class PlanReplyLogger:
"""独立的Plan/Reply日志记录器负责落盘和容量控制。"""
_BASE_DIR = Path("logs")
_PLAN_DIR = _BASE_DIR / "plan"
_REPLY_DIR = _BASE_DIR / "reply"
_MAX_PER_CHAT = 1000
_TRIM_COUNT = 100
@classmethod
def log_plan(
cls,
chat_id: str,
prompt: str,
reasoning: str,
raw_output: Optional[str],
raw_reasoning: Optional[str],
actions: List[Any],
timing: Optional[Dict[str, Any]] = None,
extra: Optional[Dict[str, Any]] = None,
) -> None:
payload = {
"type": "plan",
"chat_id": chat_id,
"timestamp": time.time(),
"prompt": prompt,
"reasoning": reasoning,
"raw_output": raw_output,
"raw_reasoning": raw_reasoning,
"actions": [cls._serialize_action(action) for action in actions],
"timing": timing or {},
"extra": cls._safe_data(extra),
}
cls._write_json(cls._PLAN_DIR, chat_id, payload)
@classmethod
def log_reply(
cls,
chat_id: str,
prompt: str,
output: Optional[str],
processed_output: Optional[List[Any]],
model: Optional[str],
timing: Optional[Dict[str, Any]] = None,
reasoning: Optional[str] = None,
think_level: Optional[int] = None,
error: Optional[str] = None,
success: bool = True,
) -> None:
payload = {
"type": "reply",
"chat_id": chat_id,
"timestamp": time.time(),
"prompt": prompt,
"output": output,
"processed_output": cls._safe_data(processed_output),
"model": model,
"reasoning": reasoning,
"think_level": think_level,
"timing": timing or {},
"error": error if not success else None,
"success": success,
}
cls._write_json(cls._REPLY_DIR, chat_id, payload)
@classmethod
def _write_json(cls, base_dir: Path, chat_id: str, payload: Dict[str, Any]) -> None:
chat_dir = base_dir / chat_id
chat_dir.mkdir(parents=True, exist_ok=True)
file_path = chat_dir / f"{int(time.time() * 1000)}_{uuid4().hex[:8]}.json"
try:
with file_path.open("w", encoding="utf-8") as f:
json.dump(cls._safe_data(payload), f, ensure_ascii=False, indent=2)
finally:
cls._trim_overflow(chat_dir)
@classmethod
def _trim_overflow(cls, chat_dir: Path) -> None:
"""超过阈值时删除最老的若干文件,避免目录无限增长。"""
files = sorted(chat_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
if len(files) <= cls._MAX_PER_CHAT:
return
# 删除最老的 TRIM_COUNT 条
for old_file in files[: cls._TRIM_COUNT]:
try:
old_file.unlink()
except FileNotFoundError:
continue
@classmethod
def _serialize_action(cls, action: Any) -> Dict[str, Any]:
# ActionPlannerInfo 结构的轻量序列化,避免引用复杂对象
message_info = None
action_message = getattr(action, "action_message", None)
if action_message:
user_info = getattr(action_message, "user_info", None)
message_info = {
"message_id": getattr(action_message, "message_id", None),
"user_id": getattr(user_info, "user_id", None) if user_info else None,
"platform": getattr(user_info, "platform", None) if user_info else None,
"text": getattr(action_message, "processed_plain_text", None),
}
return {
"action_type": getattr(action, "action_type", None),
"reasoning": getattr(action, "reasoning", None),
"action_data": cls._safe_data(getattr(action, "action_data", None)),
"action_message": message_info,
"available_actions": cls._safe_data(getattr(action, "available_actions", None)),
"action_reasoning": getattr(action, "action_reasoning", None),
}
@classmethod
def _safe_data(cls, value: Any) -> Any:
if isinstance(value, (str, int, float, bool)) or value is None:
return value
if isinstance(value, dict):
return {str(k): cls._safe_data(v) for k, v in value.items()}
if isinstance(value, (list, tuple, set)):
return [cls._safe_data(v) for v in value]
if isinstance(value, Path):
return str(value)
# Fallback to string for other complex types
return str(value)

View File

@ -10,6 +10,7 @@ from json_repair import repair_json
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.common.logger import get_logger
from src.chat.logger.plan_reply_logger import PlanReplyLogger
from src.common.data_models.info_data_model import ActionPlannerInfo
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.chat.utils.chat_message_builder import (
@ -310,6 +311,7 @@ class ActionPlanner:
"""
规划器 (Planner): 使用LLM根据上下文决定做出什么动作
"""
plan_start = time.perf_counter()
# 获取聊天上下文
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
@ -345,6 +347,7 @@ class ActionPlanner:
logger.debug(f"{self.log_prefix}过滤后有{len(filtered_actions)}个可用动作")
prompt_build_start = time.perf_counter()
# 构建包含所有动作的提示词
prompt, message_id_list = await self.build_planner_prompt(
is_group_chat=is_group_chat,
@ -353,9 +356,10 @@ class ActionPlanner:
chat_content_block=chat_content_block,
message_id_list=message_id_list,
)
prompt_build_ms = (time.perf_counter() - prompt_build_start) * 1000
# 调用LLM获取决策
reasoning, actions = await self._execute_main_planner(
reasoning, actions, llm_raw_output, llm_reasoning, llm_duration_ms = await self._execute_main_planner(
prompt=prompt,
message_id_list=message_id_list,
filtered_actions=filtered_actions,
@ -397,6 +401,25 @@ class ActionPlanner:
self.add_plan_log(reasoning, actions)
try:
PlanReplyLogger.log_plan(
chat_id=self.chat_id,
prompt=prompt,
reasoning=reasoning,
raw_output=llm_raw_output,
raw_reasoning=llm_reasoning,
actions=actions,
timing={
"prompt_build_ms": round(prompt_build_ms, 2),
"llm_duration_ms": round(llm_duration_ms, 2) if llm_duration_ms is not None else None,
"total_plan_ms": round((time.perf_counter() - plan_start) * 1000, 2),
"loop_start_time": loop_start_time,
},
extra=None,
)
except Exception:
logger.exception(f"{self.log_prefix}记录plan日志失败")
return actions
def add_plan_log(self, reasoning: str, actions: List[ActionPlannerInfo]):
@ -610,14 +633,19 @@ class ActionPlanner:
filtered_actions: Dict[str, ActionInfo],
available_actions: Dict[str, ActionInfo],
loop_start_time: float,
) -> Tuple[str, List[ActionPlannerInfo]]:
) -> Tuple[str, List[ActionPlannerInfo], Optional[str], Optional[str], Optional[float]]:
"""执行主规划器"""
llm_content = None
actions: List[ActionPlannerInfo] = []
llm_reasoning = None
llm_duration_ms = None
try:
# 调用LLM
llm_start = time.perf_counter()
llm_content, (reasoning_content, _, _) = await self.planner_llm.generate_response_async(prompt=prompt)
llm_duration_ms = (time.perf_counter() - llm_start) * 1000
llm_reasoning = reasoning_content
if global_config.debug.show_planner_prompt:
logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}")
@ -640,7 +668,7 @@ class ActionPlanner:
action_message=None,
available_actions=available_actions,
)
]
], llm_content, llm_reasoning, llm_duration_ms
# 解析LLM响应
extracted_reasoning = ""
@ -685,7 +713,7 @@ class ActionPlanner:
logger.debug(f"{self.log_prefix}规划器选择了{len(actions)}个动作: {' '.join([a.action_type for a in actions])}")
return extracted_reasoning, actions
return extracted_reasoning, actions, llm_content, llm_reasoning, llm_duration_ms
def _create_no_reply(self, reasoning: str, available_actions: Dict[str, ActionInfo]) -> List[ActionPlannerInfo]:
"""创建no_reply"""

View File

@ -31,6 +31,7 @@ from src.person_info.person_info import Person
from src.plugin_system.base.component_types import ActionInfo, EventType
from src.plugin_system.apis import llm_api
from src.chat.logger.plan_reply_logger import PlanReplyLogger
from src.chat.replyer.prompt.lpmm_prompt import init_lpmm_prompt
from src.chat.replyer.prompt.replyer_prompt import init_replyer_prompt
from src.chat.replyer.prompt.rewrite_prompt import init_rewrite_prompt
@ -74,6 +75,7 @@ class DefaultReplyer:
reply_time_point: Optional[float] = time.time(),
think_level: int = 1,
unknown_words: Optional[List[str]] = None,
log_reply: bool = True,
) -> Tuple[bool, LLMGenerationDataModel]:
# sourcery skip: merge-nested-ifs
"""
@ -92,6 +94,9 @@ class DefaultReplyer:
Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: (是否成功, 生成的回复, 使用的prompt)
"""
overall_start = time.perf_counter()
prompt_duration_ms: Optional[float] = None
llm_duration_ms: Optional[float] = None
prompt = None
selected_expressions: Optional[List[int]] = None
llm_response = LLMGenerationDataModel()
@ -101,6 +106,7 @@ class DefaultReplyer:
# 3. 构建 Prompt
timing_logs = []
almost_zero_str = ""
prompt_start = time.perf_counter()
with Timer("构建Prompt", {}): # 内部计时器,可选保留
prompt, selected_expressions, timing_logs, almost_zero_str = await self.build_prompt_reply_context(
extra_info=extra_info,
@ -113,11 +119,37 @@ class DefaultReplyer:
think_level=think_level,
unknown_words=unknown_words,
)
prompt_duration_ms = (time.perf_counter() - prompt_start) * 1000
llm_response.prompt = prompt
llm_response.selected_expressions = selected_expressions
llm_response.timing = {
"prompt_ms": round(prompt_duration_ms or 0.0, 2),
"overall_ms": None, # 占位,稍后写入
}
llm_response.timing_logs = timing_logs
llm_response.timing["timing_logs"] = timing_logs
if not prompt:
logger.warning("构建prompt失败跳过回复生成")
llm_response.timing["overall_ms"] = round((time.perf_counter() - overall_start) * 1000, 2)
llm_response.timing["almost_zero"] = almost_zero_str
llm_response.timing["timing_logs"] = timing_logs
if log_reply:
try:
PlanReplyLogger.log_reply(
chat_id=self.chat_stream.stream_id,
prompt="",
output=None,
processed_output=None,
model=None,
timing=llm_response.timing,
reasoning=None,
think_level=think_level,
error="build_prompt_failed",
success=False,
)
except Exception:
logger.exception("记录reply日志失败")
return False, llm_response
from src.plugin_system.core.events_manager import events_manager
@ -137,7 +169,9 @@ class DefaultReplyer:
model_name = "unknown_model"
try:
llm_start = time.perf_counter()
content, reasoning_content, model_name, tool_call = await self.llm_generate_content(prompt)
llm_duration_ms = (time.perf_counter() - llm_start) * 1000
# logger.debug(f"replyer生成内容: {content}")
# 统一输出所有日志信息使用try-except确保即使某个步骤出错也能输出
@ -161,6 +195,26 @@ class DefaultReplyer:
llm_response.reasoning = reasoning_content
llm_response.model = model_name
llm_response.tool_calls = tool_call
llm_response.timing["llm_ms"] = round(llm_duration_ms or 0.0, 2)
llm_response.timing["overall_ms"] = round((time.perf_counter() - overall_start) * 1000, 2)
llm_response.timing_logs = timing_logs
llm_response.timing["timing_logs"] = timing_logs
llm_response.timing["almost_zero"] = almost_zero_str
try:
if log_reply:
PlanReplyLogger.log_reply(
chat_id=self.chat_stream.stream_id,
prompt=prompt,
output=content,
processed_output=None,
model=model_name,
timing=llm_response.timing,
reasoning=reasoning_content,
think_level=think_level,
success=True,
)
except Exception:
logger.exception("记录reply日志失败")
continue_flag, modified_message = await events_manager.handle_mai_events(
EventType.AFTER_LLM, None, prompt, llm_response, stream_id=stream_id
)
@ -194,6 +248,27 @@ class DefaultReplyer:
except Exception as log_e:
logger.warning(f"输出日志时出错: {log_e}")
llm_response.timing["llm_ms"] = round(llm_duration_ms or 0.0, 2)
llm_response.timing["overall_ms"] = round((time.perf_counter() - overall_start) * 1000, 2)
llm_response.timing_logs = timing_logs
llm_response.timing["timing_logs"] = timing_logs
llm_response.timing["almost_zero"] = almost_zero_str
if log_reply:
try:
PlanReplyLogger.log_reply(
chat_id=self.chat_stream.stream_id,
prompt=prompt or "",
output=None,
processed_output=None,
model=model_name,
timing=llm_response.timing,
reasoning=None,
think_level=think_level,
error=str(llm_e),
success=False,
)
except Exception:
logger.exception("记录reply日志失败")
return False, llm_response # LLM 调用失败则无法生成回复
return True, llm_response

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Optional, List, TYPE_CHECKING
from typing import Optional, List, TYPE_CHECKING, Dict, Any
from . import BaseDataModel
@ -17,3 +17,6 @@ class LLMGenerationDataModel(BaseDataModel):
prompt: Optional[str] = None
selected_expressions: Optional[List[int]] = None
reply_set: Optional["ReplySetModel"] = None
timing: Optional[Dict[str, Any]] = None
processed_output: Optional[List[str]] = None
timing_logs: Optional[List[str]] = None

View File

@ -20,6 +20,7 @@ from src.chat.message_receive.chat_stream import ChatStream
from src.chat.utils.utils import process_llm_response
from src.chat.replyer.replyer_manager import replyer_manager
from src.plugin_system.base.component_types import ActionInfo
from src.chat.logger.plan_reply_logger import PlanReplyLogger
if TYPE_CHECKING:
from src.common.data_models.info_data_model import ActionPlannerInfo
@ -162,16 +163,37 @@ async def generate_reply(
from_plugin=from_plugin,
stream_id=chat_stream.stream_id if chat_stream else chat_id,
reply_time_point=reply_time_point,
log_reply=False,
)
if not success:
logger.warning("[GeneratorAPI] 回复生成失败")
return False, None
reply_set: Optional[ReplySetModel] = None
if content := llm_response.content:
reply_set = process_human_text(content, enable_splitter, enable_chinese_typo)
processed_response = process_llm_response(content, enable_splitter, enable_chinese_typo)
llm_response.processed_output = processed_response
reply_set = ReplySetModel()
for text in processed_response:
reply_set.add_text_content(text)
llm_response.reply_set = reply_set
logger.debug(f"[GeneratorAPI] 回复生成成功,生成了 {len(reply_set) if reply_set else 0} 个回复项")
# 统一在这里记录最终回复日志(包含分割后的 processed_output
try:
PlanReplyLogger.log_reply(
chat_id=chat_stream.stream_id if chat_stream else (chat_id or ""),
prompt=llm_response.prompt or "",
output=llm_response.content,
processed_output=llm_response.processed_output,
model=llm_response.model,
timing=llm_response.timing,
reasoning=llm_response.reasoning,
think_level=think_level,
success=True,
)
except Exception:
logger.exception("[GeneratorAPI] 记录reply日志失败")
return success, llm_response
except ValueError as ve: