pull/1001/head
SnowindMe 2025-05-05 17:26:04 +08:00
commit 936a5a318f
31 changed files with 5753 additions and 239 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@ mongodb/
NapCat.Framework.Windows.Once/ NapCat.Framework.Windows.Once/
log/ log/
logs/ logs/
out/
tool_call_benchmark.py tool_call_benchmark.py
run_maibot_core.bat run_maibot_core.bat
run_napcat_adapter.bat run_napcat_adapter.bat

5
@flet_new_.mdc 100644
View File

@ -0,0 +1,5 @@
---
description:
globs:
alwaysApply: false
---

31
bot.py
View File

@ -226,6 +226,7 @@ def raw_main():
if __name__ == "__main__": if __name__ == "__main__":
exit_code = 0 # 用于记录程序最终的退出状态
try: try:
# 获取MainSystem实例 # 获取MainSystem实例
main_system = raw_main() main_system = raw_main()
@ -241,13 +242,29 @@ if __name__ == "__main__":
except KeyboardInterrupt: except KeyboardInterrupt:
# loop.run_until_complete(global_api.stop()) # loop.run_until_complete(global_api.stop())
logger.warning("收到中断信号,正在优雅关闭...") logger.warning("收到中断信号,正在优雅关闭...")
loop.run_until_complete(graceful_shutdown()) if loop and not loop.is_closed():
finally: try:
loop.close() loop.run_until_complete(graceful_shutdown())
except Exception as ge: # 捕捉优雅关闭时可能发生的错误
logger.error(f"优雅关闭时发生错误: {ge}")
# except Exception as e: # 将主异常捕获移到外层 try...except
# logger.error(f"事件循环内发生错误: {str(e)} {str(traceback.format_exc())}")
# exit_code = 1
# finally: # finally 块移到最外层,确保 loop 关闭和暂停总是执行
# if loop and not loop.is_closed():
# loop.close()
# # 在这里添加 input() 来暂停
# input("按 Enter 键退出...") # <--- 添加这行
# sys.exit(exit_code) # <--- 使用记录的退出码
except Exception as e: except Exception as e:
logger.error(f"主程序异常: {str(e)} {str(traceback.format_exc())}") logger.error(f"主程序发生异常: {str(e)} {str(traceback.format_exc())}")
if loop and not loop.is_closed(): exit_code = 1 # 标记发生错误
loop.run_until_complete(graceful_shutdown()) finally:
# 确保 loop 在任何情况下都尝试关闭(如果存在且未关闭)
if "loop" in locals() and loop and not loop.is_closed():
loop.close() loop.close()
sys.exit(1) logger.info("事件循环已关闭")
# 在程序退出前暂停,让你有机会看到输出
input("按 Enter 键退出...") # <--- 添加这行
sys.exit(exit_code) # <--- 使用记录的退出码

261
launcher.py 100644
View File

@ -0,0 +1,261 @@
import flet as ft
import os
import atexit
import psutil # Keep for initial PID checks maybe, though state should handle it
# --- Import refactored modules --- #
from src.MaiGoi.state import AppState
from src.MaiGoi.process_manager import (
start_bot_and_show_console,
output_processor_loop, # Needed for restarting on navigate
cleanup_on_exit,
handle_disconnect,
)
from src.MaiGoi.ui_views import (
create_main_view,
create_console_view,
create_adapters_view,
create_process_output_view,
)
from src.MaiGoi.config_manager import load_config
# --- Import the new settings view --- #
from src.MaiGoi.ui_settings_view import create_settings_view
# --- Global AppState instance --- #
# This holds all the state previously scattered as globals
app_state = AppState()
# --- File Picker Result Handler Placeholder ---
# We need a placeholder function or logic to handle the result here if needed
# For now, the result will be handled within the adapters view itself.
# def handle_file_picker_result(e: ft.FilePickerResultEvent):
# print("File picker result in launcher (should be handled in view):", e.files)
# --- atexit Cleanup Registration --- #
# Register the cleanup function from the process manager module
# It needs access to the app_state
atexit.register(cleanup_on_exit, app_state)
print("[Main Script] atexit cleanup handler from process_manager registered.", flush=True)
# --- Routing Logic --- #
def route_change(route: ft.RouteChangeEvent):
"""Handles Flet route changes, creating and appending views."""
page = route.page
target_route = route.route
# Clear existing views before adding new ones
page.views.clear()
# Always add the main view
main_view = create_main_view(page, app_state)
page.views.append(main_view)
# --- Handle Specific Routes --- #
if target_route == "/console":
console_view = create_console_view(page, app_state)
page.views.append(console_view)
# Check process status and potentially restart processor loop if needed
is_running = app_state.bot_pid is not None and psutil.pid_exists(app_state.bot_pid)
print(
f"[Route Change /console] Checking status: PID={app_state.bot_pid}, is_running={is_running}, stop_event={app_state.stop_event.is_set()}",
flush=True,
)
if is_running:
print("[Route Change /console] Process is running.", flush=True)
# If the processor loop was stopped (e.g., by navigating away or stop button),
# but the process is still running, restart the loop.
if app_state.stop_event.is_set():
print("[Route Change /console] Stop event was set, clearing and restarting processor loop.", flush=True)
app_state.stop_event.clear()
# Make sure output_list_view is available before starting loop
if not app_state.output_list_view:
print("[Route Change /console] Warning: output_list_view is None when restarting loop. Creating.")
app_state.output_list_view = ft.ListView(
expand=True, spacing=2, auto_scroll=app_state.is_auto_scroll_enabled, padding=5
)
console_view.controls[1].controls[0].content = app_state.output_list_view # Update content in view
page.run_task(output_processor_loop, page, app_state)
else:
print("[Route Change /console] Process is not running.", flush=True)
# Ensure console view shows the 'not running' state if needed
if app_state.output_list_view:
# Check if already has the message? Might add duplicates.
# Simple approach: just add it if the list is empty or last msg isn't it.
add_not_running_msg = True
if app_state.output_list_view.controls:
last_control = app_state.output_list_view.controls[-1]
# Check if it's Text and value is not None before checking content
if (
isinstance(last_control, ft.Text)
and last_control.value is not None
and "Bot 进程未运行" in last_control.value
):
add_not_running_msg = False
if add_not_running_msg:
app_state.output_list_view.controls.append(ft.Text("--- Bot 进程未运行 ---", italic=True))
else:
# If list view doesn't exist here, create it and add the message
print("[Route Change /console] Creating ListView to show 'not running' message.")
app_state.output_list_view = ft.ListView(
expand=True, spacing=2, auto_scroll=app_state.is_auto_scroll_enabled, padding=5
)
app_state.output_list_view.controls.append(ft.Text("--- Bot 进程未运行 ---", italic=True))
# Update the console view container's content
console_view.controls[1].controls[0].content = app_state.output_list_view
elif target_route == "/adapters":
adapters_view = create_adapters_view(page, app_state)
page.views.append(adapters_view)
elif target_route == "/settings":
# Call the new settings view function
settings_view = create_settings_view(page, app_state)
page.views.append(settings_view)
# --- Handle Dynamic Adapter Output Route --- #
# Check if the route matches the pattern /adapters/<something>
elif target_route.startswith("/adapters/") and len(target_route.split("/")) == 3:
parts = target_route.split("/")
process_id = parts[2] # Extract the process ID (which is the script path for now)
print(f"[Route Change] Detected adapter output route for ID: {process_id}")
adapter_output_view = create_process_output_view(page, app_state, process_id)
if adapter_output_view:
page.views.append(adapter_output_view)
else:
# If view creation failed (e.g., process state not found), show error and stay on previous view?
# Or redirect back to /adapters? Let's go back to adapters list.
print(f"[Route Change] Failed to create output view for {process_id}. Redirecting to /adapters.")
# Avoid infinite loop if /adapters also fails
if len(page.views) > 1: # Ensure we don't pop the main view
page.views.pop() # Pop the failed view attempt
# Find the adapters view if it exists, otherwise just update
adapters_view_index = -1
for i, view in enumerate(page.views):
if view.route == "/adapters":
adapters_view_index = i
break
if adapters_view_index == -1: # Adapters view wasn't in stack? Add it.
adapters_view = create_adapters_view(page, app_state)
page.views.append(adapters_view)
# Go back to the adapters list route to rebuild the view stack correctly
page.go("/adapters")
return # Prevent page.update() below
# Update the page to show the correct view(s)
page.update()
def view_pop(e: ft.ViewPopEvent):
"""Handles view popping (e.g., back navigation)."""
page = e.page
# Remove the top view
page.views.pop()
if page.views:
top_view = page.views[-1]
# Go to the route of the view now at the top of the stack
# This will trigger route_change again to rebuild the view stack correctly
page.go(top_view.route)
# else: print("Warning: Popped the last view.")
# --- Main Application Setup --- #
def main(page: ft.Page):
# Load initial config and store in state
# 启动时清除/logs/interest/interest_history.log
if os.path.exists("logs/interest/interest_history.log"):
os.remove("logs/interest/interest_history.log")
loaded_config = load_config()
app_state.gui_config = loaded_config
app_state.adapter_paths = loaded_config.get("adapters", []).copy() # Get adapter paths
print(f"[Main] Initial adapters loaded: {app_state.adapter_paths}")
# Set script_dir in AppState early
app_state.script_dir = os.path.dirname(os.path.abspath(__file__))
print(f"[Main] Script directory set in state: {app_state.script_dir}", flush=True)
# --- Setup File Picker --- #
# Create the FilePicker instance
# The on_result handler will be set dynamically in the view that uses it
app_state.file_picker = ft.FilePicker()
# Add the FilePicker to the page's overlay controls
page.overlay.append(app_state.file_picker)
print("[Main] FilePicker created and added to page overlay.")
page.title = "MaiBot 启动器"
page.window.width = 1400
page.window.height = 1000 # Increased height slightly for monitor
page.vertical_alignment = ft.MainAxisAlignment.START
page.horizontal_alignment = ft.CrossAxisAlignment.CENTER
# --- Apply Theme from Config --- #
saved_theme = app_state.gui_config.get("theme", "System").upper()
try:
page.theme_mode = ft.ThemeMode[saved_theme]
print(f"[Main] Applied theme from config: {page.theme_mode}")
except KeyError:
print(f"[Main] Warning: Invalid theme '{saved_theme}' in config. Falling back to System.")
page.theme_mode = ft.ThemeMode.SYSTEM
page.padding = 0 # <-- 将页面 padding 设置为 0
# --- Create the main 'Start Bot' button and store in state --- #
# This button needs to exist before the first route_change call
app_state.start_bot_button = ft.FilledButton(
"启动 MaiBot 主程序 (bot.py)",
icon=ft.icons.SMART_TOY_OUTLINED,
# The click handler now calls the function from process_manager
on_click=lambda _: start_bot_and_show_console(page, app_state),
expand=True,
tooltip="启动主程序并在新视图中显示控制台输出",
)
print("[Main] Start Bot Button created and stored in state.", flush=True)
# --- Routing Setup --- #
page.on_route_change = route_change
page.on_view_pop = view_pop
# --- Disconnect Handler --- #
# Pass app_state to the disconnect handler
page.on_disconnect = lambda e: handle_disconnect(page, app_state, e)
print("[Main] Registered page.on_disconnect handler.", flush=True)
# Prevent immediate close to allow cleanup
page.window_prevent_close = True
# --- Hide Native Title Bar --- #
# page.window_title_bar_hidden = True
# page.window.frameless = True
# --- Initial Navigation --- #
# Trigger the initial route change to build the first view
page.go(page.route if page.route else "/")
# --- Run Flet App --- #
if __name__ == "__main__":
# No need to initialize globals here anymore, AppState handles it.
ft.app(target=main)
# This print will appear *after* the Flet window closes,
# but *before* the atexit handler runs.
print("[Main Script] Flet app exited. atexit handler should run next.", flush=True)
# --- Removed Code Sections (Previously Globals and Functions) ---
# (Keep this comment block or similar for reference if desired)
# Removed: bot_process, bot_pid, output_queue, stop_event, interest_monitor_control,
# output_list_view, start_bot_button (now in AppState),
# is_auto_scroll_enabled (now in AppState)
# Removed: ansi_converter
# Removed: cleanup_on_exit (moved to process_manager)
# Removed: update_page_safe (moved to utils)
# Removed: show_snackbar (moved to utils)
# Removed: run_script (moved to utils)
# Removed: handle_disconnect (moved to process_manager)
# Removed: stop_bot_process (moved to process_manager)
# Removed: read_process_output (moved to process_manager)
# Removed: output_processor_loop (moved to process_manager)
# Removed: start_bot_and_show_console (moved to process_manager)
# Removed: create_console_view (moved to ui_views)
# (Main view creation logic also moved to ui_views within create_main_view)

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 215 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 284 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 491 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 MiB

View File

