diff --git a/README.md b/README.md
index f07e7d57..17a8da37 100644
--- a/README.md
+++ b/README.md
@@ -1,18 +1,18 @@
# 麦麦!MaiCore-MaiMBot (编辑中)
-
+
+
一款专注于 群组聊天 的赛博网友
探索本项目的文档 »
diff --git a/src/chat/focus_chat/cycle_analyzer.py b/src/chat/focus_chat/cycle_analyzer.py
deleted file mode 100644
index 23374ced..00000000
--- a/src/chat/focus_chat/cycle_analyzer.py
+++ /dev/null
@@ -1,216 +0,0 @@
-import os
-import time
-from typing import List, Dict, Any, Tuple
-from src.chat.focus_chat.heartFC_Cycleinfo import CycleInfo
-from src.common.logger_manager import get_logger
-
-logger = get_logger("cycle_analyzer")
-
-
-class CycleAnalyzer:
- """循环信息分析类,提供查询和分析CycleInfo的工具"""
-
- def __init__(self, base_dir: str = "log_debug"):
- """
- 初始化分析器
-
- 参数:
- base_dir: 存储CycleInfo的基础目录,默认为log_debug
- """
- self.base_dir = base_dir
-
- def list_streams(self) -> List[str]:
- """
- 获取所有聊天流ID列表
-
- 返回:
- List[str]: 聊天流ID列表
- """
- try:
- if not os.path.exists(self.base_dir):
- return []
-
- return [d for d in os.listdir(self.base_dir) if os.path.isdir(os.path.join(self.base_dir, d))]
- except Exception as e:
- logger.error(f"获取聊天流列表时出错: {e}")
- return []
-
- def get_stream_cycle_count(self, stream_id: str) -> int:
- """
- 获取指定聊天流的循环数量
-
- 参数:
- stream_id: 聊天流ID
-
- 返回:
- int: 循环数量
- """
- try:
- files = CycleInfo.list_cycles(stream_id, self.base_dir)
- return len(files)
- except Exception as e:
- logger.error(f"获取聊天流循环数量时出错: {e}")
- return 0
-
- def get_stream_cycles(self, stream_id: str, start: int = 0, limit: int = -1) -> List[str]:
- """
- 获取指定聊天流的循环文件列表
-
- 参数:
- stream_id: 聊天流ID
- start: 起始索引,默认为0
- limit: 返回的最大数量,默认为-1(全部)
-
- 返回:
- List[str]: 循环文件路径列表
- """
- try:
- files = CycleInfo.list_cycles(stream_id, self.base_dir)
- if limit < 0:
- return files[start:]
- else:
- return files[start : start + limit]
- except Exception as e:
- logger.error(f"获取聊天流循环文件列表时出错: {e}")
- return []
-
- def get_cycle_content(self, filepath: str) -> str:
- """
- 获取循环文件的内容
-
- 参数:
- filepath: 文件路径
-
- 返回:
- str: 文件内容
- """
- try:
- if not os.path.exists(filepath):
- return f"文件不存在: {filepath}"
-
- with open(filepath, "r", encoding="utf-8") as f:
- return f.read()
- except Exception as e:
- logger.error(f"读取循环文件内容时出错: {e}")
- return f"读取文件出错: {e}"
-
- def analyze_stream_cycles(self, stream_id: str) -> Dict[str, Any]:
- """
- 分析指定聊天流的所有循环,生成统计信息
-
- 参数:
- stream_id: 聊天流ID
-
- 返回:
- Dict[str, Any]: 统计信息
- """
- try:
- files = CycleInfo.list_cycles(stream_id, self.base_dir)
- if not files:
- return {"error": "没有找到循环记录"}
-
- total_cycles = len(files)
- action_counts = {"text_reply": 0, "emoji_reply": 0, "no_reply": 0, "unknown": 0}
- total_duration = 0
- tool_usage = {}
-
- for filepath in files:
- with open(filepath, "r", encoding="utf-8") as f:
- content = f.read()
-
- # 解析动作类型
- for line in content.split("\n"):
- if line.startswith("动作:"):
- action = line[3:].strip()
- action_counts[action] = action_counts.get(action, 0) + 1
-
- # 解析耗时
- elif line.startswith("耗时:"):
- try:
- duration = float(line[3:].strip().split("秒")[0])
- total_duration += duration
- except Exception as e:
- logger.error(f"解析耗时时出错: {e}")
- pass
-
- # 解析工具使用
- elif line.startswith("使用的工具:"):
- tools = line[6:].strip().split(", ")
- for tool in tools:
- tool_usage[tool] = tool_usage.get(tool, 0) + 1
-
- avg_duration = total_duration / total_cycles if total_cycles > 0 else 0
-
- return {
- "总循环数": total_cycles,
- "动作统计": action_counts,
- "平均耗时": f"{avg_duration:.2f}秒",
- "总耗时": f"{total_duration:.2f}秒",
- "工具使用次数": tool_usage,
- }
- except Exception as e:
- logger.error(f"分析聊天流循环时出错: {e}")
- return {"error": f"分析出错: {e}"}
-
- def get_latest_cycles(self, count: int = 10) -> List[Tuple[str, str]]:
- """
- 获取所有聊天流中最新的几个循环
-
- 参数:
- count: 获取的数量,默认为10
-
- 返回:
- List[Tuple[str, str]]: 聊天流ID和文件路径的元组列表
- """
- try:
- all_cycles = []
- streams = self.list_streams()
-
- for stream_id in streams:
- files = CycleInfo.list_cycles(stream_id, self.base_dir)
- for filepath in files:
- try:
- # 从文件名中提取时间戳
- filename = os.path.basename(filepath)
- timestamp_str = filename.split("_", 2)[2].split(".")[0]
- timestamp = time.mktime(time.strptime(timestamp_str, "%Y%m%d_%H%M%S"))
- all_cycles.append((timestamp, stream_id, filepath))
- except Exception as e:
- logger.error(f"从文件名中提取时间戳时出错: {e}")
- continue
-
- # 按时间戳排序,取最新的count个
- all_cycles.sort(reverse=True)
- return [(item[1], item[2]) for item in all_cycles[:count]]
- except Exception as e:
- logger.error(f"获取最新循环时出错: {e}")
- return []
-
-
-# 使用示例
-if __name__ == "__main__":
- analyzer = CycleAnalyzer()
-
- # 列出所有聊天流
- streams = analyzer.list_streams()
- print(f"找到 {len(streams)} 个聊天流: {streams}")
-
- # 分析第一个聊天流的循环
- if streams:
- stream_id = streams[0]
- stats = analyzer.analyze_stream_cycles(stream_id)
- print(f"\n聊天流 {stream_id} 的统计信息:")
- for key, value in stats.items():
- print(f" {key}: {value}")
-
- # 获取最新的循环
- cycles = analyzer.get_stream_cycles(stream_id, limit=1)
- if cycles:
- print("\n最新循环内容:")
- print(analyzer.get_cycle_content(cycles[0]))
-
- # 获取所有聊天流中最新的3个循环
- latest_cycles = analyzer.get_latest_cycles(3)
- print(f"\n所有聊天流中最新的 {len(latest_cycles)} 个循环:")
- for stream_id, filepath in latest_cycles:
- print(f" 聊天流 {stream_id}: {os.path.basename(filepath)}")
diff --git a/src/chat/focus_chat/expressors/default_expressor.py b/src/chat/focus_chat/expressors/default_expressor.py
index 4b253c6c..12750e9c 100644
--- a/src/chat/focus_chat/expressors/default_expressor.py
+++ b/src/chat/focus_chat/expressors/default_expressor.py
@@ -260,7 +260,7 @@ class DefaultExpressor:
return None
mark_head = False
- _first_bot_msg: Optional[MessageSending] = None
+ # first_bot_msg: Optional[MessageSending] = None
reply_message_ids = [] # 记录实际发送的消息ID
sent_msg_list = []
@@ -291,7 +291,7 @@ class DefaultExpressor:
try:
if not mark_head:
mark_head = True
- _first_bot_msg = bot_message # 保存第一个成功发送的消息对象
+ # first_bot_msg = bot_message # 保存第一个成功发送的消息对象
typing = False
else:
typing = True
diff --git a/src/chat/focus_chat/expressors/exprssion_learner.py b/src/chat/focus_chat/expressors/exprssion_learner.py
index 55222714..d908e939 100644
--- a/src/chat/focus_chat/expressors/exprssion_learner.py
+++ b/src/chat/focus_chat/expressors/exprssion_learner.py
@@ -10,7 +10,7 @@ import os
import json
-MAX_EXPRESSION_COUNT = 300
+MAX_EXPRESSION_COUNT = 100
logger = get_logger("expressor")
@@ -65,9 +65,9 @@ def init_prompt() -> None:
当"xxx"时,可以"xxx"
例如:
-当"表达观点较复杂"时,使用"省略主语"的句法
+当"表达观点较复杂"时,使用"省略主语(3-6个字)"的句法
当"不用详细说明的一般表达"时,使用"非常简洁的句子"的句法
-当"需要单纯简单的确认"时,使用"单字或几个字的肯定"的句法
+当"需要单纯简单的确认"时,使用"单字或几个字的肯定(1-2个字)"的句法
注意不要总结你自己的发言
现在请你概括
@@ -122,11 +122,11 @@ class ExpressionLearner:
"""
学习并存储表达方式,分别学习语言风格和句法特点
"""
- learnt_style: Optional[List[Tuple[str, str, str]]] = await self.learn_and_store(type="style", num=3)
+ learnt_style: Optional[List[Tuple[str, str, str]]] = await self.learn_and_store(type="style", num=15)
if not learnt_style:
return []
- learnt_grammar: Optional[List[Tuple[str, str, str]]] = await self.learn_and_store(type="grammar", num=2)
+ learnt_grammar: Optional[List[Tuple[str, str, str]]] = await self.learn_and_store(type="grammar", num=15)
if not learnt_grammar:
return []
@@ -233,7 +233,7 @@ class ExpressionLearner:
chat_str=random_msg_str,
)
- logger.info(f"学习{type_str}的prompt: {prompt}")
+ # logger.info(f"学习{type_str}的prompt: {prompt}")
try:
response, _ = await self.express_learn_model.generate_response_async(prompt)
@@ -291,7 +291,7 @@ class ExpressionLearner:
"personality_expression_prompt",
personality=global_config.expression_style,
)
- logger.info(f"个性表达方式提取prompt: {prompt}")
+ # logger.info(f"个性表达方式提取prompt: {prompt}")
try:
response, _ = await self.express_learn_model.generate_response_async(prompt)
diff --git a/src/chat/focus_chat/heartFC_Cycleinfo.py b/src/chat/focus_chat/heartFC_Cycleinfo.py
index 80864e83..f1accecd 100644
--- a/src/chat/focus_chat/heartFC_Cycleinfo.py
+++ b/src/chat/focus_chat/heartFC_Cycleinfo.py
@@ -1,6 +1,5 @@
import time
import os
-import json
from typing import List, Optional, Dict, Any
@@ -9,45 +8,16 @@ class CycleDetail:
def __init__(self, cycle_id: int):
self.cycle_id = cycle_id
+ self.thinking_id = ""
self.start_time = time.time()
self.end_time: Optional[float] = None
- self.action_taken = False
- self.action_type = "unknown"
- self.reasoning = ""
self.timers: Dict[str, float] = {}
- self.thinking_id = ""
- self.replanned = False
- # 添加响应信息相关字段
- self.response_info: Dict[str, Any] = {
- "response_text": [], # 回复的文本列表
- "emoji_info": "", # 表情信息
- "anchor_message_id": "", # 锚点消息ID
- "reply_message_ids": [], # 回复消息ID列表
- "sub_mind_thinking": "", # 子思维思考内容
- "in_mind_reply": [], # 子思维思考内容
- }
-
- # 添加SubMind相关信息
- self.submind_info: Dict[str, Any] = {
- "prompt": "", # SubMind输入的prompt
- "structured_info": "", # 结构化信息
- "result": "", # SubMind的思考结果
- }
-
- # 添加ToolUse相关信息
- self.tooluse_info: Dict[str, Any] = {
- "prompt": "", # 工具使用的prompt
- "tools_used": [], # 使用了哪些工具
- "tool_results": [], # 工具获得的信息
- }
-
- # 添加Planner相关信息
- self.planner_info: Dict[str, Any] = {
- "prompt": "", # 规划器的prompt
- "response": "", # 规划器的原始回复
- "parsed_result": {}, # 解析后的结果
- }
+ # 新字段
+ self.loop_observation_info: Dict[str, Any] = {}
+ self.loop_process_info: Dict[str, Any] = {}
+ self.loop_plan_info: Dict[str, Any] = {}
+ self.loop_action_info: Dict[str, Any] = {}
def to_dict(self) -> Dict[str, Any]:
"""将循环信息转换为字典格式"""
@@ -55,229 +25,28 @@ class CycleDetail:
"cycle_id": self.cycle_id,
"start_time": self.start_time,
"end_time": self.end_time,
- "action_taken": self.action_taken,
- "action_type": self.action_type,
- "reasoning": self.reasoning,
"timers": self.timers,
"thinking_id": self.thinking_id,
- "response_info": self.response_info,
- "submind_info": self.submind_info,
- "tooluse_info": self.tooluse_info,
- "planner_info": self.planner_info,
+ "loop_observation_info": self.loop_observation_info,
+ "loop_process_info": self.loop_process_info,
+ "loop_plan_info": self.loop_plan_info,
+ "loop_action_info": self.loop_action_info,
}
def complete_cycle(self):
"""完成循环,记录结束时间"""
self.end_time = time.time()
- def set_action_info(
- self, action_type: str, reasoning: str, action_taken: bool, action_data: Optional[Dict[str, Any]] = None
- ):
- """设置动作信息"""
- self.action_type = action_type
- self.action_data = action_data
- self.reasoning = reasoning
- self.action_taken = action_taken
-
def set_thinking_id(self, thinking_id: str):
"""设置思考消息ID"""
self.thinking_id = thinking_id
- def set_response_info(
- self,
- response_text: Optional[List[str]] = None,
- emoji_info: Optional[str] = None,
- anchor_message_id: Optional[str] = None,
- reply_message_ids: Optional[List[str]] = None,
- sub_mind_thinking: Optional[str] = None,
- ):
- """设置响应信息"""
- if response_text is not None:
- self.response_info["response_text"] = response_text
- if emoji_info is not None:
- self.response_info["emoji_info"] = emoji_info
- if anchor_message_id is not None:
- self.response_info["anchor_message_id"] = anchor_message_id
- if reply_message_ids is not None:
- self.response_info["reply_message_ids"] = reply_message_ids
- if sub_mind_thinking is not None:
- self.response_info["sub_mind_thinking"] = sub_mind_thinking
-
- def set_submind_info(
- self,
- prompt: Optional[str] = None,
- structured_info: Optional[str] = None,
- result: Optional[str] = None,
- ):
- """设置SubMind信息"""
- if prompt is not None:
- self.submind_info["prompt"] = prompt
- if structured_info is not None:
- self.submind_info["structured_info"] = structured_info
- if result is not None:
- self.submind_info["result"] = result
-
- def set_tooluse_info(
- self,
- prompt: Optional[str] = None,
- tools_used: Optional[List[str]] = None,
- tool_results: Optional[List[Dict[str, Any]]] = None,
- ):
- """设置ToolUse信息"""
- if prompt is not None:
- self.tooluse_info["prompt"] = prompt
- if tools_used is not None:
- self.tooluse_info["tools_used"] = tools_used
- if tool_results is not None:
- self.tooluse_info["tool_results"] = tool_results
-
- def set_planner_info(
- self,
- prompt: Optional[str] = None,
- response: Optional[str] = None,
- parsed_result: Optional[Dict[str, Any]] = None,
- ):
- """设置Planner信息"""
- if prompt is not None:
- self.planner_info["prompt"] = prompt
- if response is not None:
- self.planner_info["response"] = response
- if parsed_result is not None:
- self.planner_info["parsed_result"] = parsed_result
-
- @staticmethod
- def save_to_file(cycle_info: "CycleDetail", stream_id: str, base_dir: str = "log_debug") -> str:
- """
- 将CycleInfo保存到文件
-
- 参数:
- cycle_info: CycleInfo对象
- stream_id: 聊天流ID
- base_dir: 基础目录,默认为log_debug
-
- 返回:
- str: 保存的文件路径
- """
- try:
- # 创建目录结构
- stream_dir = os.path.join(base_dir, stream_id)
- os.makedirs(stream_dir, exist_ok=True)
-
- # 生成文件名和路径
- timestamp = time.strftime("%Y%m%d_%H%M%S", time.localtime(cycle_info.start_time))
- filename = f"cycle_{cycle_info.cycle_id}_{timestamp}.txt"
- filepath = os.path.join(stream_dir, filename)
-
- # 格式化输出成易读的格式
- with open(filepath, "w", encoding="utf-8") as f:
- # 写入基本信息
- f.write(f"循环ID: {cycle_info.cycle_id}\n")
- f.write(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(cycle_info.start_time))}\n")
- if cycle_info.end_time:
- f.write(f"结束时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(cycle_info.end_time))}\n")
- duration = cycle_info.end_time - cycle_info.start_time
- f.write(f"耗时: {duration:.2f}秒\n")
- f.write(f"动作: {cycle_info.action_type}\n")
- f.write(f"原因: {cycle_info.reasoning}\n")
- f.write(f"执行状态: {'已执行' if cycle_info.action_taken else '未执行'}\n")
- f.write(f"思考ID: {cycle_info.thinking_id}\n")
- f.write(f"是否为重新规划: {'是' if cycle_info.replanned else '否'}\n\n")
-
- # 写入计时器信息
- if cycle_info.timers:
- f.write("== 计时器信息 ==\n")
- for name, elapsed in cycle_info.timers.items():
- formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
- f.write(f"{name}: {formatted_time}\n")
- f.write("\n")
-
- # 写入响应信息
- f.write("== 响应信息 ==\n")
- f.write(f"锚点消息ID: {cycle_info.response_info['anchor_message_id']}\n")
- if cycle_info.response_info["response_text"]:
- f.write("回复文本:\n")
- for i, text in enumerate(cycle_info.response_info["response_text"]):
- f.write(f" [{i + 1}] {text}\n")
- if cycle_info.response_info["emoji_info"]:
- f.write(f"表情信息: {cycle_info.response_info['emoji_info']}\n")
- if cycle_info.response_info["reply_message_ids"]:
- f.write(f"回复消息ID: {', '.join(cycle_info.response_info['reply_message_ids'])}\n")
- f.write("\n")
-
- # 写入SubMind信息
- f.write("== SubMind信息 ==\n")
- f.write(f"结构化信息:\n{cycle_info.submind_info['structured_info']}\n\n")
- f.write(f"思考结果:\n{cycle_info.submind_info['result']}\n\n")
- f.write("SubMind Prompt:\n")
- f.write(f"{cycle_info.submind_info['prompt']}\n\n")
-
- # 写入ToolUse信息
- f.write("== 工具使用信息 ==\n")
- if cycle_info.tooluse_info["tools_used"]:
- f.write(f"使用的工具: {', '.join(cycle_info.tooluse_info['tools_used'])}\n")
- else:
- f.write("未使用工具\n")
-
- if cycle_info.tooluse_info["tool_results"]:
- f.write("工具结果:\n")
- for i, result in enumerate(cycle_info.tooluse_info["tool_results"]):
- f.write(f" [{i + 1}] 类型: {result.get('type', '未知')}, 内容: {result.get('content', '')}\n")
- f.write("\n")
- f.write("工具执行 Prompt:\n")
- f.write(f"{cycle_info.tooluse_info['prompt']}\n\n")
-
- # 写入Planner信息
- f.write("== Planner信息 ==\n")
- f.write("Planner Prompt:\n")
- f.write(f"{cycle_info.planner_info['prompt']}\n\n")
- f.write("原始回复:\n")
- f.write(f"{cycle_info.planner_info['response']}\n\n")
- f.write("解析结果:\n")
- f.write(f"{json.dumps(cycle_info.planner_info['parsed_result'], ensure_ascii=False, indent=2)}\n")
-
- return filepath
- except Exception as e:
- print(f"保存CycleInfo到文件时出错: {e}")
- return ""
-
- @staticmethod
- def load_from_file(filepath: str) -> Optional[Dict[str, Any]]:
- """
- 从文件加载CycleInfo信息(只加载JSON格式的数据,不解析文本格式)
-
- 参数:
- filepath: 文件路径
-
- 返回:
- Optional[Dict[str, Any]]: 加载的CycleInfo数据,失败则返回None
- """
- try:
- if not os.path.exists(filepath):
- print(f"文件不存在: {filepath}")
- return None
-
- # 尝试从文件末尾读取JSON数据
- with open(filepath, "r", encoding="utf-8") as f:
- lines = f.readlines()
-
- # 查找"解析结果:"后的JSON数据
- for i, line in enumerate(lines):
- if "解析结果:" in line and i + 1 < len(lines):
- # 尝试解析后面的行
- json_data = ""
- for j in range(i + 1, len(lines)):
- json_data += lines[j]
-
- try:
- return json.loads(json_data)
- except json.JSONDecodeError:
- continue
-
- # 如果没有找到JSON数据,则返回None
- return None
- except Exception as e:
- print(f"从文件加载CycleInfo时出错: {e}")
- return None
+ def set_loop_info(self, loop_info: Dict[str, Any]):
+ """设置循环信息"""
+ self.loop_observation_info = loop_info["loop_observation_info"]
+ self.loop_processor_info = loop_info["loop_processor_info"]
+ self.loop_plan_info = loop_info["loop_plan_info"]
+ self.loop_action_info = loop_info["loop_action_info"]
@staticmethod
def list_cycles(stream_id: str, base_dir: str = "log_debug") -> List[str]:
diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py
index 8c1dd293..9dd2529b 100644
--- a/src/chat/focus_chat/heartFC_chat.py
+++ b/src/chat/focus_chat/heartFC_chat.py
@@ -1,7 +1,5 @@
import asyncio
import contextlib
-import json # <--- 确保导入 json
-import random # <--- 添加导入
import time
import traceback
from collections import deque
@@ -10,19 +8,10 @@ from src.chat.message_receive.chat_stream import ChatStream
from src.chat.message_receive.chat_stream import chat_manager
from rich.traceback import install
from src.common.logger_manager import get_logger
-from src.chat.models.utils_model import LLMRequest
-from src.config.config import global_config
from src.chat.utils.timer_calculator import Timer
from src.chat.heart_flow.observation.observation import Observation
-from src.chat.focus_chat.heartflow_prompt_builder import prompt_builder
from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
-from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
-from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
from src.chat.focus_chat.info.info_base import InfoBase
-from src.chat.focus_chat.info.obs_info import ObsInfo
-from src.chat.focus_chat.info.cycle_info import CycleInfo
-from src.chat.focus_chat.info.mind_info import MindInfo
-from src.chat.focus_chat.info.structured_info import StructuredInfo
from src.chat.focus_chat.info_processors.chattinginfo_processor import ChattingInfoProcessor
from src.chat.focus_chat.info_processors.mind_processor import MindProcessor
from src.chat.heart_flow.observation.memory_observation import MemoryObservation
@@ -30,10 +19,12 @@ from src.chat.heart_flow.observation.hfcloop_observation import HFCloopObservati
from src.chat.heart_flow.observation.working_observation import WorkingObservation
from src.chat.focus_chat.info_processors.tool_processor import ToolProcessor
from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
-from src.chat.focus_chat.hfc_utils import create_empty_anchor_message, parse_thinking_id_to_timestamp
from src.chat.focus_chat.memory_activator import MemoryActivator
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
from src.plugins.group_nickname.nickname_manager import nickname_manager
+from src.chat.focus_chat.info_processors.base_processor import BaseProcessor
+from src.chat.focus_chat.planners.planner import ActionPlanner
+from src.chat.focus_chat.planners.action_factory import ActionManager
install(extra_lines=3)
@@ -47,78 +38,6 @@ CONSECUTIVE_NO_REPLY_THRESHOLD = 3 # 连续不回复的阈值
logger = get_logger("hfc") # Logger Name Changed
-# 默认动作定义
-DEFAULT_ACTIONS = {"no_reply": "不操作,继续浏览", "reply": "表达想法,可以只包含文本、表情或两者都有"}
-
-
-class ActionManager:
- """动作管理器:控制每次决策可以使用的动作"""
-
- def __init__(self):
- # 初始化为新的默认动作集
- self._available_actions: Dict[str, str] = DEFAULT_ACTIONS.copy()
- self._original_actions_backup: Optional[Dict[str, str]] = None
-
- def get_available_actions(self) -> Dict[str, str]:
- """获取当前可用的动作集"""
- return self._available_actions.copy() # 返回副本以防外部修改
-
- def add_action(self, action_name: str, description: str) -> bool:
- """
- 添加新的动作
-
- 参数:
- action_name: 动作名称
- description: 动作描述
-
- 返回:
- bool: 是否添加成功
- """
- if action_name in self._available_actions:
- return False
- self._available_actions[action_name] = description
- return True
-
- def remove_action(self, action_name: str) -> bool:
- """
- 移除指定动作
-
- 参数:
- action_name: 动作名称
-
- 返回:
- bool: 是否移除成功
- """
- if action_name not in self._available_actions:
- return False
- del self._available_actions[action_name]
- return True
-
- def temporarily_remove_actions(self, actions_to_remove: List[str]):
- """
- 临时移除指定的动作,备份原始动作集。
- 如果已经有备份,则不重复备份。
- """
- if self._original_actions_backup is None:
- self._original_actions_backup = self._available_actions.copy()
-
- actions_actually_removed = []
- for action_name in actions_to_remove:
- if action_name in self._available_actions:
- del self._available_actions[action_name]
- actions_actually_removed.append(action_name)
- # logger.debug(f"临时移除了动作: {actions_actually_removed}") # 可选日志
-
- def restore_actions(self):
- """
- 恢复之前备份的原始动作集。
- """
- if self._original_actions_backup is not None:
- self._available_actions = self._original_actions_backup.copy()
- self._original_actions_backup = None
- # logger.debug("恢复了原始动作集") # 可选日志
-
-
async def _handle_cycle_delay(action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str):
"""处理循环延迟"""
cycle_duration = time.monotonic() - cycle_start_time
@@ -164,39 +83,25 @@ class HeartFChatting:
self.chat_stream: Optional[ChatStream] = None # 关联的聊天流
self.observations: List[Observation] = observations # 关联的观察列表,用于监控聊天流状态
self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback
-
- self.chatting_info_processor = ChattingInfoProcessor()
- self.mind_processor = MindProcessor(subheartflow_id=self.stream_id)
+ self.log_prefix: str = str(chat_id) # Initial default, will be updated
self.memory_observation = MemoryObservation(observe_id=self.stream_id)
self.hfcloop_observation = HFCloopObservation(observe_id=self.stream_id)
- self.tool_processor = ToolProcessor(subheartflow_id=self.stream_id)
self.working_observation = WorkingObservation(observe_id=self.stream_id)
self.memory_activator = MemoryActivator()
-
- # 日志前缀
- self.log_prefix: str = str(chat_id) # Initial default, will be updated
-
- # --- Initialize attributes (defaults) ---
- self.is_group_chat: bool = False
- self.chat_target_info: Optional[dict] = None
- # --- End Initialization ---
self.expressor = DefaultExpressor(chat_id=self.stream_id)
-
- # 动作管理器
self.action_manager = ActionManager()
+ self.action_planner = ActionPlanner(log_prefix=self.log_prefix, action_manager=self.action_manager, stream_id=self.stream_id, chat_stream=self.chat_stream)
+
+
+ # --- 处理器列表 ---
+ self.processors: List[BaseProcessor] = []
+ self._register_default_processors()
# 初始化状态控制
self._initialized = False
self._processing_lock = asyncio.Lock()
- # LLM规划器配置
- self.planner_llm = LLMRequest(
- model=global_config.llm_plan,
- max_tokens=1000,
- request_type="action_planning", # 用于动作规划
- )
-
# 循环控制内部状态
self._loop_active: bool = False # 循环是否正在运行
self._loop_task: Optional[asyncio.Task] = None # 主循环任务
@@ -205,9 +110,9 @@ class HeartFChatting:
self._cycle_counter = 0
self._cycle_history: Deque[CycleDetail] = deque(maxlen=10) # 保留最近10个循环的信息
self._current_cycle: Optional[CycleDetail] = None
- self.total_no_reply_count: int = 0 # <--- 新增:连续不回复计数器
- self._shutting_down: bool = False # <--- 新增:关闭标志位
- self.total_waiting_time: float = 0.0 # <--- 新增:累计等待时间
+ self.total_no_reply_count: int = 0 # 连续不回复计数器
+ self._shutting_down: bool = False # 关闭标志位
+ self.total_waiting_time: float = 0.0 # 累计等待时间
async def _initialize(self) -> bool:
"""
@@ -230,7 +135,6 @@ class HeartFChatting:
return True
try:
- self.is_group_chat, self.chat_target_info = await get_chat_type_and_target_info(self.stream_id)
await self.expressor.initialize()
self.chat_stream = await asyncio.to_thread(chat_manager.get_stream, self.stream_id)
self.expressor.chat_stream = self.chat_stream
@@ -244,6 +148,13 @@ class HeartFChatting:
logger.debug(f"{self.log_prefix} 初始化完成,准备开始处理消息")
return True
+ def _register_default_processors(self):
+ """注册默认的信息处理器"""
+ self.processors.append(ChattingInfoProcessor())
+ self.processors.append(MindProcessor(subheartflow_id=self.stream_id))
+ self.processors.append(ToolProcessor(subheartflow_id=self.stream_id))
+ logger.info(f"{self.log_prefix} 已注册默认处理器: {[p.__class__.__name__ for p in self.processors]}")
+
async def start(self):
"""
启动 HeartFChatting 的主循环。
@@ -272,10 +183,7 @@ class HeartFChatting:
pass # 忽略取消或超时错误
self._loop_task = None # 清理旧任务引用
- logger.debug(f"{self.log_prefix} 启动认真水群(HFC)主循环...")
- # 创建新的循环任务
- self._loop_task = asyncio.create_task(self._hfc_loop())
- # 添加完成回调
+ self._loop_task = asyncio.create_task(self._run_focus_chat())
self._loop_task.add_done_callback(self._handle_loop_completion)
def _handle_loop_completion(self, task: asyncio.Task):
@@ -286,7 +194,6 @@ class HeartFChatting:
logger.error(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天(异常): {exception}")
logger.error(traceback.format_exc()) # Log full traceback for exceptions
else:
- # Loop completing normally now means it was cancelled/shutdown externally
logger.info(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天 (外部停止)")
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天(任务取消)")
@@ -297,16 +204,14 @@ class HeartFChatting:
logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。")
self._processing_lock.release()
- async def _hfc_loop(self):
+ async def _run_focus_chat(self):
"""主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try:
while True: # 主循环
logger.debug(f"{self.log_prefix} 开始第{self._cycle_counter}次循环")
- # --- 在循环开始处检查关闭标志 ---
if self._shutting_down:
- logger.info(f"{self.log_prefix} 检测到关闭标志,退出 HFC 循环。")
+ logger.info(f"{self.log_prefix} 检测到关闭标志,退出 Focus Chat 循环。")
break
- # --------------------------------
# 创建新的循环信息
self._cycle_counter += 1
@@ -317,63 +222,49 @@ class HeartFChatting:
loop_cycle_start_time = time.monotonic()
# 执行规划和处理阶段
- async with self._get_cycle_context() as acquired_lock:
- if not acquired_lock:
- # 如果未能获取锁(理论上不太可能,除非 shutdown 过程中释放了但又被抢了?)
- # 或者也可以在这里再次检查 self._shutting_down
- if self._shutting_down:
- break # 再次检查,确保退出
- logger.warning(f"{self.log_prefix} 未能获取循环处理锁,跳过本次循环。")
- await asyncio.sleep(0.1) # 短暂等待避免空转
- continue
-
- # thinking_id 是思考过程的ID,用于标记每一轮思考
+ async with self._get_cycle_context():
thinking_id = "tid" + str(round(time.time(), 2))
-
+ self._current_cycle.set_thinking_id(thinking_id)
# 主循环:思考->决策->执行
- action_taken = await self._think_plan_execute_loop(cycle_timers, thinking_id)
+ loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id)
- # 更新循环信息
- self._current_cycle.set_thinking_id(thinking_id)
+ self._current_cycle.set_loop_info(loop_info)
+
+ self.hfcloop_observation.add_loop_info(self._current_cycle)
self._current_cycle.timers = cycle_timers
# 防止循环过快消耗资源
- await _handle_cycle_delay(action_taken, loop_cycle_start_time, self.log_prefix)
+ await _handle_cycle_delay(
+ loop_info["loop_action_info"]["action_taken"], loop_cycle_start_time, self.log_prefix
+ )
# 完成当前循环并保存历史
self._current_cycle.complete_cycle()
self._cycle_history.append(self._current_cycle)
- # 保存CycleInfo到文件
- try:
- filepath = CycleDetail.save_to_file(self._current_cycle, self.stream_id)
- logger.info(f"{self.log_prefix} 已保存循环信息到文件: {filepath}")
- except Exception as e:
- logger.error(f"{self.log_prefix} 保存循环信息到文件时出错: {e}")
-
# 记录循环信息和计时器结果
timer_strings = []
for name, elapsed in cycle_timers.items():
formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
timer_strings.append(f"{name}: {formatted_time}")
- logger.debug(
- f"{self.log_prefix} 第 #{self._current_cycle.cycle_id}次思考完成,"
- f"耗时: {self._current_cycle.end_time - self._current_cycle.start_time:.2f}秒, "
- f"动作: {self._current_cycle.action_type}"
- + (f"\n计时器详情: {'; '.join(timer_strings)}" if timer_strings else "")
+ logger.info(
+ f"{self.log_prefix} 第{self._current_cycle.cycle_id}次思考,"
+ f"耗时: {self._current_cycle.end_time - self._current_cycle.start_time:.1f}秒, "
+ f"动作: {self._current_cycle.loop_plan_info['action_result']['action_type']}"
+ + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
except asyncio.CancelledError:
# 设置了关闭标志位后被取消是正常流程
if not self._shutting_down:
- logger.warning(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环意外被取消")
+ logger.warning(f"{self.log_prefix} 麦麦Focus聊天模式意外被取消")
else:
- logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环已取消 (正常关闭)")
+ logger.info(f"{self.log_prefix} 麦麦已离开Focus聊天模式")
except Exception as e:
- logger.error(f"{self.log_prefix} HeartFChatting: 意外错误: {e}")
- logger.error(traceback.format_exc())
+ logger.error(f"{self.log_prefix} 麦麦Focus聊天模式意外错误: {e}")
+ print(traceback.format_exc())
@contextlib.asynccontextmanager
async def _get_cycle_context(self):
@@ -394,135 +285,145 @@ class HeartFChatting:
if acquired and self._processing_lock.locked():
self._processing_lock.release()
- async def _think_plan_execute_loop(self, cycle_timers: dict, thinking_id: str) -> tuple[bool, str]:
+ async def _process_processors(
+ self, observations: List[Observation], running_memorys: List[Dict[str, Any]], cycle_timers: dict
+ ) -> List[InfoBase]:
+ # 记录并行任务开始时间
+ parallel_start_time = time.time()
+ logger.debug(f"{self.log_prefix} 开始信息处理器并行任务")
+
+ processor_tasks = []
+ task_to_name_map = {}
+
+ for processor in self.processors:
+ processor_name = processor.__class__.log_prefix
+ task = asyncio.create_task(
+ processor.process_info(observations=observations, running_memorys=running_memorys)
+ )
+ processor_tasks.append(task)
+ task_to_name_map[task] = processor_name
+ logger.debug(f"{self.log_prefix} 启动处理器任务: {processor_name}")
+
+ pending_tasks = set(processor_tasks)
+ all_plan_info: List[InfoBase] = []
+
+ while pending_tasks:
+ done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
+
+ for task in done:
+ processor_name = task_to_name_map[task]
+ task_completed_time = time.time()
+ duration_since_parallel_start = task_completed_time - parallel_start_time
+
+ try:
+ # 使用 await task 来获取结果或触发异常
+ result_list = await task
+ logger.info(
+ f"{self.log_prefix} 处理器 {processor_name} 已完成,信息已处理: {duration_since_parallel_start:.2f}秒"
+ )
+ if result_list is not None:
+ all_plan_info.extend(result_list)
+ else:
+ logger.warning(f"{self.log_prefix} 处理器 {processor_name} 返回了 None")
+ except Exception as e:
+ logger.error(
+ f"{self.log_prefix} 处理器 {processor_name} 执行失败,耗时 (自并行开始): {duration_since_parallel_start:.2f}秒. 错误: {e}",
+ exc_info=True,
+ )
+ # 即使出错,也认为该任务结束了,已从 pending_tasks 中移除
+
+ if pending_tasks:
+ current_progress_time = time.time()
+ elapsed_for_log = current_progress_time - parallel_start_time
+ pending_names_for_log = [task_to_name_map[t] for t in pending_tasks]
+ logger.info(
+ f"{self.log_prefix} 信息处理已进行 {elapsed_for_log:.2f}秒,待完成任务: {', '.join(pending_names_for_log)}"
+ )
+
+ # 所有任务完成后的最终日志
+ parallel_end_time = time.time()
+ total_duration = parallel_end_time - parallel_start_time
+ logger.info(f"{self.log_prefix} 所有处理器任务全部完成,总耗时: {total_duration:.2f}秒")
+ # logger.debug(f"{self.log_prefix} 所有信息处理器处理后的信息: {all_plan_info}")
+
+ return all_plan_info
+
+ async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> tuple[bool, str]:
try:
with Timer("观察", cycle_timers):
await self.observations[0].observe()
await self.memory_observation.observe()
await self.working_observation.observe()
await self.hfcloop_observation.observe()
- observations: List[Observation] = []
- observations.append(self.observations[0])
- observations.append(self.memory_observation)
- observations.append(self.working_observation)
- observations.append(self.hfcloop_observation)
+ observations: List[Observation] = []
+ observations.append(self.observations[0])
+ observations.append(self.memory_observation)
+ observations.append(self.working_observation)
+ observations.append(self.hfcloop_observation)
- for observation in observations:
- logger.debug(f"{self.log_prefix} 观察信息: {observation}")
+ loop_observation_info = {
+ "observations": observations,
+ }
with Timer("回忆", cycle_timers):
running_memorys = await self.memory_activator.activate_memory(observations)
- # 记录并行任务开始时间
- parallel_start_time = time.time()
- logger.debug(f"{self.log_prefix} 开始信息处理器并行任务")
-
- # 并行执行两个任务:思考和工具执行
with Timer("执行 信息处理器", cycle_timers):
- # 1. 子思维思考 - 不执行工具调用
- think_task = asyncio.create_task(
- self.mind_processor.process_info(observations=observations, running_memorys=running_memorys)
- )
- logger.debug(f"{self.log_prefix} 启动子思维思考任务")
+ all_plan_info = await self._process_processors(observations, running_memorys, cycle_timers)
- # 2. 工具执行器 - 专门处理工具调用
- tool_task = asyncio.create_task(
- self.tool_processor.process_info(observations=observations, running_memorys=running_memorys)
- )
- logger.debug(f"{self.log_prefix} 启动工具执行任务")
+ loop_processor_info = {
+ "all_plan_info": all_plan_info,
+ }
- # 3. 聊天信息处理器
- chatting_info_task = asyncio.create_task(
- self.chatting_info_processor.process_info(
- observations=observations, running_memorys=running_memorys
- )
- )
- logger.debug(f"{self.log_prefix} 启动聊天信息处理器任务")
-
- # 创建任务完成状态追踪
- tasks = {"思考任务": think_task, "工具任务": tool_task, "聊天信息处理任务": chatting_info_task}
- pending = set(tasks.values())
-
- # 等待所有任务完成,同时追踪每个任务的完成情况
- results: dict[str, list[InfoBase]] = {}
- while pending:
- # 等待任务完成
- done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED, timeout=1.0)
-
- # 记录完成的任务
- for task in done:
- for name, t in tasks.items():
- if task == t:
- task_end_time = time.time()
- task_duration = task_end_time - parallel_start_time
- logger.info(f"{self.log_prefix} {name}已完成,耗时: {task_duration:.2f}秒")
- results[name] = task.result()
- break
-
- # 如果仍有未完成任务,记录进行中状态
- if pending:
- current_time = time.time()
- elapsed = current_time - parallel_start_time
- pending_names = [name for name, t in tasks.items() if t in pending]
- logger.info(
- f"{self.log_prefix} 并行处理已进行{elapsed:.2f}秒,待完成任务: {', '.join(pending_names)}"
- )
-
- # 所有任务完成,从结果中提取数据
- mind_processed_infos = results.get("思考任务", [])
- tool_processed_infos = results.get("工具任务", [])
- chatting_info_processed_infos = results.get("聊天信息处理任务", [])
-
- # 记录总耗时
- parallel_end_time = time.time()
- total_duration = parallel_end_time - parallel_start_time
- logger.info(f"{self.log_prefix} 思考和工具并行任务全部完成,总耗时: {total_duration:.2f}秒")
-
- all_plan_info = mind_processed_infos + tool_processed_infos + chatting_info_processed_infos
-
- logger.debug(f"{self.log_prefix} 所有信息处理器处理后的信息: {all_plan_info}")
- # 串行执行规划器 - 使用刚获取的思考结果
- logger.debug(f"{self.log_prefix} 开始 规划器")
with Timer("规划器", cycle_timers):
- planner_result = await self._planner(all_plan_info, cycle_timers)
+ plan_result = await self.action_planner.plan(all_plan_info, cycle_timers)
- action = planner_result.get("action", "error")
- action_data = planner_result.get("action_data", {}) # 新增获取动作数据
- reasoning = planner_result.get("reasoning", "未提供理由")
+ loop_plan_info = {
+ "action_result": plan_result.get("action_result", {}),
+ "current_mind": plan_result.get("current_mind", ""),
+ "observed_messages": plan_result.get("observed_messages", ""),
+ }
- logger.debug(f"{self.log_prefix} 动作和动作信息: {action}, {action_data}, {reasoning}")
+ with Timer("执行动作", cycle_timers):
+ action_type, action_data, reasoning = (
+ plan_result.get("action_result", {}).get("action_type", "error"),
+ plan_result.get("action_result", {}).get("action_data", {}),
+ plan_result.get("action_result", {}).get("reasoning", "未提供理由"),
+ )
- # 更新循环信息
- self._current_cycle.set_action_info(
- action_type=action,
- action_data=action_data,
- reasoning=reasoning,
- action_taken=True,
- )
+ # 在此处添加日志记录
+ if action_type == "reply":
+ action_str = "回复"
+ elif action_type == "no_reply":
+ action_str = "不回复"
+ else:
+ action_type = "unknown"
+ action_str = "未知动作"
- # 处理LLM错误
- if planner_result.get("llm_error"):
- logger.error(f"{self.log_prefix} LLM失败: {reasoning}")
- return False, ""
+ logger.info(f"{self.log_prefix} 麦麦决定'{action_str}', 原因'{reasoning}'")
- # 在此处添加日志记录
- if action == "reply":
- action_str = "回复"
- elif action == "no_reply":
- action_str = "不回复"
- else:
- action_str = "位置动作"
+ success, reply_text = await self._handle_action(
+ action_type, reasoning, action_data, cycle_timers, thinking_id
+ )
- logger.info(f"{self.log_prefix} 麦麦决定'{action_str}', 原因'{reasoning}'")
+ loop_action_info = {
+ "action_taken": success,
+ "reply_text": reply_text,
+ }
- self.hfcloop_observation.add_loop_info(self._current_cycle)
+ loop_info = {
+ "loop_observation_info": loop_observation_info,
+ "loop_processor_info": loop_processor_info,
+ "loop_plan_info": loop_plan_info,
+ "loop_action_info": loop_action_info,
+ }
- return await self._handle_action(action, reasoning, action_data, cycle_timers, thinking_id)
+ return loop_info
except Exception as e:
- logger.error(f"{self.log_prefix} 并行+串行处理失败: {e}")
+ logger.error(f"{self.log_prefix} FOCUS聊天处理失败: {e}")
logger.error(traceback.format_exc())
- return False, ""
+ return {}
async def _handle_action(
self,
@@ -533,148 +434,59 @@ class HeartFChatting:
thinking_id: str,
) -> tuple[bool, str]:
"""
- 处理规划动作
+ 处理规划动作,使用动作工厂创建相应的动作处理器
参数:
action: 动作类型
reasoning: 决策理由
action_data: 动作数据,包含不同动作需要的参数
cycle_timers: 计时器字典
- planner_start_db_time: 规划开始时间
+ thinking_id: 思考ID
返回:
tuple[bool, str]: (是否执行了动作, 思考消息ID)
"""
- action_handlers = {
- "reply": self._handle_reply,
- "no_reply": self._handle_no_reply,
- }
-
- handler = action_handlers.get(action)
- if not handler:
- logger.warning(f"{self.log_prefix} 未知动作: {action}, 原因: {reasoning}")
- return False, ""
-
try:
- if action == "reply":
- return await handler(reasoning, action_data, cycle_timers, thinking_id)
- else: # no_reply
- return await handler(reasoning, cycle_timers, thinking_id)
+ # 使用工厂创建动作处理器实例
+ action_handler = self.action_manager.create_action(
+ action_name=action,
+ action_data=action_data,
+ reasoning=reasoning,
+ cycle_timers=cycle_timers,
+ thinking_id=thinking_id,
+ observations=self.observations,
+ expressor=self.expressor,
+ chat_stream=self.chat_stream,
+ current_cycle=self._current_cycle,
+ log_prefix=self.log_prefix,
+ on_consecutive_no_reply_callback=self.on_consecutive_no_reply_callback,
+ total_no_reply_count=self.total_no_reply_count,
+ total_waiting_time=self.total_waiting_time,
+ shutting_down=self._shutting_down,
+ )
+
+ if not action_handler:
+ logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}, 原因: {reasoning}")
+ return False, ""
+
+ # 处理动作并获取结果
+ success, reply_text = await action_handler.handle_action()
+
+ # 更新状态计数器
+ if action == "no_reply":
+ self.total_no_reply_count = getattr(action_handler, "total_no_reply_count", self.total_no_reply_count)
+ self.total_waiting_time = getattr(action_handler, "total_waiting_time", self.total_waiting_time)
+ elif action == "reply":
+ self.total_no_reply_count = 0
+ self.total_waiting_time = 0.0
+
+ return success, reply_text
+
except Exception as e:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
traceback.print_exc()
return False, ""
- async def _handle_no_reply(self, reasoning: str, cycle_timers: dict, thinking_id: str) -> bool:
- """
- 处理不回复的情况
-
- 工作流程:
- 1. 等待新消息、超时或关闭信号
- 2. 根据等待结果更新连续不回复计数
- 3. 如果达到阈值,触发回调
-
- 参数:
- reasoning: 不回复的原因
- planner_start_db_time: 规划开始时间
- cycle_timers: 计时器字典
-
- 返回:
- bool: 是否成功处理
- """
- logger.info(f"{self.log_prefix} 决定不回复: {reasoning}")
-
- observation = self.observations[0] if self.observations else None
-
- try:
- with Timer("等待新消息", cycle_timers):
- # 等待新消息、超时或关闭信号,并获取结果
- await self._wait_for_new_message(observation, thinking_id, self.log_prefix)
- # 从计时器获取实际等待时间
- current_waiting = cycle_timers.get("等待新消息", 0.0)
-
- if not self._shutting_down:
- self.total_no_reply_count += 1
- self.total_waiting_time += current_waiting # 累加等待时间
- logger.debug(
- f"{self.log_prefix} 连续不回复计数增加: {self.total_no_reply_count}/{CONSECUTIVE_NO_REPLY_THRESHOLD}, "
- f"本次等待: {current_waiting:.2f}秒, 累计等待: {self.total_waiting_time:.2f}秒"
- )
-
- # 检查是否同时达到次数和时间阈值
- time_threshold = 0.66 * WAITING_TIME_THRESHOLD * CONSECUTIVE_NO_REPLY_THRESHOLD
- if (
- self.total_no_reply_count >= CONSECUTIVE_NO_REPLY_THRESHOLD
- and self.total_waiting_time >= time_threshold
- ):
- logger.info(
- f"{self.log_prefix} 连续不回复达到阈值 ({self.total_no_reply_count}次) "
- f"且累计等待时间达到 {self.total_waiting_time:.2f}秒 (阈值 {time_threshold}秒),"
- f"调用回调请求状态转换"
- )
- # 调用回调。注意:这里不重置计数器和时间,依赖回调函数成功改变状态来隐式重置上下文。
- await self.on_consecutive_no_reply_callback()
- elif self.total_no_reply_count >= CONSECUTIVE_NO_REPLY_THRESHOLD:
- # 仅次数达到阈值,但时间未达到
- logger.debug(
- f"{self.log_prefix} 连续不回复次数达到阈值 ({self.total_no_reply_count}次) "
- f"但累计等待时间 {self.total_waiting_time:.2f}秒 未达到时间阈值 ({time_threshold}秒),暂不调用回调"
- )
- # else: 次数和时间都未达到阈值,不做处理
-
- return True, thinking_id
-
- except asyncio.CancelledError:
- logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)")
- raise
- except Exception as e: # 捕获调用管理器或其他地方可能发生的错误
- logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}")
- logger.error(traceback.format_exc())
- return False, thinking_id
-
- async def _wait_for_new_message(self, observation: ChattingObservation, thinking_id: str, log_prefix: str) -> bool:
- """
- 等待新消息 或 检测到关闭信号
-
- 参数:
- observation: 观察实例
- planner_start_db_time: 开始等待的时间
- log_prefix: 日志前缀
-
- 返回:
- bool: 是否检测到新消息 (如果因关闭信号退出则返回 False)
- """
- wait_start_time = time.monotonic()
- while True:
- # --- 在每次循环开始时检查关闭标志 ---
- if self._shutting_down:
- logger.info(f"{log_prefix} 等待新消息时检测到关闭信号,中断等待。")
- return False # 表示因为关闭而退出
- # -----------------------------------
-
- thinking_id_timestamp = parse_thinking_id_to_timestamp(thinking_id)
-
- # 检查新消息
- if await observation.has_new_messages_since(thinking_id_timestamp):
- logger.info(f"{log_prefix} 检测到新消息")
- return True
-
- # 检查超时 (放在检查新消息和关闭之后)
- if time.monotonic() - wait_start_time > WAITING_TIME_THRESHOLD:
- logger.warning(f"{log_prefix} 等待新消息超时({WAITING_TIME_THRESHOLD}秒)")
- return False
-
- try:
- # 短暂休眠,让其他任务有机会运行,并能更快响应取消或关闭
- await asyncio.sleep(0.5) # 缩短休眠时间
- except asyncio.CancelledError:
- # 如果在休眠时被取消,再次检查关闭标志
- # 如果是正常关闭,则不需要警告
- if not self._shutting_down:
- logger.warning(f"{log_prefix} _wait_for_new_message 的休眠被意外取消")
- # 无论如何,重新抛出异常,让上层处理
- raise
-
async def shutdown(self):
"""优雅关闭HeartFChatting实例,取消活动循环任务"""
logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...")
@@ -717,284 +529,4 @@ class HeartFChatting:
history = history[-last_n:]
return [cycle.to_dict() for cycle in history]
- async def _planner(self, all_plan_info: List[InfoBase], cycle_timers: dict) -> Dict[str, Any]:
- """
- 规划器 (Planner): 使用LLM根据上下文决定是否和如何回复。
- 重构为:让LLM返回结构化JSON文本,然后在代码中解析。
- 参数:
- current_mind: 子思维的当前思考结果
- cycle_timers: 计时器字典
- is_re_planned: 是否为重新规划 (此重构中暂时简化,不处理 is_re_planned 的特殊逻辑)
- """
- logger.info(f"{self.log_prefix}开始 规划")
-
- actions_to_remove_temporarily = []
- # --- 检查历史动作并决定临时移除动作 (逻辑保持不变) ---
- lian_xu_wen_ben_hui_fu = 0
- probability_roll = random.random()
- for cycle in reversed(self._cycle_history):
- if cycle.action_taken:
- if cycle.action_type == "text_reply":
- lian_xu_wen_ben_hui_fu += 1
- else:
- break
- if len(self._cycle_history) > 0 and cycle.cycle_id <= self._cycle_history[0].cycle_id + (
- len(self._cycle_history) - 4
- ):
- break
- logger.debug(f"{self.log_prefix}[Planner] 检测到连续文本回复次数: {lian_xu_wen_ben_hui_fu}")
-
- if lian_xu_wen_ben_hui_fu >= 3:
- logger.info(f"{self.log_prefix}[Planner] 连续回复 >= 3 次,强制移除 text_reply 和 emoji_reply")
- actions_to_remove_temporarily.extend(["text_reply", "emoji_reply"])
- elif lian_xu_wen_ben_hui_fu == 2:
- if probability_roll < 0.8:
- logger.info(f"{self.log_prefix}[Planner] 连续回复 2 次,80% 概率移除 text_reply 和 emoji_reply (触发)")
- actions_to_remove_temporarily.extend(["text_reply", "emoji_reply"])
- else:
- logger.info(
- f"{self.log_prefix}[Planner] 连续回复 2 次,80% 概率移除 text_reply 和 emoji_reply (未触发)"
- )
- elif lian_xu_wen_ben_hui_fu == 1:
- if probability_roll < 0.4:
- logger.info(f"{self.log_prefix}[Planner] 连续回复 1 次,40% 概率移除 text_reply (触发)")
- actions_to_remove_temporarily.append("text_reply")
- else:
- logger.info(f"{self.log_prefix}[Planner] 连续回复 1 次,40% 概率移除 text_reply (未触发)")
- # --- 结束检查历史动作 ---
-
- # 获取观察信息
- for info in all_plan_info:
- if isinstance(info, ObsInfo):
- logger.debug(f"{self.log_prefix} 观察信息: {info}")
- observed_messages = info.get_talking_message()
- observed_messages_str = info.get_talking_message_str_truncate()
- chat_type = info.get_chat_type()
- if chat_type == "group":
- is_group_chat = True
- else:
- is_group_chat = False
- elif isinstance(info, MindInfo):
- logger.debug(f"{self.log_prefix} 思维信息: {info}")
- current_mind = info.get_current_mind()
- elif isinstance(info, CycleInfo):
- logger.debug(f"{self.log_prefix} 循环信息: {info}")
- cycle_info = info.get_observe_info()
- elif isinstance(info, StructuredInfo):
- logger.debug(f"{self.log_prefix} 结构化信息: {info}")
- structured_info = info.get_data()
-
- # --- 使用 LLM 进行决策 (JSON 输出模式) --- #
- action = "no_reply" # 默认动作
- reasoning = "规划器初始化默认"
- llm_error = False # LLM 请求或解析错误标志
-
- # 获取我们将传递给 prompt 构建器和用于验证的当前可用动作
- current_available_actions = self.action_manager.get_available_actions()
-
- try:
- # --- 应用临时动作移除 ---
- if actions_to_remove_temporarily:
- self.action_manager.temporarily_remove_actions(actions_to_remove_temporarily)
- # 更新 current_available_actions 以反映移除后的状态
- current_available_actions = self.action_manager.get_available_actions()
- logger.debug(
- f"{self.log_prefix}[Planner] 临时移除的动作: {actions_to_remove_temporarily}, 当前可用: {list(current_available_actions.keys())}"
- )
-
- # 需要获取用于上下文的历史消息
- message_list_before_now = get_raw_msg_before_timestamp_with_chat(
- chat_id=self.stream_id,
- timestamp=time.time(), # 使用当前时间作为参考点
- limit=global_config.observation_context_size, # 使用与 prompt 构建一致的 limit
- )
- # 调用工具函数获取格式化后的绰号字符串
- nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(
- self.chat_stream, message_list_before_now
- )
- # --- 构建提示词 (调用修改后的 PromptBuilder 方法) ---
- prompt = await prompt_builder.build_planner_prompt(
- is_group_chat=is_group_chat, # <-- Pass HFC state
- chat_target_info=None,
- observed_messages_str=observed_messages_str, # <-- Pass local variable
- current_mind=current_mind, # <-- Pass argument
- structured_info=structured_info, # <-- Pass SubMind info
- current_available_actions=current_available_actions, # <-- Pass determined actions
- cycle_info=cycle_info, # <-- Pass cycle info
- nickname_info=nickname_injection_str, # <-- Pass nickname injection
- )
-
- # --- 调用 LLM (普通文本生成) ---
- llm_content = None
- try:
- llm_content, _, _ = await self.planner_llm.generate_response(prompt=prompt)
- logger.debug(f"{self.log_prefix}[Planner] LLM 原始 JSON 响应 (预期): {llm_content}")
- except Exception as req_e:
- logger.error(f"{self.log_prefix}[Planner] LLM 请求执行失败: {req_e}")
- reasoning = f"LLM 请求失败: {req_e}"
- llm_error = True
- # 直接使用默认动作返回错误结果
- action = "no_reply" # 明确设置为默认值
-
- # --- 解析 LLM 返回的 JSON (仅当 LLM 请求未出错时进行) ---
- if not llm_error and llm_content:
- try:
- # 尝试去除可能的 markdown 代码块标记
- cleaned_content = (
- llm_content.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
- )
- if not cleaned_content:
- raise json.JSONDecodeError("Cleaned content is empty", cleaned_content, 0)
- parsed_json = json.loads(cleaned_content)
-
- # 提取决策,提供默认值
- extracted_action = parsed_json.get("action", "no_reply")
- extracted_reasoning = parsed_json.get("reasoning", "LLM未提供理由")
- # extracted_emoji_query = parsed_json.get("emoji_query", "")
-
- # 新的reply格式
- if extracted_action == "reply":
- action_data = {
- "text": parsed_json.get("text", []),
- "emojis": parsed_json.get("emojis", []),
- "target": parsed_json.get("target", ""),
- }
- else:
- action_data = {} # 其他动作可能不需要额外数据
-
- # 验证动作是否在当前可用列表中
- # !! 使用调用 prompt 时实际可用的动作列表进行验证
- if extracted_action not in current_available_actions:
- logger.warning(
- f"{self.log_prefix}[Planner] LLM 返回了当前不可用或无效的动作: '{extracted_action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'"
- )
- action = "no_reply"
- reasoning = f"LLM 返回了当前不可用的动作 '{extracted_action}' (可用: {list(current_available_actions.keys())})。原始理由: {extracted_reasoning}"
- # 检查 no_reply 是否也恰好被移除了 (极端情况)
- if "no_reply" not in current_available_actions:
- logger.error(
- f"{self.log_prefix}[Planner] 严重错误:'no_reply' 动作也不可用!无法执行任何动作。"
- )
- action = "error" # 回退到错误状态
- reasoning = "无法执行任何有效动作,包括 no_reply"
- llm_error = True # 标记为严重错误
- else:
- llm_error = False # 视为逻辑修正而非 LLM 错误
- else:
- # 动作有效且可用
- action = extracted_action
- reasoning = extracted_reasoning
- llm_error = False # 解析成功
- logger.debug(
- f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果 (来自JSON): {action}, 理由: {reasoning}"
- )
- logger.debug(f"{self.log_prefix}动作信息: '{action_data}'")
-
- except Exception as json_e:
- logger.warning(
- f"{self.log_prefix}[Planner] 解析LLM响应JSON失败: {json_e}. LLM原始输出: '{llm_content}'"
- )
- reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'."
- action = "no_reply" # 解析失败则默认不回复
- llm_error = True # 标记解析错误
- elif not llm_error and not llm_content:
- # LLM 请求成功但返回空内容
- logger.warning(f"{self.log_prefix}[Planner] LLM 返回了空内容。")
- reasoning = "LLM 返回了空内容,使用默认动作 'no_reply'."
- action = "no_reply"
- llm_error = True # 标记为空响应错误
-
- except Exception as outer_e:
- logger.error(f"{self.log_prefix}[Planner] Planner 处理过程中发生意外错误: {outer_e}")
- traceback.print_exc()
- action = "error" # 发生未知错误,标记为 error 动作
- reasoning = f"Planner 内部处理错误: {outer_e}"
- llm_error = True
- finally:
- # --- 确保动作恢复 ---
- if self.action_manager._original_actions_backup is not None:
- self.action_manager.restore_actions()
- logger.debug(
- f"{self.log_prefix}[Planner] 恢复了原始动作集, 当前可用: {list(self.action_manager.get_available_actions().keys())}"
- )
-
- # --- 概率性忽略文本回复附带的表情 (逻辑保持不变) ---
- try:
- emoji = action_data.get("emojis")
- if action == "reply" and emoji:
- logger.debug(f"{self.log_prefix}[Planner] 大模型建议文字回复带表情: '{emoji}'")
- if random.random() > EMOJI_SEND_PRO:
- logger.info(
- f"{self.log_prefix}但是麦麦这次不想加表情 ({1 - EMOJI_SEND_PRO:.0%}),忽略表情 '{emoji}'"
- )
- action_data["emojis"] = "" # 清空表情请求
- else:
- logger.info(f"{self.log_prefix}好吧,加上表情 '{emoji}'")
- except Exception as e:
- logger.error(f"{self.log_prefix}[Planner] 概率性忽略表情时发生错误: {e}")
- traceback.print_exc()
- # --- 结束概率性忽略 ---
-
- # 返回结果字典
- return {
- "action": action,
- "action_data": action_data,
- "reasoning": reasoning,
- "current_mind": current_mind,
- "observed_messages": observed_messages,
- "llm_error": llm_error, # 返回错误状态
- }
-
- async def _handle_reply(
- self, reasoning: str, reply_data: dict, cycle_timers: dict, thinking_id: str
- ) -> tuple[bool, str]:
- """
- 处理统一的回复动作 - 可包含文本和表情,顺序任意
-
- reply_data格式:
- {
- "text": "你好啊" # 文本内容列表(可选)
- "target": "锚定消息", # 锚定消息的文本内容
- "emojis": "微笑" # 表情关键词列表(可选)
- }
- """
- # 重置连续不回复计数器
- self.total_no_reply_count = 0
- self.total_waiting_time = 0.0
-
- # 从聊天观察获取锚定消息
- observations: ChattingObservation = self.observations[0]
- anchor_message = observations.serch_message_by_text(reply_data["target"])
-
- # 如果没有找到锚点消息,创建一个占位符
- if not anchor_message:
- logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
- anchor_message = await create_empty_anchor_message(
- self.chat_stream.platform, self.chat_stream.group_info, self.chat_stream
- )
- else:
- anchor_message.update_chat_stream(self.chat_stream)
-
- success, reply_set = await self.expressor.deal_reply(
- cycle_timers=cycle_timers,
- action_data=reply_data,
- anchor_message=anchor_message,
- reasoning=reasoning,
- thinking_id=thinking_id,
- )
-
- reply_text = ""
- for reply in reply_set:
- type = reply[0]
- data = reply[1]
- if type == "text":
- reply_text += data
- elif type == "emoji":
- reply_text += data
-
- self._current_cycle.set_response_info(
- response_text=reply_text,
- )
-
- return success, reply_text
diff --git a/src/chat/focus_chat/heartflow_prompt_builder.py b/src/chat/focus_chat/heartflow_prompt_builder.py
index deea9aec..5044b307 100644
--- a/src/chat/focus_chat/heartflow_prompt_builder.py
+++ b/src/chat/focus_chat/heartflow_prompt_builder.py
@@ -17,7 +17,6 @@ import traceback
import random
from src.plugins.group_nickname.nickname_manager import nickname_manager
-
logger = get_logger("prompt")
@@ -54,69 +53,11 @@ def init_prompt():
"info_from_tools",
)
- # Planner提示词 - 修改为要求 JSON 输出
- Prompt(
- """你的名字是{bot_name},{prompt_personality},{chat_context_description}。需要基于以下信息决定如何参与对话:
-{structured_info_block}
-{nickname_info}
-{chat_content_block}
-{mind_info_prompt}
-{cycle_info_block}
-
-请综合分析聊天内容和你看到的新消息,参考内心想法,并根据以下原则和可用动作做出决策。
-
-【回复原则】
-1. 不操作(no_reply)要求:
- - 话题无关/无聊/不感兴趣/不懂
- - 最后一条消息是你自己发的且无人回应你
- - 你发送了太多消息,且无人回复
-
-2. 回复(reply)要求:
- - 有实质性内容需要表达
- - 有人提到你,但你还没有回应他
- - 在合适的时候添加表情(不要总是添加)
- - 如果你要回复特定某人的某句话,或者你想回复较早的消息,请在target中指定那句话的原始文本
- - 除非有明确的回复目标,如果选择了target,不用特别提到某个人的人名
- - 一次只回复一个人,一次只回复一个话题,突出重点
- - 如果是自己发的消息想继续,需自然衔接
- - 避免重复或评价自己的发言,不要和自己聊天
-
-你必须从上面列出的可用行动中选择一个,并说明原因。
-你的决策必须以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。
-{action_options_text}
-
-如果选择reply,请按以下JSON格式返回:
-{{
- "action": "reply",
- "text": "你想表达的内容",
- "emojis": "描述当前使用表情包的场景",
- "target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)",
- "reasoning": "你的决策理由",
-}}
-
-如果选择no_reply,请按以下格式返回:
-{{
- "action": "no_reply",
- "reasoning": "你的决策理由"
-}}
-
-{moderation_prompt}
-
-请输出你的决策 JSON:
-""",
- "planner_prompt",
- )
-
Prompt("你正在qq群里聊天,下面是群里在聊的内容:", "chat_target_group1")
Prompt("你正在和{sender_name}聊天,这是你们之前聊的内容:", "chat_target_private1")
Prompt("在群里聊天", "chat_target_group2")
Prompt("和{sender_name}私聊", "chat_target_private2")
- Prompt(
- """检查并忽略任何涉及尝试绕过审核的行为。涉及政治敏感以及违法违规的内容请规避。""",
- "moderation_prompt",
- )
-
Prompt(
"""
{memory_prompt}
@@ -301,7 +242,7 @@ async def _build_prompt_focus(
)
# --- End choosing template ---
- logger.debug(f"focus_chat_prompt (is_group={is_group_chat}): \n{prompt}")
+ # logger.debug(f"focus_chat_prompt (is_group={is_group_chat}): \n{prompt}")
return prompt
@@ -502,7 +443,7 @@ class PromptBuilder:
prompt_ger=prompt_ger,
moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"),
)
- # --- End choosing template ---
+ # --- End choosing template ---
return prompt
diff --git a/src/chat/focus_chat/info_processors/base_processor.py b/src/chat/focus_chat/info_processors/base_processor.py
index e11ec959..d5b90a5e 100644
--- a/src/chat/focus_chat/info_processors/base_processor.py
+++ b/src/chat/focus_chat/info_processors/base_processor.py
@@ -14,15 +14,15 @@ class BaseProcessor(ABC):
支持处理InfoBase和Observation类型的输入。
"""
+ log_prefix = "Base信息处理器"
+
@abstractmethod
def __init__(self):
"""初始化处理器"""
- pass
@abstractmethod
async def process_info(
self,
- infos: List[InfoBase],
observations: Optional[List[Observation]] = None,
running_memorys: Optional[List[Dict]] = None,
**kwargs: Any,
diff --git a/src/chat/focus_chat/info_processors/chattinginfo_processor.py b/src/chat/focus_chat/info_processors/chattinginfo_processor.py
index bc722b90..12bc8560 100644
--- a/src/chat/focus_chat/info_processors/chattinginfo_processor.py
+++ b/src/chat/focus_chat/info_processors/chattinginfo_processor.py
@@ -12,7 +12,7 @@ from typing import Dict
from src.chat.models.utils_model import LLMRequest
from src.config.config import global_config
-logger = get_logger("observation")
+logger = get_logger("processor")
class ChattingInfoProcessor(BaseProcessor):
@@ -21,12 +21,14 @@ class ChattingInfoProcessor(BaseProcessor):
用于处理Observation对象,将其转换为ObsInfo对象。
"""
+ log_prefix = "聊天信息处理"
+
def __init__(self):
"""初始化观察处理器"""
+ super().__init__()
self.llm_summary = LLMRequest(
model=global_config.llm_observation, temperature=0.7, max_tokens=300, request_type="chat_observation"
)
- super().__init__()
async def process_info(
self,
diff --git a/src/chat/focus_chat/info_processors/mind_processor.py b/src/chat/focus_chat/info_processors/mind_processor.py
index ec32ea7e..1a104e12 100644
--- a/src/chat/focus_chat/info_processors/mind_processor.py
+++ b/src/chat/focus_chat/info_processors/mind_processor.py
@@ -24,17 +24,16 @@ from src.chat.focus_chat.info_processors.processor_utils import (
from typing import Dict
from src.chat.focus_chat.info.info_base import InfoBase
-logger = get_logger("sub_heartflow")
+logger = get_logger("processor")
def init_prompt():
# --- Group Chat Prompt ---
group_prompt = """
+你的名字是{bot_name}
{memory_str}
{extra_info}
{relation_prompt}
-你的名字是{bot_name}
-{mood_info}
{cycle_info_block}
现在是{time_now},你正在上网,和qq群里的网友们聊天,以下是正在进行的聊天内容:
{chat_observe_info}
@@ -74,8 +73,11 @@ def init_prompt():
class MindProcessor(BaseProcessor):
+ log_prefix = "聊天思考"
+
def __init__(self, subheartflow_id: str):
super().__init__()
+
self.subheartflow_id = subheartflow_id
self.llm_model = LLMRequest(
@@ -228,7 +230,7 @@ class MindProcessor(BaseProcessor):
bot_name=individuality.name,
time_now=time_now,
chat_observe_info=chat_observe_info,
- mood_info="mood_info",
+ # mood_info="mood_info",
hf_do_next=spark_prompt,
last_mind=previous_mind,
cycle_info_block=hfcloop_observe_info,
diff --git a/src/chat/focus_chat/info_processors/tool_processor.py b/src/chat/focus_chat/info_processors/tool_processor.py
index 79dba4cd..8840c1ae 100644
--- a/src/chat/focus_chat/info_processors/tool_processor.py
+++ b/src/chat/focus_chat/info_processors/tool_processor.py
@@ -14,7 +14,7 @@ from src.chat.heart_flow.observation.observation import Observation
from src.chat.heart_flow.observation.working_observation import WorkingObservation
from src.chat.focus_chat.info.structured_info import StructuredInfo
-logger = get_logger("tool_use")
+logger = get_logger("processor")
def init_prompt():
@@ -45,6 +45,8 @@ def init_prompt():
class ToolProcessor(BaseProcessor):
+ log_prefix = "工具执行器"
+
def __init__(self, subheartflow_id: str):
super().__init__()
self.subheartflow_id = subheartflow_id
diff --git a/src/chat/focus_chat/memory_activator.py b/src/chat/focus_chat/memory_activator.py
index 76be3e5d..2d7fea03 100644
--- a/src/chat/focus_chat/memory_activator.py
+++ b/src/chat/focus_chat/memory_activator.py
@@ -8,6 +8,7 @@ from src.chat.utils.prompt_builder import Prompt
from datetime import datetime
from src.chat.memory_system.Hippocampus import HippocampusManager
from typing import List, Dict
+import difflib
logger = get_logger("memory_activator")
@@ -82,7 +83,7 @@ class MemoryActivator:
text=obs_info_text, max_memory_num=3, max_memory_length=2, max_depth=3, fast_retrieval=True
)
- logger.debug(f"获取到的记忆: {related_memory}")
+ # logger.debug(f"获取到的记忆: {related_memory}")
# 激活时,所有已有记忆的duration+1,达到3则移除
for m in self.running_memory[:]:
@@ -91,14 +92,21 @@ class MemoryActivator:
if related_memory:
for topic, memory in related_memory:
- # 检查是否已存在相同topic和content的记忆
- exists = any(m["topic"] == topic and m["content"] == memory for m in self.running_memory)
+ # 检查是否已存在相同topic或相似内容(相似度>=0.7)的记忆
+ exists = any(
+ m["topic"] == topic or difflib.SequenceMatcher(None, m["content"], memory).ratio() >= 0.7
+ for m in self.running_memory
+ )
if not exists:
self.running_memory.append(
{"topic": topic, "content": memory, "timestamp": datetime.now().isoformat(), "duration": 1}
)
logger.debug(f"添加新记忆: {topic} - {memory}")
+ # 限制同时加载的记忆条数,最多保留最后3条
+ if len(self.running_memory) > 3:
+ self.running_memory = self.running_memory[-3:]
+
return self.running_memory
diff --git a/src/chat/focus_chat/planners/action_factory.py b/src/chat/focus_chat/planners/action_factory.py
new file mode 100644
index 00000000..257156a2
--- /dev/null
+++ b/src/chat/focus_chat/planners/action_factory.py
@@ -0,0 +1,287 @@
+from typing import Dict, List, Optional, Callable, Coroutine, Type, Any, Union
+import os
+import importlib
+from src.chat.focus_chat.planners.actions.base_action import BaseAction, _ACTION_REGISTRY, _DEFAULT_ACTIONS
+from src.chat.heart_flow.observation.observation import Observation
+from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
+from src.chat.message_receive.chat_stream import ChatStream
+from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
+from src.common.logger_manager import get_logger
+
+# 导入动作类,确保装饰器被执行
+from src.chat.focus_chat.planners.actions.reply_action import ReplyAction
+from src.chat.focus_chat.planners.actions.no_reply_action import NoReplyAction
+
+logger = get_logger("action_factory")
+
+# 定义动作信息类型
+ActionInfo = Dict[str, Any]
+
+
+class ActionManager:
+ """
+ 动作管理器,用于管理各种类型的动作
+ """
+
+ def __init__(self):
+ """初始化动作管理器"""
+ # 所有注册的动作集合
+ self._registered_actions: Dict[str, ActionInfo] = {}
+ # 当前正在使用的动作集合,默认加载默认动作
+ self._using_actions: Dict[str, ActionInfo] = {}
+ # 临时备份原始使用中的动作
+ self._original_actions_backup: Optional[Dict[str, ActionInfo]] = None
+
+ # 默认动作集,仅作为快照,用于恢复默认
+ self._default_actions: Dict[str, ActionInfo] = {}
+
+ # 加载所有已注册动作
+ self._load_registered_actions()
+
+ # 初始化时将默认动作加载到使用中的动作
+ self._using_actions = self._default_actions.copy()
+
+ # logger.info(f"当前可用动作: {list(self._using_actions.keys())}")
+ # for action_name, action_info in self._using_actions.items():
+ # logger.info(f"动作名称: {action_name}, 动作信息: {action_info}")
+
+
+ def _load_registered_actions(self) -> None:
+ """
+ 加载所有通过装饰器注册的动作
+ """
+ try:
+ # 从_ACTION_REGISTRY获取所有已注册动作
+ for action_name, action_class in _ACTION_REGISTRY.items():
+ # 获取动作相关信息
+ action_description:str = getattr(action_class, "action_description", "")
+ action_parameters:dict[str:str] = getattr(action_class, "action_parameters", {})
+ action_require:list[str] = getattr(action_class, "action_require", [])
+ is_default:bool = getattr(action_class, "default", False)
+
+ if action_name and action_description:
+ # 创建动作信息字典
+ action_info = {
+ "description": action_description,
+ "parameters": action_parameters,
+ "require": action_require
+ }
+
+ # 注册2
+ print("注册2")
+ print(action_info)
+
+ # 添加到所有已注册的动作
+ self._registered_actions[action_name] = action_info
+
+ # 添加到默认动作(如果是默认动作)
+ if is_default:
+ self._default_actions[action_name] = action_info
+
+ logger.info(f"所有注册动作: {list(self._registered_actions.keys())}")
+ logger.info(f"默认动作: {list(self._default_actions.keys())}")
+ # for action_name, action_info in self._default_actions.items():
+ # logger.info(f"动作名称: {action_name}, 动作信息: {action_info}")
+
+ except Exception as e:
+ logger.error(f"加载已注册动作失败: {e}")
+
+ def create_action(
+ self,
+ action_name: str,
+ action_data: dict,
+ reasoning: str,
+ cycle_timers: dict,
+ thinking_id: str,
+ observations: List[Observation],
+ expressor: DefaultExpressor,
+ chat_stream: ChatStream,
+ current_cycle: CycleDetail,
+ log_prefix: str,
+ on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]],
+ total_no_reply_count: int = 0,
+ total_waiting_time: float = 0.0,
+ shutting_down: bool = False,
+ ) -> Optional[BaseAction]:
+ """
+ 创建动作处理器实例
+
+ Args:
+ action_name: 动作名称
+ action_data: 动作数据
+ reasoning: 执行理由
+ cycle_timers: 计时器字典
+ thinking_id: 思考ID
+ observations: 观察列表
+ expressor: 表达器
+ chat_stream: 聊天流
+ current_cycle: 当前循环信息
+ log_prefix: 日志前缀
+ on_consecutive_no_reply_callback: 连续不回复回调
+ total_no_reply_count: 连续不回复计数
+ total_waiting_time: 累计等待时间
+ shutting_down: 是否正在关闭
+
+ Returns:
+ Optional[BaseAction]: 创建的动作处理器实例,如果动作名称未注册则返回None
+ """
+ # 检查动作是否在当前使用的动作集中
+ if action_name not in self._using_actions:
+ logger.warning(f"当前不可用的动作类型: {action_name}")
+ return None
+
+ handler_class = _ACTION_REGISTRY.get(action_name)
+ if not handler_class:
+ logger.warning(f"未注册的动作类型: {action_name}")
+ return None
+
+ try:
+ # 创建动作实例并传递所有必要参数
+ instance = handler_class(
+ action_name=action_name,
+ action_data=action_data,
+ reasoning=reasoning,
+ cycle_timers=cycle_timers,
+ thinking_id=thinking_id,
+ observations=observations,
+ on_consecutive_no_reply_callback=on_consecutive_no_reply_callback,
+ current_cycle=current_cycle,
+ log_prefix=log_prefix,
+ total_no_reply_count=total_no_reply_count,
+ total_waiting_time=total_waiting_time,
+ shutting_down=shutting_down,
+ expressor=expressor,
+ chat_stream=chat_stream,
+ )
+
+ return instance
+
+ except Exception as e:
+ logger.error(f"创建动作处理器实例失败: {e}")
+ return None
+
+ def get_registered_actions(self) -> Dict[str, ActionInfo]:
+ """获取所有已注册的动作集"""
+ return self._registered_actions.copy()
+
+ def get_default_actions(self) -> Dict[str, ActionInfo]:
+ """获取默认动作集"""
+ return self._default_actions.copy()
+
+ def get_using_actions(self) -> Dict[str, ActionInfo]:
+ """获取当前正在使用的动作集"""
+ return self._using_actions.copy()
+
+ def add_action_to_using(self, action_name: str) -> bool:
+ """
+ 添加已注册的动作到当前使用的动作集
+
+ Args:
+ action_name: 动作名称
+
+ Returns:
+ bool: 添加是否成功
+ """
+ if action_name not in self._registered_actions:
+ logger.warning(f"添加失败: 动作 {action_name} 未注册")
+ return False
+
+ if action_name in self._using_actions:
+ logger.info(f"动作 {action_name} 已经在使用中")
+ return True
+
+ self._using_actions[action_name] = self._registered_actions[action_name]
+ logger.info(f"添加动作 {action_name} 到使用集")
+ return True
+
+ def remove_action_from_using(self, action_name: str) -> bool:
+ """
+ 从当前使用的动作集中移除指定动作
+
+ Args:
+ action_name: 动作名称
+
+ Returns:
+ bool: 移除是否成功
+ """
+ if action_name not in self._using_actions:
+ logger.warning(f"移除失败: 动作 {action_name} 不在当前使用的动作集中")
+ return False
+
+ del self._using_actions[action_name]
+ logger.info(f"已从使用集中移除动作 {action_name}")
+ return True
+
+ def add_action(self, action_name: str, description: str, parameters: Dict = None, require: List = None) -> bool:
+ """
+ 添加新的动作到注册集
+
+ Args:
+ action_name: 动作名称
+ description: 动作描述
+ parameters: 动作参数定义,默认为空字典
+ require: 动作依赖项,默认为空列表
+
+ Returns:
+ bool: 添加是否成功
+ """
+ if action_name in self._registered_actions:
+ return False
+
+ if parameters is None:
+ parameters = {}
+ if require is None:
+ require = []
+
+ action_info = {
+ "description": description,
+ "parameters": parameters,
+ "require": require
+ }
+
+ self._registered_actions[action_name] = action_info
+ return True
+
+ def remove_action(self, action_name: str) -> bool:
+ """从注册集移除指定动作"""
+ if action_name not in self._registered_actions:
+ return False
+ del self._registered_actions[action_name]
+ # 如果在使用集中也存在,一并移除
+ if action_name in self._using_actions:
+ del self._using_actions[action_name]
+ return True
+
+ def temporarily_remove_actions(self, actions_to_remove: List[str]) -> None:
+ """临时移除使用集中的指定动作,备份原始使用集"""
+ if self._original_actions_backup is None:
+ self._original_actions_backup = self._using_actions.copy()
+ for name in actions_to_remove:
+ self._using_actions.pop(name, None)
+
+ def restore_actions(self) -> None:
+ """恢复之前备份的原始使用集"""
+ if self._original_actions_backup is not None:
+ self._using_actions = self._original_actions_backup.copy()
+ self._original_actions_backup = None
+
+ def restore_default_actions(self) -> None:
+ """恢复默认动作集到使用集"""
+ self._using_actions = self._default_actions.copy()
+ self._original_actions_backup = None
+
+ def get_action(self, action_name: str) -> Optional[Type[BaseAction]]:
+ """
+ 获取指定动作的处理器类
+
+ Args:
+ action_name: 动作名称
+
+ Returns:
+ Optional[Type[BaseAction]]: 动作处理器类,如果不存在则返回None
+ """
+ return _ACTION_REGISTRY.get(action_name)
+
+
+# 创建全局实例
+ActionFactory = ActionManager()
diff --git a/src/chat/focus_chat/planners/actions/base_action.py b/src/chat/focus_chat/planners/actions/base_action.py
new file mode 100644
index 00000000..7c77c300
--- /dev/null
+++ b/src/chat/focus_chat/planners/actions/base_action.py
@@ -0,0 +1,84 @@
+from abc import ABC, abstractmethod
+from typing import Tuple, Dict, Type
+from src.common.logger_manager import get_logger
+
+logger = get_logger("base_action")
+
+# 全局动作注册表
+_ACTION_REGISTRY: Dict[str, Type["BaseAction"]] = {}
+_DEFAULT_ACTIONS: Dict[str, str] = {}
+
+
+def register_action(cls):
+ """
+ 动作注册装饰器
+
+ 用法:
+ @register_action
+ class MyAction(BaseAction):
+ action_name = "my_action"
+ action_description = "我的动作"
+ ...
+ """
+ # 检查类是否有必要的属性
+ if not hasattr(cls, "action_name") or not hasattr(cls, "action_description"):
+ logger.error(f"动作类 {cls.__name__} 缺少必要的属性: action_name 或 action_description")
+ return cls
+
+ action_name = getattr(cls, "action_name")
+ action_description = getattr(cls, "action_description")
+ is_default = getattr(cls, "default", False)
+
+ if not action_name or not action_description:
+ logger.error(f"动作类 {cls.__name__} 的 action_name 或 action_description 为空")
+ return cls
+
+ # 将动作类注册到全局注册表
+ _ACTION_REGISTRY[action_name] = cls
+
+ # 如果是默认动作,添加到默认动作集
+ if is_default:
+ _DEFAULT_ACTIONS[action_name] = action_description
+
+ logger.info(f"已注册动作: {action_name} -> {cls.__name__},默认: {is_default}")
+ return cls
+
+
+class BaseAction(ABC):
+ """动作基类接口
+
+ 所有具体的动作类都应该继承这个基类,并实现handle_action方法。
+ """
+
+ def __init__(self, action_data: dict, reasoning: str, cycle_timers: dict, thinking_id: str):
+ """初始化动作
+
+ Args:
+ action_name: 动作名称
+ action_data: 动作数据
+ reasoning: 执行该动作的理由
+ cycle_timers: 计时器字典
+ thinking_id: 思考ID
+ """
+ #每个动作必须实现
+ self.action_name:str = "base_action"
+ self.action_description:str = "基础动作"
+ self.action_parameters:dict = {}
+ self.action_require:list[str] = []
+
+ self.default:bool = False
+
+
+ self.action_data = action_data
+ self.reasoning = reasoning
+ self.cycle_timers = cycle_timers
+ self.thinking_id = thinking_id
+
+ @abstractmethod
+ async def handle_action(self) -> Tuple[bool, str]:
+ """处理动作的抽象方法,需要被子类实现
+
+ Returns:
+ Tuple[bool, str]: (是否执行成功, 回复文本)
+ """
+ pass
diff --git a/src/chat/focus_chat/planners/actions/no_reply_action.py b/src/chat/focus_chat/planners/actions/no_reply_action.py
new file mode 100644
index 00000000..a29812c7
--- /dev/null
+++ b/src/chat/focus_chat/planners/actions/no_reply_action.py
@@ -0,0 +1,179 @@
+import asyncio
+import traceback
+from src.common.logger_manager import get_logger
+from src.chat.utils.timer_calculator import Timer
+from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
+from typing import Tuple, List, Callable, Coroutine
+from src.chat.heart_flow.observation.observation import Observation
+from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
+from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
+from src.chat.focus_chat.hfc_utils import parse_thinking_id_to_timestamp
+
+logger = get_logger("action_taken")
+
+# 常量定义
+WAITING_TIME_THRESHOLD = 300 # 等待新消息时间阈值,单位秒
+CONSECUTIVE_NO_REPLY_THRESHOLD = 3 # 连续不回复的阈值
+
+
+@register_action
+class NoReplyAction(BaseAction):
+ """不回复动作处理类
+
+ 处理决定不回复的动作。
+ """
+
+ action_name = "no_reply"
+ action_description = "不回复"
+ action_parameters = {}
+ action_require = [
+ "话题无关/无聊/不感兴趣/不懂",
+ "最后一条消息是你自己发的且无人回应你",
+ "你发送了太多消息,且无人回复"
+ ]
+ default = True
+
+ def __init__(
+ self,
+ action_data: dict,
+ reasoning: str,
+ cycle_timers: dict,
+ thinking_id: str,
+ observations: List[Observation],
+ on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]],
+ current_cycle: CycleDetail,
+ log_prefix: str,
+ total_no_reply_count: int = 0,
+ total_waiting_time: float = 0.0,
+ shutting_down: bool = False,
+ **kwargs
+ ):
+ """初始化不回复动作处理器
+
+ Args:
+ action_name: 动作名称
+ action_data: 动作数据
+ reasoning: 执行该动作的理由
+ cycle_timers: 计时器字典
+ thinking_id: 思考ID
+ observations: 观察列表
+ on_consecutive_no_reply_callback: 连续不回复达到阈值时调用的回调函数
+ current_cycle: 当前循环信息
+ log_prefix: 日志前缀
+ total_no_reply_count: 连续不回复计数
+ total_waiting_time: 累计等待时间
+ shutting_down: 是否正在关闭
+ """
+ super().__init__(action_data, reasoning, cycle_timers, thinking_id)
+ self.observations = observations
+ self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback
+ self._current_cycle = current_cycle
+ self.log_prefix = log_prefix
+ self.total_no_reply_count = total_no_reply_count
+ self.total_waiting_time = total_waiting_time
+ self._shutting_down = shutting_down
+
+ async def handle_action(self) -> Tuple[bool, str]:
+ """
+ 处理不回复的情况
+
+ 工作流程:
+ 1. 等待新消息、超时或关闭信号
+ 2. 根据等待结果更新连续不回复计数
+ 3. 如果达到阈值,触发回调
+
+ Returns:
+ Tuple[bool, str]: (是否执行成功, 空字符串)
+ """
+ logger.info(f"{self.log_prefix} 决定不回复: {self.reasoning}")
+
+ observation = self.observations[0] if self.observations else None
+
+ try:
+ with Timer("等待新消息", self.cycle_timers):
+ # 等待新消息、超时或关闭信号,并获取结果
+ await self._wait_for_new_message(observation, self.thinking_id, self.log_prefix)
+ # 从计时器获取实际等待时间
+ current_waiting = self.cycle_timers.get("等待新消息", 0.0)
+
+ if not self._shutting_down:
+ self.total_no_reply_count += 1
+ self.total_waiting_time += current_waiting # 累加等待时间
+ logger.debug(
+ f"{self.log_prefix} 连续不回复计数增加: {self.total_no_reply_count}/{CONSECUTIVE_NO_REPLY_THRESHOLD}, "
+ f"本次等待: {current_waiting:.2f}秒, 累计等待: {self.total_waiting_time:.2f}秒"
+ )
+
+ # 检查是否同时达到次数和时间阈值
+ time_threshold = 0.66 * WAITING_TIME_THRESHOLD * CONSECUTIVE_NO_REPLY_THRESHOLD
+ if (
+ self.total_no_reply_count >= CONSECUTIVE_NO_REPLY_THRESHOLD
+ and self.total_waiting_time >= time_threshold
+ ):
+ logger.info(
+ f"{self.log_prefix} 连续不回复达到阈值 ({self.total_no_reply_count}次) "
+ f"且累计等待时间达到 {self.total_waiting_time:.2f}秒 (阈值 {time_threshold}秒),"
+ f"调用回调请求状态转换"
+ )
+ # 调用回调。注意:这里不重置计数器和时间,依赖回调函数成功改变状态来隐式重置上下文。
+ await self.on_consecutive_no_reply_callback()
+ elif self.total_no_reply_count >= CONSECUTIVE_NO_REPLY_THRESHOLD:
+ # 仅次数达到阈值,但时间未达到
+ logger.debug(
+ f"{self.log_prefix} 连续不回复次数达到阈值 ({self.total_no_reply_count}次) "
+ f"但累计等待时间 {self.total_waiting_time:.2f}秒 未达到时间阈值 ({time_threshold}秒),暂不调用回调"
+ )
+ # else: 次数和时间都未达到阈值,不做处理
+
+ return True, "" # 不回复动作没有回复文本
+
+ except asyncio.CancelledError:
+ logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)")
+ raise
+ except Exception as e: # 捕获调用管理器或其他地方可能发生的错误
+ logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}")
+ logger.error(traceback.format_exc())
+ return False, ""
+
+ async def _wait_for_new_message(self, observation: ChattingObservation, thinking_id: str, log_prefix: str) -> bool:
+ """
+ 等待新消息 或 检测到关闭信号
+
+ 参数:
+ observation: 观察实例
+ thinking_id: 思考ID
+ log_prefix: 日志前缀
+
+ 返回:
+ bool: 是否检测到新消息 (如果因关闭信号退出则返回 False)
+ """
+ wait_start_time = asyncio.get_event_loop().time()
+ while True:
+ # --- 在每次循环开始时检查关闭标志 ---
+ if self._shutting_down:
+ logger.info(f"{log_prefix} 等待新消息时检测到关闭信号,中断等待。")
+ return False # 表示因为关闭而退出
+ # -----------------------------------
+
+ thinking_id_timestamp = parse_thinking_id_to_timestamp(thinking_id)
+
+ # 检查新消息
+ if await observation.has_new_messages_since(thinking_id_timestamp):
+ logger.info(f"{log_prefix} 检测到新消息")
+ return True
+
+ # 检查超时 (放在检查新消息和关闭之后)
+ if asyncio.get_event_loop().time() - wait_start_time > WAITING_TIME_THRESHOLD:
+ logger.warning(f"{log_prefix} 等待新消息超时({WAITING_TIME_THRESHOLD}秒)")
+ return False
+
+ try:
+ # 短暂休眠,让其他任务有机会运行,并能更快响应取消或关闭
+ await asyncio.sleep(0.5) # 缩短休眠时间
+ except asyncio.CancelledError:
+ # 如果在休眠时被取消,再次检查关闭标志
+ # 如果是正常关闭,则不需要警告
+ if not self._shutting_down:
+ logger.warning(f"{log_prefix} _wait_for_new_message 的休眠被意外取消")
+ # 无论如何,重新抛出异常,让上层处理
+ raise
diff --git a/src/chat/focus_chat/planners/actions/reply_action.py b/src/chat/focus_chat/planners/actions/reply_action.py
new file mode 100644
index 00000000..7b2e88fa
--- /dev/null
+++ b/src/chat/focus_chat/planners/actions/reply_action.py
@@ -0,0 +1,142 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from src.common.logger_manager import get_logger
+from src.chat.utils.timer_calculator import Timer
+from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
+from typing import Tuple, List, Optional
+from src.chat.heart_flow.observation.observation import Observation
+from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
+from src.chat.message_receive.chat_stream import ChatStream
+from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
+from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
+from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
+
+logger = get_logger("action_taken")
+
+
+@register_action
+class ReplyAction(BaseAction):
+ """回复动作处理类
+
+ 处理构建和发送消息回复的动作。
+ """
+
+ action_name:str = "reply"
+ action_description:str = "表达想法,可以只包含文本、表情或两者都有"
+ action_parameters:dict[str:str] = {
+ "text": "你想要表达的内容(可选)",
+ "emojis": "描述当前使用表情包的场景(可选)",
+ "target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)(可选)",
+ }
+ action_require:list[str] = [
+ "有实质性内容需要表达",
+ "有人提到你,但你还没有回应他",
+ "在合适的时候添加表情(不要总是添加)",
+ "如果你要回复特定某人的某句话,或者你想回复较早的消息,请在target中指定那句话的原始文本",
+ "除非有明确的回复目标,如果选择了target,不用特别提到某个人的人名",
+ "一次只回复一个人,一次只回复一个话题,突出重点",
+ "如果是自己发的消息想继续,需自然衔接",
+ "避免重复或评价自己的发言,不要和自己聊天",
+ "注意:回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。"
+ ]
+ default = True
+
+ def __init__(
+ self,
+ action_name: str,
+ action_data: dict,
+ reasoning: str,
+ cycle_timers: dict,
+ thinking_id: str,
+ observations: List[Observation],
+ expressor: DefaultExpressor,
+ chat_stream: ChatStream,
+ current_cycle: CycleDetail,
+ log_prefix: str,
+ **kwargs
+ ):
+ """初始化回复动作处理器
+
+ Args:
+ action_name: 动作名称
+ action_data: 动作数据,包含 message, emojis, target 等
+ reasoning: 执行该动作的理由
+ cycle_timers: 计时器字典
+ thinking_id: 思考ID
+ observations: 观察列表
+ expressor: 表达器
+ chat_stream: 聊天流
+ current_cycle: 当前循环信息
+ log_prefix: 日志前缀
+ """
+ super().__init__(action_data, reasoning, cycle_timers, thinking_id)
+ self.observations = observations
+ self.expressor = expressor
+ self.chat_stream = chat_stream
+ self._current_cycle = current_cycle
+ self.log_prefix = log_prefix
+
+ async def handle_action(self) -> Tuple[bool, str]:
+ """
+ 处理回复动作
+
+ Returns:
+ Tuple[bool, str]: (是否执行成功, 回复文本)
+ """
+ # 注意: 此处可能会使用不同的expressor实现根据任务类型切换不同的回复策略
+ return await self._handle_reply(
+ reasoning=self.reasoning,
+ reply_data=self.action_data,
+ cycle_timers=self.cycle_timers,
+ thinking_id=self.thinking_id
+ )
+
+ async def _handle_reply(
+ self, reasoning: str, reply_data: dict, cycle_timers: dict, thinking_id: str
+ ) -> tuple[bool, str]:
+ """
+ 处理统一的回复动作 - 可包含文本和表情,顺序任意
+
+ reply_data格式:
+ {
+ "text": "你好啊" # 文本内容列表(可选)
+ "target": "锚定消息", # 锚定消息的文本内容
+ "emojis": "微笑" # 表情关键词列表(可选)
+ }
+ """
+ # 重置连续不回复计数器
+ self.total_no_reply_count = 0
+ self.total_waiting_time = 0.0
+
+ # 从聊天观察获取锚定消息
+ observations: ChattingObservation = self.observations[0]
+ anchor_message = observations.serch_message_by_text(reply_data["target"])
+
+ # 如果没有找到锚点消息,创建一个占位符
+ if not anchor_message:
+ logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
+ anchor_message = await create_empty_anchor_message(
+ self.chat_stream.platform, self.chat_stream.group_info, self.chat_stream
+ )
+ else:
+ anchor_message.update_chat_stream(self.chat_stream)
+
+ success, reply_set = await self.expressor.deal_reply(
+ cycle_timers=cycle_timers,
+ action_data=reply_data,
+ anchor_message=anchor_message,
+ reasoning=reasoning,
+ thinking_id=thinking_id,
+ )
+
+ reply_text = ""
+ for reply in reply_set:
+ type = reply[0]
+ data = reply[1]
+ if type == "text":
+ reply_text += data
+ elif type == "emoji":
+ reply_text += data
+
+ return success, reply_text
diff --git a/src/chat/focus_chat/planners/planner.py b/src/chat/focus_chat/planners/planner.py
new file mode 100644
index 00000000..4671ce6a
--- /dev/null
+++ b/src/chat/focus_chat/planners/planner.py
@@ -0,0 +1,301 @@
+import time
+import json # <--- 确保导入 json
+import traceback
+from typing import List, Dict, Any, Optional
+from rich.traceback import install
+from MaiMBot.src.chat.message_receive.chat_stream import ChatStream
+from src.chat.models.utils_model import LLMRequest
+from src.config.config import global_config
+from src.chat.focus_chat.heartflow_prompt_builder import prompt_builder
+from src.chat.focus_chat.info.info_base import InfoBase
+from src.chat.focus_chat.info.obs_info import ObsInfo
+from src.chat.focus_chat.info.cycle_info import CycleInfo
+from src.chat.focus_chat.info.mind_info import MindInfo
+from src.chat.focus_chat.info.structured_info import StructuredInfo
+from src.common.logger_manager import get_logger
+from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
+from src.individuality.individuality import Individuality
+from src.chat.focus_chat.planners.action_factory import ActionManager
+from src.chat.focus_chat.planners.action_factory import ActionInfo
+from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat
+from src.plugins.group_nickname.nickname_manager import nickname_manager
+logger = get_logger("planner")
+
+install(extra_lines=3)
+
+def init_prompt():
+ Prompt(
+ """你的名字是{bot_name},{prompt_personality},{chat_context_description}。需要基于以下信息决定如何参与对话:
+{nickname_info_block}
+{chat_content_block}
+{mind_info_block}
+{cycle_info_block}
+
+请综合分析聊天内容和你看到的新消息,参考聊天规划,选择合适的action:
+
+{action_options_text}
+
+你必须从上面列出的可用action中选择一个,并说明原因。
+你的决策必须以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。
+
+请你以下面格式输出你选择的action:
+{{
+ "action": "action_name",
+ "reasoning": "你的决策理由",
+ "参数1": "参数1的值",
+ "参数2": "参数2的值",
+ "参数3": "参数3的值",
+ ...
+}}
+
+请输出你的决策 JSON:""",
+"planner_prompt",)
+
+ Prompt(
+ """
+action_name: {action_name}
+ 描述:{action_description}
+ 参数:
+ {action_parameters}
+ 动作要求:
+ {action_require}
+ """,
+ "action_prompt",
+ )
+
+
+class ActionPlanner:
+ def __init__(self, log_prefix: str, action_manager: ActionManager, stream_id: str, chat_stream: ChatStream):
+ self.log_prefix = log_prefix
+ # LLM规划器配置
+ self.planner_llm = LLMRequest(
+ model=global_config.llm_plan,
+ max_tokens=1000,
+ request_type="action_planning", # 用于动作规划
+ )
+ self.action_manager = action_manager
+ self.stream_id = stream_id
+ self.chat_stream = chat_stream
+
+ async def plan(self, all_plan_info: List[InfoBase], cycle_timers: dict) -> Dict[str, Any]:
+ """
+ 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。
+
+ 参数:
+ all_plan_info: 所有计划信息
+ cycle_timers: 计时器字典
+ """
+
+ action = "no_reply" # 默认动作
+ reasoning = "规划器初始化默认"
+
+ try:
+ # 获取观察信息
+ for info in all_plan_info:
+ if isinstance(info, ObsInfo):
+ logger.debug(f"{self.log_prefix} 观察信息: {info}")
+ observed_messages = info.get_talking_message()
+ observed_messages_str = info.get_talking_message_str_truncate()
+ chat_type = info.get_chat_type()
+ if chat_type == "group":
+ is_group_chat = True
+ else:
+ is_group_chat = False
+ elif isinstance(info, MindInfo):
+ logger.debug(f"{self.log_prefix} 思维信息: {info}")
+ current_mind = info.get_current_mind()
+ elif isinstance(info, CycleInfo):
+ logger.debug(f"{self.log_prefix} 循环信息: {info}")
+ cycle_info = info.get_observe_info()
+ elif isinstance(info, StructuredInfo):
+ logger.debug(f"{self.log_prefix} 结构化信息: {info}")
+ structured_info = info.get_data()
+
+ current_available_actions = self.action_manager.get_using_actions()
+
+ # --- 构建提示词 (调用修改后的 PromptBuilder 方法) ---
+ prompt = await self.build_planner_prompt(
+ is_group_chat=is_group_chat, # <-- Pass HFC state
+ chat_target_info=None,
+ observed_messages_str=observed_messages_str, # <-- Pass local variable
+ current_mind=current_mind, # <-- Pass argument
+ # structured_info=structured_info, # <-- Pass SubMind info
+ current_available_actions=current_available_actions, # <-- Pass determined actions
+ cycle_info=cycle_info, # <-- Pass cycle info
+ )
+
+ # --- 调用 LLM (普通文本生成) ---
+ llm_content = None
+ try:
+ llm_content, _, _ = await self.planner_llm.generate_response(prompt=prompt)
+ logger.debug(f"{self.log_prefix}[Planner] LLM 原始 JSON 响应 (预期): {llm_content}")
+ except Exception as req_e:
+ logger.error(f"{self.log_prefix}[Planner] LLM 请求执行失败: {req_e}")
+ reasoning = f"LLM 请求失败,你的模型出现问题: {req_e}"
+ action = "no_reply"
+
+ if llm_content:
+ try:
+ # 尝试去除可能的 markdown 代码块标记
+ cleaned_content = (
+ llm_content.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
+ )
+ if not cleaned_content:
+ raise json.JSONDecodeError("Cleaned content is empty", cleaned_content, 0)
+ parsed_json = json.loads(cleaned_content)
+
+ # 提取决策,提供默认值
+ extracted_action = parsed_json.get("action", "no_reply")
+ extracted_reasoning = parsed_json.get("reasoning", "LLM未提供理由")
+
+ # 新的reply格式
+ if extracted_action == "reply":
+ action_data = {
+ "text": parsed_json.get("text", []),
+ "emojis": parsed_json.get("emojis", []),
+ "target": parsed_json.get("target", ""),
+ }
+ else:
+ action_data = {} # 其他动作可能不需要额外数据
+
+ if extracted_action not in current_available_actions:
+ logger.warning(
+ f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{extracted_action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'"
+ )
+ action = "no_reply"
+ reasoning = f"LLM 返回了当前不可用的动作 '{extracted_action}' (可用: {list(current_available_actions.keys())})。原始理由: {extracted_reasoning}"
+ else:
+ # 动作有效且可用
+ action = extracted_action
+ reasoning = extracted_reasoning
+
+ except Exception as json_e:
+ logger.warning(
+ f"{self.log_prefix}解析LLM响应JSON失败,模型返回不标准: {json_e}. LLM原始输出: '{llm_content}'"
+ )
+ reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'."
+ action = "no_reply"
+
+ except Exception as outer_e:
+ logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_reply: {outer_e}")
+ traceback.print_exc()
+ action = "no_reply" # 发生未知错误,标记为 error 动作
+ reasoning = f"Planner 内部处理错误: {outer_e}"
+
+ logger.debug(
+ f"{self.log_prefix}规划器Prompt:\n{prompt}\n\n决策动作:{action},\n动作信息: '{action_data}'\n理由: {reasoning}"
+ )
+
+ # 恢复原始动作集
+ self.action_manager.restore_actions()
+ logger.debug(
+ f"{self.log_prefix}恢复了原始动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}"
+ )
+
+ action_result = {"action_type": action, "action_data": action_data, "reasoning": reasoning}
+
+ plan_result = {
+ "action_result": action_result,
+ "current_mind": current_mind,
+ "observed_messages": observed_messages,
+ }
+
+ # 返回结果字典
+ return plan_result
+
+
+ async def build_planner_prompt(
+ self,
+ is_group_chat: bool, # Now passed as argument
+ chat_target_info: Optional[dict], # Now passed as argument
+ observed_messages_str: str,
+ current_mind: Optional[str],
+ current_available_actions: Dict[str, ActionInfo],
+ cycle_info: Optional[str],
+ ) -> str:
+ """构建 Planner LLM 的提示词 (获取模板并填充数据)"""
+ try:
+ # --- Determine chat context ---
+ chat_context_description = "你现在正在一个群聊中"
+ chat_target_name = None # Only relevant for private
+ if not is_group_chat and chat_target_info:
+ chat_target_name = (
+ chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
+ )
+ chat_context_description = f"你正在和 {chat_target_name} 私聊"
+
+
+ chat_content_block = ""
+ if observed_messages_str:
+ chat_content_block = f"聊天记录:\n{observed_messages_str}"
+ else:
+ chat_content_block = "你还未开始聊天"
+
+ mind_info_block = ""
+ if current_mind:
+ mind_info_block = f"对聊天的规划:{current_mind}"
+ else:
+ mind_info_block = "你刚参与聊天"
+
+ individuality = Individuality.get_instance()
+ personality_block = individuality.get_prompt(x_person=2, level=2)
+
+
+ action_options_block = ""
+ for using_actions_name, using_actions_info in current_available_actions.items():
+ # print(using_actions_name)
+ # print(using_actions_info)
+ # print(using_actions_info["parameters"])
+ # print(using_actions_info["require"])
+ # print(using_actions_info["description"])
+
+ using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
+
+ param_text = ""
+ for param_name, param_description in using_actions_info["parameters"].items():
+ param_text += f"{param_name}: {param_description}\n"
+
+ require_text = ""
+ for require_item in using_actions_info["require"]:
+ require_text += f"- {require_item}\n"
+
+ using_action_prompt = using_action_prompt.format(
+ action_name=using_actions_name,
+ action_description=using_actions_info["description"],
+ action_parameters=param_text,
+ action_require=require_text,
+ )
+
+ action_options_block += using_action_prompt
+
+ # 需要获取用于上下文的历史消息
+ message_list_before_now = get_raw_msg_before_timestamp_with_chat(
+ chat_id=self.stream_id,
+ timestamp=time.time(), # 使用当前时间作为参考点
+ limit=global_config.observation_context_size, # 使用与 prompt 构建一致的 limit
+ )
+ # 调用工具函数获取格式化后的绰号字符串
+ nickname_injection_str = await nickname_manager.get_nickname_prompt_injection(
+ self.chat_stream, message_list_before_now
+ )
+
+ planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
+ prompt = planner_prompt_template.format(
+ bot_name=global_config.BOT_NICKNAME,
+ prompt_personality=personality_block,
+ chat_context_description=chat_context_description,
+ chat_content_block=chat_content_block,
+ mind_info_block=mind_info_block,
+ cycle_info_block=cycle_info,
+ action_options_text=action_options_block,
+ nickname_info_block=nickname_injection_str,
+ )
+ return prompt
+
+ except Exception as e:
+ logger.error(f"构建 Planner 提示词时出错: {e}")
+ logger.error(traceback.format_exc())
+ return "构建 Planner Prompt 时出错"
+
+
+init_prompt()
diff --git a/src/chat/heart_flow/observation/chatting_observation.py b/src/chat/heart_flow/observation/chatting_observation.py
index 5a16792b..ccc414c9 100644
--- a/src/chat/heart_flow/observation/chatting_observation.py
+++ b/src/chat/heart_flow/observation/chatting_observation.py
@@ -19,7 +19,7 @@ from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
from src.chat.utils.prompt_builder import Prompt
-logger = get_logger(__name__)
+logger = get_logger("observation")
Prompt(
diff --git a/src/chat/heart_flow/observation/hfcloop_observation.py b/src/chat/heart_flow/observation/hfcloop_observation.py
index f2f33671..470671e2 100644
--- a/src/chat/heart_flow/observation/hfcloop_observation.py
+++ b/src/chat/heart_flow/observation/hfcloop_observation.py
@@ -23,19 +23,16 @@ class HFCloopObservation:
def add_loop_info(self, loop_info: CycleDetail):
# logger.debug(f"添加循环信息111111111111111111111111111111111111: {loop_info}")
# print(f"添加循环信息111111111111111111111111111111111111: {loop_info}")
- print(f"action_taken: {loop_info.action_taken}")
- print(f"action_type: {loop_info.action_type}")
- print(f"response_info: {loop_info.response_info}")
self.history_loop.append(loop_info)
async def observe(self):
recent_active_cycles: List[CycleDetail] = []
for cycle in reversed(self.history_loop):
# 只关心实际执行了动作的循环
- if cycle.action_taken:
+ action_taken = cycle.loop_action_info["action_taken"]
+ if action_taken:
recent_active_cycles.append(cycle)
- # 最多找最近的3个活动循环
- if len(recent_active_cycles) == 3:
+ if len(recent_active_cycles) == 5:
break
cycle_info_block = ""
@@ -44,10 +41,10 @@ class HFCloopObservation:
# 检查这最近的活动循环中有多少是连续的文本回复 (从最近的开始看)
for cycle in recent_active_cycles:
- if cycle.action_type == "reply":
+ action_type = cycle.loop_plan_info["action_result"]["action_type"]
+ if action_type == "reply":
consecutive_text_replies += 1
- # 获取回复内容,如果不存在则返回'[空回复]'
- response_text = cycle.response_info.get("response_text", "[空回复]")
+ response_text = cycle.loop_plan_info["action_result"]["action_data"].get("text", "[空回复]")
responses_for_prompt.append(response_text)
else:
break
@@ -70,7 +67,7 @@ class HFCloopObservation:
# 获取history_loop中最新添加的
if self.history_loop:
- last_loop = self.history_loop[-1]
+ last_loop = self.history_loop[0]
start_time = last_loop.start_time
end_time = last_loop.end_time
if start_time is not None and end_time is not None:
diff --git a/src/chat/utils/chat_message_builder.py b/src/chat/utils/chat_message_builder.py
index 2d34b669..f8ae89d6 100644
--- a/src/chat/utils/chat_message_builder.py
+++ b/src/chat/utils/chat_message_builder.py
@@ -71,8 +71,9 @@ def get_raw_msg_by_timestamp_random(
# 随机选一条
msg = random.choice(all_msgs)
chat_id = msg["chat_id"]
+ timestamp_start = msg["time"]
# 用 chat_id 获取该聊天在指定时间戳范围内的消息
- return get_raw_msg_by_timestamp_with_chat(chat_id, timestamp_start, timestamp_end, limit, limit_mode)
+ return get_raw_msg_by_timestamp_with_chat(chat_id, timestamp_start, timestamp_end, limit, "earliest")
def get_raw_msg_by_timestamp_with_users(
@@ -418,15 +419,25 @@ async def build_readable_messages(
async def build_anonymous_messages(messages: List[Dict[str, Any]]) -> str:
"""
构建匿名可读消息,将不同人的名称转为唯一占位符(A、B、C...),bot自己用SELF。
+ 处理 回复