From 29f818b43558024c91a12484a68f74eddef9d361 Mon Sep 17 00:00:00 2001
From: SnowindMe <1945743455@qq.com>
Date: Wed, 4 Jun 2025 16:29:06 +0800
Subject: [PATCH] trying resolves conflicts
---
README.md | 126 --
scripts/070configexe.py | 1137 -----------------
src/chat/focus_chat/heartFC_Cycleinfo.py | 135 --
src/chat/focus_chat/heartFC_chat.py | 636 ---------
.../observation/chatting_observation.py | 290 -----
.../observation/hfcloop_observation.py | 125 --
src/config/config.py | 190 ---
src/config/official_configs.py | 468 -------
8 files changed, 3107 deletions(-)
delete mode 100644 README.md
delete mode 100644 scripts/070configexe.py
delete mode 100644 src/chat/focus_chat/heartFC_Cycleinfo.py
delete mode 100644 src/chat/focus_chat/heartFC_chat.py
delete mode 100644 src/chat/heart_flow/observation/chatting_observation.py
delete mode 100644 src/chat/heart_flow/observation/hfcloop_observation.py
delete mode 100644 src/config/config.py
delete mode 100644 src/config/official_configs.py
diff --git a/README.md b/README.md
deleted file mode 100644
index 965c6dca..00000000
--- a/README.md
+++ /dev/null
@@ -1,126 +0,0 @@
-
-
-# 麦麦!MaiCore-MaiBot
-
-
-
-
-
-
-
-
-[](https://deepwiki.com/DrSmoothl/MaiBot)
-
-
-
-## 🎉 介绍
-
-**🍔MaiCore 是一个基于大语言模型的可交互智能体**
-
-- 💭 **智能对话系统**:基于 LLM 的自然语言交互。
-- 🤔 **实时思维系统**:模拟人类思考过程。
-- 💝 **情感表达系统**:丰富的表情包和情绪表达。
-- 🧠 **持久记忆系统**:基于图的长期记忆存储。
-- 🔄 **动态人格系统**:自适应的性格特征和表达方式。
-
-
-
-## 🔥 更新和安装
-
-**最新版本: v0.7.0** ([更新日志](changelogs/changelog.md))
-可前往 [Release](https://github.com/MaiM-with-u/MaiBot/releases/) 页面下载最新版本
-可前往 [启动器发布页面](https://github.com/MaiM-with-u/mailauncher/releases/tag/v0.1.0)下载最新启动器
-**GitHub 分支说明:**
-- `main`: 稳定发布版本(推荐)
-- `dev`: 开发测试版本(不稳定)
-- `classical`: 旧版本(停止维护)
-
-### 最新版本部署教程
-- [从0.6升级须知](https://docs.mai-mai.org/faq/maibot/update_to_07.html)
-- [🚀 最新版本部署教程](https://docs.mai-mai.org/manual/deployment/mmc_deploy_windows.html) - 基于 MaiCore 的新版本部署方式(与旧版本不兼容)
-
-> [!WARNING]
-> - 从 0.6.x 旧版本升级前请务必阅读:[升级指南](https://docs.mai-mai.org/faq/maibot/update_to_07.html)
-> - 项目处于活跃开发阶段,功能和 API 可能随时调整。
-> - 文档未完善,有问题可以提交 Issue 或者 Discussion。
-> - QQ 机器人存在被限制风险,请自行了解,谨慎使用。
-> - 由于持续迭代,可能存在一些已知或未知的 bug。
-> - 由于程序处于开发中,可能消耗较多 token。
-
-## 💬 讨论
-
-- [一群](https://qm.qq.com/q/VQ3XZrWgMs) |
- [四群](https://qm.qq.com/q/wGePTl1UyY) |
- [二群](https://qm.qq.com/q/RzmCiRtHEW) |
- [五群](https://qm.qq.com/q/JxvHZnxyec)(已满) |
- [三群](https://qm.qq.com/q/wlH5eT8OmQ)(已满)
-
-## 📚 文档
-
-**部分内容可能更新不够及时,请注意版本对应**
-
-- [📚 核心 Wiki 文档](https://docs.mai-mai.org) - 项目最全面的文档中心,你可以了解麦麦有关的一切。
-
-### 设计理念(原始时代的火花)
-
-> **千石可乐说:**
-> - 这个项目最初只是为了给牛牛 bot 添加一点额外的功能,但是功能越写越多,最后决定重写。其目的是为了创造一个活跃在 QQ 群聊的"生命体"。目的并不是为了写一个功能齐全的机器人,而是一个尽可能让人感知到真实的类人存在。
-> - 程序的功能设计理念基于一个核心的原则:"最像而不是好"。
-> - 如果人类真的需要一个 AI 来陪伴自己,并不是所有人都需要一个完美的,能解决所有问题的"helpful assistant",而是一个会犯错的,拥有自己感知和想法的"生命形式"。
-> - 代码会保持开源和开放,但个人希望 MaiMbot 的运行时数据保持封闭,尽量避免以显式命令来对其进行控制和调试。我认为一个你无法完全掌控的个体才更能让你感觉到它的自主性,而视其成为一个对话机器。
-> - SengokuCola~~纯编程外行,面向 cursor 编程,很多代码写得不好多多包涵~~已得到大脑升级。
-
-## 🙋 贡献和致谢
-你可以阅读[开发文档](https://docs.mai-mai.org/develop/)来更好的了解麦麦!
-MaiCore 是一个开源项目,我们非常欢迎你的参与。你的贡献,无论是提交 bug 报告、功能需求还是代码 pr,都对项目非常宝贵。我们非常感谢你的支持!🎉
-但无序的讨论会降低沟通效率,进而影响问题的解决速度,因此在提交任何贡献前,请务必先阅读本项目的[贡献指南](docs/CONTRIBUTE.md)。(待补完)
-
-### 贡献者
-
-感谢各位大佬!
-
-
-
-
-
-### 致谢
-
-- [略nd](https://space.bilibili.com/1344099355): 为麦麦绘制人设。
-- [NapCat](https://github.com/NapNeko/NapCatQQ): 现代化的基于 NTQQ 的 Bot 协议端实现。
-
-**也感谢每一位给麦麦发展提出宝贵意见与建议的用户,感谢陪伴麦麦走到现在的你们!**
-
-## 📌 注意事项
-
-> [!WARNING]
-> 使用本项目前必须阅读和同意[用户协议](EULA.md)和[隐私协议](PRIVACY.md)。
-> 本应用生成内容来自人工智能模型,由 AI 生成,请仔细甄别,请勿用于违反法律的用途,AI 生成内容不代表本项目团队的观点和立场。
-
-## 麦麦仓库状态
-
-
-
-### Star 趋势
-
-[](https://starchart.cc/MaiM-with-u/MaiBot)
-
-## License
-
-GPL-3.0
diff --git a/scripts/070configexe.py b/scripts/070configexe.py
deleted file mode 100644
index 7eaa6cef..00000000
--- a/scripts/070configexe.py
+++ /dev/null
@@ -1,1137 +0,0 @@
-import tkinter as tk
-from tkinter import ttk, messagebox, filedialog
-import tomli
-import tomli_w
-import os
-from typing import Any, Dict, List
-import threading
-import time
-import sys
-
-
-class ConfigEditor:
- def __init__(self, root):
- self.root = root
- self.root.title("麦麦配置编辑器")
-
- # 加载编辑器配置
- self.load_editor_config()
-
- # 设置窗口大小
- self.root.geometry(f"{self.window_width}x{self.window_height}")
-
- # 加载配置
- self.load_config()
-
- # 加载环境变量
- self.load_env_vars()
-
- # 自动保存相关
- self.last_save_time = time.time()
- self.save_timer = None
- self.save_lock = threading.Lock()
- self.current_section = None # 当前编辑的节
- self.pending_save = False # 是否有待保存的更改
-
- # 存储控件的字典
- self.widgets = {}
-
- # 创建主框架
- self.main_frame = ttk.Frame(self.root, padding="10")
- self.main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
-
- # 创建版本号显示
- self.create_version_label()
-
- # 创建左侧导航栏
- self.create_navbar()
-
- # 创建右侧编辑区
- self.create_editor()
-
- # 创建底部按钮
- self.create_buttons()
-
- # 配置网格权重
- self.root.columnconfigure(0, weight=1)
- self.root.rowconfigure(0, weight=1)
- self.main_frame.columnconfigure(1, weight=1)
- self.main_frame.rowconfigure(1, weight=1) # 修改为1,因为第0行是版本号
-
- # 默认选择快捷设置栏
- self.current_section = "quick_settings"
- self.create_quick_settings_widgets()
- # 选中导航树中的快捷设置项
- for item in self.tree.get_children():
- if self.tree.item(item)["values"][0] == "quick_settings":
- self.tree.selection_set(item)
- break
-
- def load_editor_config(self):
- """加载编辑器配置"""
- try:
- editor_config_path = os.path.join(os.path.dirname(__file__), "configexe.toml")
- with open(editor_config_path, "rb") as f:
- self.editor_config = tomli.load(f) # 保存整个配置对象
-
- # 设置配置路径
- self.config_path = self.editor_config["config"]["bot_config_path"]
- # 如果路径是相对路径,转换为绝对路径
- if not os.path.isabs(self.config_path):
- self.config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), self.config_path)
-
- # 设置编辑器参数
- self.window_width = self.editor_config["editor"]["window_width"]
- self.window_height = self.editor_config["editor"]["window_height"]
- self.save_delay = self.editor_config["editor"]["save_delay"]
-
- # 加载翻译
- self.translations = self.editor_config.get("translations", {})
-
- except Exception as e:
- messagebox.showerror("错误", f"加载编辑器配置失败: {str(e)}")
- # 使用默认值
- self.editor_config = {} # 初始化空配置
- self.config_path = "config/bot_config.toml"
- self.window_width = 1000
- self.window_height = 800
- self.save_delay = 1.0
- self.translations = {}
-
- def load_config(self):
- try:
- with open(self.config_path, "rb") as f:
- self.config = tomli.load(f)
- except Exception as e:
- messagebox.showerror("错误", f"加载配置文件失败: {str(e)}")
- self.config = {}
- # 自动打开配置路径窗口
- self.open_path_config()
-
- def load_env_vars(self):
- """加载并解析环境变量文件"""
- try:
- # 从配置中获取环境文件路径
- env_path = self.config.get("inner", {}).get("env_file", ".env")
- if not os.path.isabs(env_path):
- env_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), env_path)
-
- if not os.path.exists(env_path):
- print(f"环境文件不存在: {env_path}")
- return
-
- # 读取环境文件
- with open(env_path, "r", encoding="utf-8") as f:
- env_content = f.read()
-
- # 解析环境变量
- env_vars = {}
- for line in env_content.split("\n"):
- line = line.strip()
- if not line or line.startswith("#"):
- continue
-
- if "=" in line:
- key, value = line.split("=", 1)
- key = key.strip()
- value = value.strip()
-
- # 检查是否是目标变量
- if key.endswith("_BASE_URL") or key.endswith("_KEY"):
- # 提取前缀(去掉_BASE_URL或_KEY)
- prefix = key[:-9] if key.endswith("_BASE_URL") else key[:-4]
- if prefix not in env_vars:
- env_vars[prefix] = {}
- env_vars[prefix][key] = value
-
- # 将解析的环境变量添加到配置中
- if "env_vars" not in self.config:
- self.config["env_vars"] = {}
- self.config["env_vars"].update(env_vars)
-
- except Exception as e:
- print(f"加载环境变量失败: {str(e)}")
-
- def create_version_label(self):
- """创建版本号显示标签"""
- version = self.config.get("inner", {}).get("version", "未知版本")
- version_frame = ttk.Frame(self.main_frame)
- version_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
-
- # 添加配置按钮
- config_button = ttk.Button(version_frame, text="配置路径", command=self.open_path_config)
- config_button.pack(side=tk.LEFT, padx=5)
-
- version_label = ttk.Label(version_frame, text=f"麦麦版本:{version}", font=("微软雅黑", 10, "bold"))
- version_label.pack(side=tk.LEFT, padx=5)
-
- def create_navbar(self):
- # 创建左侧导航栏
- self.nav_frame = ttk.Frame(self.main_frame, padding="5")
- self.nav_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
-
- # 创建导航树
- self.tree = ttk.Treeview(self.nav_frame)
- self.tree.pack(fill=tk.BOTH, expand=True)
-
- # 添加快捷设置节
- self.tree.insert("", "end", text="快捷设置", values=("quick_settings",))
-
- # 添加env_vars节,显示为"配置你的模型APIKEY"
- self.tree.insert("", "end", text="配置你的模型APIKEY", values=("env_vars",))
-
- # 只显示bot_config.toml实际存在的section
- for section in self.config:
- if section not in (
- "inner",
- "env_vars",
- "telemetry",
- "experimental",
- "maim_message",
- "keyword_reaction",
- "message_receive",
- "relationship",
- ):
- section_trans = self.translations.get("sections", {}).get(section, {})
- section_name = section_trans.get("name", section)
- self.tree.insert("", "end", text=section_name, values=(section,))
- # 绑定选择事件
- self.tree.bind("<>", self.on_section_select)
-
- def create_editor(self):
- # 创建右侧编辑区
- self.editor_frame = ttk.Frame(self.main_frame, padding="5")
- self.editor_frame.grid(row=1, column=1, sticky=(tk.W, tk.E, tk.N, tk.S))
-
- # 创建编辑区标题
- # self.editor_title = ttk.Label(self.editor_frame, text="")
- # self.editor_title.pack(fill=tk.X)
-
- # 创建编辑区内容
- self.editor_content = ttk.Frame(self.editor_frame)
- self.editor_content.pack(fill=tk.BOTH, expand=True)
-
- # 创建滚动条
- self.scrollbar = ttk.Scrollbar(self.editor_content)
- self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
-
- # 创建画布和框架
- self.canvas = tk.Canvas(self.editor_content, yscrollcommand=self.scrollbar.set)
- self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- self.scrollbar.config(command=self.canvas.yview)
-
- # 创建内容框架
- self.content_frame = ttk.Frame(self.canvas)
- self.canvas.create_window((0, 0), window=self.content_frame, anchor=tk.NW)
-
- # 绑定画布大小变化事件
- self.content_frame.bind("", self.on_frame_configure)
- self.canvas.bind("", self.on_canvas_configure)
-
- def on_frame_configure(self, event=None):
- self.canvas.configure(scrollregion=self.canvas.bbox("all"))
-
- def on_canvas_configure(self, event):
- # 更新内容框架的宽度以适应画布
- self.canvas.itemconfig(self.canvas.find_withtag("all")[0], width=event.width)
-
- def create_buttons(self):
- # 创建底部按钮区
- self.button_frame = ttk.Frame(self.main_frame, padding="5")
- self.button_frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E))
-
- # 刷新按钮
- # self.refresh_button = ttk.Button(self.button_frame, text="刷新", command=self.refresh_config)
- # self.refresh_button.pack(side=tk.RIGHT, padx=5)
-
- # 高级选项按钮(左下角)
- self.advanced_button = ttk.Button(self.button_frame, text="高级选项", command=self.open_advanced_options)
- self.advanced_button.pack(side=tk.LEFT, padx=5)
-
- def create_widget_for_value(self, parent: ttk.Frame, key: str, value: Any, path: List[str]) -> None:
- """为不同类型的值创建对应的编辑控件"""
- frame = ttk.Frame(parent)
- frame.pack(fill=tk.X, padx=5, pady=2)
-
- # --- 修改开始: 改进翻译查找逻辑 ---
- full_config_path_key = ".".join(path + [key]) # 例如 "chinese_typo.enable"
-
- model_item_translations = {
- "name": ("模型名称", "模型的唯一标识或名称"),
- "provider": ("模型提供商", "模型API的提供商"),
- "pri_in": ("输入价格", "模型输入的价格/消耗"),
- "pri_out": ("输出价格", "模型输出的价格/消耗"),
- "temp": ("模型温度", "控制模型输出的多样性"),
- }
-
- item_name_to_display = key # 默认显示原始键名
- item_desc_to_display = "" # 默认无描述
-
- # 1. 尝试使用完整路径的特定翻译
- specific_translation = self.translations.get("items", {}).get(full_config_path_key)
- if specific_translation and specific_translation.get("name"):
- item_name_to_display = specific_translation.get("name")
- item_desc_to_display = specific_translation.get("description", "")
- else:
- # 2. 如果特定翻译未找到或没有name,尝试使用通用键名的翻译
- generic_translation = self.translations.get("items", {}).get(key)
- if generic_translation and generic_translation.get("name"):
- item_name_to_display = generic_translation.get("name")
- item_desc_to_display = generic_translation.get("description", "")
- elif key in model_item_translations:
- item_name_to_display, item_desc_to_display = model_item_translations[key]
- # --- 修改结束 ---
-
- # 配置名(大号字体)
- label = ttk.Label(frame, text=item_name_to_display, font=("微软雅黑", 16, "bold"))
- label.grid(row=0, column=0, sticky=tk.W, padx=5, pady=(0, 0))
-
- # 星星图标快捷设置(与配置名同一行)
- content_col_offset_for_star = 1 # 星标按钮占一列
- quick_settings = self.editor_config.get("editor", {}).get("quick_settings", {}).get("items", [])
- already_in_quick = any(item.get("path") == full_config_path_key for item in quick_settings)
- icon = "★" if already_in_quick else "☆"
- icon_fg = "#FFD600" # 始终金色
-
- def on_star_click():
- self.toggle_quick_setting(
- full_config_path_key, widget_type, item_name_to_display, item_desc_to_display, already_in_quick
- )
- # 立即刷新本分组
- for widget in parent.winfo_children():
- widget.destroy()
- self.widgets.clear()
- # 判断parent是不是self.content_frame
- if parent == self.content_frame:
- # 主界面
- if (
- hasattr(self, "current_section")
- and self.current_section
- and self.current_section != "quick_settings"
- ):
- self.create_section_widgets(
- parent, self.current_section, self.config[self.current_section], [self.current_section]
- )
- elif hasattr(self, "current_section") and self.current_section == "quick_settings":
- self.create_quick_settings_widgets()
- else:
- # 弹窗Tab
- # 重新渲染当前Tab的内容
- if path:
- section = path[0]
- self.create_section_widgets(parent, section, self.config[section], path)
-
- pin_btn = ttk.Button(frame, text=icon, width=2, command=on_star_click)
- pin_btn.grid(row=0, column=content_col_offset_for_star, sticky=tk.W, padx=5)
- try:
- pin_btn.configure(style="Pin.TButton")
- style = ttk.Style()
- style.configure("Pin.TButton", foreground=icon_fg)
- except Exception:
- pass
-
- # 配置项描述(第二行)
- desc_row = 1
- if item_desc_to_display:
- desc_label = ttk.Label(frame, text=item_desc_to_display, foreground="gray", font=("微软雅黑", 10))
- desc_label.grid(
- row=desc_row, column=0, columnspan=content_col_offset_for_star + 1, sticky=tk.W, padx=5, pady=(0, 4)
- )
- widget_row = desc_row + 1 # 内容控件在描述下方
- else:
- widget_row = desc_row # 内容控件直接在第二行
-
- # 配置内容控件(第三行或第二行)
- if path[0] == "inner":
- value_label = ttk.Label(frame, text=str(value), font=("微软雅黑", 16))
- value_label.grid(row=widget_row, column=0, columnspan=content_col_offset_for_star + 1, sticky=tk.W, padx=5)
- return
-
- if isinstance(value, bool):
- # 布尔值使用复选框
- var = tk.BooleanVar(value=value)
- checkbox = ttk.Checkbutton(frame, variable=var, command=lambda: self.on_value_changed())
- checkbox.grid(row=widget_row, column=0, columnspan=content_col_offset_for_star + 1, sticky=tk.W, padx=5)
- self.widgets[tuple(path + [key])] = var
- widget_type = "bool"
-
- elif isinstance(value, (int, float)):
- # 数字使用数字输入框
- var = tk.StringVar(value=str(value))
- entry = ttk.Entry(frame, textvariable=var, font=("微软雅黑", 16))
- entry.grid(row=widget_row, column=0, columnspan=content_col_offset_for_star + 1, sticky=tk.W + tk.E, padx=5)
- var.trace_add("write", lambda *args: self.on_value_changed())
- self.widgets[tuple(path + [key])] = var
- widget_type = "number"
-
- elif isinstance(value, list):
- # 列表使用每行一个输入框的形式
- frame_list = ttk.Frame(frame)
- frame_list.grid(
- row=widget_row, column=0, columnspan=content_col_offset_for_star + 1, sticky=tk.W + tk.E, padx=5
- )
-
- # 创建添加和删除按钮
- button_frame = ttk.Frame(frame_list)
- button_frame.pack(side=tk.RIGHT, padx=5)
-
- add_button = ttk.Button(
- button_frame, text="+", width=3, command=lambda p=path + [key]: self.add_list_item(frame_list, p)
- )
- add_button.pack(side=tk.TOP, pady=2)
-
- # 创建列表项框架
- items_frame = ttk.Frame(frame_list)
- items_frame.pack(side=tk.LEFT, fill=tk.X, expand=True)
-
- # 存储所有输入框的变量
- entry_vars = []
-
- # 为每个列表项创建输入框
- for i, item in enumerate(value):
- self.create_list_item(items_frame, item, i, entry_vars, path + [key])
-
- # 存储控件引用
- self.widgets[tuple(path + [key])] = (items_frame, entry_vars)
- widget_type = "list"
-
- else:
- # 其他类型(字符串等)使用普通文本框
- var = tk.StringVar(value=str(value))
-
- # 特殊处理provider字段
- full_path = ".".join(path + [key])
- if key == "provider" and full_path.startswith("model."):
- # print(f"处理provider字段,完整路径: {full_path}")
- # print(f"当前config中的env_vars: {self.config.get('env_vars', {})}")
- # 获取所有可用的provider选项
- providers = []
- if "env_vars" in self.config:
- # print(f"找到env_vars节,内容: {self.config['env_vars']}")
- # 遍历env_vars中的所有配置对
- for prefix, values in self.config["env_vars"].items():
- # print(f"检查配置对 {prefix}: {values}")
- # 检查是否同时有BASE_URL和KEY
- if f"{prefix}_BASE_URL" in values and f"{prefix}_KEY" in values:
- providers.append(prefix)
- # print(f"添加provider: {prefix}")
-
- # print(f"最终providers列表: {providers}")
- if providers:
- # 创建模型名称标签(大字体)
- # model_name = var.get() if var.get() else providers[0]
- # section_translations = {
- # "model.utils": "麦麦组件模型",
- # "model.utils_small": "小型麦麦组件模型",
- # "model.memory_summary": "记忆概括模型",
- # "model.vlm": "图像识别模型",
- # "model.embedding": "嵌入模型",
- # "model.normal_chat_1": "普通聊天:主要聊天模型",
- # "model.normal_chat_2": "普通聊天:次要聊天模型",
- # "model.focus_working_memory": "专注模式:工作记忆模型",
- # "model.focus_tool_use": "专注模式:工具调用模型",
- # "model.focus_planner": "专注模式:决策模型",
- # "model.focus_expressor": "专注模式:表达器模型",
- # }
- # 获取当前节的名称
- # current_section = ".".join(path[:-1]) # 去掉最后一个key
- # section_name = section_translations.get(current_section, current_section)
-
- # 创建节名称标签(大字体)
- # section_label = ttk.Label(frame, text="11", font=("微软雅黑", 24, "bold"))
- # section_label.grid(row=widget_row, column=0, columnspan=content_col_offset_for_star +1, sticky=tk.W, padx=5, pady=(0, 5))
-
- # 创建下拉菜单(小字体)
- combo = ttk.Combobox(
- frame, textvariable=var, values=providers, font=("微软雅黑", 12), state="readonly"
- )
- combo.grid(
- row=widget_row + 1,
- column=0,
- columnspan=content_col_offset_for_star + 1,
- sticky=tk.W + tk.E,
- padx=5,
- )
- combo.bind("<>", lambda e: self.on_value_changed())
- self.widgets[tuple(path + [key])] = var
- widget_type = "provider"
- # print(f"创建了下拉菜单,选项: {providers}")
- else:
- # 如果没有可用的provider,使用普通文本框
- # print(f"没有可用的provider,使用普通文本框")
- entry = ttk.Entry(frame, textvariable=var, font=("微软雅黑", 16))
- entry.grid(
- row=widget_row, column=0, columnspan=content_col_offset_for_star + 1, sticky=tk.W + tk.E, padx=5
- )
- var.trace_add("write", lambda *args: self.on_value_changed())
- self.widgets[tuple(path + [key])] = var
- widget_type = "text"
- else:
- # 普通文本框
- entry = ttk.Entry(frame, textvariable=var, font=("微软雅黑", 16))
- entry.grid(
- row=widget_row, column=0, columnspan=content_col_offset_for_star + 1, sticky=tk.W + tk.E, padx=5
- )
- var.trace_add("write", lambda *args: self.on_value_changed())
- self.widgets[tuple(path + [key])] = var
- widget_type = "text"
-
- def create_section_widgets(self, parent: ttk.Frame, section: str, data: Dict, path=None) -> None:
- """为配置节创建编辑控件"""
- if path is None:
- path = [section]
- # section完整路径
- full_section_path = ".".join(path)
- # 获取节的中文名称和描述
- section_translations = {
- "model.utils": "工具模型",
- "model.utils_small": "小型工具模型",
- "model.memory_summary": "记忆概括模型",
- "model.vlm": "图像识别模型",
- "model.embedding": "嵌入模型",
- "model.normal_chat_1": "主要聊天模型",
- "model.normal_chat_2": "次要聊天模型",
- "model.focus_working_memory": "工作记忆模型",
- "model.focus_tool_use": "工具调用模型",
- "model.focus_planner": "决策模型",
- "model.focus_expressor": "表达器模型",
- }
- section_trans = self.translations.get("sections", {}).get(full_section_path, {})
- section_name = section_trans.get("name") or section_translations.get(full_section_path) or section
- section_desc = section_trans.get("description", "")
-
- # 创建节的标签框架
- section_frame = ttk.Frame(parent)
- section_frame.pack(fill=tk.X, padx=5, pady=10)
-
- # 创建节的名称标签
- section_label = ttk.Label(section_frame, text=f"[{section_name}]", font=("微软雅黑", 18, "bold"))
- section_label.pack(side=tk.LEFT, padx=5)
-
- # 创建节的描述标签
- if isinstance(section_trans.get("description"), dict):
- # 如果是多语言描述,优先取en,否则取第一个
- desc_en = section_trans["description"].get("en") or next(iter(section_trans["description"].values()), "")
- desc_label = ttk.Label(section_frame, text=desc_en, foreground="gray", font=("微软雅黑", 10))
- else:
- desc_label = ttk.Label(section_frame, text=section_desc, foreground="gray", font=("微软雅黑", 10))
- desc_label.pack(side=tk.LEFT, padx=5)
-
- # 为每个配置项创建对应的控件
- for key, value in data.items():
- if isinstance(value, dict):
- self.create_section_widgets(parent, key, value, path + [key])
- else:
- self.create_widget_for_value(parent, key, value, path)
-
- def on_value_changed(self):
- """当值改变时触发自动保存"""
- self.pending_save = True
- current_time = time.time()
- if current_time - self.last_save_time > self.save_delay:
- if self.save_timer:
- self.root.after_cancel(self.save_timer)
- self.save_timer = self.root.after(int(self.save_delay * 1000), self.save_config)
-
- def on_section_select(self, event):
- # 如果有待保存的更改,先保存
- if self.pending_save:
- self.save_config()
-
- selection = self.tree.selection()
- if not selection:
- return
-
- section = self.tree.item(selection[0])["values"][0] # 使用values中的原始节名
- self.current_section = section
-
- # 清空编辑器
- for widget in self.content_frame.winfo_children():
- widget.destroy()
-
- # 清空控件字典
- self.widgets.clear()
-
- # 创建编辑控件
- if section == "quick_settings":
- self.create_quick_settings_widgets()
- elif section == "env_vars":
- self.create_env_vars_section(self.content_frame)
- elif section in self.config:
- self.create_section_widgets(self.content_frame, section, self.config[section])
-
- def create_quick_settings_widgets(self):
- """创建快捷设置编辑界面"""
- # 获取快捷设置配置
- quick_settings = self.editor_config.get("editor", {}).get("quick_settings", {}).get("items", [])
-
- # 创建快捷设置控件
- for setting in quick_settings:
- frame = ttk.Frame(self.content_frame)
- frame.pack(fill=tk.X, padx=5, pady=2)
-
- # 获取当前值
- path = setting["path"].split(".")
- current = self.config
- for key in path[:-1]: # 除了最后一个键
- current = current.get(key, {})
- value = current.get(path[-1]) # 获取最后一个键的值
-
- # 创建名称标签(加粗)
- name_label = ttk.Label(frame, text=setting["name"], font=("微软雅黑", 16, "bold"))
- name_label.pack(fill=tk.X, padx=5, pady=(2, 0))
-
- # 创建描述标签
- if setting.get("description"):
- desc_label = ttk.Label(frame, text=setting["description"], foreground="gray", font=("微软雅黑", 10))
- desc_label.pack(fill=tk.X, padx=5, pady=(0, 2))
-
- # 根据类型创建不同的控件
- setting_type = setting.get("type", "bool")
-
- if setting_type == "bool":
- value = bool(value) if value is not None else False
- var = tk.BooleanVar(value=value)
- checkbox = ttk.Checkbutton(
- frame, text="", variable=var, command=lambda p=path, v=var: self.on_quick_setting_changed(p, v)
- )
- checkbox.pack(anchor=tk.W, padx=5, pady=(0, 5))
-
- elif setting_type == "text":
- value = str(value) if value is not None else ""
- var = tk.StringVar(value=value)
- entry = ttk.Entry(frame, textvariable=var, width=40, font=("微软雅黑", 12))
- entry.pack(fill=tk.X, padx=5, pady=(0, 5))
- var.trace_add("write", lambda *args, p=path, v=var: self.on_quick_setting_changed(p, v))
-
- elif setting_type == "number":
- value = str(value) if value is not None else "0"
- var = tk.StringVar(value=value)
- entry = ttk.Entry(frame, textvariable=var, width=10, font=("微软雅黑", 12))
- entry.pack(fill=tk.X, padx=5, pady=(0, 5))
- var.trace_add("write", lambda *args, p=path, v=var: self.on_quick_setting_changed(p, v))
-
- elif setting_type == "list":
- # 对于列表类型,创建一个按钮来打开编辑窗口
- button = ttk.Button(
- frame, text="编辑列表", command=lambda p=path, s=setting: self.open_list_editor(p, s)
- )
- button.pack(anchor=tk.W, padx=5, pady=(0, 5))
-
- def create_list_item(self, parent, value, index, entry_vars, path):
- """创建单个列表项的输入框"""
- item_frame = ttk.Frame(parent)
- item_frame.pack(fill=tk.X, pady=1)
-
- # 创建输入框
- var = tk.StringVar(value=str(value))
- entry = ttk.Entry(item_frame, textvariable=var)
- entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
- var.trace_add("write", lambda *args: self.on_value_changed())
-
- # 创建删除按钮
- del_button = ttk.Button(
- item_frame,
- text="-",
- width=3,
- command=lambda: self.remove_list_item(parent, item_frame, entry_vars, index, path),
- )
- del_button.pack(side=tk.RIGHT, padx=5)
-
- # 存储变量引用
- entry_vars.append(var)
-
- def add_list_item(self, parent, path):
- """添加新的列表项"""
- items_frame = parent.winfo_children()[1] # 获取列表项框架
- entry_vars = self.widgets[tuple(path)][1] # 获取变量列表
-
- # 创建新的列表项
- self.create_list_item(items_frame, "", len(entry_vars), entry_vars, path)
- self.on_value_changed()
-
- def remove_list_item(self, parent, item_frame, entry_vars, index, path):
- """删除列表项"""
- item_frame.destroy()
- entry_vars.pop(index)
- self.on_value_changed()
-
- def get_widget_value(self, widget) -> Any:
- """获取控件的值"""
- if isinstance(widget, tk.BooleanVar):
- return widget.get()
- elif isinstance(widget, tk.StringVar):
- value = widget.get()
- try:
- # 尝试转换为数字
- if "." in value:
- return float(value)
- return int(value)
- except ValueError:
- return value
- elif isinstance(widget, tuple): # 列表类型
- items_frame, entry_vars = widget
- # 获取所有非空输入框的值
- return [var.get() for var in entry_vars if var.get().strip()]
- return None
-
- def save_config(self):
- """保存配置到文件"""
- if not self.pending_save:
- return
-
- with self.save_lock:
- try:
- # 获取所有控件的值
- for path, widget in self.widgets.items():
- # 跳过 env_vars 的控件赋值(只用于.env,不写回config)
- if len(path) >= 2 and path[0] == "env_vars":
- continue
- value = self.get_widget_value(widget)
- current = self.config
- for key in path[:-1]:
- current = current[key]
- final_key = path[-1]
- current[final_key] = value
-
- # === 只保存 TOML,不包含 env_vars ===
- env_vars = self.config.pop("env_vars", None)
- with open(self.config_path, "wb") as f:
- tomli_w.dump(self.config, f)
- if env_vars is not None:
- self.config["env_vars"] = env_vars
-
- # === 保存 env_vars 到 .env 文件(只覆盖特定key,其他内容保留) ===
- env_path = self.editor_config["config"].get("env_file", ".env")
- if not os.path.isabs(env_path):
- env_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), env_path)
- # 1. 读取原有.env内容
- old_lines = []
- if os.path.exists(env_path):
- with open(env_path, "r", encoding="utf-8") as f:
- old_lines = f.readlines()
- # 2. 收集所有目标key的新值(直接从widgets取)
- new_env_dict = {}
- for path, widget in self.widgets.items():
- if len(path) == 2 and path[0] == "env_vars":
- k = path[1]
- if k.endswith("_BASE_URL") or k.endswith("_KEY"):
- new_env_dict[k] = self.get_widget_value(widget)
- # 3. 遍历原有行,替换目标key,保留所有其他内容
- result_lines = []
- found_keys = set()
- for line in old_lines:
- if "=" in line and not line.strip().startswith("#"):
- k = line.split("=", 1)[0].strip()
- if k in new_env_dict:
- result_lines.append(f"{k}={new_env_dict[k]}\n")
- found_keys.add(k)
- else:
- result_lines.append(line)
- else:
- result_lines.append(line)
- # 4. 新key如果原.env没有,则追加
- for k, v in new_env_dict.items():
- if k not in found_keys:
- result_lines.append(f"{k}={v}\n")
- # 5. 写回.env
- with open(env_path, "w", encoding="utf-8") as f:
- f.writelines(result_lines)
- # === 结束 ===
-
- # === 保存完 .env 后,同步 widgets 的值回 self.config['env_vars'] ===
- for path, widget in self.widgets.items():
- if len(path) == 2 and path[0] == "env_vars":
- prefix_key = path[1]
- if prefix_key.endswith("_BASE_URL") or prefix_key.endswith("_KEY"):
- prefix = prefix_key[:-9] if prefix_key.endswith("_BASE_URL") else prefix_key[:-4]
- if "env_vars" not in self.config:
- self.config["env_vars"] = {}
- if prefix not in self.config["env_vars"]:
- self.config["env_vars"][prefix] = {}
- self.config["env_vars"][prefix][prefix_key] = self.get_widget_value(widget)
-
- self.last_save_time = time.time()
- self.pending_save = False
- except Exception as e:
- messagebox.showerror("错误", f"保存配置失败: {str(e)}")
-
- def refresh_config(self):
- # 如果有待保存的更改,先保存
- if self.pending_save:
- self.save_config()
-
- self.load_config()
- self.tree.delete(*self.tree.get_children())
- for section in self.config:
- # 获取节的中文名称
- section_trans = self.translations.get("sections", {}).get(section, {})
- section_name = section_trans.get("name", section)
- self.tree.insert("", "end", text=section_name, values=(section,))
- messagebox.showinfo("成功", "配置已刷新")
-
- def open_list_editor(self, path, setting):
- """打开列表编辑窗口"""
- # 创建新窗口
- dialog = tk.Toplevel(self.root)
- dialog.title(f"编辑 {setting['name']}")
- dialog.geometry("400x300")
-
- # 获取当前值
- current = self.config
- for key in path[:-1]:
- current = current.get(key, {})
- value = current.get(path[-1], [])
-
- # 创建编辑区
- frame = ttk.Frame(dialog, padding="10")
- frame.pack(fill=tk.BOTH, expand=True)
-
- # 创建列表项框架
- items_frame = ttk.Frame(frame)
- items_frame.pack(fill=tk.BOTH, expand=True)
-
- # 存储所有输入框的变量
- entry_vars = []
-
- # 为每个列表项创建输入框
- for i, item in enumerate(value):
- self.create_list_item(items_frame, item, i, entry_vars, path)
-
- # 创建按钮框架
- button_frame = ttk.Frame(frame)
- button_frame.pack(fill=tk.X, pady=10)
-
- # 添加按钮
- add_button = ttk.Button(button_frame, text="添加", command=lambda: self.add_list_item(items_frame, path))
- add_button.pack(side=tk.LEFT, padx=5)
-
- # 保存按钮
- save_button = ttk.Button(
- button_frame, text="保存", command=lambda: self.save_list_editor(dialog, path, entry_vars)
- )
- save_button.pack(side=tk.RIGHT, padx=5)
-
- def save_list_editor(self, dialog, path, entry_vars):
- """保存列表编辑窗口的内容"""
- # 获取所有非空输入框的值
- values = [var.get() for var in entry_vars if var.get().strip()]
-
- # 更新配置
- current = self.config
- for key in path[:-1]:
- if key not in current:
- current[key] = {}
- current = current[key]
- current[path[-1]] = values
-
- # 触发保存
- self.on_value_changed()
-
- # 关闭窗口
- dialog.destroy()
-
- def on_quick_setting_changed(self, path, var):
- """快捷设置值改变时的处理"""
- # 更新配置
- current = self.config
- for key in path[:-1]:
- if key not in current:
- current[key] = {}
- current = current[key]
- # 根据变量类型设置值
- if isinstance(var, tk.BooleanVar):
- current[path[-1]] = var.get()
- elif isinstance(var, tk.StringVar):
- value = var.get()
- try:
- # 尝试转换为数字
- if "." in value:
- current[path[-1]] = float(value)
- else:
- current[path[-1]] = int(value)
- except ValueError:
- current[path[-1]] = value
- # 触发保存
- self.on_value_changed()
-
- def toggle_quick_setting(self, full_path, widget_type, name, desc, already_in_quick):
- quick_settings = (
- self.editor_config.setdefault("editor", {}).setdefault("quick_settings", {}).setdefault("items", [])
- )
- if already_in_quick:
- # 移除
- self.editor_config["editor"]["quick_settings"]["items"] = [
- item for item in quick_settings if item.get("path") != full_path
- ]
- else:
- # 添加
- quick_settings.append({"name": name, "description": desc, "path": full_path, "type": widget_type})
- # 保存到configexe.toml
- import tomli_w
- import os
-
- config_path = os.path.join(os.path.dirname(__file__), "configexe.toml")
- with open(config_path, "wb") as f:
- tomli_w.dump(self.editor_config, f)
- self.refresh_quick_settings()
-
- def refresh_quick_settings(self):
- # 重新渲染快捷设置栏(如果当前在快捷设置页)
- if self.current_section == "quick_settings":
- for widget in self.content_frame.winfo_children():
- widget.destroy()
- self.widgets.clear()
- self.create_quick_settings_widgets()
-
- def create_env_var_group(self, parent: ttk.Frame, prefix: str, values: Dict[str, str], path: List[str]) -> None:
- """创建环境变量组"""
- frame = ttk.Frame(parent)
- frame.pack(fill=tk.X, padx=5, pady=2)
-
- # 创建组标题
- title_frame = ttk.Frame(frame)
- title_frame.pack(fill=tk.X, pady=(5, 0))
-
- title_label = ttk.Label(title_frame, text=f"API配置组: {prefix}", font=("微软雅黑", 16, "bold"))
- title_label.pack(side=tk.LEFT, padx=5)
-
- # 删除按钮
- del_button = ttk.Button(title_frame, text="删除组", command=lambda: self.delete_env_var_group(prefix))
- del_button.pack(side=tk.RIGHT, padx=5)
-
- # 创建BASE_URL输入框
- base_url_frame = ttk.Frame(frame)
- base_url_frame.pack(fill=tk.X, padx=5, pady=2)
-
- base_url_label = ttk.Label(base_url_frame, text="BASE_URL:", font=("微软雅黑", 12))
- base_url_label.pack(side=tk.LEFT, padx=5)
-
- base_url_var = tk.StringVar(value=values.get(f"{prefix}_BASE_URL", ""))
- base_url_entry = ttk.Entry(base_url_frame, textvariable=base_url_var, font=("微软雅黑", 12))
- base_url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
- base_url_var.trace_add("write", lambda *args: self.on_value_changed())
-
- # 创建KEY输入框
- key_frame = ttk.Frame(frame)
- key_frame.pack(fill=tk.X, padx=5, pady=2)
-
- key_label = ttk.Label(key_frame, text="API KEY:", font=("微软雅黑", 12))
- key_label.pack(side=tk.LEFT, padx=5)
-
- key_var = tk.StringVar(value=values.get(f"{prefix}_KEY", ""))
- key_entry = ttk.Entry(key_frame, textvariable=key_var, font=("微软雅黑", 12))
- key_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
- key_var.trace_add("write", lambda *args: self.on_value_changed())
-
- # 存储变量引用
- self.widgets[tuple(path + [f"{prefix}_BASE_URL"])] = base_url_var
- self.widgets[tuple(path + [f"{prefix}_KEY"])] = key_var
-
- # 添加分隔线
- separator = ttk.Separator(frame, orient="horizontal")
- separator.pack(fill=tk.X, pady=5)
-
- def create_env_vars_section(self, parent: ttk.Frame) -> None:
- """创建环境变量编辑区"""
- # 创建添加新组的按钮
- add_button = ttk.Button(parent, text="添加新的API配置组", command=self.add_new_env_var_group)
- add_button.pack(pady=10)
-
- # 创建现有组的编辑区
- if "env_vars" in self.config:
- for prefix, values in self.config["env_vars"].items():
- self.create_env_var_group(parent, prefix, values, ["env_vars"])
-
- def add_new_env_var_group(self):
- """添加新的环境变量组"""
- # 创建新窗口
- dialog = tk.Toplevel(self.root)
- dialog.title("添加新的API配置组")
- dialog.geometry("400x200")
-
- # 创建输入框架
- frame = ttk.Frame(dialog, padding="10")
- frame.pack(fill=tk.BOTH, expand=True)
-
- # 前缀输入
- prefix_label = ttk.Label(frame, text="API前缀名称:", font=("微软雅黑", 12))
- prefix_label.pack(pady=5)
-
- prefix_var = tk.StringVar()
- prefix_entry = ttk.Entry(frame, textvariable=prefix_var, font=("微软雅黑", 12))
- prefix_entry.pack(fill=tk.X, pady=5)
-
- # 确认按钮
- def on_confirm():
- prefix = prefix_var.get().strip()
- if prefix:
- if "env_vars" not in self.config:
- self.config["env_vars"] = {}
- self.config["env_vars"][prefix] = {f"{prefix}_BASE_URL": "", f"{prefix}_KEY": ""}
- # 刷新显示
- self.refresh_env_vars_section()
- self.on_value_changed()
- dialog.destroy()
-
- confirm_button = ttk.Button(frame, text="确认", command=on_confirm)
- confirm_button.pack(pady=10)
-
- def delete_env_var_group(self, prefix: str):
- """删除环境变量组"""
- if messagebox.askyesno("确认", f"确定要删除 {prefix} 配置组吗?"):
- if "env_vars" in self.config:
- del self.config["env_vars"][prefix]
- # 刷新显示
- self.refresh_env_vars_section()
- self.on_value_changed()
-
- def refresh_env_vars_section(self):
- """刷新环境变量编辑区"""
- # 清空当前显示
- for widget in self.content_frame.winfo_children():
- widget.destroy()
- self.widgets.clear()
-
- # 重新创建编辑区
- self.create_env_vars_section(self.content_frame)
-
- def open_advanced_options(self):
- """弹窗显示高级配置"""
- dialog = tk.Toplevel(self.root)
- dialog.title("高级选项")
- dialog.geometry("700x800")
-
- notebook = ttk.Notebook(dialog)
- notebook.pack(fill=tk.BOTH, expand=True)
-
- # 遥测栏
- if "telemetry" in self.config:
- telemetry_frame = ttk.Frame(notebook)
- notebook.add(telemetry_frame, text="遥测")
- self.create_section_widgets(telemetry_frame, "telemetry", self.config["telemetry"], ["telemetry"])
- # 实验性功能栏
- if "experimental" in self.config:
- exp_frame = ttk.Frame(notebook)
- notebook.add(exp_frame, text="实验性功能")
- self.create_section_widgets(exp_frame, "experimental", self.config["experimental"], ["experimental"])
- # 消息服务栏
- if "maim_message" in self.config:
- msg_frame = ttk.Frame(notebook)
- notebook.add(msg_frame, text="消息服务")
- self.create_section_widgets(msg_frame, "maim_message", self.config["maim_message"], ["maim_message"])
- # 消息接收栏
- if "message_receive" in self.config:
- recv_frame = ttk.Frame(notebook)
- notebook.add(recv_frame, text="消息接收")
- self.create_section_widgets(
- recv_frame, "message_receive", self.config["message_receive"], ["message_receive"]
- )
- # 关系栏
- if "relationship" in self.config:
- rel_frame = ttk.Frame(notebook)
- notebook.add(rel_frame, text="关系")
- self.create_section_widgets(rel_frame, "relationship", self.config["relationship"], ["relationship"])
-
- def open_path_config(self):
- """打开路径配置对话框"""
- dialog = tk.Toplevel(self.root)
- dialog.title("配置路径")
- dialog.geometry("600x200")
-
- # 创建输入框架
- frame = ttk.Frame(dialog, padding="10")
- frame.pack(fill=tk.BOTH, expand=True)
-
- # bot_config.toml路径配置
- bot_config_frame = ttk.Frame(frame)
- bot_config_frame.pack(fill=tk.X, pady=5)
-
- bot_config_label = ttk.Label(bot_config_frame, text="bot_config.toml路径:", font=("微软雅黑", 12))
- bot_config_label.pack(side=tk.LEFT, padx=5)
-
- bot_config_var = tk.StringVar(value=self.config_path)
- bot_config_entry = ttk.Entry(bot_config_frame, textvariable=bot_config_var, font=("微软雅黑", 12))
- bot_config_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
-
- def apply_config():
- new_bot_config_path = bot_config_var.get().strip()
- new_env_path = env_var.get().strip()
-
- if not new_bot_config_path or not new_env_path:
- messagebox.showerror("错误", "路径不能为空")
- return
-
- if not os.path.exists(new_bot_config_path):
- messagebox.showerror("错误", "bot_config.toml文件不存在")
- return
-
- # 更新配置
- self.config_path = new_bot_config_path
- self.editor_config["config"]["bot_config_path"] = new_bot_config_path
- self.editor_config["config"]["env_file"] = new_env_path
-
- # 保存编辑器配置
- config_path = os.path.join(os.path.dirname(__file__), "configexe.toml")
- with open(config_path, "wb") as f:
- tomli_w.dump(self.editor_config, f)
-
- # 重新加载配置
- self.load_config()
- self.load_env_vars()
-
- # 刷新显示
- self.refresh_config()
-
- messagebox.showinfo("成功", "路径配置已更新,程序将重新启动")
- dialog.destroy()
-
- # 重启程序
- self.root.quit()
- os.execv(sys.executable, ["python"] + sys.argv)
-
- def browse_bot_config():
- file_path = filedialog.askopenfilename(
- title="选择bot_config.toml文件", filetypes=[("TOML文件", "*.toml"), ("所有文件", "*.*")]
- )
- if file_path:
- bot_config_var.set(file_path)
- apply_config()
-
- browse_bot_config_btn = ttk.Button(bot_config_frame, text="浏览", command=browse_bot_config)
- browse_bot_config_btn.pack(side=tk.LEFT, padx=5)
-
- # .env路径配置
- env_frame = ttk.Frame(frame)
- env_frame.pack(fill=tk.X, pady=5)
-
- env_label = ttk.Label(env_frame, text=".env路径:", font=("微软雅黑", 12))
- env_label.pack(side=tk.LEFT, padx=5)
-
- env_path = self.editor_config["config"].get("env_file", ".env")
- if not os.path.isabs(env_path):
- env_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), env_path)
- env_var = tk.StringVar(value=env_path)
- env_entry = ttk.Entry(env_frame, textvariable=env_var, font=("微软雅黑", 12))
- env_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
-
- def browse_env():
- file_path = filedialog.askopenfilename(
- title="选择.env文件", filetypes=[("环境变量文件", "*.env"), ("所有文件", "*.*")]
- )
- if file_path:
- env_var.set(file_path)
- apply_config()
-
- browse_env_btn = ttk.Button(env_frame, text="浏览", command=browse_env)
- browse_env_btn.pack(side=tk.LEFT, padx=5)
-
-
-def main():
- root = tk.Tk()
- _app = ConfigEditor(root)
- root.mainloop()
-
-
-if __name__ == "__main__":
- main()
diff --git a/src/chat/focus_chat/heartFC_Cycleinfo.py b/src/chat/focus_chat/heartFC_Cycleinfo.py
deleted file mode 100644
index b8fc1ef2..00000000
--- a/src/chat/focus_chat/heartFC_Cycleinfo.py
+++ /dev/null
@@ -1,135 +0,0 @@
-import time
-import os
-from typing import Optional, Dict, Any
-from src.common.logger_manager import get_logger
-import json
-
-logger = get_logger("hfc") # Logger Name Changed
-
-log_dir = "log/log_cycle_debug/"
-
-
-class CycleDetail:
- """循环信息记录类"""
-
- def __init__(self, cycle_id: int):
- self.cycle_id = cycle_id
- self.prefix = ""
- self.thinking_id = ""
- self.start_time = time.time()
- self.end_time: Optional[float] = None
- self.timers: Dict[str, float] = {}
-
- # 新字段
- self.loop_observation_info: Dict[str, Any] = {}
- self.loop_process_info: Dict[str, Any] = {}
- self.loop_plan_info: Dict[str, Any] = {}
- self.loop_action_info: Dict[str, Any] = {}
-
- def to_dict(self) -> Dict[str, Any]:
- """将循环信息转换为字典格式"""
-
- def convert_to_serializable(obj, depth=0, seen=None):
- if seen is None:
- seen = set()
-
- # 防止递归过深
- if depth > 5: # 降低递归深度限制
- return str(obj)
-
- # 防止循环引用
- obj_id = id(obj)
- if obj_id in seen:
- return str(obj)
- seen.add(obj_id)
-
- try:
- if hasattr(obj, "to_dict"):
- # 对于有to_dict方法的对象,直接调用其to_dict方法
- return obj.to_dict()
- elif isinstance(obj, dict):
- # 对于字典,只保留基本类型和可序列化的值
- return {
- k: convert_to_serializable(v, depth + 1, seen)
- for k, v in obj.items()
- if isinstance(k, (str, int, float, bool))
- }
- elif isinstance(obj, (list, tuple)):
- # 对于列表和元组,只保留可序列化的元素
- return [
- convert_to_serializable(item, depth + 1, seen)
- for item in obj
- if not isinstance(item, (dict, list, tuple))
- or isinstance(item, (str, int, float, bool, type(None)))
- ]
- elif isinstance(obj, (str, int, float, bool, type(None))):
- return obj
- else:
- return str(obj)
- finally:
- seen.remove(obj_id)
-
- return {
- "cycle_id": self.cycle_id,
- "start_time": self.start_time,
- "end_time": self.end_time,
- "timers": self.timers,
- "thinking_id": self.thinking_id,
- "loop_observation_info": convert_to_serializable(self.loop_observation_info),
- "loop_process_info": convert_to_serializable(self.loop_process_info),
- "loop_plan_info": convert_to_serializable(self.loop_plan_info),
- "loop_action_info": convert_to_serializable(self.loop_action_info),
- }
-
- def complete_cycle(self):
- """完成循环,记录结束时间"""
- self.end_time = time.time()
-
- # 处理 prefix,只保留中英文字符和基本标点
- if not self.prefix:
- self.prefix = "group"
- else:
- # 只保留中文、英文字母、数字和基本标点
- allowed_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
- self.prefix = (
- "".join(char for char in self.prefix if "\u4e00" <= char <= "\u9fff" or char in allowed_chars)
- or "group"
- )
-
- current_time_minute = time.strftime("%Y%m%d_%H%M", time.localtime())
- try:
- self.log_cycle_to_file(
- log_dir + self.prefix + f"/{current_time_minute}_cycle_" + str(self.cycle_id) + ".json"
- )
- except Exception as e:
- logger.warning(f"写入文件日志,可能是群名称包含非法字符: {e}")
-
- def log_cycle_to_file(self, file_path: str):
- """将循环信息写入文件"""
- # 如果目录不存在,则创建目
- dir_name = os.path.dirname(file_path)
- # 去除特殊字符,保留字母、数字、下划线、中划线和中文
- dir_name = "".join(
- char for char in dir_name if char.isalnum() or char in ["_", "-", "/"] or "\u4e00" <= char <= "\u9fff"
- )
- # print("dir_name:", dir_name)
- if dir_name and not os.path.exists(dir_name):
- os.makedirs(dir_name, exist_ok=True)
- # 写入文件
-
-
- file_path = os.path.join(dir_name, os.path.basename(file_path))
- # print("file_path:", file_path)
- with open(file_path, "a", encoding="utf-8") as f:
- f.write(json.dumps(self.to_dict(), ensure_ascii=False) + "\n")
-
- def set_thinking_id(self, thinking_id: str):
- """设置思考消息ID"""
- self.thinking_id = thinking_id
-
- def set_loop_info(self, loop_info: Dict[str, Any]):
- """设置循环信息"""
- self.loop_observation_info = loop_info["loop_observation_info"]
- self.loop_processor_info = loop_info["loop_processor_info"]
- self.loop_plan_info = loop_info["loop_plan_info"]
- self.loop_action_info = loop_info["loop_action_info"]
diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py
deleted file mode 100644
index 53e213ac..00000000
--- a/src/chat/focus_chat/heartFC_chat.py
+++ /dev/null
@@ -1,636 +0,0 @@
-import asyncio
-import contextlib
-import time
-import traceback
-from collections import deque
-from typing import List, Optional, Dict, Any, Deque, Callable, Awaitable
-from src.chat.message_receive.chat_stream import ChatStream
-from src.chat.message_receive.chat_stream import chat_manager
-from rich.traceback import install
-from src.chat.utils.prompt_builder import global_prompt_manager
-from src.common.logger_manager import get_logger
-from src.chat.utils.timer_calculator import Timer
-from src.chat.heart_flow.observation.observation import Observation
-from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
-from src.chat.focus_chat.info.info_base import InfoBase
-from src.chat.focus_chat.info_processors.chattinginfo_processor import ChattingInfoProcessor
-from src.chat.focus_chat.info_processors.relationship_processor import RelationshipProcessor
-from src.chat.focus_chat.info_processors.mind_processor import MindProcessor
-from src.chat.focus_chat.info_processors.working_memory_processor import WorkingMemoryProcessor
-
-# from src.chat.focus_chat.info_processors.action_processor import ActionProcessor
-from src.chat.heart_flow.observation.hfcloop_observation import HFCloopObservation
-from src.chat.heart_flow.observation.working_observation import WorkingMemoryObservation
-from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
-from src.chat.heart_flow.observation.structure_observation import StructureObservation
-from src.chat.heart_flow.observation.actions_observation import ActionObservation
-from src.chat.focus_chat.info_processors.tool_processor import ToolProcessor
-from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
-from src.chat.focus_chat.replyer.default_replyer import DefaultReplyer
-from src.chat.focus_chat.memory_activator import MemoryActivator
-from src.chat.focus_chat.info_processors.base_processor import BaseProcessor
-from src.chat.focus_chat.info_processors.self_processor import SelfProcessor
-from src.chat.focus_chat.planners.planner_factory import PlannerFactory
-from src.chat.focus_chat.planners.modify_actions import ActionModifier
-from src.chat.focus_chat.planners.action_manager import ActionManager
-from src.chat.focus_chat.working_memory.working_memory import WorkingMemory
-from src.config.config import global_config
-
-install(extra_lines=3)
-
-# 定义观察器映射:键是观察器名称,值是 (观察器类, 初始化参数)
-OBSERVATION_CLASSES = {
- "ChattingObservation": (ChattingObservation, "chat_id"),
- "WorkingMemoryObservation": (WorkingMemoryObservation, "observe_id"),
- "HFCloopObservation": (HFCloopObservation, "observe_id"),
- "StructureObservation": (StructureObservation, "observe_id"),
-}
-
-# 定义处理器映射:键是处理器名称,值是 (处理器类, 可选的配置键名)
-PROCESSOR_CLASSES = {
- "ChattingInfoProcessor": (ChattingInfoProcessor, None),
- "MindProcessor": (MindProcessor, "mind_processor"),
- "ToolProcessor": (ToolProcessor, "tool_use_processor"),
- "WorkingMemoryProcessor": (WorkingMemoryProcessor, "working_memory_processor"),
- "SelfProcessor": (SelfProcessor, "self_identify_processor"),
- "RelationshipProcessor": (RelationshipProcessor, "relationship_processor"),
-}
-
-logger = get_logger("hfc") # Logger Name Changed
-
-
-async def _handle_cycle_delay(action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str):
- """处理循环延迟"""
- cycle_duration = time.monotonic() - cycle_start_time
-
- try:
- sleep_duration = 0.0
- if not action_taken_this_cycle and cycle_duration < 1:
- sleep_duration = 1 - cycle_duration
- elif cycle_duration < 0.2:
- sleep_duration = 0.2
-
- if sleep_duration > 0:
- await asyncio.sleep(sleep_duration)
-
- except asyncio.CancelledError:
- logger.info(f"{log_prefix} Sleep interrupted, loop likely cancelling.")
- raise
-
-
-class HeartFChatting:
- """
- 管理一个连续的Focus Chat循环
- 用于在特定聊天流中生成回复。
- 其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。
- """
-
- def __init__(
- self,
- chat_id: str,
- on_stop_focus_chat: Optional[Callable[[], Awaitable[None]]] = None,
- ):
- """
- HeartFChatting 初始化函数
-
- 参数:
- chat_id: 聊天流唯一标识符(如stream_id)
- on_stop_focus_chat: 当收到stop_focus_chat命令时调用的回调函数
- """
- # 基础属性
- self.stream_id: str = chat_id # 聊天流ID
- self.chat_stream = chat_manager.get_stream(self.stream_id)
- self.log_prefix = f"[{chat_manager.get_stream_name(self.stream_id) or self.stream_id}]"
-
- self.memory_activator = MemoryActivator()
-
- # 初始化观察器
- self.observations: List[Observation] = []
- self._register_observations()
-
- # 根据配置文件和默认规则确定启用的处理器
- config_processor_settings = global_config.focus_chat_processor
- self.enabled_processor_names = [
- proc_name for proc_name, (_proc_class, config_key) in PROCESSOR_CLASSES.items()
- if not config_key or getattr(config_processor_settings, config_key, True)
- ]
-
- # logger.info(f"{self.log_prefix} 将启用的处理器: {self.enabled_processor_names}")
-
- self.processors: List[BaseProcessor] = []
- self._register_default_processors()
-
- self.expressor = DefaultExpressor(chat_stream=self.chat_stream)
- self.replyer = DefaultReplyer(chat_stream=self.chat_stream)
-
-
- self.action_manager = ActionManager()
- self.action_planner = PlannerFactory.create_planner(
- log_prefix=self.log_prefix, action_manager=self.action_manager
- )
- self.action_modifier = ActionModifier(action_manager=self.action_manager)
- self.action_observation = ActionObservation(observe_id=self.stream_id)
- self.action_observation.set_action_manager(self.action_manager)
-
-
- self._processing_lock = asyncio.Lock()
-
- # 循环控制内部状态
- self._loop_active: bool = False # 循环是否正在运行
- self._loop_task: Optional[asyncio.Task] = None # 主循环任务
-
- # 添加循环信息管理相关的属性
- self._cycle_counter = 0
- self._cycle_history: Deque[CycleDetail] = deque(maxlen=10) # 保留最近10个循环的信息
- self._current_cycle_detail: Optional[CycleDetail] = None
- self._shutting_down: bool = False # 关闭标志位
-
- # 存储回调函数
- self.on_stop_focus_chat = on_stop_focus_chat
-
- def _register_observations(self):
- """注册所有观察器"""
- self.observations = [] # 清空已有的
-
- for name, (observation_class, param_name) in OBSERVATION_CLASSES.items():
- try:
- # 根据参数名使用正确的参数
- kwargs = {param_name: self.stream_id}
- observation = observation_class(**kwargs)
- self.observations.append(observation)
- logger.debug(f"{self.log_prefix} 注册观察器 {name}")
- except Exception as e:
- logger.error(f"{self.log_prefix} 观察器 {name} 构造失败: {e}")
-
- if self.observations:
- logger.info(f"{self.log_prefix} 已注册观察器: {[o.__class__.__name__ for o in self.observations]}")
- else:
- logger.warning(f"{self.log_prefix} 没有注册任何观察器")
-
- def _register_default_processors(self):
- """根据 self.enabled_processor_names 注册信息处理器"""
- self.processors = [] # 清空已有的
-
- for name in self.enabled_processor_names: # 'name' is "ChattingInfoProcessor", etc.
- processor_info = PROCESSOR_CLASSES.get(name) # processor_info is (ProcessorClass, config_key)
- if processor_info:
- processor_actual_class = processor_info[0] # 获取实际的类定义
- # 根据处理器类名判断是否需要 subheartflow_id
- if name in ["MindProcessor", "ToolProcessor", "WorkingMemoryProcessor", "SelfProcessor", "RelationshipProcessor"]:
- self.processors.append(processor_actual_class(subheartflow_id=self.stream_id))
- elif name == "ChattingInfoProcessor":
- self.processors.append(processor_actual_class())
- else:
- # 对于PROCESSOR_CLASSES中定义但此处未明确处理构造的处理器
- # (例如, 新增了一个处理器到PROCESSOR_CLASSES, 它不需要id, 也不叫ChattingInfoProcessor)
- try:
- self.processors.append(processor_actual_class()) # 尝试无参构造
- logger.debug(f"{self.log_prefix} 注册处理器 {name} (尝试无参构造).")
- except TypeError:
- logger.error(
- f"{self.log_prefix} 处理器 {name} 构造失败。它可能需要参数(如 subheartflow_id)但未在注册逻辑中明确处理。"
- )
- else:
- # 这理论上不应该发生,因为 enabled_processor_names 是从 PROCESSOR_CLASSES 的键生成的
- logger.warning(
- f"{self.log_prefix} 在 PROCESSOR_CLASSES 中未找到名为 '{name}' 的处理器定义,将跳过注册。"
- )
-
- if self.processors:
- logger.info(
- f"{self.log_prefix} 已注册处理器: {[p.__class__.__name__ for p in self.processors]}"
- )
- else:
- logger.warning(f"{self.log_prefix} 没有注册任何处理器。这可能是由于配置错误或所有处理器都被禁用了。")
-
- async def start(self):
- """检查是否需要启动主循环,如果未激活则启动。"""
- # 如果循环已经激活,直接返回
- if self._loop_active:
- return
-
- # 标记为活动状态,防止重复启动
- self._loop_active = True
-
- # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False)
- if self._loop_task and not self._loop_task.done():
- logger.warning(f"{self.log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。")
- self._loop_task.cancel()
- try:
- # 等待旧任务确实被取消
- await asyncio.wait_for(self._loop_task, timeout=0.5)
- except (asyncio.CancelledError, asyncio.TimeoutError):
- pass # 忽略取消或超时错误
- self._loop_task = None # 清理旧任务引用
-
- self._loop_task = asyncio.create_task(self._run_focus_chat())
- self._loop_task.add_done_callback(self._handle_loop_completion)
-
- def _handle_loop_completion(self, task: asyncio.Task):
- """当 _hfc_loop 任务完成时执行的回调。"""
- try:
- exception = task.exception()
- if exception:
- logger.error(f"{self.log_prefix} HeartFChatting: 脱离了聊天(异常): {exception}")
- logger.error(traceback.format_exc()) # Log full traceback for exceptions
- else:
- logger.info(f"{self.log_prefix} HeartFChatting: 脱离了聊天 (外部停止)")
- except asyncio.CancelledError:
- logger.info(f"{self.log_prefix} HeartFChatting: 脱离了聊天(任务取消)")
- finally:
- self._loop_active = False
- self._loop_task = None
- if self._processing_lock.locked():
- logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。")
- self._processing_lock.release()
-
- async def _run_focus_chat(self):
- """主循环,持续进行计划并可能回复消息,直到被外部取消。"""
- try:
- while True: # 主循环
- logger.debug(f"{self.log_prefix} 开始第{self._cycle_counter}次循环")
- if self._shutting_down:
- logger.info(f"{self.log_prefix} 检测到关闭标志,退出 Focus Chat 循环。")
- break
-
- # 创建新的循环信息
- self._cycle_counter += 1
- self._current_cycle_detail = CycleDetail(self._cycle_counter)
- self._current_cycle_detail.prefix = self.log_prefix
-
- # 初始化周期状态
- cycle_timers = {}
- loop_cycle_start_time = time.monotonic()
-
- # 执行规划和处理阶段
- async with self._get_cycle_context():
- thinking_id = "tid" + str(round(time.time(), 2))
- self._current_cycle_detail.set_thinking_id(thinking_id)
- # 主循环:思考->决策->执行
- async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
- logger.debug(f"模板 {self.chat_stream.context.get_template_name()}")
- loop_info = await self._observe_process_plan_action_loop(cycle_timers, thinking_id)
-
- if loop_info["loop_action_info"]["command"] == "stop_focus_chat":
- logger.info(f"{self.log_prefix} 麦麦决定停止专注聊天")
- # 如果设置了回调函数,则调用它
- if self.on_stop_focus_chat:
- try:
- await self.on_stop_focus_chat()
- logger.info(f"{self.log_prefix} 成功调用回调函数处理停止专注聊天")
- except Exception as e:
- logger.error(f"{self.log_prefix} 调用停止专注聊天回调函数时出错: {e}")
- logger.error(traceback.format_exc())
- break
-
- self._current_cycle_detail.set_loop_info(loop_info)
-
- # 从observations列表中获取HFCloopObservation
- hfcloop_observation = next((obs for obs in self.observations if isinstance(obs, HFCloopObservation)), None)
- if hfcloop_observation:
- hfcloop_observation.add_loop_info(self._current_cycle_detail)
- else:
- logger.warning(f"{self.log_prefix} 未找到HFCloopObservation实例")
-
- self._current_cycle_detail.timers = cycle_timers
-
- # 防止循环过快消耗资源
- await _handle_cycle_delay(
- loop_info["loop_action_info"]["action_taken"], loop_cycle_start_time, self.log_prefix
- )
-
- # 完成当前循环并保存历史
- self._current_cycle_detail.complete_cycle()
- self._cycle_history.append(self._current_cycle_detail)
-
- # 记录循环信息和计时器结果
- timer_strings = []
- for name, elapsed in cycle_timers.items():
- formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
- timer_strings.append(f"{name}: {formatted_time}")
-
- # 新增:输出每个处理器的耗时
- processor_time_costs = self._current_cycle_detail.loop_processor_info.get("processor_time_costs", {})
- processor_time_strings = []
- for pname, ptime in processor_time_costs.items():
- formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}秒"
- processor_time_strings.append(f"{pname}: {formatted_ptime}")
- processor_time_log = (
- ("\n各处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else ""
- )
-
- logger.info(
- f"{self.log_prefix} 第{self._current_cycle_detail.cycle_id}次思考,"
- f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, "
- f"动作: {self._current_cycle_detail.loop_plan_info['action_result']['action_type']}"
- + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
- + processor_time_log
- )
-
- await asyncio.sleep(global_config.focus_chat.think_interval)
-
- except asyncio.CancelledError:
- # 设置了关闭标志位后被取消是正常流程
- if not self._shutting_down:
- logger.warning(f"{self.log_prefix} 麦麦Focus聊天模式意外被取消")
- else:
- logger.info(f"{self.log_prefix} 麦麦已离开Focus聊天模式")
- except Exception as e:
- logger.error(f"{self.log_prefix} 麦麦Focus聊天模式意外错误: {e}")
- print(traceback.format_exc())
-
- @contextlib.asynccontextmanager
- async def _get_cycle_context(self):
- """
- 循环周期的上下文管理器
-
- 用于确保资源的正确获取和释放:
- 1. 获取处理锁
- 2. 执行操作
- 3. 释放锁
- """
- acquired = False
- try:
- await self._processing_lock.acquire()
- acquired = True
- yield acquired
- finally:
- if acquired and self._processing_lock.locked():
- self._processing_lock.release()
-
- async def _process_processors(
- self, observations: List[Observation], running_memorys: List[Dict[str, Any]]
- ) -> tuple[List[InfoBase], Dict[str, float]]:
- # 记录并行任务开始时间
- parallel_start_time = time.time()
- logger.debug(f"{self.log_prefix} 开始信息处理器并行任务")
-
- processor_tasks = []
- task_to_name_map = {}
- processor_time_costs = {} # 新增: 记录每个处理器耗时
-
- for processor in self.processors:
- processor_name = processor.__class__.log_prefix
-
- async def run_with_timeout(proc=processor):
- return await asyncio.wait_for(
- proc.process_info(observations=observations, running_memorys=running_memorys),
- timeout=global_config.focus_chat.processor_max_time,
- )
-
- task = asyncio.create_task(run_with_timeout())
- processor_tasks.append(task)
- task_to_name_map[task] = processor_name
- logger.debug(f"{self.log_prefix} 启动处理器任务: {processor_name}")
-
- pending_tasks = set(processor_tasks)
- all_plan_info: List[InfoBase] = []
-
- while pending_tasks:
- done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
-
- for task in done:
- processor_name = task_to_name_map[task]
- task_completed_time = time.time()
- duration_since_parallel_start = task_completed_time - parallel_start_time
-
- try:
- result_list = await task
- logger.info(f"{self.log_prefix} 处理器 {processor_name} 已完成!")
- if result_list is not None:
- all_plan_info.extend(result_list)
- else:
- logger.warning(f"{self.log_prefix} 处理器 {processor_name} 返回了 None")
- # 记录耗时
- processor_time_costs[processor_name] = duration_since_parallel_start
- except asyncio.TimeoutError:
- logger.info(
- f"{self.log_prefix} 处理器 {processor_name} 超时(>{global_config.focus_chat.processor_max_time}s),已跳过"
- )
- processor_time_costs[processor_name] = global_config.focus_chat.processor_max_time
- except Exception as e:
- logger.error(
- f"{self.log_prefix} 处理器 {processor_name} 执行失败,耗时 (自并行开始): {duration_since_parallel_start:.2f}秒. 错误: {e}",
- exc_info=True,
- )
- traceback.print_exc()
- processor_time_costs[processor_name] = duration_since_parallel_start
-
- if pending_tasks:
- current_progress_time = time.time()
- elapsed_for_log = current_progress_time - parallel_start_time
- pending_names_for_log = [task_to_name_map[t] for t in pending_tasks]
- logger.info(
- f"{self.log_prefix} 信息处理已进行 {elapsed_for_log:.2f}秒,待完成任务: {', '.join(pending_names_for_log)}"
- )
-
- # 所有任务完成后的最终日志
- parallel_end_time = time.time()
- total_duration = parallel_end_time - parallel_start_time
- logger.info(f"{self.log_prefix} 所有处理器任务全部完成,总耗时: {total_duration:.2f}秒")
- # logger.debug(f"{self.log_prefix} 所有信息处理器处理后的信息: {all_plan_info}")
-
- return all_plan_info, processor_time_costs
-
- async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict:
- try:
- with Timer("观察", cycle_timers):
- # 执行所有观察器的观察
- for observation in self.observations:
- await observation.observe()
-
- loop_observation_info = {
- "observations": self.observations,
- }
-
- with Timer("调整动作", cycle_timers):
- # 处理特殊的观察
- await self.action_modifier.modify_actions(observations=self.observations)
- await self.action_observation.observe()
- self.observations.append(self.action_observation)
-
- # 根据配置决定是否并行执行回忆和处理器阶段
- # print(global_config.focus_chat.parallel_processing)
- if global_config.focus_chat.parallel_processing:
- # 并行执行回忆和处理器阶段
- with Timer("并行回忆和处理", cycle_timers):
- memory_task = asyncio.create_task(self.memory_activator.activate_memory(self.observations))
- processor_task = asyncio.create_task(self._process_processors(self.observations, []))
-
- # 等待两个任务完成
- running_memorys, (all_plan_info, processor_time_costs) = await asyncio.gather(
- memory_task, processor_task
- )
- else:
- # 串行执行
- with Timer("回忆", cycle_timers):
- running_memorys = await self.memory_activator.activate_memory(self.observations)
-
- with Timer("执行 信息处理器", cycle_timers):
- all_plan_info, processor_time_costs = await self._process_processors(self.observations, running_memorys)
-
- loop_processor_info = {
- "all_plan_info": all_plan_info,
- "processor_time_costs": processor_time_costs,
- }
-
- with Timer("规划器", cycle_timers):
- plan_result = await self.action_planner.plan(all_plan_info, running_memorys)
-
- loop_plan_info = {
- "action_result": plan_result.get("action_result", {}),
- "current_mind": plan_result.get("current_mind", ""),
- "observed_messages": plan_result.get("observed_messages", ""),
- }
-
- with Timer("执行动作", cycle_timers):
- action_type, action_data, reasoning = (
- plan_result.get("action_result", {}).get("action_type", "error"),
- plan_result.get("action_result", {}).get("action_data", {}),
- plan_result.get("action_result", {}).get("reasoning", "未提供理由"),
- )
-
- if action_type == "reply":
- action_str = "回复"
- elif action_type == "no_reply":
- action_str = "不回复"
- else:
- action_str = action_type
-
- logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'")
-
- success, reply_text, command = await self._handle_action(
- action_type, reasoning, action_data, cycle_timers, thinking_id, self.observations
- )
-
- loop_action_info = {
- "action_taken": success,
- "reply_text": reply_text,
- "command": command,
- "taken_time": time.time(),
- }
-
- loop_info = {
- "loop_observation_info": loop_observation_info,
- "loop_processor_info": loop_processor_info,
- "loop_plan_info": loop_plan_info,
- "loop_action_info": loop_action_info,
- }
-
- return loop_info
-
- except Exception as e:
- logger.error(f"{self.log_prefix} FOCUS聊天处理失败: {e}")
- logger.error(traceback.format_exc())
- return {
- "loop_observation_info": {},
- "loop_processor_info": {},
- "loop_plan_info": {},
- "loop_action_info": {"action_taken": False, "reply_text": "", "command": ""},
- }
-
- async def _handle_action(
- self,
- action: str,
- reasoning: str,
- action_data: dict,
- cycle_timers: dict,
- thinking_id: str,
- observations: List[Observation],
- ) -> tuple[bool, str, str]:
- """
- 处理规划动作,使用动作工厂创建相应的动作处理器
-
- 参数:
- action: 动作类型
- reasoning: 决策理由
- action_data: 动作数据,包含不同动作需要的参数
- cycle_timers: 计时器字典
- thinking_id: 思考ID
-
- 返回:
- tuple[bool, str, str]: (是否执行了动作, 思考消息ID, 命令)
- """
- try:
- # 使用工厂创建动作处理器实例
- try:
- action_handler = self.action_manager.create_action(
- action_name=action,
- action_data=action_data,
- reasoning=reasoning,
- cycle_timers=cycle_timers,
- thinking_id=thinking_id,
- observations=observations,
- expressor=self.expressor,
- replyer=self.replyer,
- chat_stream=self.chat_stream,
- log_prefix=self.log_prefix,
- shutting_down=self._shutting_down,
- )
- except Exception as e:
- logger.error(f"{self.log_prefix} 创建动作处理器时出错: {e}")
- traceback.print_exc()
- return False, "", ""
-
- if not action_handler:
- logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}, 原因: {reasoning}")
- return False, "", ""
-
- # 处理动作并获取结果
- result = await action_handler.handle_action()
- if len(result) == 3:
- success, reply_text, command = result
- else:
- success, reply_text = result
- command = ""
- logger.debug(
- f"{self.log_prefix} 麦麦执行了'{action}', 返回结果'{success}', '{reply_text}', '{command}'"
- )
- return success, reply_text, command
-
- except Exception as e:
- logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
- traceback.print_exc()
- return False, "", ""
-
- async def shutdown(self):
- """优雅关闭HeartFChatting实例,取消活动循环任务"""
- logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...")
- self._shutting_down = True # <-- 在开始关闭时设置标志位
-
- # 取消循环任务
- if self._loop_task and not self._loop_task.done():
- logger.info(f"{self.log_prefix} 正在取消HeartFChatting循环任务")
- self._loop_task.cancel()
- try:
- await asyncio.wait_for(self._loop_task, timeout=1.0)
- logger.info(f"{self.log_prefix} HeartFChatting循环任务已取消")
- except (asyncio.CancelledError, asyncio.TimeoutError):
- pass
- except Exception as e:
- logger.error(f"{self.log_prefix} 取消循环任务出错: {e}")
- else:
- logger.info(f"{self.log_prefix} 没有活动的HeartFChatting循环任务")
-
- # 清理状态
- self._loop_active = False
- self._loop_task = None
- if self._processing_lock.locked():
- self._processing_lock.release()
- logger.warning(f"{self.log_prefix} 已释放处理锁")
-
- logger.info(f"{self.log_prefix} HeartFChatting关闭完成")
-
- def get_cycle_history(self, last_n: Optional[int] = None) -> List[Dict[str, Any]]:
- """获取循环历史记录
-
- 参数:
- last_n: 获取最近n个循环的信息,如果为None则获取所有历史记录
-
- 返回:
- List[Dict[str, Any]]: 循环历史记录列表
- """
- history = list(self._cycle_history)
- if last_n is not None:
- history = history[-last_n:]
- return [cycle.to_dict() for cycle in history]
diff --git a/src/chat/heart_flow/observation/chatting_observation.py b/src/chat/heart_flow/observation/chatting_observation.py
deleted file mode 100644
index eeb7ee7f..00000000
--- a/src/chat/heart_flow/observation/chatting_observation.py
+++ /dev/null
@@ -1,290 +0,0 @@
-from datetime import datetime
-from src.config.config import global_config
-import traceback
-from src.chat.utils.chat_message_builder import (
- get_raw_msg_before_timestamp_with_chat,
- build_readable_messages,
- get_raw_msg_by_timestamp_with_chat,
- num_new_messages_since,
- get_person_id_list,
-)
-from src.chat.utils.prompt_builder import global_prompt_manager
-from typing import Optional
-import difflib
-from src.chat.message_receive.message import MessageRecv # 添加 MessageRecv 导入
-from src.chat.heart_flow.observation.observation import Observation
-
-from src.common.logger_manager import get_logger
-from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
-from src.chat.utils.prompt_builder import Prompt
-
-
-logger = get_logger("observation")
-
-
-Prompt(
- """这是qq群聊的聊天记录,请总结以下聊天记录的主题:
-{chat_logs}
-请用一句话概括,包括人物、事件和主要信息,不要分点。""",
- "chat_summary_group_prompt", # Template for group chat
-)
-
-Prompt(
- """这是你和{chat_target}的私聊记录,请总结以下聊天记录的主题:
-{chat_logs}
-请用一句话概括,包括事件,时间,和主要信息,不要分点。""",
- "chat_summary_private_prompt", # Template for private chat
-)
-# --- End Prompt Template Definition ---
-
-
-# 聊天观察
-class ChattingObservation(Observation):
- def __init__(self, chat_id):
- super().__init__(chat_id)
- self.chat_id = chat_id
- self.platform = "qq"
-
- self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_id)
-
- # --- Other attributes initialized in __init__ ---
- self.talking_message = []
- self.talking_message_str = ""
- self.talking_message_str_truncate = ""
- self.name = global_config.bot.nickname
- self.nick_name = global_config.bot.alias_names
- self.max_now_obs_len = global_config.focus_chat.observation_context_size
- self.overlap_len = global_config.focus_chat.compressed_length
- self.mid_memories = []
- self.max_mid_memory_len = global_config.focus_chat.compress_length_limit
- self.mid_memory_info = ""
- self.person_list = []
- self.oldest_messages = []
- self.oldest_messages_str = ""
- self.compressor_prompt = ""
-
- initial_messages = get_raw_msg_before_timestamp_with_chat(self.chat_id, self.last_observe_time, 10)
- self.last_observe_time = initial_messages[-1]["time"] if initial_messages else self.last_observe_time
- self.talking_message = initial_messages
- self.talking_message_str = build_readable_messages(self.talking_message)
-
-
- def to_dict(self) -> dict:
- """将观察对象转换为可序列化的字典"""
- return {
- "chat_id": self.chat_id,
- "platform": self.platform,
- "is_group_chat": self.is_group_chat,
- "chat_target_info": self.chat_target_info,
- "talking_message_str": self.talking_message_str,
- "talking_message_str_truncate": self.talking_message_str_truncate,
- "name": self.name,
- "nick_name": self.nick_name,
- "mid_memory_info": self.mid_memory_info,
- "person_list": self.person_list,
- "oldest_messages_str": self.oldest_messages_str,
- "compressor_prompt": self.compressor_prompt,
- "last_observe_time": self.last_observe_time,
- }
-
- # 进行一次观察 返回观察结果observe_info
- def get_observe_info(self, ids=None):
- mid_memory_str = ""
- if ids:
- for id in ids:
- print(f"id:{id}")
- try:
- for mid_memory in self.mid_memories:
- if mid_memory["id"] == id:
- mid_memory_by_id = mid_memory
- msg_str = ""
- for msg in mid_memory_by_id["messages"]:
- msg_str += f"{msg['detailed_plain_text']}"
- # time_diff = int((datetime.now().timestamp() - mid_memory_by_id["created_at"]) / 60)
- # mid_memory_str += f"距离现在{time_diff}分钟前:\n{msg_str}\n"
- mid_memory_str += f"{msg_str}\n"
- except Exception as e:
- logger.error(f"获取mid_memory_id失败: {e}")
- traceback.print_exc()
- return self.talking_message_str
-
- return mid_memory_str + "现在群里正在聊:\n" + self.talking_message_str
-
- else:
- mid_memory_str = "之前的聊天内容:\n"
- for mid_memory in self.mid_memories:
- mid_memory_str += f"{mid_memory['theme']}\n"
- return mid_memory_str + "现在群里正在聊:\n" + self.talking_message_str
-
- def search_message_by_text(self, text: str) -> Optional[MessageRecv]:
- """
- 根据回复的纯文本
- 1. 在talking_message中查找最新的,最匹配的消息
- 2. 如果找到,则返回消息
- """
- msg_list = []
- find_msg = None
- reverse_talking_message = list(reversed(self.talking_message))
-
- for message in reverse_talking_message:
- if message["processed_plain_text"] == text:
- find_msg = message
- # logger.debug(f"找到的锚定消息:find_msg: {find_msg}")
- break
- else:
- similarity = difflib.SequenceMatcher(None, text, message["processed_plain_text"]).ratio()
- msg_list.append({"message": message, "similarity": similarity})
- # logger.debug(f"对锚定消息检查:message: {message['processed_plain_text']},similarity: {similarity}")
- if not find_msg:
- if msg_list:
- msg_list.sort(key=lambda x: x["similarity"], reverse=True)
- if msg_list[0]["similarity"] >= 0.5: # 只返回相似度大于等于0.5的消息
- find_msg = msg_list[0]["message"]
- else:
- logger.debug("没有找到锚定消息,相似度低")
- return None
- else:
- logger.debug("没有找到锚定消息,没有消息捕获")
- return None
-
- # logger.debug(f"找到的锚定消息:find_msg: {find_msg}")
-
- # 创建所需的user_info字段
- user_info = {
- "platform": find_msg.get("user_platform", ""),
- "user_id": find_msg.get("user_id", ""),
- "user_nickname": find_msg.get("user_nickname", ""),
- "user_cardname": find_msg.get("user_cardname", ""),
- }
-
- # 创建所需的group_info字段,如果是群聊的话
- group_info = {}
- if find_msg.get("chat_info_group_id"):
- group_info = {
- "platform": find_msg.get("chat_info_group_platform", ""),
- "group_id": find_msg.get("chat_info_group_id", ""),
- "group_name": find_msg.get("chat_info_group_name", ""),
- }
-
- content_format = ""
- accept_format = ""
- template_items = {}
-
- format_info = {"content_format": content_format, "accept_format": accept_format}
- template_info = {
- "template_items": template_items,
- }
-
- message_info = {
- "platform": self.platform,
- "message_id": find_msg.get("message_id"),
- "time": find_msg.get("time"),
- "group_info": group_info,
- "user_info": user_info,
- "additional_config": find_msg.get("additional_config"),
- "format_info": format_info,
- "template_info": template_info,
- }
- message_dict = {
- "message_info": message_info,
- "raw_message": find_msg.get("processed_plain_text"),
- "detailed_plain_text": find_msg.get("processed_plain_text"),
- "processed_plain_text": find_msg.get("processed_plain_text"),
- }
- find_rec_msg = MessageRecv(message_dict)
- # logger.debug(f"锚定消息处理后:find_rec_msg: {find_rec_msg}")
- return find_rec_msg
-
- async def observe(self):
- # 自上一次观察的新消息
- new_messages_list = get_raw_msg_by_timestamp_with_chat(
- chat_id=self.chat_id,
- timestamp_start=self.last_observe_time,
- timestamp_end=datetime.now().timestamp(),
- limit=self.max_now_obs_len,
- limit_mode="latest",
- )
-
- # print(f"new_messages_list: {new_messages_list}")
-
- last_obs_time_mark = self.last_observe_time
- if new_messages_list:
- self.last_observe_time = new_messages_list[-1]["time"]
- self.talking_message.extend(new_messages_list)
-
- if len(self.talking_message) > self.max_now_obs_len:
- # 计算需要移除的消息数量,保留最新的 max_now_obs_len 条
- messages_to_remove_count = len(self.talking_message) - self.max_now_obs_len
- oldest_messages = self.talking_message[:messages_to_remove_count]
- self.talking_message = self.talking_message[messages_to_remove_count:] # 保留后半部分,即最新的
-
- # print(f"压缩中:oldest_messages: {oldest_messages}")
- oldest_messages_str = build_readable_messages(
- messages=oldest_messages, timestamp_mode="normal_no_YMD", read_mark=0
- )
-
- # --- Build prompt using template ---
- prompt = None # Initialize prompt as None
- try:
- # 构建 Prompt - 根据 is_group_chat 选择模板
- if self.is_group_chat:
- prompt_template_name = "chat_summary_group_prompt"
- prompt = await global_prompt_manager.format_prompt(
- prompt_template_name, chat_logs=oldest_messages_str
- )
- else:
- # For private chat, add chat_target to the prompt variables
- prompt_template_name = "chat_summary_private_prompt"
- # Determine the target name for the prompt
- chat_target_name = "对方" # Default fallback
- if self.chat_target_info:
- # Prioritize person_name, then nickname
- chat_target_name = (
- self.chat_target_info.get("person_name")
- or self.chat_target_info.get("user_nickname")
- or chat_target_name
- )
-
- # Format the private chat prompt
- prompt = await global_prompt_manager.format_prompt(
- prompt_template_name,
- # Assuming the private prompt template uses {chat_target}
- chat_target=chat_target_name,
- chat_logs=oldest_messages_str,
- )
- except Exception as e:
- logger.error(f"构建总结 Prompt 失败 for chat {self.chat_id}: {e}")
- # prompt remains None
-
- if prompt: # Check if prompt was built successfully
- self.compressor_prompt = prompt
- self.oldest_messages = oldest_messages
- self.oldest_messages_str = oldest_messages_str
-
- # 构建中
- # print(f"构建中:self.talking_message: {self.talking_message}")
- self.talking_message_str = build_readable_messages(
- messages=self.talking_message,
- timestamp_mode="lite",
- read_mark=last_obs_time_mark,
- )
- # print(f"构建中:self.talking_message_str: {self.talking_message_str}")
- self.talking_message_str_truncate = build_readable_messages(
- messages=self.talking_message,
- timestamp_mode="normal_no_YMD",
- read_mark=last_obs_time_mark,
- truncate=True,
- )
- # print(f"构建中:self.talking_message_str_truncate: {self.talking_message_str_truncate}")
-
- self.person_list = await get_person_id_list(self.talking_message)
- # print(f"构建中:self.person_list: {self.person_list}")
-
- logger.trace(
- f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.talking_message_str}"
- )
-
- async def has_new_messages_since(self, timestamp: float) -> bool:
- """检查指定时间戳之后是否有新消息"""
- count = num_new_messages_since(chat_id=self.chat_id, timestamp_start=timestamp)
- return count > 0
diff --git a/src/chat/heart_flow/observation/hfcloop_observation.py b/src/chat/heart_flow/observation/hfcloop_observation.py
deleted file mode 100644
index 02617cba..00000000
--- a/src/chat/heart_flow/observation/hfcloop_observation.py
+++ /dev/null
@@ -1,125 +0,0 @@
-# 定义了来自外部世界的信息
-# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体
-from datetime import datetime
-from src.common.logger_manager import get_logger
-from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
-from typing import List
-# Import the new utility function
-
-logger = get_logger("observation")
-
-
-# 所有观察的基类
-class HFCloopObservation:
- def __init__(self, observe_id):
- self.observe_info = ""
- self.observe_id = observe_id
- self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间
- self.history_loop: List[CycleDetail] = []
-
- def get_observe_info(self):
- return self.observe_info
-
- def add_loop_info(self, loop_info: CycleDetail):
- self.history_loop.append(loop_info)
-
- async def observe(self):
- recent_active_cycles: List[CycleDetail] = []
- for cycle in reversed(self.history_loop):
- # 只关心实际执行了动作的循环
- # action_taken = cycle.loop_action_info["action_taken"]
- # if action_taken:
- recent_active_cycles.append(cycle)
- if len(recent_active_cycles) == 5:
- break
-
- cycle_info_block = ""
- action_detailed_str = ""
- consecutive_text_replies = 0
- responses_for_prompt = []
-
- cycle_last_reason = ""
-
- # 检查这最近的活动循环中有多少是连续的文本回复 (从最近的开始看)
- for cycle in recent_active_cycles:
- action_type = cycle.loop_plan_info["action_result"]["action_type"]
- action_reasoning = cycle.loop_plan_info["action_result"]["reasoning"]
- is_taken = cycle.loop_action_info["action_taken"]
- action_taken_time = cycle.loop_action_info["taken_time"]
- action_taken_time_str = datetime.fromtimestamp(action_taken_time).strftime("%H:%M:%S")
- # print(action_type)
- # print(action_reasoning)
- # print(is_taken)
- # print(action_taken_time_str)
- # print("--------------------------------")
- if action_reasoning != cycle_last_reason:
- cycle_last_reason = action_reasoning
- action_reasoning_str = f"你选择这个action的原因是:{action_reasoning}"
- else:
- action_reasoning_str = ""
-
- if action_type == "reply":
- consecutive_text_replies += 1
- response_text = cycle.loop_action_info["reply_text"]
- responses_for_prompt.append(response_text)
-
- if is_taken:
- action_detailed_str += f"{action_taken_time_str}时,你选择回复(action:{action_type},内容是:'{response_text}')。{action_reasoning_str}\n"
- else:
- action_detailed_str += f"{action_taken_time_str}时,你选择回复(action:{action_type},内容是:'{response_text}'),但是动作失败了。{action_reasoning_str}\n"
- elif action_type == "no_reply":
- # action_detailed_str += (
- # f"{action_taken_time_str}时,你选择不回复(action:{action_type}),{action_reasoning_str}\n"
- # )
- pass
- else:
- if is_taken:
- action_detailed_str += (
- f"{action_taken_time_str}时,你选择执行了(action:{action_type}),{action_reasoning_str}\n"
- )
- else:
- action_detailed_str += f"{action_taken_time_str}时,你选择执行了(action:{action_type}),但是动作失败了。{action_reasoning_str}\n"
-
- if action_detailed_str:
- cycle_info_block = f"\n你最近做的事:\n{action_detailed_str}\n"
- else:
- cycle_info_block = "\n"
-
- # 根据连续文本回复的数量构建提示信息
- if consecutive_text_replies >= 3: # 如果最近的三个活动都是文本回复
- cycle_info_block = f'你已经连续回复了三条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}",第三近: "{responses_for_prompt[2]}")。你回复的有点多了,请注意'
- elif consecutive_text_replies == 2: # 如果最近的两个活动是文本回复
- cycle_info_block = f'你已经连续回复了两条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}"),请注意'
-
- # 包装提示块,增加可读性,即使没有连续回复也给个标记
- # if cycle_info_block:
- # cycle_info_block = f"\n你最近的回复\n{cycle_info_block}\n"
- # else:
- # cycle_info_block = "\n"
-
- # 获取history_loop中最新添加的
- if self.history_loop:
- last_loop = self.history_loop[0]
- start_time = last_loop.start_time
- end_time = last_loop.end_time
- if start_time is not None and end_time is not None:
- time_diff = int(end_time - start_time)
- if time_diff > 60:
- cycle_info_block += f"距离你上一次阅读消息并思考和规划,已经过去了{int(time_diff / 60)}分钟\n"
- else:
- cycle_info_block += f"距离你上一次阅读消息并思考和规划,已经过去了{time_diff}秒\n"
- else:
- cycle_info_block += "你还没看过消息\n"
-
- self.observe_info = cycle_info_block
-
- def to_dict(self) -> dict:
- """将观察对象转换为可序列化的字典"""
- # 只序列化基本信息,避免循环引用
- return {
- "observe_info": self.observe_info,
- "observe_id": self.observe_id,
- "last_observe_time": self.last_observe_time,
- # 不序列化history_loop,避免循环引用
- "history_loop_count": len(self.history_loop),
- }
diff --git a/src/config/config.py b/src/config/config.py
deleted file mode 100644
index a4c18109..00000000
--- a/src/config/config.py
+++ /dev/null
@@ -1,190 +0,0 @@
-import os
-from dataclasses import field, dataclass
-
-import tomlkit
-import shutil
-from datetime import datetime
-
-from tomlkit import TOMLDocument
-from tomlkit.items import Table
-
-from src.common.logger_manager import get_logger
-from rich.traceback import install
-
-from src.config.config_base import ConfigBase
-from src.config.official_configs import (
- BotConfig,
- PersonalityConfig,
- IdentityConfig,
- ExpressionConfig,
- ChatConfig,
- NormalChatConfig,
- FocusChatConfig,
- EmojiConfig,
- MemoryConfig,
- MoodConfig,
- KeywordReactionConfig,
- ChineseTypoConfig,
- ResponseSplitterConfig,
- TelemetryConfig,
- ExperimentalConfig,
- ModelConfig,
- FocusChatProcessorConfig,
- MessageReceiveConfig,
- MaimMessageConfig,
- RelationshipConfig,
-)
-
-install(extra_lines=3)
-
-
-# 配置主程序日志格式
-logger = get_logger("config")
-
-CONFIG_DIR = "config"
-TEMPLATE_DIR = "template"
-
-# 考虑到,实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码
-# 对该字段的更新,请严格参照语义化版本规范:https://semver.org/lang/zh-CN/
-MMC_VERSION = "0.7.1-snapshot.1"
-
-
-def update_config():
- # 获取根目录路径
- old_config_dir = f"{CONFIG_DIR}/old"
-
- # 定义文件路径
- template_path = f"{TEMPLATE_DIR}/bot_config_template.toml"
- old_config_path = f"{CONFIG_DIR}/bot_config.toml"
- new_config_path = f"{CONFIG_DIR}/bot_config.toml"
-
- # 检查配置文件是否存在
- if not os.path.exists(old_config_path):
- logger.info("配置文件不存在,从模板创建新配置")
- os.makedirs(CONFIG_DIR, exist_ok=True) # 创建文件夹
- shutil.copy2(template_path, old_config_path) # 复制模板文件
- logger.info(f"已创建新配置文件,请填写后重新运行: {old_config_path}")
- # 如果是新创建的配置文件,直接返回
- quit()
-
- # 读取旧配置文件和模板文件
- with open(old_config_path, "r", encoding="utf-8") as f:
- old_config = tomlkit.load(f)
- with open(template_path, "r", encoding="utf-8") as f:
- new_config = tomlkit.load(f)
-
- # 检查version是否相同
- if old_config and "inner" in old_config and "inner" in new_config:
- old_version = old_config["inner"].get("version")
- new_version = new_config["inner"].get("version")
- if old_version and new_version and old_version == new_version:
- logger.info(f"检测到配置文件版本号相同 (v{old_version}),跳过更新")
- return
- else:
- logger.info(f"检测到版本号不同: 旧版本 v{old_version} -> 新版本 v{new_version}")
- else:
- logger.info("已有配置文件未检测到版本号,可能是旧版本。将进行更新")
-
- # 创建old目录(如果不存在)
- os.makedirs(old_config_dir, exist_ok=True)
-
- # 生成带时间戳的新文件名
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- old_backup_path = f"{old_config_dir}/bot_config_{timestamp}.toml"
-
- # 移动旧配置文件到old目录
- shutil.move(old_config_path, old_backup_path)
- logger.info(f"已备份旧配置文件到: {old_backup_path}")
-
- # 复制模板文件到配置目录
- shutil.copy2(template_path, new_config_path)
- logger.info(f"已创建新配置文件: {new_config_path}")
-
- def update_dict(target: TOMLDocument | dict, source: TOMLDocument | dict):
- """
- 将source字典的值更新到target字典中(如果target中存在相同的键)
- """
- for key, value in source.items():
- # 跳过version字段的更新
- if key == "version":
- continue
- if key in target:
- if isinstance(value, dict) and isinstance(target[key], (dict, Table)):
- update_dict(target[key], value)
- else:
- try:
- # 对数组类型进行特殊处理
- if isinstance(value, list):
- # 如果是空数组,确保它保持为空数组
- target[key] = tomlkit.array(str(value)) if value else tomlkit.array()
- else:
- # 其他类型使用item方法创建新值
- target[key] = tomlkit.item(value)
- except (TypeError, ValueError):
- # 如果转换失败,直接赋值
- target[key] = value
-
- # 将旧配置的值更新到新配置中
- logger.info("开始合并新旧配置...")
- update_dict(new_config, old_config)
-
- # 保存更新后的配置(保留注释和格式)
- with open(new_config_path, "w", encoding="utf-8") as f:
- f.write(tomlkit.dumps(new_config))
- logger.info("配置文件更新完成,建议检查新配置文件中的内容,以免丢失重要信息")
- quit()
-
-
-@dataclass
-class Config(ConfigBase):
- """总配置类"""
-
- MMC_VERSION: str = field(default=MMC_VERSION, repr=False, init=False) # 硬编码的版本信息
-
- bot: BotConfig
- personality: PersonalityConfig
- identity: IdentityConfig
- relationship: RelationshipConfig
- chat: ChatConfig
- message_receive: MessageReceiveConfig
- normal_chat: NormalChatConfig
- focus_chat: FocusChatConfig
- focus_chat_processor: FocusChatProcessorConfig
- emoji: EmojiConfig
- expression: ExpressionConfig
- memory: MemoryConfig
- mood: MoodConfig
- keyword_reaction: KeywordReactionConfig
- chinese_typo: ChineseTypoConfig
- response_splitter: ResponseSplitterConfig
- telemetry: TelemetryConfig
- experimental: ExperimentalConfig
- model: ModelConfig
- maim_message: MaimMessageConfig
-
-
-def load_config(config_path: str) -> Config:
- """
- 加载配置文件
- :param config_path: 配置文件路径
- :return: Config对象
- """
- # 读取配置文件
- with open(config_path, "r", encoding="utf-8") as f:
- config_data = tomlkit.load(f)
-
- # 创建Config对象
- try:
- return Config.from_dict(config_data)
- except Exception as e:
- logger.critical("配置文件解析失败")
- raise e
-
-
-# 获取配置文件路径
-logger.info(f"MaiCore当前版本: {MMC_VERSION}")
-update_config()
-
-logger.info("正在品鉴配置文件...")
-global_config = load_config(config_path=f"{CONFIG_DIR}/bot_config.toml")
-logger.info("非常的新鲜,非常的美味!")
diff --git a/src/config/official_configs.py b/src/config/official_configs.py
deleted file mode 100644
index a0b32075..00000000
--- a/src/config/official_configs.py
+++ /dev/null
@@ -1,468 +0,0 @@
-from dataclasses import dataclass, field
-from typing import Any, Literal
-import re
-
-from src.config.config_base import ConfigBase
-
-"""
-须知:
-1. 本文件中记录了所有的配置项
-2. 所有新增的class都需要继承自ConfigBase
-3. 所有新增的class都应在config.py中的Config类中添加字段
-4. 对于新增的字段,若为可选项,则应在其后添加field()并设置default_factory或default
-"""
-
-
-@dataclass
-class BotConfig(ConfigBase):
- """QQ机器人配置类"""
-
- qq_account: str
- """QQ账号"""
-
- nickname: str
- """昵称"""
-
- alias_names: list[str] = field(default_factory=lambda: [])
- """别名列表"""
-
-
-@dataclass
-class PersonalityConfig(ConfigBase):
- """人格配置类"""
-
- personality_core: str
- """核心人格"""
-
- personality_sides: list[str] = field(default_factory=lambda: [])
- """人格侧写"""
-
-
-@dataclass
-class IdentityConfig(ConfigBase):
- """个体特征配置类"""
-
- identity_detail: list[str] = field(default_factory=lambda: [])
- """身份特征"""
-
-
-@dataclass
-class RelationshipConfig(ConfigBase):
- """关系配置类"""
-
- give_name: bool = False
- """是否给其他人取名"""
-
-
-@dataclass
-class ChatConfig(ConfigBase):
- """聊天配置类"""
-
- chat_mode: str = "normal"
- """聊天模式"""
-
- auto_focus_threshold: float = 1.0
- """自动切换到专注聊天的阈值,越低越容易进入专注聊天"""
-
- exit_focus_threshold: float = 1.0
- """自动退出专注聊天的阈值,越低越容易退出专注聊天"""
-
-
-@dataclass
-class MessageReceiveConfig(ConfigBase):
- """消息接收配置类"""
-
- ban_words: set[str] = field(default_factory=lambda: set())
- """过滤词列表"""
-
- ban_msgs_regex: set[str] = field(default_factory=lambda: set())
- """过滤正则表达式列表"""
-
-
-@dataclass
-class NormalChatConfig(ConfigBase):
- """普通聊天配置类"""
-
- normal_chat_first_probability: float = 0.3
- """
- 发言时选择推理模型的概率(0-1之间)
- 选择普通模型的概率为 1 - reasoning_normal_model_probability
- """
-
- max_context_size: int = 15
- """上下文长度"""
-
- message_buffer: bool = False
- """消息缓冲器"""
-
- emoji_chance: float = 0.2
- """发送表情包的基础概率"""
-
- thinking_timeout: int = 120
- """最长思考时间"""
-
- willing_mode: str = "classical"
- """意愿模式"""
-
- talk_frequency: float = 1
- """回复频率阈值"""
-
- response_willing_amplifier: float = 1.0
- """回复意愿放大系数"""
-
- response_interested_rate_amplifier: float = 1.0
- """回复兴趣度放大系数"""
-
- talk_frequency_down_groups: list[str] = field(default_factory=lambda: [])
- """降低回复频率的群组"""
-
- down_frequency_rate: float = 3.0
- """降低回复频率的群组回复意愿降低系数"""
-
- emoji_response_penalty: float = 0.0
- """表情包回复惩罚系数"""
-
- mentioned_bot_inevitable_reply: bool = False
- """提及 bot 必然回复"""
-
- at_bot_inevitable_reply: bool = False
- """@bot 必然回复"""
-
- enable_planner: bool = False
- """是否启用动作规划器"""
-
-
-@dataclass
-class FocusChatConfig(ConfigBase):
- """专注聊天配置类"""
-
- observation_context_size: int = 12
- """可观察到的最长上下文大小,超过这个值的上下文会被压缩"""
-
- compressed_length: int = 5
- """心流上下文压缩的最短压缩长度,超过心流观察到的上下文长度,会压缩,最短压缩长度为5"""
-
- compress_length_limit: int = 5
- """最多压缩份数,超过该数值的压缩上下文会被删除"""
-
- think_interval: float = 1
- """思考间隔(秒)"""
-
- consecutive_replies: float = 1
- """连续回复能力,值越高,麦麦连续回复的概率越高"""
-
- parallel_processing: bool = False
- """是否允许处理器阶段和回忆阶段并行执行"""
-
- processor_max_time: int = 25
- """处理器最大时间,单位秒,如果超过这个时间,处理器会自动停止"""
-
- planner_type: str = "simple"
- """规划器类型,可选值:default(默认规划器), simple(简单规划器)"""
-
-
-@dataclass
-class FocusChatProcessorConfig(ConfigBase):
- """专注聊天处理器配置类"""
-
- mind_processor: bool = False
- """是否启用思维处理器"""
-
- self_identify_processor: bool = True
- """是否启用自我识别处理器"""
-
- relation_processor: bool = True
- """是否启用关系识别处理器"""
-
- tool_use_processor: bool = True
- """是否启用工具使用处理器"""
-
- working_memory_processor: bool = True
- """是否启用工作记忆处理器"""
-
-
-@dataclass
-class ExpressionConfig(ConfigBase):
- """表达配置类"""
-
- expression_style: str = ""
- """表达风格"""
-
- learning_interval: int = 300
- """学习间隔(秒)"""
-
- enable_expression_learning: bool = True
- """是否启用表达学习"""
-
-
-@dataclass
-class EmojiConfig(ConfigBase):
- """表情包配置类"""
-
- max_reg_num: int = 200
- """表情包最大注册数量"""
-
- do_replace: bool = True
- """达到最大注册数量时替换旧表情包"""
-
- check_interval: int = 120
- """表情包检查间隔(分钟)"""
-
- steal_emoji: bool = True
- """是否偷取表情包,让麦麦可以发送她保存的这些表情包"""
-
- content_filtration: bool = False
- """是否开启表情包过滤"""
-
- filtration_prompt: str = "符合公序良俗"
- """表情包过滤要求"""
-
-
-@dataclass
-class MemoryConfig(ConfigBase):
- """记忆配置类"""
-
- memory_build_interval: int = 600
- """记忆构建间隔(秒)"""
-
- memory_build_distribution: tuple[
- float,
- float,
- float,
- float,
- float,
- float,
- ] = field(default_factory=lambda: (6.0, 3.0, 0.6, 32.0, 12.0, 0.4))
- """记忆构建分布,参数:分布1均值,标准差,权重,分布2均值,标准差,权重"""
-
- memory_build_sample_num: int = 8
- """记忆构建采样数量"""
-
- memory_build_sample_length: int = 40
- """记忆构建采样长度"""
-
- memory_compress_rate: float = 0.1
- """记忆压缩率"""
-
- forget_memory_interval: int = 1000
- """记忆遗忘间隔(秒)"""
-
- memory_forget_time: int = 24
- """记忆遗忘时间(小时)"""
-
- memory_forget_percentage: float = 0.01
- """记忆遗忘比例"""
-
- consolidate_memory_interval: int = 1000
- """记忆整合间隔(秒)"""
-
- consolidation_similarity_threshold: float = 0.7
- """整合相似度阈值"""
-
- consolidate_memory_percentage: float = 0.01
- """整合检查节点比例"""
-
- memory_ban_words: list[str] = field(default_factory=lambda: ["表情包", "图片", "回复", "聊天记录"])
- """不允许记忆的词列表"""
-
-
-@dataclass
-class MoodConfig(ConfigBase):
- """情绪配置类"""
-
- mood_update_interval: int = 1
- """情绪更新间隔(秒)"""
-
- mood_decay_rate: float = 0.95
- """情绪衰减率"""
-
- mood_intensity_factor: float = 0.7
- """情绪强度因子"""
-
-
-@dataclass
-class KeywordRuleConfig(ConfigBase):
- """关键词规则配置类"""
-
- keywords: list[str] = field(default_factory=lambda: [])
- """关键词列表"""
-
- regex: list[str] = field(default_factory=lambda: [])
- """正则表达式列表"""
-
- reaction: str = ""
- """关键词触发的反应"""
-
- def __post_init__(self):
- """验证配置"""
- if not self.keywords and not self.regex:
- raise ValueError("关键词规则必须至少包含keywords或regex中的一个")
-
- if not self.reaction:
- raise ValueError("关键词规则必须包含reaction")
-
- # 验证正则表达式
- for pattern in self.regex:
- try:
- re.compile(pattern)
- except re.error as e:
- raise ValueError(f"无效的正则表达式 '{pattern}': {str(e)}") from e
-
-
-@dataclass
-class KeywordReactionConfig(ConfigBase):
- """关键词配置类"""
-
- keyword_rules: list[KeywordRuleConfig] = field(default_factory=lambda: [])
- """关键词规则列表"""
-
- regex_rules: list[KeywordRuleConfig] = field(default_factory=lambda: [])
- """正则表达式规则列表"""
-
- def __post_init__(self):
- """验证配置"""
- # 验证所有规则
- for rule in self.keyword_rules + self.regex_rules:
- if not isinstance(rule, KeywordRuleConfig):
- raise ValueError(f"规则必须是KeywordRuleConfig类型,而不是{type(rule).__name__}")
-
-
-@dataclass
-class ChineseTypoConfig(ConfigBase):
- """中文错别字配置类"""
-
- enable: bool = True
- """是否启用中文错别字生成器"""
-
- error_rate: float = 0.01
- """单字替换概率"""
-
- min_freq: int = 9
- """最小字频阈值"""
-
- tone_error_rate: float = 0.1
- """声调错误概率"""
-
- word_replace_rate: float = 0.006
- """整词替换概率"""
-
-
-@dataclass
-class ResponseSplitterConfig(ConfigBase):
- """回复分割器配置类"""
-
- enable: bool = True
- """是否启用回复分割器"""
-
- max_length: int = 256
- """回复允许的最大长度"""
-
- max_sentence_num: int = 3
- """回复允许的最大句子数"""
-
- enable_kaomoji_protection: bool = False
- """是否启用颜文字保护"""
-
-
-@dataclass
-class TelemetryConfig(ConfigBase):
- """遥测配置类"""
-
- enable: bool = True
- """是否启用遥测"""
-
-
-@dataclass
-class ExperimentalConfig(ConfigBase):
- """实验功能配置类"""
-
- debug_show_chat_mode: bool = False
- """是否在回复后显示当前聊天模式"""
-
- enable_friend_chat: bool = False
- """是否启用好友聊天"""
-
- pfc_chatting: bool = False
- """是否启用PFC"""
-
-
-@dataclass
-class MaimMessageConfig(ConfigBase):
- """maim_message配置类"""
-
- use_custom: bool = False
- """是否使用自定义的maim_message配置"""
-
- host: str = "127.0.0.1"
- """主机地址"""
-
- port: int = 8090
- """"端口号"""
-
- mode: Literal["ws", "tcp"] = "ws"
- """连接模式,支持ws和tcp"""
-
- use_wss: bool = False
- """是否使用WSS安全连接"""
-
- cert_file: str = ""
- """SSL证书文件路径,仅在use_wss=True时有效"""
-
- key_file: str = ""
- """SSL密钥文件路径,仅在use_wss=True时有效"""
-
- auth_token: list[str] = field(default_factory=lambda: [])
- """认证令牌,用于API验证,为空则不启用验证"""
-
-
-@dataclass
-class ModelConfig(ConfigBase):
- """模型配置类"""
-
- model_max_output_length: int = 800 # 最大回复长度
-
- utils: dict[str, Any] = field(default_factory=lambda: {})
- """组件模型配置"""
-
- utils_small: dict[str, Any] = field(default_factory=lambda: {})
- """组件小模型配置"""
-
- normal_chat_1: dict[str, Any] = field(default_factory=lambda: {})
- """normal_chat首要回复模型模型配置"""
-
- normal_chat_2: dict[str, Any] = field(default_factory=lambda: {})
- """normal_chat次要回复模型配置"""
-
- memory_summary: dict[str, Any] = field(default_factory=lambda: {})
- """记忆的概括模型配置"""
-
- vlm: dict[str, Any] = field(default_factory=lambda: {})
- """视觉语言模型配置"""
-
- focus_working_memory: dict[str, Any] = field(default_factory=lambda: {})
- """专注工作记忆模型配置"""
-
-
- focus_tool_use: dict[str, Any] = field(default_factory=lambda: {})
- """专注工具使用模型配置"""
-
- planner: dict[str, Any] = field(default_factory=lambda: {})
- """规划模型配置"""
-
- relation: dict[str, Any] = field(default_factory=lambda: {})
- """关系模型配置"""
-
- focus_expressor: dict[str, Any] = field(default_factory=lambda: {})
- """专注表达器模型配置"""
-
- embedding: dict[str, Any] = field(default_factory=lambda: {})
- """嵌入模型配置"""
-
- pfc_action_planner: dict[str, Any] = field(default_factory=lambda: {})
- """PFC动作规划模型配置"""
-
- pfc_chat: dict[str, Any] = field(default_factory=lambda: {})
- """PFC聊天模型配置"""
-
- pfc_reply_checker: dict[str, Any] = field(default_factory=lambda: {})
- """PFC回复检查模型配置"""