@ -0,0 +1,229 @@
"""
Parses log lines containing ANSI escape codes or Loguru-style color tags
into a list of Flet TextSpan objects for colored output.
"""
import re
import flet as ft
# Basic ANSI SGR (Select Graphic Rendition) codes mapping
# See: https://en.wikipedia.org/wiki/ANSI_escape_code#SGR_(Select_Graphic_Rendition)_parameters
# Focusing on common foreground colors and styles used by Loguru
ANSI_CODES = {
# Styles
"1": ft.FontWeight.BOLD,
"3": ft.TextStyle(italic=True), # Italic
"4": ft.TextStyle(decoration=ft.TextDecoration.UNDERLINE), # Underline
"22": ft.FontWeight.NORMAL, # Reset bold
"23": ft.TextStyle(italic=False), # Reset italic
"24": ft.TextStyle(decoration=ft.TextDecoration.NONE), # Reset underline
# Foreground Colors (30-37)
"30": ft.colors.BLACK,
"31": ft.colors.RED,
"32": ft.colors.GREEN,
"33": ft.colors.YELLOW,
"34": ft.colors.BLUE,
"35": ft.colors.PINK,
"36": ft.colors.CYAN,
"37": ft.colors.WHITE,
"39": None, # Default foreground color
# Bright Foreground Colors (90-97)
"90": ft.colors.with_opacity(0.7, ft.colors.BLACK), # Often rendered as gray
"91": ft.colors.RED_ACCENT, # Or RED_400 / LIGHT_RED
"92": ft.colors.LIGHT_GREEN, # Or GREEN_ACCENT
"93": ft.colors.YELLOW_ACCENT, # Or LIGHT_YELLOW
"94": ft.colors.LIGHT_BLUE, # Or BLUE_ACCENT
"95": ft.colors.PINK, # ANSI bright magenta maps well to Flet's PINK
"96": ft.colors.CYAN_ACCENT,
"97": ft.colors.WHITE70, # Brighter white
}
# Loguru simple tags mapping (add more as needed from your logger.py)
# Using lowercase for matching
LOGURU_TAGS = {
"red": ft.colors.RED,
"green": ft.colors.GREEN,
"yellow": ft.colors.YELLOW,
"blue": ft.colors.BLUE,
"magenta": ft.colors.PINK,
"cyan": ft.colors.CYAN,
"white": ft.colors.WHITE,
"light-yellow": ft.colors.YELLOW_ACCENT, # Or specific yellow shade
"light-green": ft.colors.LIGHT_GREEN,
"light-magenta": ft.colors.PINK, # Or specific magenta shade
"light-cyan": ft.colors.CYAN_ACCENT, # Or specific cyan shade
"light-blue": ft.colors.LIGHT_BLUE,
"fg #ffd700": "#FFD700", # Handle specific hex colors like emoji
"fg #3399ff": "#3399FF", # Handle specific hex colors like emoji
"fg #66ccff": "#66CCFF",
"fg #005ba2": "#005BA2",
"fg #7cffe6": "#7CFFE6", # 海马体
"fg #37ffb4": "#37FFB4", # LPMM
"fg #00788a": "#00788A", # 远程
"fg #3fc1c9": "#3FC1C9", # Tools
# Add other colors used in your logger.py simple formats
}
# Regex to find ANSI codes (basic SGR, true-color fg) OR Loguru tags
# Added specific capture for 38;2;r;g;b
ANSI_COLOR_REGEX = re.compile(
r"(\x1b\[(?:(?:(?:3[0-7]|9[0-7]|1|3|4|22|23|24);?)+|39|0)m)" # Group 1: Basic SGR codes (like 31, 1;32, 0, 39)
r"|"
r"(\x1b\[38;2;(\d{1,3});(\d{1,3});(\d{1,3})m)" # Group 2: Truecolor FG ( captures full code, Grp 3: R, Grp 4: G, Grp 5: B )
# r"|(\x1b\[48;2;...m)" # Placeholder for Truecolor BG if needed later
r"|"
r"(<(/?)([^>]+)?>)" # Group 6: Loguru tags ( Grp 7: slash, Grp 8: content )
)
def parse_log_line_to_spans(line: str) -> list[ft.TextSpan]:
"""
Parses a log line potentially containing ANSI codes OR Loguru tags
into a list of Flet TextSpan objects.
Uses a style stack for basic nesting.
"""
spans = []
current_pos = 0
# Stack holds TextStyle objects. Base style is default.
style_stack = [ft.TextStyle()]
for match in ANSI_COLOR_REGEX.finditer(line):
start, end = match.span()
basic_ansi_code = match.group(1)
truecolor_ansi_code = match.group(2)
tc_r, tc_g, tc_b = match.group(3), match.group(4), match.group(5)
loguru_full_tag = match.group(6)
loguru_closing_slash = match.group(7)
loguru_tag_content = match.group(8)
current_style = style_stack[-1]
if start > current_pos:
spans.append(ft.TextSpan(line[current_pos:start], current_style))
if basic_ansi_code:
# --- Handle Basic ANSI ---
params = basic_ansi_code[2:-1]
if not params or params == "0": # Reset code
style_stack = [ft.TextStyle()] # Reset stack
else:
temp_style_dict = {
k: getattr(current_style, k, None) for k in ["color", "weight", "italic", "decoration"]
}
codes = params.split(";")
for code in filter(None, codes):
style_attr = ANSI_CODES.get(code)
if isinstance(style_attr, str):
temp_style_dict["color"] = style_attr
elif isinstance(style_attr, ft.FontWeight):
temp_style_dict["weight"] = None if code == "22" else style_attr
elif isinstance(style_attr, ft.TextStyle):
if style_attr.italic is not None:
temp_style_dict["italic"] = False if code == "23" else style_attr.italic
if style_attr.decoration is not None:
temp_style_dict["decoration"] = (
ft.TextDecoration.NONE if code == "24" else style_attr.decoration
)
elif style_attr is None and code == "39":
temp_style_dict["color"] = None
style_stack[-1] = ft.TextStyle(**{k: v for k, v in temp_style_dict.items() if v is not None})
elif truecolor_ansi_code:
# --- Handle Truecolor ANSI ---
try:
r, g, b = int(tc_r), int(tc_g), int(tc_b)
hex_color = f"#{r:02x}{g:02x}{b:02x}"
# print(f"--- TrueColor Debug: Parsed RGB ({r},{g},{b}) -> {hex_color} ---")
# Update color in the current style on stack top
temp_style_dict = {
k: getattr(current_style, k, None) for k in ["color", "weight", "italic", "decoration"]
}
temp_style_dict["color"] = hex_color
style_stack[-1] = ft.TextStyle(**{k: v for k, v in temp_style_dict.items() if v is not None})
except (ValueError, TypeError) as e:
print(f"Error parsing truecolor ANSI: {e}, Code: {truecolor_ansi_code}")
# Keep current style if parsing fails
elif loguru_full_tag:
if loguru_closing_slash:
if len(style_stack) > 1:
style_stack.pop()
# print(f"--- Loguru Debug: Closing Tag processed. Stack size: {len(style_stack)} ---")
elif loguru_tag_content: # Opening tag
tag_lower = loguru_tag_content.lower()
style_attr = LOGURU_TAGS.get(tag_lower)
# print(f"--- Loguru Debug: Opening Tag --- ")
# print(f" Raw Content : {repr(loguru_tag_content)}")
# print(f" Lowercase Key: {repr(tag_lower)}")
# print(f" Found Attr : {repr(style_attr)} --- ")
temp_style_dict = {
k: getattr(current_style, k, None) for k in ["color", "weight", "italic", "decoration"]
}
if style_attr:
if isinstance(style_attr, str):
temp_style_dict["color"] = style_attr
# print(f" Applied Color: {style_attr}")
# ... (handle other style types if needed)
# Push the new style only if the tag was recognized and resulted in a change
# (or check if style_attr is not None)
new_style = ft.TextStyle(**{k: v for k, v in temp_style_dict.items() if v is not None})
# Avoid pushing identical style
if new_style != current_style:
style_stack.append(new_style)
# print(f" Pushed Style. Stack size: {len(style_stack)}")
# else:
# print(f" Style unchanged, stack not pushed.")
# else:
# print(f" Tag NOT FOUND in LOGURU_TAGS.")
# else: Invalid tag format?
current_pos = end
# Add any remaining text after the last match
final_style = style_stack[-1]
if current_pos < len(line):
spans.append(ft.TextSpan(line[current_pos:], final_style))
return [span for span in spans if span.text]
if __name__ == "__main__":
# Example Usage & Testing
test_lines = [
"This is normal text.",
"\\x1b[31mThis is red text.\\x1b[0m And back to normal.",
"\\x1b[1;32mThis is bold green.\\x1b[0m",
"Text with <red>red tag</red> inside.",
"Nested <yellow>yellow <bold>bold</bold> yellow</yellow>.", # Bold tag not handled yet
"<light-green>Light green message</light-green>",
"<fg #FFD700>Emoji color</fg #FFD700>",
"\\x1b[94mBright Blue ANSI\\x1b[0m",
"\\x1b[3mItalic ANSI\\x1b[0m",
# Example from user image (simplified)
"\\x1b[37m2025-05-03 23:00:44\\x1b[0m | \\x1b[1mINFO\\x1b[0m | \\x1b[96m配置\\x1b[0m | \\x1b[1m成功加载配置文件: ...\\x1b[0m",
"\\x1b[1mDEBUG\\x1b[0m | \\x1b[94m人物信息\\x1b[0m | \\x1b[1m已加载 81 个用户名\\x1b[0m",
"<level>TIME</level> | <light-green>模块</light-green> | <light-green>消息</light-green>", # Loguru format string itself
]
# Simple print test (won't show colors in standard terminal)
for t_line in test_lines:
print(f"--- Input: {repr(t_line)} ---")
parsed_spans = parse_log_line_to_spans(t_line)
print("Parsed Spans:")
for s in parsed_spans:
print(
f" Text: {repr(s.text)}, Style: color={s.style.color}, weight={s.style.weight}, italic={s.style.italic}, decoration={s.style.decoration}"
)
print("-" * 20)
# To visually test with Flet, you'd run this in a simple Flet app:
# import flet as ft
# def main(page: ft.Page):
# page.add(ft.Column([
# ft.Text(spans=parse_log_line_to_spans(line)) for line in test_lines
# ]))
# ft.app(target=main)

View File

@ -0,0 +1,100 @@
import toml
# Use tomlkit for dumping to preserve comments/formatting if needed,
# but stick to `toml` for loading unless specific features are required.
import tomlkit
from pathlib import Path
from typing import Dict, Any, Optional
CONFIG_DIR = Path("config")
# Define default filenames for different config types
CONFIG_FILES = {"gui": "gui_config.toml", "lpmm": "lpmm_config.toml", "bot": "bot_config.toml"}
DEFAULT_GUI_CONFIG = {"adapters": [], "theme": "System"} # Add default theme
def get_config_path(config_type: str = "gui") -> Optional[Path]:
"""Gets the full path to the specified config file type."""
filename = CONFIG_FILES.get(config_type)
if not filename:
print(f"[Config] Error: Unknown config type '{config_type}'")
return None
# Determine the base directory relative to this file
# Assumes config_manager.py is in src/MaiGoi/
try:
script_dir = Path(__file__).parent.parent.parent # Project Root (MaiBot-Core/)
config_path = script_dir / CONFIG_DIR / filename
return config_path
except Exception as e:
print(f"[Config] Error determining config path for type '{config_type}': {e}")
return None
def load_config(config_type: str = "gui") -> Dict[str, Any]:
"""Loads the configuration from the specified TOML file type."""
config_path = get_config_path(config_type)
if not config_path:
return {} # Return empty dict if path is invalid
print(f"[Config] Loading {config_type} config from: {config_path}")
default_config_to_use = DEFAULT_GUI_CONFIG if config_type == "gui" else {}
try:
config_path.parent.mkdir(parents=True, exist_ok=True) # Ensure directory exists
if config_path.is_file():
with open(config_path, "r", encoding="utf-8") as f:
# Use standard toml for loading, it's generally more robust
config_data = toml.load(f)
print(f"[Config] {config_type} config loaded successfully.")
# Basic check for GUI config default keys
if config_type == "gui":
if "adapters" not in config_data:
config_data["adapters"] = DEFAULT_GUI_CONFIG["adapters"]
if "theme" not in config_data:
config_data["theme"] = DEFAULT_GUI_CONFIG["theme"]
return config_data
else:
print(f"[Config] {config_type} config file not found, using default.")
# Save default config only if it's the GUI config
if config_type == "gui":
save_config(default_config_to_use, config_type=config_type)
return default_config_to_use.copy() # Return a copy
except FileNotFoundError:
print(f"[Config] {config_type} config file not found (FileNotFoundError), using default.")
if config_type == "gui":
save_config(default_config_to_use, config_type=config_type) # Attempt to save default
return default_config_to_use.copy()
except toml.TomlDecodeError as e:
print(f"[Config] Error decoding {config_type} TOML file: {e}. Using default.")
return default_config_to_use.copy()
except Exception as e:
print(f"[Config] An unexpected error occurred loading {config_type} config: {e}.")
import traceback
traceback.print_exc()
return default_config_to_use.copy()
def save_config(config_data: Dict[str, Any], config_type: str = "gui") -> bool:
"""Saves the configuration dictionary to the specified TOML file type."""
config_path = get_config_path(config_type)
if not config_path:
return False # Cannot save if path is invalid
print(f"[Config] Saving {config_type} config to: {config_path}")
try:
config_path.parent.mkdir(parents=True, exist_ok=True) # Ensure directory exists
with open(config_path, "w", encoding="utf-8") as f:
# Use tomlkit.dump if preserving format/comments is important
# Otherwise, stick to toml.dump for simplicity
tomlkit.dump(config_data, f) # Using tomlkit here
print(f"[Config] {config_type} config saved successfully.")
return True
except IOError as e:
print(f"[Config] Error writing {config_type} config file (IOError): {e}")
except Exception as e:
print(f"[Config] An unexpected error occurred saving {config_type} config: {e}")
import traceback
traceback.print_exc()
return False

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,159 @@
"""
Flet UI开发的规则和最佳实践
这个文件记录了在使用Flet开发UI界面时发现的重要规则和最佳实践
可以帮助避免常见错误并提高代码质量
"""
# ===== Container相关规则 =====
"""
规则1: Container没有controls属性
Container只有content属性不能直接访问controls必须通过container.content访问内容
错误示例:
container.controls.append(...) # 错误! Container没有controls属性
正确示例:
container.content = ft.Column([]) # 先设置content为一个有controls属性的控件
container.content.controls.append(...) # 然后通过content访问controls
"""
"""
规则2: Card没有padding属性
Card控件不直接支持padding必须用Container包装来添加padding
错误示例:
ft.Card(padding=10, content=...) # 错误! Card没有padding属性
正确示例:
ft.Card(
content=ft.Container(
content=...,
padding=10
)
)
"""
# ===== UI更新规则 =====
"""
规则3: 控件必须先添加到页面才能调用update()
调用控件的update()方法前确保该控件已经添加到页面中否则会报错
错误示例:
new_column = ft.Column([])
new_column.update() # 错误! 控件还未添加到页面
正确示例:
# 区分初始加载和用户交互
def add_item(e=None, is_initial=False):
# 创建新控件...
items_column.controls.append(new_control)
# 只在用户交互时更新UI
if not is_initial and e is not None:
items_column.update()
"""
"""
规则4: 嵌套结构展开/折叠时的更新策略
处理嵌套数据结构(如字典)的展开/折叠时要小心控制update()的调用时机
最佳实践:
1. 在生成UI结构时不要调用update()
2. 在用户交互(如点击展开按钮)后再调用update()
3. 始终从父容器调用update()而不是每个子控件都调用
4. 添加异常处理防止动态生成控件时的错误导致整个UI崩溃
"""
# ===== 数据类型处理规则 =====
"""
规则5: 特殊处理集合类型(set)
Python中的set类型在UI表示时需要特殊处理将其转换为可编辑的表单控件
最佳实践:
1. 为set类型实现专门的UI控件(如_create_set_control)
2. 添加错误处理即使创建控件失败也要提供备选显示方式
3. 小心处理类型转换确保UI中的数据变更能正确应用到set类型
示例:
if isinstance(value, set):
try:
return create_set_control(value)
except Exception:
return ft.Text(f"{value} (不可编辑)", italic=True)
"""
"""
规则6: 动态UI组件的初始化与更新分离
创建动态UI组件时将初始化和更新逻辑分开处理
最佳实践:
1. 初始化时只创建控件不调用update()
2. 使用标志(如is_initial)区分初始加载和用户交互
3. 只在用户交互时调用update()
4. 更新数据模型和更新UI分开处理
示例:
# 添加现有项目使用is_initial=True标记为初始化
for item in values:
add_item(item, is_initial=True)
# 用户添加新项目时不使用is_initial参数
add_button.on_click = lambda e: add_item(new_value)
"""
# ===== 其他实用规则 =====
"""
规则7: 始终使用正确的padding格式
Flet中padding必须使用正确的格式不能直接传入数字
错误示例:
ft.Padding(padding=10, content=...) # 错误
正确示例:
ft.Padding(padding=ft.padding.all(10), content=...)
ft.Container(padding=ft.padding.all(10), content=...)
"""
"""
规则8: 控件引用路径注意层级关系
访问嵌套控件时注意层级关系特别是当使用Container包装其他控件时
错误示例:
# 如果card的内容是Container且Container的内容是Column
button = card.controls[-1] # 错误! Card没有controls属性
正确示例:
# 正确的访问路径
button = card.content.content.controls[-1]
"""
# ===== 自定义控件规则 (Flet v0.21.0+) =====
"""
规则9: 弃用 UserControl直接继承基础控件
Flet v0.21.0 及更高版本已弃用 `ft.UserControl`
创建自定义控件时应直接继承自 Flet 的基础控件 `ft.Column`, `ft.Row`, `ft.Card`, `ft.Text`
修改步骤:
1. 更改类定义: `class MyControl(ft.Column):` 替换 `class MyControl(ft.UserControl):`
2. `build()` 方法中的 UI 构建逻辑移至 `__init__` 方法
3. `__init__` 中调用 `super().__init__(...)` 并传递基础控件所需的参数
4. `__init__` 中直接将子控件添加到 `self.controls`
5. 移除 `build()` 方法
错误示例 (已弃用):
class OldCustom(ft.UserControl):
def build(self):
return ft.Text("Old way")
正确示例 (继承 ft.Column):
class NewCustom(ft.Column):
def __init__(self):
super().__init__(spacing=5)
self.controls.append(ft.Text("New way"))
"""

View File

