diff --git a/docs/manual_deploy_windows.md b/docs/manual_deploy_windows.md index 37f0a5e3..d5115120 100644 --- a/docs/manual_deploy_windows.md +++ b/docs/manual_deploy_windows.md @@ -75,22 +75,22 @@ conda activate maimbot pip install -r requirements.txt ``` -### 2️⃣ **然后你需要启动MongoDB数据库,来存储信息** +### 3️⃣ **然后你需要启动MongoDB数据库,来存储信息** - 安装并启动MongoDB服务 - 默认连接本地27017端口 -### 3️⃣ **配置NapCat,让麦麦bot与qq取得联系** +### 4️⃣ **配置NapCat,让麦麦bot与qq取得联系** - 安装并登录NapCat(用你的qq小号) - 添加反向WS: `ws://127.0.0.1:8080/onebot/v11/ws` -### 4️⃣ **配置文件设置,让麦麦Bot正常工作** +### 5️⃣ **配置文件设置,让麦麦Bot正常工作** - 修改环境配置文件:`.env.prod` - 修改机器人配置文件:`bot_config.toml` -### 5️⃣ **启动麦麦机器人** +### 6️⃣ **启动麦麦机器人** - 打开命令行,cd到对应路径 @@ -104,7 +104,7 @@ nb run python bot.py ``` -### 6️⃣ **其他组件(可选)** +### 7️⃣ **其他组件(可选)** - `run_thingking.bat`: 启动可视化推理界面(未完善) - 直接运行 knowledge.py生成知识库 diff --git a/src/common/logger.py b/src/common/logger.py index 91f1a1da..45d6f415 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -86,6 +86,25 @@ MEMORY_STYLE_CONFIG = { }, } + +#MOOD +MOOD_STYLE_CONFIG = { + "advanced": { + "console_format": ( + "{time:YYYY-MM-DD HH:mm:ss} | " + "{level: <8} | " + "{extra[module]: <12} | " + "心情 | " + "{message}" + ), + "file_format": ("{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 心情 | {message}"), + }, + "simple": { + "console_format": ("{time:MM-DD HH:mm} | 心情 | {message}"), + "file_format": ("{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 心情 | {message}"), + }, +} + SENDER_STYLE_CONFIG = { "advanced": { "console_format": ( @@ -163,7 +182,7 @@ TOPIC_STYLE_CONFIG = TOPIC_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else TOPIC_ST SENDER_STYLE_CONFIG = SENDER_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else SENDER_STYLE_CONFIG["advanced"] LLM_STYLE_CONFIG = LLM_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else LLM_STYLE_CONFIG["advanced"] CHAT_STYLE_CONFIG = CHAT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else CHAT_STYLE_CONFIG["advanced"] - +MOOD_STYLE_CONFIG = MOOD_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else MOOD_STYLE_CONFIG["advanced"] def is_registered_module(record: dict) -> bool: """检查是否为已注册的模块""" diff --git a/src/plugins/chat/__init__.py b/src/plugins/chat/__init__.py index 78e026ca..f51184a7 100644 --- a/src/plugins/chat/__init__.py +++ b/src/plugins/chat/__init__.py @@ -150,7 +150,7 @@ async def merge_memory_task(): # print("\033[1;32m[记忆整合]\033[0m 记忆整合完成") -@scheduler.scheduled_job("interval", seconds=30, id="print_mood") +@scheduler.scheduled_job("interval", seconds=15, id="print_mood") async def print_mood_task(): """每30秒打印一次情绪状态""" mood_manager = MoodManager.get_instance() diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py index ef9878c4..bc7090ac 100644 --- a/src/plugins/chat/utils.py +++ b/src/plugins/chat/utils.py @@ -46,11 +46,18 @@ def is_mentioned_bot_in_message(message: MessageRecv) -> bool: """检查消息是否提到了机器人""" keywords = [global_config.BOT_NICKNAME] nicknames = global_config.BOT_ALIAS_NAMES + message_id = int(re.findall(r"\[CQ:reply,id=([0-9]*)\]", message.raw_message)[0]) + result = db.messages.find_one({"message_id": message_id}) + message_content = re.sub(r'\[CQ:reply,[\s\S]*?\]','', message.raw_message) + message_content = re.sub(r'\[CQ:cq,[\s\S]*?\]','', message_content) + logger.exception(f"raw_message : {message.raw_message}") for keyword in keywords: - if keyword in message.processed_plain_text: + if (f"[回复 {keyword} 的消息: " in message.processed_plain_text) and result == None: + return True + if keyword in message_content: return True for nickname in nicknames: - if nickname in message.processed_plain_text: + if nickname in message_content: return True return False @@ -539,4 +546,4 @@ def is_western_char(char): def is_western_paragraph(paragraph): """检测是否为西文字符段落""" return all(is_western_char(char) for char in paragraph if char.isalnum()) - \ No newline at end of file + diff --git a/src/plugins/moods/moods.py b/src/plugins/moods/moods.py index 3e977d02..986075da 100644 --- a/src/plugins/moods/moods.py +++ b/src/plugins/moods/moods.py @@ -4,9 +4,14 @@ import time from dataclasses import dataclass from ..chat.config import global_config -from src.common.logger import get_module_logger +from src.common.logger import get_module_logger, LogConfig, MOOD_STYLE_CONFIG -logger = get_module_logger("mood_manager") +mood_config = LogConfig( + # 使用海马体专用样式 + console_format=MOOD_STYLE_CONFIG["console_format"], + file_format=MOOD_STYLE_CONFIG["file_format"], +) +logger = get_module_logger("mood_manager", config=mood_config) @dataclass diff --git a/src/plugins/remote/remote.py b/src/plugins/remote/remote.py index fdc805df..8586aa67 100644 --- a/src/plugins/remote/remote.py +++ b/src/plugins/remote/remote.py @@ -54,7 +54,9 @@ def send_heartbeat(server_url, client_id): sys = platform.system() try: headers = {"Client-ID": client_id, "User-Agent": f"HeartbeatClient/{client_id[:8]}"} - data = json.dumps({"system": sys}) + data = json.dumps( + {"system": sys, "Version": global_config.MAI_VERSION}, + ) response = requests.post(f"{server_url}/api/clients", headers=headers, data=data) if response.status_code == 201: @@ -92,9 +94,9 @@ class HeartbeatThread(threading.Thread): logger.info(f"{self.interval}秒后发送下一次心跳...") else: logger.info(f"{self.interval}秒后重试...") - + self.last_heartbeat_time = time.time() - + # 使用可中断的等待代替 sleep # 每秒检查一次是否应该停止或发送心跳 remaining_wait = self.interval @@ -104,7 +106,7 @@ class HeartbeatThread(threading.Thread): if self.stop_event.wait(wait_time): break # 如果事件被设置,立即退出等待 remaining_wait -= wait_time - + # 检查是否由于外部原因导致间隔异常延长 if time.time() - self.last_heartbeat_time >= self.interval * 1.5: logger.warning("检测到心跳间隔异常延长,立即发送心跳") diff --git a/src/plugins/schedule/schedule_generator copy.py b/src/plugins/schedule/schedule_generator copy.py deleted file mode 100644 index eff0a08d..00000000 --- a/src/plugins/schedule/schedule_generator copy.py +++ /dev/null @@ -1,191 +0,0 @@ -import datetime -import json -import re -import os -import sys -from typing import Dict, Union - - -# 添加项目根目录到 Python 路径 -root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) -sys.path.append(root_path) - -from src.common.database import db # noqa: E402 -from src.common.logger import get_module_logger # noqa: E402 -from src.plugins.schedule.offline_llm import LLMModel # noqa: E402 -from src.plugins.chat.config import global_config # noqa: E402 - -logger = get_module_logger("scheduler") - - -class ScheduleGenerator: - enable_output: bool = True - - def __init__(self): - # 使用离线LLM模型 - self.llm_scheduler = LLMModel(model_name="Pro/deepseek-ai/DeepSeek-V3", temperature=0.9) - self.today_schedule_text = "" - self.today_schedule = {} - self.tomorrow_schedule_text = "" - self.tomorrow_schedule = {} - self.yesterday_schedule_text = "" - self.yesterday_schedule = {} - - async def initialize(self): - today = datetime.datetime.now() - tomorrow = datetime.datetime.now() + datetime.timedelta(days=1) - yesterday = datetime.datetime.now() - datetime.timedelta(days=1) - - self.today_schedule_text, self.today_schedule = await self.generate_daily_schedule(target_date=today) - self.tomorrow_schedule_text, self.tomorrow_schedule = await self.generate_daily_schedule( - target_date=tomorrow, read_only=True - ) - self.yesterday_schedule_text, self.yesterday_schedule = await self.generate_daily_schedule( - target_date=yesterday, read_only=True - ) - - async def generate_daily_schedule( - self, target_date: datetime.datetime = None, read_only: bool = False - ) -> Dict[str, str]: - date_str = target_date.strftime("%Y-%m-%d") - weekday = target_date.strftime("%A") - - schedule_text = str - - existing_schedule = db.schedule.find_one({"date": date_str}) - if existing_schedule: - if self.enable_output: - logger.debug(f"{date_str}的日程已存在:") - schedule_text = existing_schedule["schedule"] - # print(self.schedule_text) - - elif not read_only: - logger.debug(f"{date_str}的日程不存在,准备生成新的日程。") - prompt = ( - f"""我是{global_config.BOT_NICKNAME},{global_config.PROMPT_SCHEDULE_GEN},请为我生成{date_str}({weekday})的日程安排,包括:""" - + """ - 1. 早上的学习和工作安排 - 2. 下午的活动和任务 - 3. 晚上的计划和休息时间 - 请按照时间顺序列出具体时间点和对应的活动,用一个时间点而不是时间段来表示时间,用JSON格式返回日程表, - 仅返回内容,不要返回注释,不要添加任何markdown或代码块样式,时间采用24小时制, - 格式为{"时间": "活动","时间": "活动",...}。""" - ) - - try: - schedule_text, _ = self.llm_scheduler.generate_response(prompt) - db.schedule.insert_one({"date": date_str, "schedule": schedule_text}) - self.enable_output = True - except Exception as e: - logger.error(f"生成日程失败: {str(e)}") - schedule_text = "生成日程时出错了" - # print(self.schedule_text) - else: - if self.enable_output: - logger.debug(f"{date_str}的日程不存在。") - schedule_text = "忘了" - - return schedule_text, None - - schedule_form = self._parse_schedule(schedule_text) - return schedule_text, schedule_form - - def _parse_schedule(self, schedule_text: str) -> Union[bool, Dict[str, str]]: - """解析日程文本,转换为时间和活动的字典""" - try: - reg = r"\{(.|\r|\n)+\}" - matched = re.search(reg, schedule_text)[0] - schedule_dict = json.loads(matched) - return schedule_dict - except json.JSONDecodeError: - logger.exception("解析日程失败: {}".format(schedule_text)) - return False - - def _parse_time(self, time_str: str) -> str: - """解析时间字符串,转换为时间""" - return datetime.datetime.strptime(time_str, "%H:%M") - - def get_current_task(self) -> str: - """获取当前时间应该进行的任务""" - current_time = datetime.datetime.now().strftime("%H:%M") - - # 找到最接近当前时间的任务 - closest_time = None - min_diff = float("inf") - - # 检查今天的日程 - if not self.today_schedule: - return "摸鱼" - for time_str in self.today_schedule.keys(): - diff = abs(self._time_diff(current_time, time_str)) - if closest_time is None or diff < min_diff: - closest_time = time_str - min_diff = diff - - # 检查昨天的日程中的晚间任务 - if self.yesterday_schedule: - for time_str in self.yesterday_schedule.keys(): - if time_str >= "20:00": # 只考虑晚上8点之后的任务 - # 计算与昨天这个时间点的差异(需要加24小时) - diff = abs(self._time_diff(current_time, time_str)) - if diff < min_diff: - closest_time = time_str - min_diff = diff - return closest_time, self.yesterday_schedule[closest_time] - - if closest_time: - return closest_time, self.today_schedule[closest_time] - return "摸鱼" - - def _time_diff(self, time1: str, time2: str) -> int: - """计算两个时间字符串之间的分钟差""" - if time1 == "24:00": - time1 = "23:59" - if time2 == "24:00": - time2 = "23:59" - t1 = datetime.datetime.strptime(time1, "%H:%M") - t2 = datetime.datetime.strptime(time2, "%H:%M") - diff = int((t2 - t1).total_seconds() / 60) - # 考虑时间的循环性 - if diff < -720: - diff += 1440 # 加一天的分钟 - elif diff > 720: - diff -= 1440 # 减一天的分钟 - # print(f"时间1[{time1}]: 时间2[{time2}],差值[{diff}]分钟") - return diff - - def print_schedule(self): - """打印完整的日程安排""" - if not self._parse_schedule(self.today_schedule_text): - logger.warning("今日日程有误,将在下次运行时重新生成") - db.schedule.delete_one({"date": datetime.datetime.now().strftime("%Y-%m-%d")}) - else: - logger.info("=== 今日日程安排 ===") - for time_str, activity in self.today_schedule.items(): - logger.info(f"时间[{time_str}]: 活动[{activity}]") - logger.info("==================") - self.enable_output = False - - -async def main(): - # 使用示例 - scheduler = ScheduleGenerator() - await scheduler.initialize() - scheduler.print_schedule() - print("\n当前任务:") - print(await scheduler.get_current_task()) - - print("昨天日程:") - print(scheduler.yesterday_schedule) - print("今天日程:") - print(scheduler.today_schedule) - print("明天日程:") - print(scheduler.tomorrow_schedule) - -# 当作为组件导入时使用的实例 -bot_schedule = ScheduleGenerator() - -if __name__ == "__main__": - import asyncio - # 当直接运行此文件时执行 - asyncio.run(main()) diff --git a/src/plugins/schedule/schedule_generator_pro.py b/src/plugins/schedule/schedule_generator_pro.py new file mode 100644 index 00000000..5a2c2a68 --- /dev/null +++ b/src/plugins/schedule/schedule_generator_pro.py @@ -0,0 +1,222 @@ +import datetime +import json +import re +import os +import sys +from typing import Dict, Union +# 添加项目根目录到 Python 路径 +root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) +sys.path.append(root_path) + +from src.common.database import db # noqa: E402 +from src.common.logger import get_module_logger # noqa: E402 +from src.plugins.schedule.offline_llm import LLMModel # noqa: E402 + +logger = get_module_logger("scheduler") + + +class ScheduleGenerator: + enable_output: bool = True + + def __init__(self, name: str = "bot_name", personality: str = "你是一个爱国爱党的新时代青年", behavior: str = "你非常外向,喜欢尝试新事物和人交流"): + # 使用离线LLM模型 + self.llm_scheduler = LLMModel(model_name="Pro/deepseek-ai/DeepSeek-V3", temperature=0.9) + + self.today_schedule_text = "" + self.today_done_list = [] + + self.yesterday_schedule_text = "" + self.yesterday_done_list = [] + + self.name = name + self.personality = personality + self.behavior = behavior + + self.start_time = datetime.datetime.now() + + async def mai_schedule_start(self): + """启动日程系统,每5分钟执行一次move_doing,并在日期变化时重新检查日程""" + try: + logger.info(f"日程系统启动/刷新时间: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") + # 初始化日程 + await self.check_and_create_today_schedule() + self.print_schedule() + + while True: + current_time = datetime.datetime.now() + + # 检查是否需要重新生成日程(日期变化) + if current_time.date() != self.start_time.date(): + logger.info("检测到日期变化,重新生成日程") + self.start_time = current_time + await self.check_and_create_today_schedule() + self.print_schedule() + + # 执行当前活动 + current_activity = await self.move_doing() + logger.info(f"当前活动: {current_activity}") + + # 等待5分钟 + await asyncio.sleep(300) # 300秒 = 5分钟 + + except Exception as e: + logger.error(f"日程系统运行时出错: {str(e)}") + logger.exception("详细错误信息:") + + async def check_and_create_today_schedule(self): + """检查昨天的日程,并确保今天有日程安排 + + Returns: + tuple: (today_schedule_text, today_schedule) 今天的日程文本和解析后的日程字典 + """ + today = datetime.datetime.now() + yesterday = today - datetime.timedelta(days=1) + + # 先检查昨天的日程 + self.yesterday_schedule_text, self.yesterday_done_list = self.load_schedule_from_db(yesterday) + if self.yesterday_schedule_text: + logger.debug(f"已加载{yesterday.strftime('%Y-%m-%d')}的日程") + + # 检查今天的日程 + self.today_schedule_text, self.today_done_list = self.load_schedule_from_db(today) + if not self.today_schedule_text: + logger.info(f"{today.strftime('%Y-%m-%d')}的日程不存在,准备生成新的日程") + self.today_schedule_text = await self.generate_daily_schedule(target_date=today) + + self.save_today_schedule_to_db() + + def construct_daytime_prompt(self, target_date: datetime.datetime): + date_str = target_date.strftime("%Y-%m-%d") + weekday = target_date.strftime("%A") + + prompt = f"我是{self.name},{self.personality},{self.behavior}" + prompt += f"我昨天的日程是:{self.yesterday_schedule_text}\n" + prompt += f"请为我生成{date_str}({weekday})的日程安排,结合我的个人特点和行为习惯\n" + prompt += "推测我的日程安排,包括我一天都在做什么,有什么发现和思考,具体一些,详细一些,记得写明时间\n" + prompt += "直接返回我的日程,不要输出其他内容:" + return prompt + + def construct_doing_prompt(self,time: datetime.datetime): + now_time = time.strftime("%H:%M") + previous_doing = self.today_done_list[-20:] if len(self.today_done_list) > 20 else self.today_done_list + prompt = f"我是{self.name},{self.personality},{self.behavior}" + prompt += f"我今天的日程是:{self.today_schedule_text}\n" + prompt += f"我之前做了的事情是:{previous_doing}\n" + prompt += f"现在是{now_time},结合我的个人特点和行为习惯," + prompt += "推测我现在做什么,具体一些,详细一些\n" + prompt += "直接返回我在做的事情,不要输出其他内容:" + return prompt + + async def generate_daily_schedule( + self, target_date: datetime.datetime = None,) -> Dict[str, str]: + daytime_prompt = self.construct_daytime_prompt(target_date) + daytime_response, _ = await self.llm_scheduler.generate_response(daytime_prompt) + return daytime_response + + def _time_diff(self, time1: str, time2: str) -> int: + """计算两个时间字符串之间的分钟差""" + if time1 == "24:00": + time1 = "23:59" + if time2 == "24:00": + time2 = "23:59" + t1 = datetime.datetime.strptime(time1, "%H:%M") + t2 = datetime.datetime.strptime(time2, "%H:%M") + diff = int((t2 - t1).total_seconds() / 60) + # 考虑时间的循环性 + if diff < -720: + diff += 1440 # 加一天的分钟 + elif diff > 720: + diff -= 1440 # 减一天的分钟 + # print(f"时间1[{time1}]: 时间2[{time2}],差值[{diff}]分钟") + return diff + + def print_schedule(self): + """打印完整的日程安排""" + if not self.today_schedule_text: + logger.warning("今日日程有误,将在下次运行时重新生成") + db.schedule.delete_one({"date": datetime.datetime.now().strftime("%Y-%m-%d")}) + else: + logger.info("=== 今日日程安排 ===") + logger.info(self.today_schedule_text) + logger.info("==================") + self.enable_output = False + + async def update_today_done_list(self): + # 更新数据库中的 today_done_list + today_str = datetime.datetime.now().strftime("%Y-%m-%d") + existing_schedule = db.schedule.find_one({"date": today_str}) + + if existing_schedule: + # 更新数据库中的 today_done_list + db.schedule.update_one( + {"date": today_str}, + {"$set": {"today_done_list": self.today_done_list}} + ) + logger.debug(f"已更新{today_str}的已完成活动列表") + else: + logger.warning(f"未找到{today_str}的日程记录") + + async def move_doing(self): + current_time = datetime.datetime.now() + time_str = current_time.strftime("%H:%M") + doing_prompt = self.construct_doing_prompt(current_time) + doing_response, _ = await self.llm_scheduler.generate_response(doing_prompt) + self.today_done_list.append(current_time,time_str + "在" + doing_response) + + await self.update_today_done_list() + + return doing_response + + + + + def save_today_schedule_to_db(self): + """保存日程到数据库,同时初始化 today_done_list""" + date_str = datetime.datetime.now().strftime("%Y-%m-%d") + schedule_data = { + "date": date_str, + "schedule": self.today_schedule_text, + "today_done_list": self.today_done_list if hasattr(self, 'today_done_list') else [] + } + # 使用 upsert 操作,如果存在则更新,不存在则插入 + db.schedule.update_one( + {"date": date_str}, + {"$set": schedule_data}, + upsert=True + ) + logger.debug(f"已保存{date_str}的日程到数据库") + + def load_schedule_from_db(self, date: datetime.datetime): + """从数据库加载日程,同时加载 today_done_list""" + date_str = date.strftime("%Y-%m-%d") + existing_schedule = db.schedule.find_one({"date": date_str}) + + if existing_schedule: + schedule_text = existing_schedule["schedule"] + return schedule_text, existing_schedule.get("today_done_list", []) + else: + logger.debug(f"{date_str}的日程不存在") + return None, None + +async def main(): + # 使用示例 + scheduler = ScheduleGenerator(name="麦麦", personality="你叫麦麦,你19岁,是一个大二的女大学生,你有一头黑色短发,你会刷贴吧,你现在在学习心理学", behavior="你比较内向") + await scheduler.check_and_create_today_schedule() + scheduler.print_schedule() + print("\n当前任务:") + print(await scheduler.get_current_task()) + + print("昨天日程:") + print(scheduler.yesterday_schedule) + print("今天日程:") + print(scheduler.today_schedule) + print("明天日程:") + print(scheduler.tomorrow_schedule) + +# 当作为组件导入时使用的实例 +bot_schedule = ScheduleGenerator() + +if __name__ == "__main__": + import asyncio + # 当直接运行此文件时执行 + asyncio.run(main()) diff --git a/配置文件错误排查.py b/配置文件错误排查.py new file mode 100644 index 00000000..c8b91173 --- /dev/null +++ b/配置文件错误排查.py @@ -0,0 +1,617 @@ +import tomli +import sys +import re +from pathlib import Path +from typing import Dict, Any, List, Set, Tuple + +def load_toml_file(file_path: str) -> Dict[str, Any]: + """加载TOML文件""" + try: + with open(file_path, "rb") as f: + return tomli.load(f) + except Exception as e: + print(f"错误: 无法加载配置文件 {file_path}: {str(e)} 请检查文件是否存在或者他妈的有没有东西没写值") + sys.exit(1) + +def load_env_file(file_path: str) -> Dict[str, str]: + """加载.env文件中的环境变量""" + env_vars = {} + try: + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + if '=' in line: + key, value = line.split('=', 1) + key = key.strip() + value = value.strip() + + # 处理注释 + if '#' in value: + value = value.split('#', 1)[0].strip() + + # 处理引号 + if (value.startswith('"') and value.endswith('"')) or \ + (value.startswith("'") and value.endswith("'")): + value = value[1:-1] + + env_vars[key] = value + return env_vars + except Exception as e: + print(f"警告: 无法加载.env文件 {file_path}: {str(e)}") + return {} + +def check_required_sections(config: Dict[str, Any]) -> List[str]: + """检查必要的配置段是否存在""" + required_sections = [ + "inner", "bot", "personality", "message", "emoji", + "cq_code", "response", "willing", "memory", "mood", + "groups", "model" + ] + missing_sections = [] + + for section in required_sections: + if section not in config: + missing_sections.append(section) + + return missing_sections + +def check_probability_sum(config: Dict[str, Any]) -> List[Tuple[str, float]]: + """检查概率总和是否为1""" + errors = [] + + # 检查人格概率 + if "personality" in config: + personality = config["personality"] + prob_sum = sum([ + personality.get("personality_1_probability", 0), + personality.get("personality_2_probability", 0), + personality.get("personality_3_probability", 0) + ]) + if abs(prob_sum - 1.0) > 0.001: # 允许有小数点精度误差 + errors.append(("人格概率总和", prob_sum)) + + # 检查响应模型概率 + if "response" in config: + response = config["response"] + model_prob_sum = sum([ + response.get("model_r1_probability", 0), + response.get("model_v3_probability", 0), + response.get("model_r1_distill_probability", 0) + ]) + if abs(model_prob_sum - 1.0) > 0.001: + errors.append(("响应模型概率总和", model_prob_sum)) + + return errors + +def check_probability_range(config: Dict[str, Any]) -> List[Tuple[str, float]]: + """检查概率值是否在0-1范围内""" + errors = [] + + # 收集所有概率值 + prob_fields = [] + + # 人格概率 + if "personality" in config: + personality = config["personality"] + prob_fields.extend([ + ("personality.personality_1_probability", personality.get("personality_1_probability")), + ("personality.personality_2_probability", personality.get("personality_2_probability")), + ("personality.personality_3_probability", personality.get("personality_3_probability")) + ]) + + # 消息概率 + if "message" in config: + message = config["message"] + prob_fields.append(("message.emoji_chance", message.get("emoji_chance"))) + + # 响应模型概率 + if "response" in config: + response = config["response"] + prob_fields.extend([ + ("response.model_r1_probability", response.get("model_r1_probability")), + ("response.model_v3_probability", response.get("model_v3_probability")), + ("response.model_r1_distill_probability", response.get("model_r1_distill_probability")) + ]) + + # 情绪衰减率 + if "mood" in config: + mood = config["mood"] + prob_fields.append(("mood.mood_decay_rate", mood.get("mood_decay_rate"))) + + # 中文错别字概率 + if "chinese_typo" in config and config["chinese_typo"].get("enable", False): + typo = config["chinese_typo"] + prob_fields.extend([ + ("chinese_typo.error_rate", typo.get("error_rate")), + ("chinese_typo.tone_error_rate", typo.get("tone_error_rate")), + ("chinese_typo.word_replace_rate", typo.get("word_replace_rate")) + ]) + + # 检查所有概率值是否在0-1范围内 + for field_name, value in prob_fields: + if value is not None and (value < 0 or value > 1): + errors.append((field_name, value)) + + return errors + +def check_model_configurations(config: Dict[str, Any], env_vars: Dict[str, str]) -> List[str]: + """检查模型配置是否完整,并验证provider是否正确""" + errors = [] + + if "model" not in config: + return ["缺少[model]部分"] + + required_models = [ + "llm_reasoning", "llm_reasoning_minor", "llm_normal", + "llm_normal_minor", "llm_emotion_judge", "llm_topic_judge", + "llm_summary_by_topic", "vlm", "embedding" + ] + + # 从环境变量中提取有效的API提供商 + valid_providers = set() + for key in env_vars: + if key.endswith('_BASE_URL'): + provider_name = key.replace('_BASE_URL', '') + valid_providers.add(provider_name) + + # 将provider名称标准化以便比较 + provider_mapping = { + "SILICONFLOW": ["SILICONFLOW", "SILICON_FLOW", "SILICON-FLOW"], + "CHAT_ANY_WHERE": ["CHAT_ANY_WHERE", "CHAT-ANY-WHERE", "CHATANYWHERE"], + "DEEP_SEEK": ["DEEP_SEEK", "DEEP-SEEK", "DEEPSEEK"] + } + + # 创建反向映射表,用于检查错误拼写 + reverse_mapping = {} + for standard, variants in provider_mapping.items(): + for variant in variants: + reverse_mapping[variant.upper()] = standard + + for model_name in required_models: + # 检查model下是否有对应子部分 + if model_name not in config["model"]: + errors.append(f"缺少[model.{model_name}]配置") + else: + model_config = config["model"][model_name] + if "name" not in model_config: + errors.append(f"[model.{model_name}]缺少name属性") + + if "provider" not in model_config: + errors.append(f"[model.{model_name}]缺少provider属性") + else: + provider = model_config["provider"].upper() + + # 检查拼写错误 + for known_provider, correct_provider in reverse_mapping.items(): + # 使用模糊匹配检测拼写错误 + if provider != known_provider and _similar_strings(provider, known_provider) and provider not in reverse_mapping: + errors.append(f"[model.{model_name}]的provider '{model_config['provider']}' 可能拼写错误,应为 '{known_provider}'") + break + + return errors + +def _similar_strings(s1: str, s2: str) -> bool: + """简单检查两个字符串是否相似(用于检测拼写错误)""" + # 如果两个字符串长度相差过大,则认为不相似 + if abs(len(s1) - len(s2)) > 2: + return False + + # 计算相同字符的数量 + common_chars = sum(1 for c1, c2 in zip(s1, s2) if c1 == c2) + # 如果相同字符比例超过80%,则认为相似 + return common_chars / max(len(s1), len(s2)) > 0.8 + +def check_api_providers(config: Dict[str, Any], env_vars: Dict[str, str]) -> List[str]: + """检查配置文件中的API提供商是否与环境变量中的一致""" + errors = [] + + if "model" not in config: + return ["缺少[model]部分"] + + # 从环境变量中提取有效的API提供商 + valid_providers = {} + for key in env_vars: + if key.endswith('_BASE_URL'): + provider_name = key.replace('_BASE_URL', '') + base_url = env_vars[key] + valid_providers[provider_name] = { + "base_url": base_url, + "key": env_vars.get(f"{provider_name}_KEY", "") + } + + # 检查配置文件中使用的所有提供商 + used_providers = set() + for model_category, model_config in config["model"].items(): + if "provider" in model_config: + provider = model_config["provider"] + used_providers.add(provider) + + # 检查此提供商是否在环境变量中定义 + normalized_provider = provider.replace(" ", "_").upper() + found = False + for env_provider in valid_providers: + if normalized_provider == env_provider: + found = True + break + # 尝试更宽松的匹配(例如SILICONFLOW可能匹配SILICON_FLOW) + elif normalized_provider.replace("_", "") == env_provider.replace("_", ""): + found = True + errors.append(f"提供商 '{provider}' 在环境变量中的名称是 '{env_provider}', 建议统一命名") + break + + if not found: + errors.append(f"提供商 '{provider}' 在环境变量中未定义") + + # 特别检查常见的拼写错误 + for provider in used_providers: + if provider.upper() == "SILICONFOLW": + errors.append(f"提供商 'SILICONFOLW' 存在拼写错误,应为 'SILICONFLOW'") + + return errors + +def check_groups_configuration(config: Dict[str, Any]) -> List[str]: + """检查群组配置""" + errors = [] + + if "groups" not in config: + return ["缺少[groups]部分"] + + groups = config["groups"] + + # 检查talk_allowed是否为列表 + if "talk_allowed" not in groups: + errors.append("缺少groups.talk_allowed配置") + elif not isinstance(groups["talk_allowed"], list): + errors.append("groups.talk_allowed应该是一个列表") + else: + # 检查talk_allowed是否包含默认示例值123 + if 123 in groups["talk_allowed"]: + errors.append({ + "main": "groups.talk_allowed中存在默认示例值'123',请修改为真实的群号", + "details": [ + f" 当前值: {groups['talk_allowed']}", + f" '123'为示例值,需要替换为真实群号" + ] + }) + + # 检查是否存在重复的群号 + talk_allowed = groups["talk_allowed"] + duplicates = [] + seen = set() + for gid in talk_allowed: + if gid in seen and gid not in duplicates: + duplicates.append(gid) + seen.add(gid) + + if duplicates: + errors.append({ + "main": "groups.talk_allowed中存在重复的群号", + "details": [f" 重复的群号: {duplicates}"] + }) + + # 检查其他群组配置 + if "talk_frequency_down" in groups and not isinstance(groups["talk_frequency_down"], list): + errors.append("groups.talk_frequency_down应该是一个列表") + + if "ban_user_id" in groups and not isinstance(groups["ban_user_id"], list): + errors.append("groups.ban_user_id应该是一个列表") + + return errors + +def check_keywords_reaction(config: Dict[str, Any]) -> List[str]: + """检查关键词反应配置""" + errors = [] + + if "keywords_reaction" not in config: + return ["缺少[keywords_reaction]部分"] + + kr = config["keywords_reaction"] + + # 检查enable字段 + if "enable" not in kr: + errors.append("缺少keywords_reaction.enable配置") + + # 检查规则配置 + if "rules" not in kr: + errors.append("缺少keywords_reaction.rules配置") + elif not isinstance(kr["rules"], list): + errors.append("keywords_reaction.rules应该是一个列表") + else: + for i, rule in enumerate(kr["rules"]): + if "enable" not in rule: + errors.append(f"关键词规则 #{i+1} 缺少enable字段") + if "keywords" not in rule: + errors.append(f"关键词规则 #{i+1} 缺少keywords字段") + elif not isinstance(rule["keywords"], list): + errors.append(f"关键词规则 #{i+1} 的keywords应该是一个列表") + if "reaction" not in rule: + errors.append(f"关键词规则 #{i+1} 缺少reaction字段") + + return errors + +def check_willing_mode(config: Dict[str, Any]) -> List[str]: + """检查回复意愿模式配置""" + errors = [] + + if "willing" not in config: + return ["缺少[willing]部分"] + + willing = config["willing"] + + if "willing_mode" not in willing: + errors.append("缺少willing.willing_mode配置") + elif willing["willing_mode"] not in ["classical", "dynamic", "custom"]: + errors.append(f"willing.willing_mode值无效: {willing['willing_mode']}, 应为classical/dynamic/custom") + + return errors + +def check_memory_config(config: Dict[str, Any]) -> List[str]: + """检查记忆系统配置""" + errors = [] + + if "memory" not in config: + return ["缺少[memory]部分"] + + memory = config["memory"] + + # 检查必要的参数 + required_fields = [ + "build_memory_interval", "memory_compress_rate", + "forget_memory_interval", "memory_forget_time", + "memory_forget_percentage" + ] + + for field in required_fields: + if field not in memory: + errors.append(f"缺少memory.{field}配置") + + # 检查参数值的有效性 + if "memory_compress_rate" in memory and (memory["memory_compress_rate"] <= 0 or memory["memory_compress_rate"] > 1): + errors.append(f"memory.memory_compress_rate值无效: {memory['memory_compress_rate']}, 应在0-1之间") + + if "memory_forget_percentage" in memory and (memory["memory_forget_percentage"] <= 0 or memory["memory_forget_percentage"] > 1): + errors.append(f"memory.memory_forget_percentage值无效: {memory['memory_forget_percentage']}, 应在0-1之间") + + return errors + +def check_personality_config(config: Dict[str, Any]) -> List[str]: + """检查人格配置""" + errors = [] + + if "personality" not in config: + return ["缺少[personality]部分"] + + personality = config["personality"] + + # 检查prompt_personality是否存在且为数组 + if "prompt_personality" not in personality: + errors.append("缺少personality.prompt_personality配置") + elif not isinstance(personality["prompt_personality"], list): + errors.append("personality.prompt_personality应该是一个数组") + else: + # 检查数组长度 + if len(personality["prompt_personality"]) < 1: + errors.append(f"personality.prompt_personality数组长度不足,当前长度: {len(personality['prompt_personality'])}, 需要至少1项") + else: + # 模板默认值 + template_values = [ + "用一句话或几句话描述性格特点和其他特征", + "用一句话或几句话描述性格特点和其他特征", + "例如,是一个热爱国家热爱党的新时代好青年" + ] + + # 检查是否仍然使用默认模板值 + error_details = [] + for i, (current, template) in enumerate(zip(personality["prompt_personality"][:3], template_values)): + if current == template: + error_details.append({ + "main": f"personality.prompt_personality第{i+1}项仍使用默认模板值,请自定义", + "details": [ + f" 当前值: '{current}'", + f" 请不要使用模板值: '{template}'" + ] + }) + + # 将错误添加到errors列表 + for error in error_details: + errors.append(error) + + return errors + +def check_bot_config(config: Dict[str, Any]) -> List[str]: + """检查机器人基础配置""" + errors = [] + infos = [] + + if "bot" not in config: + return ["缺少[bot]部分"] + + bot = config["bot"] + + # 检查QQ号是否为默认值或测试值 + if "qq" not in bot: + errors.append("缺少bot.qq配置") + elif bot["qq"] == 1 or bot["qq"] == 123: + errors.append(f"QQ号 '{bot['qq']}' 似乎是默认值或测试值,请设置为真实的QQ号") + else: + infos.append(f"当前QQ号: {bot['qq']}") + + # 检查昵称是否设置 + if "nickname" not in bot or not bot["nickname"]: + errors.append("缺少bot.nickname配置或昵称为空") + elif bot["nickname"]: + infos.append(f"当前昵称: {bot['nickname']}") + + # 检查别名是否为列表 + if "alias_names" in bot and not isinstance(bot["alias_names"], list): + errors.append("bot.alias_names应该是一个列表") + + return errors, infos + +def format_results(all_errors): + """格式化检查结果""" + sections_errors, prob_sum_errors, prob_range_errors, model_errors, api_errors, groups_errors, kr_errors, willing_errors, memory_errors, personality_errors, bot_results = all_errors + bot_errors, bot_infos = bot_results + + if not any([sections_errors, prob_sum_errors, prob_range_errors, model_errors, api_errors, groups_errors, kr_errors, willing_errors, memory_errors, personality_errors, bot_errors]): + result = "✅ 配置文件检查通过,未发现问题。" + + # 添加机器人信息 + if bot_infos: + result += "\n\n【机器人信息】" + for info in bot_infos: + result += f"\n - {info}" + + return result + + output = [] + output.append("❌ 配置文件检查发现以下问题:") + + if sections_errors: + output.append("\n【缺失的配置段】") + for section in sections_errors: + output.append(f" - {section}") + + if prob_sum_errors: + output.append("\n【概率总和错误】(应为1.0)") + for name, value in prob_sum_errors: + output.append(f" - {name}: {value:.4f}") + + if prob_range_errors: + output.append("\n【概率值范围错误】(应在0-1之间)") + for name, value in prob_range_errors: + output.append(f" - {name}: {value}") + + if model_errors: + output.append("\n【模型配置错误】") + for error in model_errors: + output.append(f" - {error}") + + if api_errors: + output.append("\n【API提供商错误】") + for error in api_errors: + output.append(f" - {error}") + + if groups_errors: + output.append("\n【群组配置错误】") + for error in groups_errors: + if isinstance(error, dict): + output.append(f" - {error['main']}") + for detail in error['details']: + output.append(f"{detail}") + else: + output.append(f" - {error}") + + if kr_errors: + output.append("\n【关键词反应配置错误】") + for error in kr_errors: + output.append(f" - {error}") + + if willing_errors: + output.append("\n【回复意愿配置错误】") + for error in willing_errors: + output.append(f" - {error}") + + if memory_errors: + output.append("\n【记忆系统配置错误】") + for error in memory_errors: + output.append(f" - {error}") + + if personality_errors: + output.append("\n【人格配置错误】") + for error in personality_errors: + if isinstance(error, dict): + output.append(f" - {error['main']}") + for detail in error['details']: + output.append(f"{detail}") + else: + output.append(f" - {error}") + + if bot_errors: + output.append("\n【机器人基础配置错误】") + for error in bot_errors: + output.append(f" - {error}") + + # 添加机器人信息,即使有错误 + if bot_infos: + output.append("\n【机器人信息】") + for info in bot_infos: + output.append(f" - {info}") + + return "\n".join(output) + +def main(): + # 获取配置文件路径 + config_path = Path("config/bot_config.toml") + env_path = Path(".env.prod") + + if not config_path.exists(): + print(f"错误: 找不到配置文件 {config_path}") + return + + if not env_path.exists(): + print(f"警告: 找不到环境变量文件 {env_path}, 将跳过API提供商检查") + env_vars = {} + else: + env_vars = load_env_file(env_path) + + # 加载配置文件 + config = load_toml_file(config_path) + + # 运行各种检查 + sections_errors = check_required_sections(config) + prob_sum_errors = check_probability_sum(config) + prob_range_errors = check_probability_range(config) + model_errors = check_model_configurations(config, env_vars) + api_errors = check_api_providers(config, env_vars) + groups_errors = check_groups_configuration(config) + kr_errors = check_keywords_reaction(config) + willing_errors = check_willing_mode(config) + memory_errors = check_memory_config(config) + personality_errors = check_personality_config(config) + bot_results = check_bot_config(config) + + # 格式化并打印结果 + all_errors = (sections_errors, prob_sum_errors, prob_range_errors, model_errors, api_errors, groups_errors, kr_errors, willing_errors, memory_errors, personality_errors, bot_results) + result = format_results(all_errors) + print("📋 机器人配置检查结果:") + print(result) + + # 综合评估 + total_errors = 0 + + # 解包bot_results + bot_errors, _ = bot_results + + # 计算普通错误列表的长度 + for errors in [sections_errors, model_errors, api_errors, groups_errors, kr_errors, willing_errors, memory_errors, bot_errors]: + total_errors += len(errors) + + # 计算元组列表的长度(概率相关错误) + total_errors += len(prob_sum_errors) + total_errors += len(prob_range_errors) + + # 特殊处理personality_errors和groups_errors + for errors_list in [personality_errors, groups_errors]: + for error in errors_list: + if isinstance(error, dict): + # 每个字典表示一个错误,而不是每行都算一个 + total_errors += 1 + else: + total_errors += 1 + + if total_errors > 0: + print(f"\n总计发现 {total_errors} 个配置问题。") + print("\n建议:") + print("1. 修复所有错误后再运行机器人") + print("2. 特别注意拼写错误,例如不!要!写!错!别!字!!!!!") + print("3. 确保所有API提供商名称与环境变量中一致") + print("4. 检查概率值设置,确保总和为1") + else: + print("\n您的配置文件完全正确!机器人可以正常运行。") + +if __name__ == "__main__": + main() + input("\n按任意键退出...") \ No newline at end of file