@ -0,0 +1,700 @@
import flet as ft
import subprocess
import os
import sys
import platform
import threading
import queue
import traceback
import asyncio
import psutil
from typing import Optional, TYPE_CHECKING, Tuple
# Import the color parser and AppState/ManagedProcessState
from .color_parser import parse_log_line_to_spans
if TYPE_CHECKING:
from .state import AppState
from .utils import show_snackbar, update_page_safe # Add import here
# --- Helper Function to Update Button States (Mostly Unchanged for now) --- #
def update_buttons_state(page: Optional[ft.Page], app_state: "AppState", is_running: bool):
"""Updates the state (text, icon, color, on_click) of the console button."""
console_button = app_state.console_action_button
needs_update = False
# --- Define Button Actions (Point to adapted functions) --- #
# start_action = lambda _: start_bot_and_show_console(page, app_state) if page else None
# stop_action = lambda _: stop_bot_process(page, app_state) if page else None # stop_bot_process now calls stop_managed_process
def _start_action(_):
if page:
start_bot_and_show_console(page, app_state)
def _stop_action(_):
if page:
stop_bot_process(page, app_state)
if console_button:
button_text_control = console_button.content if isinstance(console_button.content, ft.Text) else None
if button_text_control:
if is_running:
new_text = "停止 MaiCore"
new_color = ft.colors.with_opacity(0.6, ft.colors.RED_ACCENT_100)
new_onclick = _stop_action # Use def
if (
button_text_control.value != new_text
or console_button.bgcolor != new_color
or console_button.on_click != new_onclick
):
button_text_control.value = new_text
console_button.bgcolor = new_color
console_button.on_click = new_onclick
needs_update = True
else:
new_text = "启动 MaiCore"
new_color = ft.colors.with_opacity(0.6, ft.colors.GREEN_ACCENT_100)
new_onclick = _start_action # Use def
if (
button_text_control.value != new_text
or console_button.bgcolor != new_color
or console_button.on_click != new_onclick
):
button_text_control.value = new_text
console_button.bgcolor = new_color
console_button.on_click = new_onclick
needs_update = True
else:
print("[Update Buttons] Warning: console_action_button content is not Text?")
if needs_update and page:
print(f"[Update Buttons] State changed, triggering page update. is_running={is_running}")
# from .utils import update_page_safe # Moved import to top
page.run_task(update_page_safe, page)
# --- Generic Process Termination Helper ---
def _terminate_process_gracefully(process_id: str, handle: Optional[subprocess.Popen], pid: Optional[int]):
"""Helper to attempt graceful termination, then kill."""
stopped_cleanly = False
if handle and pid:
print(f"[_terminate] Attempting termination using handle for PID: {pid} (ID: {process_id})...", flush=True)
try:
if handle.poll() is None:
handle.terminate()
print(f"[_terminate] Sent terminate() to PID: {pid}. Waiting briefly...", flush=True)
try:
handle.wait(timeout=1.0)
print(f"[_terminate] Process PID: {pid} stopped after terminate().", flush=True)
stopped_cleanly = True
except subprocess.TimeoutExpired:
print(f"[_terminate] Terminate timed out for PID: {pid}. Attempting kill()...", flush=True)
try:
handle.kill()
print(f"[_terminate] Sent kill() to PID: {pid}.", flush=True)
except Exception as kill_err:
print(f"[_terminate] Error during kill() for PID: {pid}: {kill_err}", flush=True)
else:
print("[_terminate] Process poll() was not None before terminate (already stopped?).", flush=True)
stopped_cleanly = True # Already stopped
except Exception as e:
print(f"[_terminate] Error during terminate/wait for PID: {pid}: {e}", flush=True)
elif pid:
print(
f"[_terminate] No process handle, attempting psutil fallback for PID: {pid} (ID: {process_id})...",
flush=True,
)
try:
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
proc.terminate()
try:
proc.wait(timeout=1.0)
stopped_cleanly = True
except psutil.TimeoutExpired:
proc.kill()
print(f"[_terminate] psutil terminated/killed PID {pid}.", flush=True)
else:
print(f"[_terminate] psutil confirms PID {pid} does not exist.", flush=True)
stopped_cleanly = True # Already gone
except Exception as ps_err:
print(f"[_terminate] Error during psutil fallback for PID {pid}: {ps_err}", flush=True)
else:
print(f"[_terminate] Cannot terminate process ID '{process_id}': No handle or PID provided.", flush=True)
stopped_cleanly = True # Nothing to stop
return stopped_cleanly
# --- Process Management Functions (Refactored for Multi-Process) --- #
def cleanup_on_exit(app_state: "AppState"):
"""Registered with atexit to ensure ALL managed processes are killed on script exit."""
print("--- [atexit Cleanup] Running cleanup function ---", flush=True)
# Iterate through a copy of the keys to avoid modification issues
process_ids = list(app_state.managed_processes.keys())
print(f"[atexit Cleanup] Found managed process IDs: {process_ids}", flush=True)
for process_id in process_ids:
process_state = app_state.managed_processes.get(process_id)
if process_state and process_state.pid:
print(f"[atexit Cleanup] Checking PID: {process_state.pid} for ID: {process_id}...", flush=True)
try:
# Use psutil directly as handles might be invalid in atexit
if psutil.pid_exists(process_state.pid):
print(
f"[atexit Cleanup] PID {process_state.pid} exists. Attempting termination/kill...", flush=True
)
proc = psutil.Process(process_state.pid)
proc.terminate()
try:
proc.wait(timeout=0.5)
except psutil.TimeoutExpired:
proc.kill()
print(
f"[atexit Cleanup] psutil terminate/kill signal sent for PID {process_state.pid}.", flush=True
)
else:
print(f"[atexit Cleanup] PID {process_state.pid} does not exist.", flush=True)
except psutil.NoSuchProcess:
print(f"[atexit Cleanup] psutil.NoSuchProcess error checking PID {process_state.pid}.", flush=True)
except Exception as ps_err:
print(f"[atexit Cleanup] Error cleaning up PID {process_state.pid}: {ps_err}", flush=True)
elif process_state:
print(f"[atexit Cleanup] Process ID '{process_id}' has no PID stored.", flush=True)
# else: Process ID might have been removed already
print("--- [atexit Cleanup] Cleanup function finished ---", flush=True)
def handle_disconnect(page: Optional[ft.Page], app_state: "AppState", e):
"""Handles UI disconnect. Sets the stop_event for the main bot.py process FOR NOW."""
# TODO: In a full multi-process model, this might need to signal all running processes or be handled differently.
print(f"--- [Disconnect Event] Triggered! Setting main stop_event. Event data: {e} ---", flush=True)
if not app_state.stop_event.is_set(): # Still uses the old singleton event
app_state.stop_event.set()
print("[Disconnect Event] Main stop_event set. atexit handler will perform final cleanup.", flush=True)
# --- New Generic Stop Function ---
def stop_managed_process(process_id: str, page: Optional[ft.Page], app_state: "AppState"):
"""Stops a specific managed process by its ID."""
print(f"[Stop Managed] Request to stop process ID: '{process_id}'", flush=True)
process_state = app_state.managed_processes.get(process_id)
if not process_state:
print(f"[Stop Managed] Process ID '{process_id}' not found in managed processes.", flush=True)
if page and process_id == "bot.py": # Show snackbar only for the main bot?
# from .utils import show_snackbar; show_snackbar(page, "Bot process not found or already stopped.") # Already imported at top
show_snackbar(page, "Bot process not found or already stopped.")
# If it's the main bot, ensure button state is correct
if process_id == "bot.py":
update_buttons_state(page, app_state, is_running=False)
return
# Signal the specific stop event for this process
if not process_state.stop_event.is_set():
print(f"[Stop Managed] Setting stop_event for ID: '{process_id}'", flush=True)
process_state.stop_event.set()
# Attempt termination
_terminate_process_gracefully(process_id, process_state.process_handle, process_state.pid)
# Update state in AppState dictionary
process_state.status = "stopped"
process_state.process_handle = None # Clear handle
process_state.pid = None # Clear PID
# Optionally remove the entry from the dictionary entirely?
# del app_state.managed_processes[process_id]
print(f"[Stop Managed] Marked process ID '{process_id}' as stopped in AppState.")
# Update UI (specifically for the main bot for now)
if process_id == "bot.py":
# If the process being stopped is the main bot, update the console button
update_buttons_state(page, app_state, is_running=False)
# Also clear the old singleton state for compatibility
app_state.clear_process() # This now also updates the dict entry
# TODO: Add UI update logic for other processes if a management view exists
# --- Adapted Old Stop Function (Calls the new generic one) ---
def stop_bot_process(page: Optional[ft.Page], app_state: "AppState"):
"""(Called by Button) Stops the main bot.py process by calling stop_managed_process."""
stop_managed_process("bot.py", page, app_state)
# --- Parameterized Reader Thread ---
def read_process_output(
app_state: "AppState", # Still pass app_state for global checks? Or remove? Let's keep for now.
process_handle: Optional[subprocess.Popen] = None,
output_queue: Optional[queue.Queue] = None,
stop_event: Optional[threading.Event] = None,
process_id: str = "bot.py", # ID for logging
):
"""
Background thread function to read raw output from a process and put it into a queue.
Defaults to using AppState singletons if specific handles/queues/events aren't provided.
"""
# Use provided arguments or default to AppState singletons
proc_handle = process_handle if process_handle is not None else app_state.bot_process
proc_queue = output_queue if output_queue is not None else app_state.output_queue
proc_stop_event = stop_event if stop_event is not None else app_state.stop_event
if not proc_handle or not proc_handle.stdout:
if not proc_stop_event.is_set():
print(f"[Reader Thread - {process_id}] Error: Process or stdout not available at start.", flush=True)
return
print(f"[Reader Thread - {process_id}] Started.", flush=True)
try:
for line in iter(proc_handle.stdout.readline, ""):
if proc_stop_event.is_set():
print(f"[Reader Thread - {process_id}] Stop event detected, exiting.", flush=True)
break
if line:
proc_queue.put(line.strip())
else:
break # End of stream
except ValueError:
if not proc_stop_event.is_set():
print(f"[Reader Thread - {process_id}] ValueError likely due to closed stdout.", flush=True)
except Exception as e:
if not proc_stop_event.is_set():
print(f"[Reader Thread - {process_id}] Error reading output: {e}", flush=True)
finally:
if not proc_stop_event.is_set():
try:
proc_queue.put(None) # Signal natural end
except Exception as q_err:
print(f"[Reader Thread - {process_id}] Error putting None signal: {q_err}", flush=True)
print(f"[Reader Thread - {process_id}] Finished.", flush=True)
# --- Parameterized Processor Loop ---
async def output_processor_loop(
page: Optional[ft.Page],
app_state: "AppState", # Pass AppState for PID checks and potentially global state access
process_id: str = "bot.py", # ID to identify the process and its state
# Defaults use AppState singletons for backward compatibility with bot.py
output_queue: Optional[queue.Queue] = None,
stop_event: Optional[threading.Event] = None,
target_list_view: Optional[ft.ListView] = None,
):
"""
Processes a specific output queue and updates the UI until stop_event is set.
Defaults to using AppState singletons if specific queue/event/view aren't provided.
"""
print(f"[Processor Loop - {process_id}] Started.", flush=True)
proc_queue = output_queue if output_queue is not None else app_state.output_queue
proc_stop_event = stop_event if stop_event is not None else app_state.stop_event
output_lv = target_list_view if target_list_view is not None else app_state.output_list_view
# from .utils import update_page_safe # Moved to top
while not proc_stop_event.is_set():
lines_to_add = []
process_ended_signal_received = False
try:
while not proc_queue.empty():
raw_line = proc_queue.get_nowait()
if raw_line is None:
process_ended_signal_received = True
print(f"[Processor Loop - {process_id}] Process ended signal received from reader.", flush=True)
lines_to_add.append(ft.Text(f"--- Process '{process_id}' Finished --- ", italic=True))
break
else:
spans = parse_log_line_to_spans(raw_line)
lines_to_add.append(ft.Text(spans=spans, selectable=True, size=12))
except queue.Empty:
pass
if lines_to_add:
if proc_stop_event.is_set():
break
if output_lv:
# 如果在手动观看模式(自动滚动关闭),记录首个元素索引
if process_id == "bot.py" and hasattr(app_state, "manual_viewing") and app_state.manual_viewing:
# 只有在自动滚动关闭时才保存视图位置
if not getattr(output_lv, "auto_scroll", True):
# 记录当前第一个可见元素的索引
first_visible_idx = 0
if hasattr(output_lv, "first_visible") and output_lv.first_visible is not None:
first_visible_idx = output_lv.first_visible
# 添加新行
output_lv.controls.extend(lines_to_add)
# 移除过多的行
removal_count = 0
while len(output_lv.controls) > 1000:
output_lv.controls.pop(0)
removal_count += 1
# 如果移除了行,需要调整首个可见元素的索引
if removal_count > 0 and first_visible_idx > removal_count:
new_idx = max(0, first_visible_idx - removal_count)
# 设置滚动位置到调整后的索引
output_lv.first_visible = new_idx
else:
# 保持当前滚动位置
output_lv.first_visible = first_visible_idx
else:
# 自动滚动开启时,正常添加
output_lv.controls.extend(lines_to_add)
while len(output_lv.controls) > 1000:
output_lv.controls.pop(0) # Limit lines
else:
# 对于非主控制台输出,或没有手动观看模式,正常处理
output_lv.controls.extend(lines_to_add)
while len(output_lv.controls) > 1000:
output_lv.controls.pop(0) # Limit lines
if output_lv.visible and page:
try:
await update_page_safe(page)
except Exception:
pass
# else: print(f"[Processor Loop - {process_id}] Warning: target_list_view is None...")
if process_ended_signal_received:
print(
f"[Processor Loop - {process_id}] Process ended naturally. Setting stop event and cleaning up.",
flush=True,
)
if not proc_stop_event.is_set():
proc_stop_event.set()
# Update the specific process state in the dictionary
proc_state = app_state.managed_processes.get(process_id)
if proc_state:
proc_state.status = "stopped"
proc_state.process_handle = None
proc_state.pid = None
# If it's the main bot, also update the old state and buttons
if process_id == "bot.py":
app_state.clear_process() # Clears old state and marks new as stopped
update_buttons_state(page, app_state, is_running=False)
break
# Check if the specific process died unexpectedly using its PID from managed_processes
current_proc_state = app_state.managed_processes.get(process_id)
current_pid = current_proc_state.pid if current_proc_state else None
if current_pid is not None and not psutil.pid_exists(current_pid) and not proc_stop_event.is_set():
print(
f"[Processor Loop - {process_id}] Process PID {current_pid} ended unexpectedly. Setting stop event.",
flush=True,
)
proc_stop_event.set()
if current_proc_state: # Update state
current_proc_state.status = "stopped"
current_proc_state.process_handle = None
current_proc_state.pid = None
# Add message to its specific output view
if output_lv:
output_lv.controls.append(ft.Text(f"--- Process '{process_id}' Ended Unexpectedly ---", italic=True))
if page and output_lv.visible:
try:
await update_page_safe(page)
except Exception:
pass
# If it's the main bot, update buttons and old state
if process_id == "bot.py":
app_state.clear_process()
update_buttons_state(page, app_state, is_running=False)
break
try:
await asyncio.sleep(0.2)
except asyncio.CancelledError:
print(f"[Processor Loop - {process_id}] Cancelled during sleep.", flush=True)
if not proc_stop_event.is_set():
proc_stop_event.set()
break
print(f"[Processor Loop - {process_id}] Exited.", flush=True)
# --- New Generic Start Function ---
def start_managed_process(
script_path: str,
display_name: str,
page: ft.Page,
app_state: "AppState",
# target_list_view: Optional[ft.ListView] = None # Removed parameter
) -> Tuple[bool, Optional[str]]:
"""
Starts a managed background process, creates its state, and starts reader/processor.
Returns (success: bool, message: Optional[str])
"""
# from .utils import show_snackbar # Dynamic import - Already imported at top
from .state import ManagedProcessState # Dynamic import
process_id = script_path # Use script path as ID for now, ensure uniqueness later if needed
# Prevent duplicate starts if ID already exists and is running
existing_state = app_state.managed_processes.get(process_id)
if (
existing_state
and existing_state.status == "running"
and existing_state.pid
and psutil.pid_exists(existing_state.pid)
):
msg = f"Process '{display_name}' (ID: {process_id}) is already running."
print(f"[Start Managed] {msg}", flush=True)
# show_snackbar(page, msg) # Maybe too noisy?
return False, msg
full_path = os.path.join(app_state.script_dir, script_path)
if not os.path.exists(full_path):
msg = f"Error: Script file not found {script_path}"
print(f"[Start Managed] {msg}", flush=True)
show_snackbar(page, msg, error=True)
return False, msg
print(f"[Start Managed] Preparing to start NEW process: {display_name} ({script_path})", flush=True)
# Create NEW state object for this process with its OWN queue and event
# UNLESS it's bot.py, in which case we still use the old singletons for now
is_main_bot = script_path == "bot.py"
new_queue = app_state.output_queue if is_main_bot else queue.Queue()
new_event = app_state.stop_event if is_main_bot else threading.Event()
new_process_state = ManagedProcessState(
process_id=process_id,
script_path=script_path,
display_name=display_name,
output_queue=new_queue,
stop_event=new_event,
status="starting",
)
# Add to managed processes *before* starting
app_state.managed_processes[process_id] = new_process_state
# --- Create and store ListView if not main bot --- #
output_lv: Optional[ft.ListView] = None
if is_main_bot:
output_lv = app_state.output_list_view # Use the main console view
else:
# Create and store a new ListView for this specific process
output_lv = ft.ListView(expand=True, spacing=2, padding=5, auto_scroll=True) # 始终默认开启自动滚动
new_process_state.output_list_view = output_lv
# Add starting message to the determined ListView
if output_lv:
output_lv.controls.append(ft.Text(f"--- Starting {display_name} --- ", italic=True))
else: # Should not happen if is_main_bot or created above
print(f"[Start Managed - {process_id}] Error: Could not determine target ListView.")
try:
print(f"[Start Managed - {process_id}] Starting subprocess: {full_path}", flush=True)
sub_env = os.environ.copy()
# Set env vars if needed (e.g., for colorization)
sub_env["LOGURU_COLORIZE"] = "True"
sub_env["FORCE_COLOR"] = "1"
sub_env["SIMPLE_OUTPUT"] = "True"
print(
f"[Start Managed - {process_id}] Subprocess environment set: COLORIZE={sub_env.get('LOGURU_COLORIZE')}, FORCE_COLOR={sub_env.get('FORCE_COLOR')}, SIMPLE_OUTPUT={sub_env.get('SIMPLE_OUTPUT')}",
flush=True,
)
# --- 修改启动命令 ---
cmd_list = []
executable_path = "" # 用于日志记录
if getattr(sys, "frozen", False):
# 打包后运行
executable_dir = os.path.dirname(sys.executable)
# 修改逻辑:这次我们直接指定 _internal 目录下的 Python 解释器
# 而不是尝试其他选项
try:
# _internal 目录是 PyInstaller 默认放置 Python 解释器的位置
internal_dir = os.path.join(executable_dir, "_internal")
if os.path.exists(internal_dir):
print(f"[Start Managed - {process_id}] 找到 _internal 目录: {internal_dir}")
# 在 _internal 目录中查找 python.exe
python_exe = None
python_paths = []
# 首先尝试直接查找
direct_python = os.path.join(internal_dir, "python.exe")
if os.path.exists(direct_python):
python_exe = direct_python
python_paths.append(direct_python)
# 如果没找到,进行递归搜索
if not python_exe:
for root, _, files in os.walk(internal_dir):
if "python.exe" in files:
path = os.path.join(root, "python.exe")
python_paths.append(path)
if not python_exe: # 只取第一个找到的
python_exe = path
# 记录所有找到的路径
if python_paths:
print(f"[Start Managed - {process_id}] 在 _internal 中找到的所有 Python.exe: {python_paths}")
if python_exe:
# 找到 Python 解释器,使用它来运行脚本
cmd_list = [python_exe, "-u", full_path]
executable_path = python_exe
print(f"[Start Managed - {process_id}] 使用打包内部的 Python: {executable_path}")
else:
# 如果找不到,只能使用脚本文件直接执行
print(f"[Start Managed - {process_id}] 无法在 _internal 目录中找到 python.exe")
cmd_list = [full_path]
executable_path = full_path
print(f"[Start Managed - {process_id}] 直接执行脚本: {executable_path}")
else:
# _internal 目录不存在,尝试直接执行脚本
print(f"[Start Managed - {process_id}] _internal 目录不存在: {internal_dir}")
cmd_list = [full_path]
executable_path = full_path
print(f"[Start Managed - {process_id}] 直接执行脚本: {executable_path}")
except Exception as path_err:
print(f"[Start Managed - {process_id}] 查找 Python 路径时出错: {path_err}")
# 如果出现异常,尝试直接执行脚本
cmd_list = [full_path]
executable_path = full_path
print(f"[Start Managed - {process_id}] 出错回退:直接执行脚本 {executable_path}")
else:
# 源码运行,使用当前的 Python 解释器
cmd_list = [sys.executable, "-u", full_path]
executable_path = sys.executable
print(f"[Start Managed - {process_id}] 源码模式:使用当前 Python ({executable_path})")
print(f"[Start Managed - {process_id}] 最终命令列表: {cmd_list}")
process = subprocess.Popen(
cmd_list, # 使用构建好的命令列表
cwd=app_state.script_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
creationflags=subprocess.CREATE_NO_WINDOW if platform.system() == "Windows" else 0,
env=sub_env,
)
# Update the state with handle and PID
new_process_state.process_handle = process
new_process_state.pid = process.pid
new_process_state.status = "running"
print(f"[Start Managed - {process_id}] Subprocess started. PID: {process.pid}", flush=True)
# If it's the main bot, also update the old state vars for compatibility
if is_main_bot:
app_state.bot_process = process
app_state.bot_pid = process.pid
update_buttons_state(page, app_state, is_running=True)
# Start the PARAMETERIZED reader thread
output_thread = threading.Thread(
target=read_process_output,
args=(app_state, process, new_queue, new_event, process_id), # Pass specific objects
daemon=True,
)
output_thread.start()
print(f"[Start Managed - {process_id}] Output reader thread started.", flush=True)
# Start the PARAMETERIZED processor loop task
# Pass the determined output_lv (either main console or the new one)
page.run_task(output_processor_loop, page, app_state, process_id, new_queue, new_event, output_lv)
print(f"[Start Managed - {process_id}] Output processor loop scheduled.", flush=True)
return True, f"Process '{display_name}' started successfully."
except Exception as e:
print(f"[Start Managed - {process_id}] Error during startup:", flush=True)
traceback.print_exc()
# Clean up state if startup failed
new_process_state.status = "error"
new_process_state.process_handle = None
new_process_state.pid = None
if process_id in app_state.managed_processes: # Might be redundant check
app_state.managed_processes[process_id].status = "error"
if is_main_bot: # Update UI/state for main bot failure
app_state.clear_process()
update_buttons_state(page, app_state, is_running=False)
error_message = str(e) if str(e) else repr(e)
show_snackbar(page, f"Error running {script_path}: {error_message}", error=True)
return False, f"Error starting process '{display_name}': {error_message}"
# --- Adapted Old Start Function (Calls the new generic one for bot.py) ---
def start_bot_and_show_console(page: ft.Page, app_state: "AppState"):
"""Starts bot.py or navigates to its console view, managing state via AppState."""
script_path_relative = "bot.py"
display_name = "MaiCore"
# from .utils import show_snackbar, update_page_safe # Dynamic imports - Already imported at top
# Check running status using OLD state for now
is_running = app_state.bot_pid is not None and psutil.pid_exists(app_state.bot_pid)
print(
f"[Start Bot Click] Current state: is_running={is_running} (PID={app_state.bot_pid}), stop_event={app_state.stop_event.is_set()}",
flush=True,
)
if is_running:
print("[Start Bot Click] Process is running. Navigating to console.", flush=True)
show_snackbar(page, "Bot process is already running, showing console.")
# Ensure processor loop is running (it uses the singleton stop_event)
if app_state.stop_event.is_set():
print("[Start Bot Click] Stop event was set, clearing and restarting processor loop.", flush=True)
app_state.stop_event.clear()
# Start the processor loop using defaults (targets main console view)
page.run_task(output_processor_loop, page, app_state)
if page.route != "/console":
page.go("/console")
else:
page.run_task(update_page_safe, page)
return
# --- Start the bot process ---
print("[Start Bot Click] Process not running. Starting new process via start_managed_process.", flush=True)
# Clear and setup OLD ListView from state (used by default processor loop)
if app_state.output_list_view:
app_state.output_list_view.controls.clear()
app_state.output_list_view.auto_scroll = app_state.is_auto_scroll_enabled
print("[Start Bot Click] Cleared console history.", flush=True)
else:
app_state.output_list_view = ft.ListView(
expand=True, spacing=2, auto_scroll=app_state.is_auto_scroll_enabled, padding=5
)
print(
f"[Start Bot Click] Created new ListView with auto_scroll={app_state.is_auto_scroll_enabled}.", flush=True
)
# Reset OLD state (clears queue, event) - this also resets the managed state entry
app_state.reset_process_state()
# Call the generic start function, targeting the main console list view
# This will use the OLD singleton queue/event because script_path == "bot.py"
# and start the default (non-parameterized call) reader/processor
# The call below now implicitly passes app_state.output_list_view because is_main_bot=True inside start_managed_process
success, message = start_managed_process(
script_path=script_path_relative,
display_name=display_name,
page=page,
app_state=app_state,
# target_list_view=app_state.output_list_view # Removed parameter
)
if success:
# Navigate to console view
page.go("/console")
# else: Error message already shown by start_managed_process

139
src/MaiGoi/state.py 100644
View File

@ -0,0 +1,139 @@
import flet as ft
import subprocess
import queue
import threading
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
# 从 flet_interest_monitor 导入,如果需要类型提示
from .flet_interest_monitor import InterestMonitorDisplay
@dataclass
class ManagedProcessState:
"""Holds the state for a single managed background process."""
process_id: str # Unique identifier (e.g., script path or UUID)
script_path: str
display_name: str
process_handle: Optional[subprocess.Popen] = None
pid: Optional[int] = None
output_queue: queue.Queue = field(default_factory=queue.Queue)
stop_event: threading.Event = field(default_factory=threading.Event)
status: str = "stopped" # e.g., "running", "stopped", "error"
# Store UI references if needed later, e.g., for dedicated output views
# output_view_controls: Optional[List[ft.Control]] = None
output_list_view: Optional[ft.ListView] = None # Added to hold the specific ListView for this process
class AppState:
"""Holds the shared state of the launcher application."""
def __init__(self):
# Process related state
self.bot_process: Optional[subprocess.Popen] = None
self.bot_pid: Optional[int] = None
self.output_queue: queue.Queue = queue.Queue()
self.stop_event: threading.Event = threading.Event()
# UI related state
self.output_list_view: Optional[ft.ListView] = None
self.start_bot_button: Optional[ft.FilledButton] = None
self.console_action_button: Optional[ft.ElevatedButton] = None
self.is_auto_scroll_enabled: bool = True # 默认启用自动滚动
self.manual_viewing: bool = False # 手动观看模式标识,用于修复自动滚动关闭时的位移问题
self.interest_monitor_control: Optional[InterestMonitorDisplay] = None
# Script directory (useful for paths)
self.script_dir: str = "" # Will be set during initialization in launcher.py
# --- Configuration State --- #
self.gui_config: Dict[str, Any] = {} # Loaded from gui_config.toml
self.adapter_paths: List[str] = [] # Specific list of adapter paths from config
# --- Process Management State (NEW - For multi-process support) --- #
self.managed_processes: Dict[str, ManagedProcessState] = {}
def reset_process_state(self):
"""Resets variables related to the bot process."""
print("[AppState] Resetting process state.", flush=True)
self.bot_process = None
self.bot_pid = None
# Clear the queue? Maybe not, might lose messages if reset mid-operation
# while not self.output_queue.empty():
# try: self.output_queue.get_nowait()
# except queue.Empty: break
self.stop_event.clear() # Ensure stop event is cleared
# --- Reset corresponding NEW state (if exists) ---
process_id = "bot.py"
if process_id in self.managed_processes:
# Ensure the managed state reflects the reset event/queue
# (Since they point to the same objects for now, this is redundant but good practice)
self.managed_processes[process_id].stop_event = self.stop_event
self.managed_processes[process_id].output_queue = self.output_queue
self.managed_processes[process_id].status = "stopped" # Ensure status is reset before start
print(f"[AppState] Reset NEW managed state event/queue pointers and status for ID: '{process_id}'.")
def set_process(self, process: subprocess.Popen, script_path: str = "bot.py", display_name: str = "MaiCore"):
"""
Sets the process handle and PID.
Also updates the new managed_processes dictionary for compatibility.
"""
# --- Update OLD state ---
self.bot_process = process
self.bot_pid = process.pid
# Reset stop event for the new process run
self.stop_event.clear()
# NOTE: We keep the OLD output_queue and stop_event separate for now,
# as the current reader/processor loops use them directly.
# In the future, the reader/processor will use the queue/event
# from the ManagedProcessState object.
# --- Update NEW state ---
process_id = script_path # Use script_path as ID for now
new_process_state = ManagedProcessState(
process_id=process_id,
script_path=script_path,
display_name=display_name,
process_handle=process,
pid=process.pid,
# IMPORTANT: For now, use the *old* queue/event for the bot.py entry
# to keep existing reader/processor working without immediate changes.
# A true multi-process implementation would give each process its own.
output_queue=self.output_queue,
stop_event=self.stop_event,
status="running",
)
self.managed_processes[process_id] = new_process_state
print(
f"[AppState] Set OLD process state (PID: {self.bot_pid}) and added/updated NEW managed state for ID: '{process_id}'"
)
def clear_process(self):
"""
Clears the process handle and PID.
Also updates the status in the new managed_processes dictionary.
"""
old_pid = self.bot_pid
process_id = "bot.py" # Assuming clear is for the main bot process
# --- Clear OLD state ---
self.bot_process = None
self.bot_pid = None
# Don't clear stop_event here, it should be set to signal stopping.
# Don't clear output_queue, might still contain final messages.
# --- Update NEW state ---
if process_id in self.managed_processes:
self.managed_processes[process_id].process_handle = None
self.managed_processes[process_id].pid = None
self.managed_processes[process_id].status = "stopped"
# Keep queue and event references for now
print(
f"[AppState] Cleared OLD process state (was PID: {old_pid}) and marked NEW managed state for ID: '{process_id}' as stopped."
)
else:
print(
f"[AppState] Cleared OLD process state (was PID: {old_pid}). No corresponding NEW state found for ID: '{process_id}'."
)

View File

@ -0,0 +1,916 @@
import flet as ft
import tomlkit
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
def load_template_with_comments(template_filename: str = "bot_config_template.toml"):
"""
加载指定的模板文件保留所有注释
Args:
template_filename: 要加载的模板文件名 (相对于 template/ 目录)
Returns:
包含注释的TOML文档对象如果失败则返回空文档
"""
try:
# 首先尝试从相对路径加载 (相对于项目根目录)
# 假设此脚本位于 src/MaiGoi/
base_path = Path(__file__).parent.parent.parent
template_path = base_path / "template" / template_filename
if template_path.exists():
print(f"找到模板文件: {template_path}")
with open(template_path, "r", encoding="utf-8") as f:
return tomlkit.parse(f.read())
else:
print(f"警告: 模板文件不存在: {template_path}")
return tomlkit.document()
except Exception as e:
print(f"加载模板文件 '{template_filename}' 出错: {e}")
return tomlkit.document()
def get_comment_for_key(template_doc, key_path: str) -> str:
"""
获取指定键路径的注释 (修正版)
Args:
template_doc: 包含注释的TOML文档
key_path: 点分隔的键路径例如 "bot.qq"
Returns:
该键对应的注释字符串如果没有则返回空字符串
"""
if not template_doc:
return ""
try:
parts = key_path.split(".")
current_item = template_doc
# 逐级导航到目标项或其父表
for i, part in enumerate(parts):
if part not in current_item:
print(f"警告: 路径部分 '{part}'{'.'.join(parts[:i])} 中未找到")
return "" # 路径不存在
# 如果是最后一个部分,我们找到了目标项
if i == len(parts) - 1:
target_item = current_item[part]
# --- 尝试从 trivia 获取注释 ---
if hasattr(target_item, "trivia") and hasattr(target_item.trivia, "comment"):
comment_lines = target_item.trivia.comment.split("\n")
# 去除每行的 '#' 和首尾空格
cleaned_comment = "\n".join([line.strip().lstrip("#").strip() for line in comment_lines])
if cleaned_comment:
return cleaned_comment
# --- 如果是顶级表,也检查容器自身的 trivia ---
# (tomlkit 对于顶级表的注释存储方式可能略有不同)
if isinstance(target_item, (tomlkit.items.Table, tomlkit.container.Container)) and len(parts) == 1:
if hasattr(target_item, "trivia") and hasattr(target_item.trivia, "comment"):
comment_lines = target_item.trivia.comment.split("\n")
cleaned_comment = "\n".join([line.strip().lstrip("#").strip() for line in comment_lines])
if cleaned_comment:
return cleaned_comment
# 如果 trivia 中没有,尝试一些旧版或不常用的属性 (风险较高)
# if hasattr(target_item, '_comment'): # 不推荐
# return str(target_item._comment).strip(" #")
# 如果以上都找不到,返回空
return ""
# 继续导航到下一级
current_item = current_item[part]
# 如果中间路径不是表/字典,则无法继续
if not isinstance(current_item, (dict, tomlkit.items.Table, tomlkit.container.Container)):
print(f"警告: 路径部分 '{part}' 指向的不是表结构,无法继续导航")
return ""
return "" # 理论上不应执行到这里,除非 key_path 为空
except Exception as e:
# 打印更详细的错误信息,包括路径和异常类型
print(f"获取注释时发生意外错误 (路径: {key_path}): {type(e).__name__} - {e}")
# print(traceback.format_exc()) # 可选:打印完整堆栈跟踪
return ""
class TomlFormGenerator:
"""用于将TOML配置生成Flet表单控件的类。"""
def __init__(
self,
page: ft.Page,
config_data: Dict[str, Any],
parent_container: ft.Column,
template_filename: str = "bot_config_template.toml",
):
"""
初始化表单生成器
Args:
page: Flet Page 对象 (用于强制刷新)
config_data: TOML配置数据嵌套字典
parent_container: 要添加控件的父容器
template_filename: 要使用的模板文件名 (相对于 template/ 目录)
"""
self.page = page # <-- 保存 Page 对象
self.config_data = config_data # 保存对原始数据的引用(重要!)
self.parent_container = parent_container
self.controls_map = {} # 映射 full_path 到 Flet 控件
self.expanded_sections = set() # 记录展开的部分
# 加载指定的模板文档
self.template_doc = load_template_with_comments(template_filename)
if not self.template_doc.value:
print(f"警告:加载的模板 '{template_filename}' 为空,注释功能将不可用。")
def build_form(self):
"""构建整个表单。"""
self.parent_container.controls.clear()
self.controls_map.clear() # 清空控件映射
# 使用 self.config_data 构建表单
self._process_toml_section(self.config_data, self.parent_container)
def _get_comment(self, key_path: str) -> str:
"""获取指定键路径的注释,并确保结果是字符串"""
try:
comment = get_comment_for_key(self.template_doc, key_path)
# 确保返回值是字符串
if comment and isinstance(comment, str):
return comment
except Exception as e:
print(f"获取注释出错: {key_path}, {e}")
return "" # 如果出现任何问题,返回空字符串
def _process_toml_section(
self,
section_data: Dict[str, Any],
container: Union[ft.Column, ft.Container],
section_path: str = "",
indent: int = 0,
):
"""
递归处理TOML配置的一个部分
Args:
section_data: 要处理的配置部分
container: 放置控件的容器可以是Column或Container
section_path: 当前部分的路径用于跟踪嵌套层级
indent: 当前缩进级别
"""
# 确保container是有controls属性的对象
if isinstance(container, ft.Container):
if container.content and hasattr(container.content, "controls"):
container = container.content
else:
# 如果Container没有有效的content创建一个Column
container.content = ft.Column([])
container = container.content
if not hasattr(container, "controls"):
raise ValueError(f"传递给_process_toml_section的容器必须有controls属性got: {type(container)}")
# 先处理所有子部分(嵌套表)
subsections = {}
simple_items = {}
# 分离子部分和简单值
for key, value in section_data.items():
if isinstance(value, (dict, tomlkit.items.Table)):
subsections[key] = value
else:
simple_items[key] = value
# 处理简单值
for key, value in simple_items.items():
full_path = f"{section_path}.{key}" if section_path else key
control = self._create_control_for_value(key, value, full_path)
if control:
if indent > 0: # 添加缩进
row = ft.Row(
[
ft.Container(width=indent * 20), # 每级缩进20像素
control,
],
alignment=ft.MainAxisAlignment.START,
)
container.controls.append(row)
else:
container.controls.append(control)
# 处理子部分
for key, value in subsections.items():
full_path = f"{section_path}.{key}" if section_path else key
# 创建一个可展开/折叠的部分
is_expanded = full_path in self.expanded_sections
# 获取此部分的注释(安全获取)
section_comment = self._get_comment(full_path)
# 创建子部分的标题行
section_title_elems = [
ft.Container(width=indent * 20) if indent > 0 else ft.Container(width=0),
ft.IconButton(
icon=ft.icons.ARROW_DROP_DOWN if is_expanded else ft.icons.ARROW_RIGHT,
on_click=lambda e, path=full_path: self._toggle_section(e, path),
),
ft.Text(key, weight=ft.FontWeight.BOLD, size=16),
]
# 如果有注释添加一个Info图标并设置tooltip
if section_comment and len(section_comment) > 0:
try:
section_title_elems.append(
ft.IconButton(icon=ft.icons.INFO_OUTLINE, tooltip=section_comment, icon_size=16)
)
except Exception as e:
print(f"创建信息图标时出错: {full_path}, {e}")
section_title = ft.Row(
section_title_elems,
alignment=ft.MainAxisAlignment.START,
vertical_alignment=ft.CrossAxisAlignment.CENTER,
)
container.controls.append(section_title)
# 创建子部分的容器
subsection_column = ft.Column([])
subsection_container = ft.Container(content=subsection_column, visible=is_expanded)
container.controls.append(subsection_container)
# 递归处理子部分
if is_expanded:
self._process_toml_section(value, subsection_column, full_path, indent + 1)
def _toggle_section(self, e, section_path):
"""切换部分的展开/折叠状态。"""
# 使用一个简化和更稳定的方法来处理toggle
print(f"切换部分: {section_path}")
# 在点击的行的下一个容器中查找
parent_row = e.control.parent
if not parent_row or not isinstance(parent_row, ft.Row):
print(f"错误: 无法找到父行: {e.control.parent}")
return
parent_container = parent_row.parent
if not parent_container or not hasattr(parent_container, "controls"):
print(f"错误: 无法找到父容器: {parent_row.parent}")
return
# 找到当前行在父容器中的索引
try:
row_index = parent_container.controls.index(parent_row)
except ValueError:
print(f"错误: 在父容器中找不到行: {parent_row}")
return
# 检查下一个控件是否是子部分容器
if row_index + 1 >= len(parent_container.controls):
print(f"错误: 行索引超出范围: {row_index + 1} >= {len(parent_container.controls)}")
return
subsection_container = parent_container.controls[row_index + 1]
print(f"找到子部分容器: {type(subsection_container).__name__}")
# 切换展开/折叠状态
if section_path in self.expanded_sections:
# 折叠
e.control.icon = ft.icons.ARROW_RIGHT
self.expanded_sections.remove(section_path)
subsection_container.visible = False
# parent_container.update() # <-- 改为 page.update()
else:
# 展开
e.control.icon = ft.icons.ARROW_DROP_DOWN
self.expanded_sections.add(section_path)
subsection_container.visible = True
# 如果容器刚刚变为可见,且内容为空,则加载内容
if subsection_container.visible:
# 获取子部分的内容列
subsection_content = None
if isinstance(subsection_container, ft.Container) and subsection_container.content:
subsection_content = subsection_container.content
else:
subsection_content = subsection_container
# 如果内容是Column且为空则加载内容
if isinstance(subsection_content, ft.Column) and len(subsection_content.controls) == 0:
# 获取配置数据
parts = section_path.split(".")
current = self.config_data
for part in parts:
if part and part in current:
current = current[part]
else:
print(f"警告: 配置路径不存在: {part} in {section_path}")
# parent_container.update() # <-- 改为 page.update()
self.page.update() # <-- 在这里也强制页面更新
return
# 递归处理子部分
if isinstance(current, (dict, tomlkit.items.Table)):
indent = len(parts) # 使用路径部分数量作为缩进级别
try:
# 处理内容但不立即更新UI
self._process_toml_section(current, subsection_content, section_path, indent)
# 只在完成内容处理后更新一次UI
# parent_container.update() # <-- 改为 page.update()
except Exception as ex:
print(f"处理子部分时出错: {ex}")
else:
print(f"警告: 配置数据不是字典类型: {type(current).__name__}")
# parent_container.update() # <-- 改为 page.update()
# else:
# 如果只是切换可见性,简单更新父容器
# parent_container.update() # <-- 改为 page.update()
# 强制更新整个页面
if self.page:
try:
self.page.update() # <-- 在函数末尾强制页面更新
except Exception as page_update_e:
print(f"强制页面更新失败: {page_update_e}")
else:
print("警告: _toggle_section 中无法访问 Page 对象进行更新")
def _create_control_for_value(self, key: str, value: Any, full_path: str) -> Optional[ft.Control]:
"""
根据值的类型创建适当的控件
Args:
key: 配置键
value: 配置值
full_path: 配置项的完整路径
Returns:
对应类型的Flet控件
"""
# 获取注释(安全获取)
comment = self._get_comment(full_path)
comment_valid = isinstance(comment, str) and len(comment) > 0
# 根据类型创建不同的控件
if isinstance(value, bool):
return self._create_boolean_control(key, value, full_path, comment if comment_valid else "")
elif isinstance(value, (int, float)):
return self._create_number_control(key, value, full_path, comment if comment_valid else "")
elif isinstance(value, str):
return self._create_string_control(key, value, full_path, comment if comment_valid else "")
elif isinstance(value, list):
return self._create_list_control(key, value, full_path, comment if comment_valid else "")
elif isinstance(value, set):
# 特殊处理集合类型groups部分经常使用
print(f"处理集合类型: {key} = {value}")
try:
return self._create_set_control(key, value, full_path, comment if comment_valid else "")
except Exception as e:
print(f"创建集合控件时出错: {e}")
# 如果创建失败,返回只读文本
return ft.Text(f"{key}: {value} (集合类型,处理失败)", italic=True)
else:
# 其他类型默认显示为只读文本
control = ft.Text(f"{key}: {value} (类型不支持编辑: {type(value).__name__})", italic=True)
# 如果有有效的注释,添加图标
if comment_valid:
try:
# 在只读文本旁加上注释图标
return ft.Row([control, ft.IconButton(icon=ft.icons.INFO_OUTLINE, tooltip=comment, icon_size=16)])
except Exception:
pass # 如果添加图标失败,仍返回原始控件
return control
def _update_config_value(self, path: str, new_value: Any):
"""递归地更新 self.config_data 中嵌套字典的值。"""
keys = path.split(".")
d = self.config_data
try:
for key in keys[:-1]:
d = d[key]
# 确保最后一个键存在并且可以赋值
if keys[-1] in d:
# 类型转换 (尝试)
original_value = d[keys[-1]]
try:
if isinstance(original_value, bool):
new_value = str(new_value).lower() in ("true", "1", "yes")
elif isinstance(original_value, int):
new_value = int(new_value)
elif isinstance(original_value, float):
new_value = float(new_value)
# Add other type checks if needed (e.g., list, set)
except (ValueError, TypeError) as e:
print(
f"类型转换错误 ({path}): 输入 '{new_value}' ({type(new_value)}), 期望类型 {type(original_value)}. 错误: {e}"
)
# 保留原始类型或回退?暂时保留新值,让用户修正
# new_value = original_value # 或者可以选择回退
pass # Keep new_value as is for now
d[keys[-1]] = new_value
print(f"配置已更新: {path} = {new_value}")
else:
print(f"警告: 尝试更新不存在的键: {path}")
except KeyError:
print(f"错误: 更新配置时找不到路径: {path}")
except TypeError:
print(f"错误: 尝试在非字典对象中更新键: {path}")
except Exception as e:
print(f"更新配置时发生未知错误 ({path}): {e}")
# 注意:这里不需要调用 page.update(),因为这是内部数据更新
# 调用保存按钮时,会使用更新后的 self.config_data
def _create_boolean_control(self, key: str, value: bool, path: str, comment: str = "") -> ft.Control:
"""创建布尔值的开关控件。"""
def on_change(e):
self._update_config_value(path, e.control.value)
switch = ft.Switch(label=key, value=value, on_change=on_change)
# 如果有注释添加一个Info图标
if comment and len(comment) > 0:
try:
return ft.Row([switch, ft.IconButton(icon=ft.icons.INFO_OUTLINE, tooltip=comment, icon_size=16)])
except Exception as e:
print(f"创建布尔控件的注释图标时出错: {path}, {e}")
return switch
def _create_number_control(self, key: str, value: Union[int, float], path: str, comment: str = "") -> ft.Control:
"""创建数字输入控件。"""
def on_change(e):
try:
# 尝试转换为原始类型
if isinstance(value, int):
converted = int(e.control.value)
else:
converted = float(e.control.value)
self._update_config_value(path, converted)
except (ValueError, TypeError):
pass # 忽略无效输入
text_field = ft.TextField(
label=key,
value=str(value),
input_filter=ft.InputFilter(allow=True, regex_string=r"[0-9.-]"),
on_change=on_change,
)
# 如果有注释,添加一个信息图标
if comment and len(comment) > 0:
try:
return ft.Row([text_field, ft.IconButton(icon=ft.icons.INFO_OUTLINE, tooltip=comment, icon_size=16)])
except Exception as e:
print(f"创建数字控件的注释图标时出错: {path}, {e}")
return text_field
def _create_string_control(self, key: str, value: str, path: str, comment: str = "") -> ft.Control:
"""创建字符串输入控件。"""
def on_change(e):
self._update_config_value(path, e.control.value)
# 若字符串较长,使用多行文本
multiline = len(value) > 30 or "\n" in value
text_field = ft.TextField(
label=key,
value=value,
multiline=multiline,
min_lines=1,
max_lines=5 if multiline else 1,
on_change=on_change,
)
# 如果有注释添加一个Info图标
if comment and len(comment) > 0:
try:
return ft.Row([text_field, ft.IconButton(icon=ft.icons.INFO_OUTLINE, tooltip=comment, icon_size=16)])
except Exception as e:
print(f"创建字符串控件的注释图标时出错: {path}, {e}")
return text_field
def _create_list_control(self, key: str, value: List[Any], path: str, comment: str = "") -> ft.Control:
"""创建列表控件。"""
# 创建一个可编辑的列表控件
# 首先创建一个Column存放列表项目和控制按钮
title_row = ft.Row(
[
ft.Text(f"{key}:", weight=ft.FontWeight.BOLD),
]
)
# 如果有注释添加一个Info图标
if comment and len(comment) > 0:
try:
title_row.controls.append(ft.IconButton(icon=ft.icons.INFO_OUTLINE, tooltip=comment, icon_size=16))
except Exception as e:
print(f"创建列表控件的注释图标时出错: {path}, {e}")
column = ft.Column([title_row])
# 创建一个内部Column用于存放列表项
items_column = ft.Column([], spacing=5, scroll=ft.ScrollMode.AUTO)
# 创建添加新项目的函数
def add_item(e=None, default_value=None, is_initial=False):
# 确定新项目的类型(基于现有项目或默认为字符串)
item_type = str
if value and len(value) > 0:
if isinstance(value[0], int):
item_type = int
elif isinstance(value[0], float):
item_type = float
elif isinstance(value[0], bool):
item_type = bool
# 创建新项目的默认值
if default_value is None:
if item_type is int:
default_value = 0
elif item_type is float:
default_value = 0.0
elif item_type is bool:
default_value = False
else:
default_value = ""
# 创建当前索引
index = len(items_column.controls)
# 创建删除项目的函数
def delete_item(e):
# 删除此项目
items_column.controls.remove(item_row)
# 更新列表中的值
update_list_value()
# 确保UI更新
items_column.update()
# 更新整个表单
column.update()
# 创建项目控件(根据类型)
if item_type is bool:
item_control = ft.Switch(value=default_value)
elif item_type in (int, float):
item_control = ft.TextField(
value=str(default_value),
input_filter=ft.InputFilter(allow=True, regex_string=r"[0-9.-]"),
width=200,
)
else: # 字符串
item_control = ft.TextField(value=default_value, width=200)
# 添加控件的更改事件
def on_item_change(e):
# 获取新值
new_val = e.control.value
# 转换类型
if item_type is int:
try:
new_val = int(new_val)
except ValueError:
new_val = 0
elif item_type is float:
try:
new_val = float(new_val)
except ValueError:
new_val = 0.0
elif item_type is bool:
new_val = bool(new_val)
# 更新列表中的值
update_list_value()
# 添加更改事件
if item_type is bool:
item_control.on_change = on_item_change
else:
item_control.on_change = on_item_change
# 创建行包含项目控件和删除按钮
item_row = ft.Row(
[ft.Text(f"[{index}]"), item_control, ft.IconButton(icon=ft.icons.DELETE, on_click=delete_item)],
alignment=ft.MainAxisAlignment.START,
)
# 将行添加到列表中
items_column.controls.append(item_row)
# 只有在用户交互时更新UI初始加载时不更新
if not is_initial and e is not None:
# 更新UI - 确保整个控件都更新
try:
items_column.update()
column.update()
except Exception as update_e:
print(f"更新列表控件时出错: {path}, {update_e}")
return item_control
# 创建更新列表值的函数
def update_list_value():
new_list = []
for item_row in items_column.controls:
if len(item_row.controls) < 2:
continue # 跳过格式不正确的行
item_control = item_row.controls[1] # 获取TextField或Switch
# 根据控件类型获取值
if isinstance(item_control, ft.Switch):
new_list.append(item_control.value)
elif isinstance(item_control, ft.TextField):
# 根据原始列表中的类型转换值
if value and len(value) > 0:
if isinstance(value[0], int):
try:
new_list.append(int(item_control.value))
except ValueError:
new_list.append(0)
elif isinstance(value[0], float):
try:
new_list.append(float(item_control.value))
except ValueError:
new_list.append(0.0)
else:
new_list.append(item_control.value)
else:
new_list.append(item_control.value)
# 更新TOML配置
try:
self._update_config_value(path, new_list)
except Exception as e:
print(f"更新列表值时出错: {path}, {e}")
# 添加现有项目使用is_initial=True标记为初始化
for item in value:
add_item(default_value=item, is_initial=True)
# 添加按钮行
button_row = ft.Row(
[ft.ElevatedButton("添加项目", icon=ft.icons.ADD, on_click=add_item)], alignment=ft.MainAxisAlignment.START
)
# 将组件添加到主Column
column.controls.append(items_column)
column.controls.append(button_row)
# 将整个列表控件包装在一个Card中让它看起来更独立
# Card不支持padding参数使用Container包裹
return ft.Card(content=ft.Container(content=column, padding=10))
def _create_set_control(self, key: str, value: set, path: str, comment: str = "") -> ft.Control:
"""创建集合控件。"""
# 创建一个可编辑的列表控件
# 首先创建一个Column存放列表项目和控制按钮
title_row = ft.Row(
[
ft.Text(f"{key} (集合):", weight=ft.FontWeight.BOLD),
]
)
# 如果有注释添加一个Info图标
if comment and len(comment) > 0:
try:
title_row.controls.append(ft.IconButton(icon=ft.icons.INFO_OUTLINE, tooltip=comment, icon_size=16))
except Exception as e:
print(f"创建集合控件的注释图标时出错: {path}, {e}")
column = ft.Column([title_row])
# 创建一个内部Column用于存放集合项
items_column = ft.Column([], spacing=5, scroll=ft.ScrollMode.AUTO)
# 创建一个用于输入的文本框
new_item_field = ft.TextField(label="添加新项目", hint_text="输入值后按Enter添加", width=300)
# 创建一个列表存储当前集合值
current_values = list(value)
# 创建添加新项目的函数
def add_item(e=None, item_value=None, is_initial=False):
if e and hasattr(e, "control") and e.control == new_item_field:
# 从文本框获取值
item_value = new_item_field.value.strip()
if not item_value:
return
new_item_field.value = "" # 清空输入框
if not is_initial: # 只有在用户交互时更新
try:
new_item_field.update()
except Exception as update_e:
print(f"更新文本框时出错: {path}, {update_e}")
if item_value is None or item_value == "":
return
# 判断值的类型(假设集合中所有元素类型一致)
item_type = str
if current_values and len(current_values) > 0:
if isinstance(current_values[0], int):
item_type = int
elif isinstance(current_values[0], float):
item_type = float
elif isinstance(current_values[0], bool):
item_type = bool
# 转换类型
if item_type is int:
try:
item_value = int(item_value)
except ValueError:
return # 如果无法转换则忽略
elif item_type is float:
try:
item_value = float(item_value)
except ValueError:
return # 如果无法转换则忽略
elif item_type is bool:
if item_value.lower() in ("true", "yes", "1", "y"):
item_value = True
elif item_value.lower() in ("false", "no", "0", "n"):
item_value = False
else:
return # 无效的布尔值
# 检查是否已存在(集合特性)
if item_value in current_values:
return # 如果已存在则忽略
# 添加到当前值列表
current_values.append(item_value)
# 创建删除项目的函数
def delete_item(e):
# 删除此项目
current_values.remove(item_value)
items_column.controls.remove(item_row)
# 更新集合中的值
update_set_value()
# 确保UI更新
try:
items_column.update()
column.update() # 更新整个表单
except Exception as update_e:
print(f"更新集合UI时出错: {path}, {update_e}")
# 创建行包含项目文本和删除按钮
item_row = ft.Row(
[ft.Text(str(item_value)), ft.IconButton(icon=ft.icons.DELETE, on_click=delete_item)],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
)
# 将行添加到列表中
items_column.controls.append(item_row)
# 只有在用户交互时更新UI初始加载时不更新
if not is_initial and e is not None:
# 更新UI
try:
items_column.update()
column.update() # 确保整个表单都更新
except Exception as update_e:
print(f"更新集合UI时出错: {path}, {update_e}")
# 更新集合值
update_set_value()
# 创建更新集合值的函数
def update_set_value():
# 从current_values创建一个新集合
try:
new_set = set(current_values)
# 更新TOML配置
self._update_config_value(path, new_set)
except Exception as e:
print(f"更新集合值时出错: {path}, {e}")
# 添加键盘事件处理
def on_key_press(e):
if e.key == "Enter":
add_item(e)
new_item_field.on_submit = add_item
# 添加现有项目使用is_initial=True标记为初始化
for item in value:
add_item(item_value=item, is_initial=True)
# 添加输入框
input_row = ft.Row(
[
new_item_field,
ft.IconButton(
icon=ft.icons.ADD, on_click=lambda e: add_item(e, item_value=new_item_field.value.strip())
),
],
alignment=ft.MainAxisAlignment.START,
)
# 将组件添加到主Column
column.controls.append(items_column)
column.controls.append(input_row)
# 将整个集合控件包装在一个Card中让它看起来更独立
# Card不支持padding参数使用Container包裹
return ft.Card(content=ft.Container(content=column, padding=10))
def load_bot_config_template(app_state) -> Dict[str, Any]:
"""
加载bot_config_template.toml文件作为参考
Returns:
带有注释的TOML文档
"""
template_path = Path(app_state.script_dir) / "template/bot_config_template.toml"
if template_path.exists():
try:
with open(template_path, "r", encoding="utf-8") as f:
return tomlkit.parse(f.read()) # 使用parse而不是load以保留注释
except Exception as e:
print(f"加载模板配置文件失败: {e}")
return tomlkit.document()
def get_bot_config_path(app_state) -> Path:
"""
获取配置文件路径
"""
config_path = Path(app_state.script_dir) / "config/bot_config.toml"
return config_path
def load_bot_config(app_state) -> Dict[str, Any]:
"""
加载bot_config.toml文件
如果文件不存在会尝试从模板创建
"""
config_path = get_bot_config_path(app_state)
# 如果配置文件不存在,尝试从模板创建
if not config_path.exists():
template_config = load_bot_config_template(app_state)
if template_config:
print(f"配置文件不存在,尝试从模板创建: {config_path}")
try:
# 确保目录存在
config_path.parent.mkdir(parents=True, exist_ok=True)
# 保存模板内容到配置文件
with open(config_path, "w", encoding="utf-8") as f:
tomlkit.dump(template_config, f)
print(f"成功从模板创建配置文件: {config_path}")
return template_config
except Exception as e:
print(f"从模板创建配置文件失败: {e}")
return {}
return {}
# 加载配置文件
try:
with open(config_path, "r", encoding="utf-8") as f:
return tomlkit.load(f)
except Exception as e:
print(f"加载配置文件失败: {e}")
return {}
def create_toml_form(
page: ft.Page,
config_data: Dict[str, Any],
container: ft.Column,
template_filename: str = "bot_config_template.toml",
):
"""
创建并构建TOML表单
Args:
page: Flet Page 对象
config_data: TOML配置数据
container: 放置表单的父容器
template_filename: 要使用的模板文件名
Returns:
创建的 TomlFormGenerator 实例
"""
generator = TomlFormGenerator(page, config_data, container, template_filename)
generator.build_form()
return generator # Return the generator instance

View File

@ -0,0 +1,265 @@
import flet as ft
from pathlib import Path
from typing import List, Tuple
# --- .env File Handling Logic ---
def load_env_data(env_path: Path) -> List[Tuple[str, str]]:
"""Loads key-value pairs from a .env file, skipping comments and empty lines."""
variables = []
if not env_path.exists():
print(f"[Env Editor] .env file not found at {env_path}")
return variables
try:
with open(env_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
# Basic handling for quotes (remove if present at ends)
if len(value) >= 2 and value.startswith(("'", '"')) and value.endswith(("'", '"')):
value = value[1:-1]
variables.append((key, value))
# else: Handle lines without '='? Maybe ignore them.
except Exception as e:
print(f"[Env Editor] Error loading .env file {env_path}: {e}")
return variables
def save_env_data(env_path: Path, variables: List[Tuple[str, str]]):
"""Saves key-value pairs back to the .env file, overwriting existing content."""
try:
with open(env_path, "w", encoding="utf-8") as f:
for key, value in variables:
# Basic quoting if value contains spaces or special chars?
# For simplicity, just write key=value for now.
# Advanced quoting logic can be added if needed.
f.write(f"{key}={value}\n")
print(f"[Env Editor] Successfully saved data to {env_path}")
except Exception as e:
print(f"[Env Editor] Error saving .env file {env_path}: {e}")
# Optionally raise or show error to user
# --- Flet UI Component ---
# Inherit directly from ft.Column instead of ft.UserControl
class EnvEditor(ft.Column):
"""A Flet Column containing controls for editing .env file variables."""
def __init__(self, app_state):
# Initialize the Column base class
# Pass Column properties like spacing, scroll, expand here
super().__init__(spacing=5, scroll=ft.ScrollMode.ADAPTIVE, expand=True)
self.app_state = app_state
self.env_path = Path(self.app_state.script_dir) / ".env"
self.variables = load_env_data(self.env_path)
# UI Controls - Define them as instance attributes
self.variable_rows_column = ft.Column([], spacing=5, scroll=ft.ScrollMode.ADAPTIVE)
self.add_key_field = ft.TextField(label="New Key", width=150)
self.add_value_field = ft.TextField(label="New Value", expand=True)
self.save_button = ft.ElevatedButton("Save Changes", icon=ft.icons.SAVE, on_click=self._save_changes)
self.status_text = ft.Text("") # For showing save status/errors
# --- Build the UI directly within __init__ ---
self._populate_rows() # Populate rows initially
add_row = ft.Row(
[
self.add_key_field,
self.add_value_field,
ft.IconButton(
icon=ft.icons.ADD_CIRCLE_OUTLINE,
tooltip="Add Variable",
on_click=self._add_variable_row_interactive,
),
],
alignment=ft.MainAxisAlignment.START,
)
# Add controls directly to self (the Column)
self.controls.extend(
[
ft.Text(".env File Editor", style=ft.TextThemeStyle.HEADLINE_SMALL),
ft.Text(f"Editing: {self.env_path.name} (in {self.env_path.parent})"),
ft.Divider(),
self.variable_rows_column, # Add the column that holds the variable rows
ft.Divider(),
ft.Text("Add New Variable:", style=ft.TextThemeStyle.LABEL_LARGE),
add_row,
ft.Divider(),
ft.Row([self.save_button, self.status_text], alignment=ft.MainAxisAlignment.START),
]
)
# No need to return anything from __init__
def _populate_rows(self):
"""Clears and refills the variable rows column based on self.variables."""
self.variable_rows_column.controls.clear()
for index, (key, value) in enumerate(self.variables):
self.variable_rows_column.controls.append(self._create_variable_row(index, key, value))
# No need to update here, usually called during init or after add/delete
def _create_variable_row(self, index: int, key: str, value: str) -> ft.Row:
"""Creates a Row control for a single key-value pair."""
key_field = ft.TextField(value=key, expand=2, data=index)
value_field = ft.TextField(value=value, expand=5, data=index)
# Update self.variables when text fields change (optional, safer to update only on save)
# key_field.on_change = self._update_variable_from_ui
# value_field.on_change = self._update_variable_from_ui
return ft.Row(
[
key_field,
value_field,
ft.IconButton(
icon=ft.icons.DELETE_OUTLINE,
tooltip="Delete Variable",
data=index, # Store index to know which one to delete
on_click=self._delete_variable_row,
),
],
alignment=ft.MainAxisAlignment.START,
key=str(index), # Assign a key for potential targeted updates
)
def _add_variable_row_interactive(self, e):
"""Adds a variable row based on the 'Add New' fields and updates the UI."""
new_key = self.add_key_field.value.strip()
new_value = self.add_value_field.value.strip()
if not new_key:
# Access page via self.page if the control is mounted
if self.page:
self.page.show_snack_bar(ft.SnackBar(ft.Text("Key cannot be empty."), open=True))
return
# Check if key already exists? For now, allow duplicates, save will handle last one.
# Add to internal list
self.variables.append((new_key, new_value))
# Add UI row
new_index = len(self.variables) - 1
self.variable_rows_column.controls.append(self._create_variable_row(new_index, new_key, new_value))
# Clear add fields
self.add_key_field.value = ""
self.add_value_field.value = ""
self.update() # Update this Column
# If page exists, update page too (might be redundant if Column update cascades)
# if self.page: self.page.update()
def _delete_variable_row(self, e):
"""Deletes a variable row from the UI and the internal list."""
index_to_delete = e.control.data
if 0 <= index_to_delete < len(self.variables):
# Find the row control to remove
row_to_remove = None
for control in self.variable_rows_column.controls:
# Check the data attribute of the delete button inside the row
if (
isinstance(control, ft.Row)
and len(control.controls) > 2
and isinstance(control.controls[2], ft.IconButton)
and control.controls[2].data == index_to_delete
):
row_to_remove = control
break
# Remove from internal list *first*
if index_to_delete < len(self.variables): # Double check index after finding row
del self.variables[index_to_delete]
else:
print(f"[Env Editor] Error: Index {index_to_delete} out of bounds after finding row.")
return
# Remove from UI column if found
if row_to_remove:
self.variable_rows_column.controls.remove(row_to_remove)
# Need to re-index remaining rows' data attributes
self._reindex_rows()
self.update() # Update this Column
# if self.page: self.page.update()
else:
print(f"[Env Editor] Error: Invalid index to delete: {index_to_delete}")
def _reindex_rows(self):
"""Updates the data attribute (index) of controls in each row after deletion."""
for i, row in enumerate(self.variable_rows_column.controls):
if isinstance(row, ft.Row) and len(row.controls) > 2:
# Update index on key field, value field, and delete button
if isinstance(row.controls[0], ft.TextField):
row.controls[0].data = i
if isinstance(row.controls[1], ft.TextField):
row.controls[1].data = i
if isinstance(row.controls[2], ft.IconButton):
row.controls[2].data = i
def _save_changes(self, e):
"""Collects data from UI rows and saves to the .env file."""
updated_variables = []
has_error = False
keys = set()
for row_index, row in enumerate(self.variable_rows_column.controls):
if isinstance(row, ft.Row) and len(row.controls) >= 2:
key_field = row.controls[0]
value_field = row.controls[1]
if isinstance(key_field, ft.TextField) and isinstance(value_field, ft.TextField):
key = key_field.value.strip()
value = value_field.value # Keep original spacing/quotes for value for now
if not key:
has_error = True
# Use row_index which reflects the current visual order
self.status_text.value = f"Error: Row {row_index + 1} has an empty key."
self.status_text.color = ft.colors.RED
break # Stop processing on first error
if key in keys:
print(f"[Env Editor] Warning: Duplicate key '{key}' found. Last occurrence will be saved.")
# Or show error? Let's allow for now, last wins on save.
keys.add(key)
updated_variables.append((key, value))
else:
has_error = True
self.status_text.value = "Error: Invalid row structure found."
self.status_text.color = ft.colors.RED
break
else: # Handle cases where row might not be what's expected
print(f"[Env Editor] Warning: Skipping unexpected control type in variable column: {type(row)}")
if not has_error:
try:
save_env_data(self.env_path, updated_variables)
self.variables = updated_variables # Update internal state
self.status_text.value = "Changes saved successfully!"
self.status_text.color = ft.colors.GREEN
except Exception as ex:
self.status_text.value = f"Error saving file: {ex}"
self.status_text.color = ft.colors.RED
self.status_text.update()
# --- Function to create the main view containing the editor ---
# This can be called from ui_settings_view.py
def create_env_editor_page_content(page: ft.Page, app_state) -> ft.Control:
"""Creates the EnvEditor control."""
# EnvEditor is now the Column itself
editor = EnvEditor(app_state)
return editor

View File

@ -0,0 +1,348 @@
import flet as ft
import tomlkit
from .state import AppState
from .utils import show_snackbar # Assuming show_snackbar is in utils
from .toml_form_generator import create_toml_form, load_bot_config, get_bot_config_path
from .config_manager import load_config, save_config
from .ui_env_editor import create_env_editor_page_content
def save_bot_config(page: ft.Page, app_state: AppState, new_config_data: dict):
"""将修改后的 Bot 配置保存回文件。"""
config_path = get_bot_config_path(app_state)
try:
with open(config_path, "w", encoding="utf-8") as f:
# Use tomlkit.dumps to preserve formatting/comments as much as possible
# It might need refinement based on how UI controls update the dict
tomlkit.dump(new_config_data, f)
show_snackbar(page, "Bot 配置已保存!")
# Optionally reload config into app_state if needed immediately elsewhere
# app_state.bot_config = new_config_data # Or reload using a dedicated function
except Exception as e:
print(f"Error saving bot config: {e}")
show_snackbar(page, f"保存 Bot 配置失败: {e}", error=True)
def save_bot_config_changes(page: ft.Page, config_to_save: dict):
"""Handles saving changes for bot_config.toml"""
print("[Settings] Saving Bot Config (TOML) changes...")
# Assuming save_config needs path, let's build it or adapt save_config
# For now, let's assume save_config can handle type='bot'
# config_path = get_bot_config_path(app_state) # Need app_state if using this
success = save_config(config_to_save, config_type="bot")
if success:
message = "Bot 配置已保存!"
else:
message = "保存 Bot 配置失败。"
show_snackbar(page, message, error=(not success))
def save_lpmm_config_changes(page: ft.Page, config_to_save: dict):
"""Handles saving changes for lpmm_config.toml"""
print("[Settings] Saving LPMM Config (TOML) changes...")
success = save_config(config_to_save, config_type="lpmm") # Use type 'lpmm'
if success:
message = "LPMM 配置已保存!"
else:
message = "保存 LPMM 配置失败。"
show_snackbar(page, message, error=(not success))
def save_gui_config_changes(page: ft.Page, app_state: AppState):
"""Handles saving changes for gui_config.toml (currently just theme)"""
print("[Settings] Saving GUI Config changes...")
# gui_config is directly in app_state, no need to pass config_to_save
success = save_config(app_state.gui_config, config_type="gui")
if success:
message = "GUI 配置已保存!"
else:
message = "保存 GUI 配置失败。"
show_snackbar(page, message, error=(not success))
def create_settings_view(page: ft.Page, app_state: AppState) -> ft.View:
"""Creates the settings view with sections for different config files."""
# --- State for switching between editors ---
content_area = ft.Column([], expand=True, scroll=ft.ScrollMode.ADAPTIVE)
current_config_data = {} # Store loaded data for saving
# --- Function to load Bot config editor (Original TOML editor) ---
def show_bot_config_editor(e=None):
nonlocal current_config_data
print("[Settings] Loading Bot Config Editor")
try:
current_bot_config = load_bot_config(app_state)
if not current_bot_config:
raise ValueError("Bot config could not be loaded.")
current_config_data = current_bot_config
content_area.controls.clear()
# Pass the correct template filename string
form_generator = create_toml_form(
page, current_bot_config, content_area, template_filename="bot_config_template.toml"
)
save_button = ft.ElevatedButton(
"保存 Bot 配置更改",
icon=ft.icons.SAVE,
on_click=lambda _: save_bot_config_changes(
page, form_generator.config_data if hasattr(form_generator, "config_data") else current_config_data
),
)
content_area.controls.append(ft.Divider())
content_area.controls.append(save_button)
except Exception as ex:
content_area.controls.clear()
content_area.controls.append(ft.Text(f"加载 Bot 配置时出错: {ex}", color=ft.colors.ERROR))
if page:
page.update()
# --- Function to load LPMM config editor ---
def show_lpmm_editor(e=None):
nonlocal current_config_data
print("[Settings] Loading LPMM Config Editor")
try:
lpmm_config = load_config(config_type="lpmm")
if not lpmm_config:
raise ValueError("LPMM config could not be loaded.")
current_config_data = lpmm_config
content_area.controls.clear()
# Pass the correct template filename string
form_generator = create_toml_form(
page, lpmm_config, content_area, template_filename="lpmm_config_template.toml"
)
save_button = ft.ElevatedButton(
"保存 LPMM 配置更改",
icon=ft.icons.SAVE,
on_click=lambda _: save_lpmm_config_changes(
page, form_generator.config_data if hasattr(form_generator, "config_data") else current_config_data
),
)
content_area.controls.append(ft.Divider())
content_area.controls.append(save_button)
except Exception as ex:
content_area.controls.clear()
content_area.controls.append(ft.Text(f"加载 LPMM 配置时出错: {ex}", color=ft.colors.ERROR))
if page:
page.update()
# --- Function to load GUI settings editor ---
def show_gui_settings(e=None):
# GUI config is simpler, might not need full form generator
# We'll load it directly from app_state and save app_state.gui_config
print("[Settings] Loading GUI Settings Editor")
content_area.controls.clear()
def change_theme(ev):
selected_theme = ev.control.value.upper()
page.theme_mode = ft.ThemeMode[selected_theme]
app_state.gui_config["theme"] = selected_theme
print(f"Theme changed to: {page.theme_mode}, updating app_state.gui_config")
page.update() # Update theme immediately
# Get current theme from app_state or page
current_theme_val = app_state.gui_config.get("theme", str(page.theme_mode).split(".")[-1]).capitalize()
if current_theme_val not in ["System", "Light", "Dark"]:
current_theme_val = "System" # Default fallback
theme_dropdown = ft.Dropdown(
label="界面主题",
value=current_theme_val,
options=[
ft.dropdown.Option("System"),
ft.dropdown.Option("Light"),
ft.dropdown.Option("Dark"),
],
on_change=change_theme,
# expand=True, # Maybe not expand in this layout
)
save_button = ft.ElevatedButton(
"保存 GUI 设置", icon=ft.icons.SAVE, on_click=lambda _: save_gui_config_changes(page, app_state)
)
content_area.controls.extend(
[
ft.Text("界面设置:", weight=ft.FontWeight.BOLD),
ft.Row([theme_dropdown]),
# Add more GUI controls here if needed in the future
ft.Divider(),
save_button,
]
)
if page:
page.update()
# --- Function to load .env editor ---
def show_env_editor(e=None):
# No config data to manage here, it handles its own save
print("[Settings] Loading .env Editor")
content_area.controls.clear()
env_editor_content = create_env_editor_page_content(page, app_state)
content_area.controls.append(env_editor_content)
if page:
page.update()
# --- Initial View Setup ---
# Load the Bot config editor by default
show_bot_config_editor()
return ft.View(
"/settings",
[
ft.AppBar(title=ft.Text("设置"), bgcolor=ft.colors.SURFACE_VARIANT),
ft.Row(
[
ft.ElevatedButton("Bot 配置", icon=ft.icons.SETTINGS_SUGGEST, on_click=show_bot_config_editor),
ft.ElevatedButton("LPMM 配置", icon=ft.icons.MEMORY, on_click=show_lpmm_editor),
ft.ElevatedButton("GUI 设置", icon=ft.icons.BRUSH, on_click=show_gui_settings),
ft.ElevatedButton(".env 配置", icon=ft.icons.EDIT, on_click=show_env_editor),
],
alignment=ft.MainAxisAlignment.CENTER,
wrap=True, # Allow buttons to wrap on smaller widths
),
ft.Divider(),
content_area, # This holds the currently selected editor
],
scroll=ft.ScrollMode.ADAPTIVE,
)
# Note: Assumes save_config function exists and can handle saving
# the bot_config dictionary back to its TOML file. You might need to
# adjust the save_bot_config_changes function based on how saving is implemented.
# Also assumes load_bot_config loads the data correctly for the TOML editor.
def create_settings_view_old(page: ft.Page, app_state: AppState) -> ft.View:
"""创建设置页面视图。"""
# --- GUI Settings ---
def change_theme(e):
selected_theme = e.control.value.upper()
page.theme_mode = ft.ThemeMode[selected_theme]
# Persist theme choice? Maybe in gui_config?
app_state.gui_config["theme"] = selected_theme # Example persistence
# Need a way to save gui_config too (similar to bot_config?)
print(f"Theme changed to: {page.theme_mode}")
page.update()
theme_dropdown = ft.Dropdown(
label="界面主题",
value=str(page.theme_mode).split(".")[-1].capitalize()
if page.theme_mode
else "System", # Handle None theme_mode
options=[
ft.dropdown.Option("System"),
ft.dropdown.Option("Light"),
ft.dropdown.Option("Dark"),
],
on_change=change_theme,
expand=True,
)
gui_settings_card = ft.Card(
content=ft.Container(
content=ft.Column(
[
ft.ListTile(title=ft.Text("GUI 设置")),
ft.Row([theme_dropdown], alignment=ft.MainAxisAlignment.SPACE_BETWEEN),
# Add more GUI settings here
]
),
padding=10,
)
)
# --- Bot Settings (Placeholder) ---
# TODO: Load bot_config.toml and dynamically generate controls
config_path = get_bot_config_path(app_state)
bot_config_content_area = ft.Column(expand=True, scroll=ft.ScrollMode.ADAPTIVE)
bot_settings_card = ft.Card(
content=ft.Container(
content=ft.Column(
[
ft.ListTile(title=ft.Text("Bot 配置 (bot_config.toml)")),
ft.Text(f"配置文件路径: {config_path}", italic=True, size=10),
ft.Divider(),
# Placeholder - Controls will be added dynamically
bot_config_content_area,
ft.Divider(),
ft.Row(
[
ft.ElevatedButton(
"重新加载", icon=ft.icons.REFRESH, on_click=lambda _: print("Reload TBD")
), # Placeholder action
ft.ElevatedButton(
"保存 Bot 配置", icon=ft.icons.SAVE, on_click=lambda _: print("Save TBD")
), # Placeholder action
],
alignment=ft.MainAxisAlignment.END,
),
]
),
padding=10,
)
)
# --- Load and Display Bot Config ---
# This needs error handling and dynamic UI generation
try:
# 使用新的加载方法
loaded_bot_config = load_bot_config(app_state)
if loaded_bot_config:
# 使用新的表单生成器创建动态表单
create_toml_form(page, loaded_bot_config, bot_config_content_area, app_state)
# Update the save button's action
save_button = bot_settings_card.content.content.controls[-1].controls[1] # Find the save button
save_button.on_click = lambda _: save_bot_config(
page, app_state, loaded_bot_config
) # Pass the loaded config dict
# Add reload logic here
reload_button = bot_settings_card.content.content.controls[-1].controls[0] # Find the reload button
def reload_action(_):
bot_config_content_area.controls.clear()
try:
reloaded_config = load_bot_config(app_state)
if reloaded_config:
# 重新创建表单
create_toml_form(page, reloaded_config, bot_config_content_area, app_state)
# Update save button reference
save_button.on_click = lambda _: save_bot_config(page, app_state, reloaded_config)
show_snackbar(page, "Bot 配置已重新加载。")
# 确保UI完全更新
bot_config_content_area.update()
bot_settings_card.update()
else:
bot_config_content_area.controls.append(
ft.Text("重新加载失败: 无法加载配置文件", color=ft.colors.ERROR)
)
bot_config_content_area.update()
except Exception as reload_e:
bot_config_content_area.controls.append(ft.Text(f"重新加载失败: {reload_e}", color=ft.colors.ERROR))
bot_config_content_area.update()
page.update()
reload_button.on_click = reload_action
else:
bot_config_content_area.controls.append(
ft.Text(f"错误: 无法加载配置文件 {config_path}", color=ft.colors.ERROR)
)
except Exception as e:
bot_config_content_area.controls.append(ft.Text(f"加载配置文件出错: {e}", color=ft.colors.ERROR))
return ft.View(
"/settings",
[
ft.AppBar(title=ft.Text("设置"), bgcolor=ft.colors.SURFACE_VARIANT),
gui_settings_card,
bot_settings_card, # Add the bot settings card
# Add more settings sections/cards as needed
],
scroll=ft.ScrollMode.ADAPTIVE, # Allow scrolling for the whole view
padding=10,
)

File diff suppressed because it is too large Load Diff

126
src/MaiGoi/utils.py 100644
View File

@ -0,0 +1,126 @@
import flet as ft
import os
import sys
import subprocess
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from .state import AppState # Avoid circular import for type hinting
async def update_page_safe(page: Optional[ft.Page]):
"""Safely call page.update() if the page object is valid."""
if page:
try:
await page.update()
except Exception:
# Reduce noise, perhaps only print if debug is enabled later
# print(f"Error during safe page update: {e}")
pass # Silently ignore update errors, especially during shutdown
def show_snackbar(page: Optional[ft.Page], message: str, error: bool = False):
"""Helper function to display a SnackBar."""
if not page:
print(f"[Snackbar - No Page] {'Error' if error else 'Info'}: {message}")
return
try:
page.snack_bar = ft.SnackBar(
ft.Text(message),
bgcolor=ft.colors.ERROR if error else None,
open=True,
)
page.update()
except Exception as e:
print(f"Error showing snackbar: {e}")
def run_script(script_path: str, page: Optional["ft.Page"], app_state: Optional["AppState"], is_python: bool = False):
"""Runs a script file (.bat or .py) in a new process/window."""
if not app_state or not app_state.script_dir:
print("[run_script] Error: AppState or script_dir not available.", flush=True)
if page:
show_snackbar(page, "错误:无法确定脚本目录", error=True)
return
# Construct the full path to the script
full_script_path = os.path.join(app_state.script_dir, script_path)
print(f"[run_script] Attempting to run: {full_script_path}", flush=True)
try:
if not os.path.exists(full_script_path):
print(f"[run_script] Error: Script file not found: {full_script_path}", flush=True)
if page:
show_snackbar(page, f"错误:脚本文件未找到\\n{script_path}", error=True)
return
# --- Platform-specific execution --- #
if sys.platform == "win32":
if script_path.lower().endswith(".bat"):
print("[run_script] Using 'start cmd /k' for .bat on Windows.", flush=True)
# Use start cmd /k to keep the window open after script finishes
subprocess.Popen(f'start cmd /k "{full_script_path}"', shell=True, cwd=app_state.script_dir)
elif script_path.lower().endswith(".py"):
print("[run_script] Using Python executable for .py on Windows.", flush=True)
# Run Python script using the current interpreter in a new console window
# Using sys.executable ensures the correct Python environment is used.
# 'start' is a cmd command, so shell=True is needed.
# We don't use /k here, the Python process itself will keep the window open if needed (e.g., input()).
subprocess.Popen(
f'start "Running {script_path}" "{sys.executable}" "{full_script_path}"',
shell=True,
cwd=app_state.script_dir,
)
else:
print(
f"[run_script] Attempting generic 'start' for unknown file type on Windows: {script_path}",
flush=True,
)
# Try generic start for other file types, might open associated program
subprocess.Popen(f'start "{full_script_path}"', shell=True, cwd=app_state.script_dir)
else: # Linux/macOS
if script_path.lower().endswith(".py"):
print("[run_script] Using Python executable for .py on non-Windows.", flush=True)
# On Unix-like systems, we typically need a terminal emulator to see output.
# This example uses xterm, adjust if needed for other terminals (gnome-terminal, etc.)
# The '-e' flag is common for executing a command.
try:
subprocess.Popen(["xterm", "-e", sys.executable, full_script_path], cwd=app_state.script_dir)
except FileNotFoundError:
print(
"[run_script] xterm not found. Trying to run Python directly (output might be lost).",
flush=True,
)
try:
subprocess.Popen([sys.executable, full_script_path], cwd=app_state.script_dir)
except Exception as e_direct:
print(f"[run_script] Error running Python script directly: {e_direct}", flush=True)
if page:
show_snackbar(page, f"运行脚本时出错: {e_direct}", error=True)
return
elif os.access(full_script_path, os.X_OK): # Check if it's executable
print("[run_script] Running executable script directly on non-Windows.", flush=True)
# Similar terminal issue might apply here if it's a console app
try:
subprocess.Popen([full_script_path], cwd=app_state.script_dir)
except Exception as e_exec:
print(f"[run_script] Error running executable script: {e_exec}", flush=True)
if page:
show_snackbar(page, f"运行脚本时出错: {e_exec}", error=True)
return
else:
print(
f"[run_script] Don't know how to run non-executable, non-python script on non-Windows: {script_path}",
flush=True,
)
if page:
show_snackbar(page, f"无法运行此类型的文件: {script_path}", error=True)
return
if page:
show_snackbar(page, f"正在尝试运行脚本: {script_path}")
except Exception as e:
print(f"[run_script] Unexpected error running script '{script_path}': {e}", flush=True)
if page:
show_snackbar(page, f"运行脚本时发生意外错误: {e}", error=True)

View File

@ -358,6 +358,23 @@ SUB_HEARTFLOW_STYLE_CONFIG = {
}, },
} }
INTEREST_CHAT_STYLE_CONFIG = {
"advanced": {
"console_format": (
"<white>{time:YYYY-MM-DD HH:mm:ss}</white> | "
"<level>{level: <8}</level> | "
"<light-blue>兴趣</light-blue> | "
"<level>{message}</level>"
),
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 兴趣 | {message}",
},
"simple": {
"console_format": "<level>{time:MM-DD HH:mm}</level> | <fg #55DDFF>兴趣 | {message}</fg #55DDFF>", # noqa: E501
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 兴趣 | {message}",
},
}
SUB_HEARTFLOW_MIND_STYLE_CONFIG = { SUB_HEARTFLOW_MIND_STYLE_CONFIG = {
"advanced": { "advanced": {
"console_format": ( "console_format": (
@ -895,6 +912,9 @@ CHAT_MESSAGE_STYLE_CONFIG = (
CHAT_IMAGE_STYLE_CONFIG = CHAT_IMAGE_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else CHAT_IMAGE_STYLE_CONFIG["advanced"] CHAT_IMAGE_STYLE_CONFIG = CHAT_IMAGE_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else CHAT_IMAGE_STYLE_CONFIG["advanced"]
INIT_STYLE_CONFIG = INIT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else INIT_STYLE_CONFIG["advanced"] INIT_STYLE_CONFIG = INIT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else INIT_STYLE_CONFIG["advanced"]
API_SERVER_STYLE_CONFIG = API_SERVER_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else API_SERVER_STYLE_CONFIG["advanced"] API_SERVER_STYLE_CONFIG = API_SERVER_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else API_SERVER_STYLE_CONFIG["advanced"]
INTEREST_CHAT_STYLE_CONFIG = (
INTEREST_CHAT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else INTEREST_CHAT_STYLE_CONFIG["advanced"]
)
def is_registered_module(record: dict) -> bool: def is_registered_module(record: dict) -> bool:

View File

@ -41,6 +41,7 @@ from src.common.logger import (
CHAT_MESSAGE_STYLE_CONFIG, CHAT_MESSAGE_STYLE_CONFIG,
CHAT_IMAGE_STYLE_CONFIG, CHAT_IMAGE_STYLE_CONFIG,
INIT_STYLE_CONFIG, INIT_STYLE_CONFIG,
INTEREST_CHAT_STYLE_CONFIG,
API_SERVER_STYLE_CONFIG, API_SERVER_STYLE_CONFIG,
) )
@ -87,6 +88,7 @@ MODULE_LOGGER_CONFIGS = {
"chat_message": CHAT_MESSAGE_STYLE_CONFIG, # 聊天消息 "chat_message": CHAT_MESSAGE_STYLE_CONFIG, # 聊天消息
"chat_image": CHAT_IMAGE_STYLE_CONFIG, # 聊天图片 "chat_image": CHAT_IMAGE_STYLE_CONFIG, # 聊天图片
"init": INIT_STYLE_CONFIG, # 初始化 "init": INIT_STYLE_CONFIG, # 初始化
"interest_chat": INTEREST_CHAT_STYLE_CONFIG, # 兴趣
"api": API_SERVER_STYLE_CONFIG, # API服务器 "api": API_SERVER_STYLE_CONFIG, # API服务器
# ...如有更多模块,继续添加... # ...如有更多模块,继续添加...
} }

View File

@ -78,6 +78,7 @@ class BackgroundTaskManager:
self._into_focus_task: Optional[asyncio.Task] = None self._into_focus_task: Optional[asyncio.Task] = None
self._private_chat_activation_task: Optional[asyncio.Task] = None # 新增私聊激活任务引用 self._private_chat_activation_task: Optional[asyncio.Task] = None # 新增私聊激活任务引用
self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks
self._detect_command_from_gui_task: Optional[asyncio.Task] = None # 新增GUI命令检测任务引用
async def start_tasks(self): async def start_tasks(self):
"""启动所有后台任务 """启动所有后台任务
@ -135,6 +136,13 @@ class BackgroundTaskManager:
f"私聊激活检查任务已启动 间隔:{PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS}s", f"私聊激活检查任务已启动 间隔:{PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS}s",
"_private_chat_activation_task", "_private_chat_activation_task",
), ),
# 新增GUI命令检测任务配置
# (
# lambda: self._run_detect_command_from_gui_cycle(3),
# "debug",
# f"GUI命令检测任务已启动 间隔:{3}s",
# "_detect_command_from_gui_task",
# ),
] ]
# 统一启动所有任务 # 统一启动所有任务
@ -296,3 +304,11 @@ class BackgroundTaskManager:
interval=interval, interval=interval,
task_func=self.subheartflow_manager.sbhf_absent_private_into_focus, task_func=self.subheartflow_manager.sbhf_absent_private_into_focus,
) )
# # 有api之后删除
# async def _run_detect_command_from_gui_cycle(self, interval: int):
# await _run_periodic_loop(
# task_name="Detect Command from GUI",
# interval=interval,
# task_func=self.subheartflow_manager.detect_command_from_gui,
# )

View File

@ -0,0 +1,200 @@
import asyncio
from src.config.config import global_config
from typing import Optional, Dict
import traceback
from src.common.logger_manager import get_logger
from src.plugins.chat.message import MessageRecv
import math
# 定义常量 (从 interest.py 移动过来)
MAX_INTEREST = 15.0
logger = get_logger("interest_chatting")
PROBABILITY_INCREASE_RATE_PER_SECOND = 0.1
PROBABILITY_DECREASE_RATE_PER_SECOND = 0.1
MAX_REPLY_PROBABILITY = 1
class InterestChatting:
def __init__(
self,
decay_rate=global_config.default_decay_rate_per_second,
max_interest=MAX_INTEREST,
trigger_threshold=global_config.reply_trigger_threshold,
max_probability=MAX_REPLY_PROBABILITY,
):
# 基础属性初始化
self.interest_level: float = 0.0
self.decay_rate_per_second: float = decay_rate
self.max_interest: float = max_interest
self.trigger_threshold: float = trigger_threshold
self.max_reply_probability: float = max_probability
self.is_above_threshold: bool = False
# 任务相关属性初始化
self.update_task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._task_lock = asyncio.Lock()
self._is_running = False
self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {}
self.update_interval = 1.0
self.above_threshold = False
self.start_hfc_probability = 0.0
async def initialize(self):
async with self._task_lock:
if self._is_running:
logger.debug("后台兴趣更新任务已在运行中。")
return
# 清理已完成或已取消的任务
if self.update_task and (self.update_task.done() or self.update_task.cancelled()):
self.update_task = None
if not self.update_task:
self._stop_event.clear()
self._is_running = True
self.update_task = asyncio.create_task(self._run_update_loop(self.update_interval))
logger.debug("后台兴趣更新任务已创建并启动。")
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
"""添加消息到兴趣字典
参数:
message: 接收到的消息
interest_value: 兴趣值
is_mentioned: 是否被提及
功能:
1. 将消息添加到兴趣字典
2. 更新最后交互时间
3. 如果字典长度超过10删除最旧的消息
"""
# 添加新消息
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
# 如果字典长度超过10删除最旧的消息
if len(self.interest_dict) > 10:
oldest_key = next(iter(self.interest_dict))
self.interest_dict.pop(oldest_key)
async def _calculate_decay(self):
"""计算兴趣值的衰减
参数:
current_time: 当前时间戳
处理逻辑:
1. 计算时间差
2. 处理各种异常情况(负值/零值)
3. 正常计算衰减
4. 更新最后更新时间
"""
# 处理极小兴趣值情况
if self.interest_level < 1e-9:
self.interest_level = 0.0
return
# 异常情况处理
if self.decay_rate_per_second <= 0:
logger.warning(f"衰减率({self.decay_rate_per_second})无效重置兴趣值为0")
self.interest_level = 0.0
return
# 正常衰减计算
try:
decay_factor = math.pow(self.decay_rate_per_second, self.update_interval)
self.interest_level *= decay_factor
except ValueError as e:
logger.error(
f"衰减计算错误: {e} 参数: 衰减率={self.decay_rate_per_second} 时间差={self.update_interval} 当前兴趣={self.interest_level}"
)
self.interest_level = 0.0
async def _update_reply_probability(self):
self.above_threshold = self.interest_level >= self.trigger_threshold
if self.above_threshold:
self.start_hfc_probability += PROBABILITY_INCREASE_RATE_PER_SECOND
else:
if self.start_hfc_probability > 0:
self.start_hfc_probability = max(0, self.start_hfc_probability - PROBABILITY_DECREASE_RATE_PER_SECOND)
async def increase_interest(self, value: float):
self.interest_level += value
self.interest_level = min(self.interest_level, self.max_interest)
async def decrease_interest(self, value: float):
self.interest_level -= value
self.interest_level = max(self.interest_level, 0.0)
async def get_interest(self) -> float:
return self.interest_level
async def get_state(self) -> dict:
interest = self.interest_level # 直接使用属性值
return {
"interest_level": round(interest, 2),
"start_hfc_probability": round(self.start_hfc_probability, 4),
"above_threshold": self.above_threshold,
}
# --- 新增后台更新任务相关方法 ---
async def _run_update_loop(self, update_interval: float = 1.0):
"""后台循环,定期更新兴趣和回复概率。"""
try:
while not self._stop_event.is_set():
try:
if self.interest_level != 0:
await self._calculate_decay()
await self._update_reply_probability()
# 等待下一个周期或停止事件
await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval)
except asyncio.TimeoutError:
# 正常超时,继续循环
continue
except Exception as e:
logger.error(f"InterestChatting 更新循环出错: {e}")
logger.error(traceback.format_exc())
# 防止错误导致CPU飙升稍作等待
await asyncio.sleep(5)
except asyncio.CancelledError:
logger.info("InterestChatting 更新循环被取消。")
finally:
self._is_running = False
logger.info("InterestChatting 更新循环已停止。")
async def stop_updates(self):
"""停止后台更新任务,使用锁确保并发安全"""
async with self._task_lock:
if not self._is_running:
logger.debug("后台兴趣更新任务未运行。")
return
logger.info("正在停止 InterestChatting 后台更新任务...")
self._stop_event.set()
if self.update_task and not self.update_task.done():
try:
# 等待任务结束,设置超时
await asyncio.wait_for(self.update_task, timeout=5.0)
logger.info("InterestChatting 后台更新任务已成功停止。")
except asyncio.TimeoutError:
logger.warning("停止 InterestChatting 后台任务超时,尝试取消...")
self.update_task.cancel()
try:
await self.update_task # 等待取消完成
except asyncio.CancelledError:
logger.info("InterestChatting 后台更新任务已被取消。")
except Exception as e:
logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}")
finally:
self.update_task = None
self._is_running = False

View File

@ -29,6 +29,14 @@ def _ensure_log_directory():
logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在") logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在")
def _clear_and_create_log_file():
"""清除日志文件并创建新的日志文件。"""
if os.path.exists(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)):
os.remove(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME))
with open(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME), "w", encoding="utf-8") as f:
f.write("")
class InterestLogger: class InterestLogger:
"""负责定期记录主心流和所有子心流的状态到日志文件。""" """负责定期记录主心流和所有子心流的状态到日志文件。"""
@ -44,6 +52,7 @@ class InterestLogger:
self.heartflow = heartflow # 存储 Heartflow 实例 self.heartflow = heartflow # 存储 Heartflow 实例
self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME) self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)
_ensure_log_directory() _ensure_log_directory()
_clear_and_create_log_file()
async def get_all_subflow_states(self) -> Dict[str, Dict]: async def get_all_subflow_states(self) -> Dict[str, Dict]:
"""并发获取所有活跃子心流的当前完整状态。""" """并发获取所有活跃子心流的当前完整状态。"""
@ -92,7 +101,7 @@ class InterestLogger:
try: try:
current_timestamp = time.time() current_timestamp = time.time()
main_mind = self.heartflow.current_mind # main_mind = self.heartflow.current_mind
# 获取 Mai 状态名称 # 获取 Mai 状态名称
mai_state_name = self.heartflow.current_state.get_current_state().name mai_state_name = self.heartflow.current_state.get_current_state().name
@ -100,7 +109,7 @@ class InterestLogger:
log_entry_base = { log_entry_base = {
"timestamp": round(current_timestamp, 2), "timestamp": round(current_timestamp, 2),
"main_mind": main_mind, # "main_mind": main_mind,
"mai_state": mai_state_name, "mai_state": mai_state_name,
"subflow_count": len(all_subflow_states), "subflow_count": len(all_subflow_states),
"subflows": [], "subflows": [],
@ -135,7 +144,7 @@ class InterestLogger:
"sub_chat_state": state.get("chat_state", "未知"), "sub_chat_state": state.get("chat_state", "未知"),
"interest_level": interest_state.get("interest_level", 0.0), "interest_level": interest_state.get("interest_level", 0.0),
"start_hfc_probability": interest_state.get("start_hfc_probability", 0.0), "start_hfc_probability": interest_state.get("start_hfc_probability", 0.0),
"is_above_threshold": interest_state.get("is_above_threshold", False), # "is_above_threshold": interest_state.get("is_above_threshold", False),
} }
subflow_details.append(subflow_entry) subflow_details.append(subflow_entry)

View File

@ -1,215 +1,22 @@
from .observation import Observation, ChattingObservation from .observation import Observation, ChattingObservation
import asyncio import asyncio
from src.config.config import global_config
import time import time
from typing import Optional, List, Dict, Tuple, Callable, Coroutine from typing import Optional, List, Dict, Tuple, Callable, Coroutine
import traceback import traceback
from src.common.logger_manager import get_logger from src.common.logger_manager import get_logger
from src.plugins.chat.message import MessageRecv from src.plugins.chat.message import MessageRecv
from src.plugins.chat.chat_stream import chat_manager from src.plugins.chat.chat_stream import chat_manager
import math
from src.plugins.heartFC_chat.heartFC_chat import HeartFChatting from src.plugins.heartFC_chat.heartFC_chat import HeartFChatting
from src.plugins.heartFC_chat.normal_chat import NormalChat from src.plugins.heartFC_chat.normal_chat import NormalChat
from src.heart_flow.mai_state_manager import MaiStateInfo from src.heart_flow.mai_state_manager import MaiStateInfo
from src.heart_flow.chat_state_info import ChatState, ChatStateInfo from src.heart_flow.chat_state_info import ChatState, ChatStateInfo
from src.heart_flow.sub_mind import SubMind from src.heart_flow.sub_mind import SubMind
from .utils_chat import get_chat_type_and_target_info from .utils_chat import get_chat_type_and_target_info
from .interest_chatting import InterestChatting
# 定义常量 (从 interest.py 移动过来)
MAX_INTEREST = 15.0
logger = get_logger("sub_heartflow") logger = get_logger("sub_heartflow")
PROBABILITY_INCREASE_RATE_PER_SECOND = 0.1
PROBABILITY_DECREASE_RATE_PER_SECOND = 0.1
MAX_REPLY_PROBABILITY = 1
class InterestChatting:
def __init__(
self,
decay_rate=global_config.default_decay_rate_per_second,
max_interest=MAX_INTEREST,
trigger_threshold=global_config.reply_trigger_threshold,
max_probability=MAX_REPLY_PROBABILITY,
):
# 基础属性初始化
self.interest_level: float = 0.0
self.decay_rate_per_second: float = decay_rate
self.max_interest: float = max_interest
self.trigger_threshold: float = trigger_threshold
self.max_reply_probability: float = max_probability
self.is_above_threshold: bool = False
# 任务相关属性初始化
self.update_task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._task_lock = asyncio.Lock()
self._is_running = False
self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {}
self.update_interval = 1.0
self.above_threshold = False
self.start_hfc_probability = 0.0
async def initialize(self):
async with self._task_lock:
if self._is_running:
logger.debug("后台兴趣更新任务已在运行中。")
return
# 清理已完成或已取消的任务
if self.update_task and (self.update_task.done() or self.update_task.cancelled()):
self.update_task = None
if not self.update_task:
self._stop_event.clear()
self._is_running = True
self.update_task = asyncio.create_task(self._run_update_loop(self.update_interval))
logger.debug("后台兴趣更新任务已创建并启动。")
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
"""添加消息到兴趣字典
参数:
message: 接收到的消息
interest_value: 兴趣值
is_mentioned: 是否被提及
功能:
1. 将消息添加到兴趣字典
2. 更新最后交互时间
3. 如果字典长度超过10删除最旧的消息
"""
# 添加新消息
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
# 如果字典长度超过10删除最旧的消息
if len(self.interest_dict) > 10:
oldest_key = next(iter(self.interest_dict))
self.interest_dict.pop(oldest_key)
async def _calculate_decay(self):
"""计算兴趣值的衰减
参数:
current_time: 当前时间戳
处理逻辑:
1. 计算时间差
2. 处理各种异常情况(负值/零值)
3. 正常计算衰减
4. 更新最后更新时间
"""
# 处理极小兴趣值情况
if self.interest_level < 1e-9:
self.interest_level = 0.0
return
# 异常情况处理
if self.decay_rate_per_second <= 0:
logger.warning(f"衰减率({self.decay_rate_per_second})无效重置兴趣值为0")
self.interest_level = 0.0
return
# 正常衰减计算
try:
decay_factor = math.pow(self.decay_rate_per_second, self.update_interval)
self.interest_level *= decay_factor
except ValueError as e:
logger.error(
f"衰减计算错误: {e} 参数: 衰减率={self.decay_rate_per_second} 时间差={self.update_interval} 当前兴趣={self.interest_level}"
)
self.interest_level = 0.0
async def _update_reply_probability(self):
self.above_threshold = self.interest_level >= self.trigger_threshold
if self.above_threshold:
self.start_hfc_probability += PROBABILITY_INCREASE_RATE_PER_SECOND
else:
if self.start_hfc_probability > 0:
self.start_hfc_probability = max(0, self.start_hfc_probability - PROBABILITY_DECREASE_RATE_PER_SECOND)
async def increase_interest(self, value: float):
self.interest_level += value
self.interest_level = min(self.interest_level, self.max_interest)
async def decrease_interest(self, value: float):
self.interest_level -= value
self.interest_level = max(self.interest_level, 0.0)
async def get_interest(self) -> float:
return self.interest_level
async def get_state(self) -> dict:
interest = self.interest_level # 直接使用属性值
return {
"interest_level": round(interest, 2),
"start_hfc_probability": round(self.start_hfc_probability, 4),
"above_threshold": self.above_threshold,
}
# --- 新增后台更新任务相关方法 ---
async def _run_update_loop(self, update_interval: float = 1.0):
"""后台循环,定期更新兴趣和回复概率。"""
try:
while not self._stop_event.is_set():
try:
if self.interest_level != 0:
await self._calculate_decay()
await self._update_reply_probability()
# 等待下一个周期或停止事件
await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval)
except asyncio.TimeoutError:
# 正常超时,继续循环
continue
except Exception as e:
logger.error(f"InterestChatting 更新循环出错: {e}")
logger.error(traceback.format_exc())
# 防止错误导致CPU飙升稍作等待
await asyncio.sleep(5)
except asyncio.CancelledError:
logger.info("InterestChatting 更新循环被取消。")
finally:
self._is_running = False
logger.info("InterestChatting 更新循环已停止。")
async def stop_updates(self):
"""停止后台更新任务,使用锁确保并发安全"""
async with self._task_lock:
if not self._is_running:
logger.debug("后台兴趣更新任务未运行。")
return
logger.info("正在停止 InterestChatting 后台更新任务...")
self._stop_event.set()
if self.update_task and not self.update_task.done():
try:
# 等待任务结束,设置超时
await asyncio.wait_for(self.update_task, timeout=5.0)
logger.info("InterestChatting 后台更新任务已成功停止。")
except asyncio.TimeoutError:
logger.warning("停止 InterestChatting 后台任务超时,尝试取消...")
self.update_task.cancel()
try:
await self.update_task # 等待取消完成
except asyncio.CancelledError:
logger.info("InterestChatting 后台更新任务已被取消。")
except Exception as e:
logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}")
finally:
self.update_task = None
self._is_running = False
# --- 结束 新增方法 ---
class SubHeartflow: class SubHeartflow:
def __init__( def __init__(

View File

@ -852,3 +852,52 @@ class SubHeartflowManager:
# --- 结束新增 --- # # --- 结束新增 --- #
# --- 结束新增:处理来自 HeartFChatting 的状态转换请求 --- # # --- 结束新增:处理来自 HeartFChatting 的状态转换请求 --- #
# 临时函数用于GUI切换有api后删除
# async def detect_command_from_gui(self):
# """检测来自GUI的命令"""
# command_file = Path("temp_command/gui_command.json")
# if not command_file.exists():
# return
# try:
# # 读取并解析命令文件
# command_data = json.loads(command_file.read_text())
# subflow_id = command_data.get("subflow_id")
# target_state = command_data.get("target_state")
# if not subflow_id or not target_state:
# logger.warning("GUI命令文件格式不正确缺少必要字段")
# return
# # 尝试转换为ChatState枚举
# try:
# target_state_enum = ChatState[target_state.upper()]
# except KeyError:
# logger.warning(f"无效的目标状态: {target_state}")
# command_file.unlink()
# return
# # 执行状态转换
# await self.force_change_by_gui(subflow_id, target_state_enum)
# # 转换成功后删除文件
# command_file.unlink()
# logger.debug(f"已处理GUI命令并删除命令文件: {command_file}")
# except json.JSONDecodeError:
# logger.warning("GUI命令文件不是有效的JSON格式")
# except Exception as e:
# logger.error(f"处理GUI命令时发生错误: {e}", exc_info=True)
# async def force_change_by_gui(self, subflow_id: Any, target_state: ChatState):
# """强制改变指定子心流的状态"""
# async with self._lock:
# subflow = self.subheartflows.get(subflow_id)
# if not subflow:
# logger.warning(f"[强制状态转换] 尝试转换不存在的子心流 {subflow_id} 到 {target_state.value}")
# return
# await subflow.change_chat_state(target_state)
# logger.info(f"[强制状态转换] 成功将 {subflow_id} 的状态转换为 {target_state.value}")
# --- 结束新增 --- #

View File

@ -15,33 +15,67 @@ remote_log_config = LogConfig(
) )
logger = get_module_logger("remote", config=remote_log_config) logger = get_module_logger("remote", config=remote_log_config)
# UUID文件路径 # --- 使用向上导航的方式定义路径 ---
UUID_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "client_uuid.json")
# 1. 获取当前文件 (remote.py) 所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 2. 从当前目录向上导航三级找到项目根目录
# (src/plugins/remote/ -> src/plugins/ -> src/ -> project_root)
root_dir = os.path.abspath(os.path.join(current_dir, "..", "..", ".."))
# 3. 定义 data 目录的路径 (位于项目根目录下)
data_dir = os.path.join(root_dir, "data")
# 4. 定义 UUID 文件在 data 目录下的完整路径
UUID_FILE = os.path.join(data_dir, "client_uuid.json")
# --- 路径定义结束 ---
# 生成或获取客户端唯一ID # 生成或获取客户端唯一ID
def get_unique_id(): def get_unique_id():
# --- 在尝试读写 UUID_FILE 之前确保 data 目录存在 ---
# 将目录检查和创建逻辑移到这里,在首次需要写入前执行
try:
# exist_ok=True 意味着如果目录已存在也不会报错
os.makedirs(data_dir, exist_ok=True)
except OSError as e:
# 处理可能的权限错误等
logger.error(f"无法创建数据目录 {data_dir}: {e}")
# 根据你的错误处理逻辑,可能需要在这里返回错误或抛出异常
# 暂且返回 None 或抛出,避免继续执行导致问题
raise RuntimeError(f"无法创建必要的数据目录 {data_dir}") from e
# --- 目录检查结束 ---
# 检查是否已经有保存的UUID # 检查是否已经有保存的UUID
if os.path.exists(UUID_FILE): if os.path.exists(UUID_FILE):
try: try:
with open(UUID_FILE, "r") as f: with open(UUID_FILE, "r", encoding="utf-8") as f: # 指定 encoding
data = json.load(f) data = json.load(f)
if "client_id" in data: if "client_id" in data:
# print("从本地文件读取客户端ID") logger.debug(f"从本地文件读取客户端ID: {UUID_FILE}")
return data["client_id"] return data["client_id"]
except (json.JSONDecodeError, IOError) as e: except (json.JSONDecodeError, IOError) as e:
print(f"读取UUID文件出错: {e}将生成新的UUID") logger.warning(f"读取UUID文件 {UUID_FILE} 出错: {e}将生成新的UUID")
except Exception as e: # 捕捉其他可能的异常
logger.error(f"读取UUID文件 {UUID_FILE} 时发生未知错误: {e}")
# 如果没有保存的UUID或读取出错则生成新的 # 如果没有保存的UUID或读取出错则生成新的
client_id = generate_unique_id() client_id = generate_unique_id()
logger.info(f"生成新的客户端ID: {client_id}")
# 保存UUID到文件 # 保存UUID到文件
try: try:
with open(UUID_FILE, "w") as f: # 再次确认目录存在 (虽然理论上前面已创建,但更保险)
json.dump({"client_id": client_id}, f) os.makedirs(data_dir, exist_ok=True)
logger.info("已保存新生成的客户端ID到本地文件") with open(UUID_FILE, "w", encoding="utf-8") as f: # 指定 encoding
json.dump({"client_id": client_id}, f, indent=4) # 添加 indent 使json可读
logger.info(f"已保存新生成的客户端ID到本地文件: {UUID_FILE}")
except IOError as e: except IOError as e:
logger.error(f"保存UUID时出错: {e}") logger.error(f"保存UUID时出错: {UUID_FILE} - {e}")
except Exception as e: # 捕捉其他可能的异常
logger.error(f"保存UUID文件 {UUID_FILE} 时发生未知错误: {e}")
return client_id return client_id

View File

@ -1,26 +0,0 @@
@echo off
CHCP 65001 > nul
setlocal enabledelayedexpansion
REM 查找venv虚拟环境
set "venv_path=%~dp0venv\Scripts\activate.bat"
if not exist "%venv_path%" (
echo 错误: 未找到虚拟环境请确保venv目录存在
pause
exit /b 1
)
REM 激活虚拟环境
call "%venv_path%"
if %ERRORLEVEL% neq 0 (
echo 错误: 虚拟环境激活失败
pause
exit /b 1
)
echo 虚拟环境已激活,正在启动 GUI...
REM 运行 Python 脚本
python scripts/interest_monitor_gui.py
pause