diff --git a/src/chat/frequency_control/frequency_control.py b/src/chat/frequency_control/frequency_control.py index 78041ae7..897a92cc 100644 --- a/src/chat/frequency_control/frequency_control.py +++ b/src/chat/frequency_control/frequency_control.py @@ -1,5 +1,6 @@ from datetime import datetime import time +import asyncio from typing import Dict from src.chat.utils.chat_message_builder import ( @@ -46,6 +47,8 @@ class FrequencyControl: self.frequency_model = LLMRequest( model_set=model_config.model_task_config.utils_small, request_type="frequency.adjust" ) + # 频率调整锁,防止并发执行 + self._adjust_lock = asyncio.Lock() def get_talk_frequency_adjust(self) -> float: """获取发言频率调整值""" @@ -56,68 +59,78 @@ class FrequencyControl: self.talk_frequency_adjust = max(0.1, min(5.0, value)) async def trigger_frequency_adjust(self) -> None: - msg_list = get_raw_msg_by_timestamp_with_chat( - chat_id=self.chat_id, - timestamp_start=self.last_frequency_adjust_time, - timestamp_end=time.time(), - ) - - if time.time() - self.last_frequency_adjust_time < 160 or len(msg_list) <= 20: - return - else: - new_msg_list = get_raw_msg_by_timestamp_with_chat( + # 使用异步锁防止并发执行 + async with self._adjust_lock: + # 在锁内检查,避免并发触发 + current_time = time.time() + previous_adjust_time = self.last_frequency_adjust_time + + msg_list = get_raw_msg_by_timestamp_with_chat( chat_id=self.chat_id, - timestamp_start=self.last_frequency_adjust_time, - timestamp_end=time.time(), - limit=20, - limit_mode="latest", + timestamp_start=previous_adjust_time, + timestamp_end=current_time, ) - message_str = build_readable_messages( - new_msg_list, - replace_bot_name=True, - timestamp_mode="relative", - read_mark=0.0, - show_actions=False, - ) - time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - bot_name = global_config.bot.nickname - bot_nickname = ( - f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else "" - ) - name_block = f"你的名字是{bot_name}{bot_nickname},请注意哪些是你自己的发言。" + if current_time - previous_adjust_time < 160 or len(msg_list) <= 20: + return - prompt = await global_prompt_manager.format_prompt( - "frequency_adjust_prompt", - name_block=name_block, - time_block=time_block, - message_str=message_str, - ) - response, (reasoning_content, _, _) = await self.frequency_model.generate_response_async( - prompt, - ) + # 立即更新调整时间,防止并发触发 + self.last_frequency_adjust_time = current_time - # logger.info(f"频率调整 prompt: {prompt}") - # logger.info(f"频率调整 response: {response}") + try: + new_msg_list = get_raw_msg_by_timestamp_with_chat( + chat_id=self.chat_id, + timestamp_start=previous_adjust_time, + timestamp_end=current_time, + limit=20, + limit_mode="latest", + ) - if global_config.debug.show_prompt: - logger.info(f"频率调整 prompt: {prompt}") - logger.info(f"频率调整 response: {response}") - logger.info(f"频率调整 reasoning_content: {reasoning_content}") + message_str = build_readable_messages( + new_msg_list, + replace_bot_name=True, + timestamp_mode="relative", + read_mark=0.0, + show_actions=False, + ) + time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + bot_name = global_config.bot.nickname + bot_nickname = ( + f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else "" + ) + name_block = f"你的名字是{bot_name}{bot_nickname},请注意哪些是你自己的发言。" - final_value_by_api = frequency_api.get_current_talk_value(self.chat_id) + prompt = await global_prompt_manager.format_prompt( + "frequency_adjust_prompt", + name_block=name_block, + time_block=time_block, + message_str=message_str, + ) + response, (reasoning_content, _, _) = await self.frequency_model.generate_response_async( + prompt, + ) - # LLM依然输出过多内容时取消本次调整。合法最多4个字,但有的模型可能会输出一些markdown换行符等,需要长度宽限 - if len(response) < 20: - if "过于频繁" in response: - logger.info(f"频率调整: 过于频繁,调整值到{final_value_by_api}") - self.talk_frequency_adjust = max(0.1, min(1.5, self.talk_frequency_adjust * 0.8)) - elif "过少" in response: - logger.info(f"频率调整: 过少,调整值到{final_value_by_api}") - self.talk_frequency_adjust = max(0.1, min(1.5, self.talk_frequency_adjust * 1.2)) - self.last_frequency_adjust_time = time.time() - else: - logger.info("频率调整:response不符合要求,取消本次调整") + # logger.info(f"频率调整 prompt: {prompt}") + # logger.info(f"频率调整 response: {response}") + + if global_config.debug.show_prompt: + logger.info(f"频率调整 prompt: {prompt}") + logger.info(f"频率调整 response: {response}") + logger.info(f"频率调整 reasoning_content: {reasoning_content}") + + final_value_by_api = frequency_api.get_current_talk_value(self.chat_id) + + # LLM依然输出过多内容时取消本次调整。合法最多4个字,但有的模型可能会输出一些markdown换行符等,需要长度宽限 + if len(response) < 20: + if "过于频繁" in response: + logger.info(f"频率调整: 过于频繁,调整值到{final_value_by_api}") + self.talk_frequency_adjust = max(0.1, min(1.5, self.talk_frequency_adjust * 0.8)) + elif "过少" in response: + logger.info(f"频率调整: 过少,调整值到{final_value_by_api}") + self.talk_frequency_adjust = max(0.1, min(1.5, self.talk_frequency_adjust * 1.2)) + except Exception as e: + logger.error(f"频率调整失败: {e}") + # 即使失败也保持时间戳更新,避免频繁重试 class FrequencyControlManager: diff --git a/src/express/expression_learner.py b/src/express/expression_learner.py index 76cc8408..af69cad6 100644 --- a/src/express/expression_learner.py +++ b/src/express/expression_learner.py @@ -2,6 +2,7 @@ import time import json import os import re +import asyncio from typing import List, Optional, Tuple import traceback from src.common.logger import get_logger @@ -91,6 +92,9 @@ class ExpressionLearner: # 维护每个chat的上次学习时间 self.last_learning_time: float = time.time() + # 学习锁,防止并发执行学习任务 + self._learning_lock = asyncio.Lock() + # 学习参数 _, self.enable_learning, self.learning_intensity = global_config.expression.get_expression_config_for_chat( self.chat_id @@ -139,32 +143,45 @@ class ExpressionLearner: Returns: bool: 是否成功触发学习 """ - if not self.should_trigger_learning(): - return + # 使用异步锁防止并发执行 + async with self._learning_lock: + # 在锁内检查,避免并发触发 + # 如果锁被持有,其他协程会等待,但等待期间条件可能已变化,所以需要再次检查 + if not self.should_trigger_learning(): + return - try: - logger.info(f"在聊天流 {self.chat_name} 学习表达方式") - # 学习语言风格 - learnt_style = await self.learn_and_store(num=25) + # 保存学习开始前的时间戳,用于获取消息范围 + learning_start_timestamp = time.time() + previous_learning_time = self.last_learning_time + + # 立即更新学习时间,防止并发触发 + self.last_learning_time = learning_start_timestamp - # 更新学习时间 - self.last_learning_time = time.time() + try: + logger.info(f"在聊天流 {self.chat_name} 学习表达方式") + # 学习语言风格,传递学习开始前的时间戳 + learnt_style = await self.learn_and_store(num=25, timestamp_start=previous_learning_time) - if learnt_style: - logger.info(f"聊天流 {self.chat_name} 表达学习完成") - else: - logger.warning(f"聊天流 {self.chat_name} 表达学习未获得有效结果") + if learnt_style: + logger.info(f"聊天流 {self.chat_name} 表达学习完成") + else: + logger.warning(f"聊天流 {self.chat_name} 表达学习未获得有效结果") - except Exception as e: - logger.error(f"为聊天流 {self.chat_name} 触发学习失败: {e}") - traceback.print_exc() - return + except Exception as e: + logger.error(f"为聊天流 {self.chat_name} 触发学习失败: {e}") + traceback.print_exc() + # 即使失败也保持时间戳更新,避免频繁重试 + return - async def learn_and_store(self, num: int = 10) -> List[Tuple[str, str, str]]: + async def learn_and_store(self, num: int = 10, timestamp_start: Optional[float] = None) -> List[Tuple[str, str, str]]: """ 学习并存储表达方式 + + Args: + num: 学习数量 + timestamp_start: 学习开始的时间戳,如果为None则使用self.last_learning_time """ - learnt_expressions = await self.learn_expression(num) + learnt_expressions = await self.learn_expression(num, timestamp_start=timestamp_start) if learnt_expressions is None: logger.info("没有学习到表达风格") @@ -374,18 +391,22 @@ class ExpressionLearner: return matched_expressions - async def learn_expression(self, num: int = 10) -> Optional[List[Tuple[str, str, str, str]]]: + async def learn_expression(self, num: int = 10, timestamp_start: Optional[float] = None) -> Optional[List[Tuple[str, str, str, str]]]: """从指定聊天流学习表达方式 Args: num: 学习数量 + timestamp_start: 学习开始的时间戳,如果为None则使用self.last_learning_time """ current_time = time.time() + + # 使用传入的时间戳,如果没有则使用self.last_learning_time + start_timestamp = timestamp_start if timestamp_start is not None else self.last_learning_time # 获取上次学习之后的消息 random_msg = get_raw_msg_by_timestamp_with_chat_inclusive( chat_id=self.chat_id, - timestamp_start=self.last_learning_time, + timestamp_start=start_timestamp, timestamp_end=current_time, limit=num, ) diff --git a/src/jargon/jargon_miner.py b/src/jargon/jargon_miner.py index 77cb15ce..b7f6c857 100644 --- a/src/jargon/jargon_miner.py +++ b/src/jargon/jargon_miner.py @@ -182,6 +182,9 @@ class JargonMiner: self.stream_name = stream_name if stream_name else self.chat_id self.cache_limit = 100 self.cache: OrderedDict[str, None] = OrderedDict() + + # 黑话提取锁,防止并发执行 + self._extraction_lock = asyncio.Lock() def _add_to_cache(self, content: str) -> None: """将提取到的黑话加入缓存,保持LRU语义""" @@ -436,261 +439,265 @@ class JargonMiner: return bool(recent_messages and len(recent_messages) >= self.min_messages_for_learning) async def run_once(self) -> None: - try: - if not self.should_trigger(): - return - - chat_stream = get_chat_manager().get_stream(self.chat_id) - if not chat_stream: - return - - # 记录本次提取的时间窗口,避免重复提取 - extraction_start_time = self.last_learning_time - extraction_end_time = time.time() - - # 拉取学习窗口内的消息 - messages = get_raw_msg_by_timestamp_with_chat_inclusive( - chat_id=self.chat_id, - timestamp_start=extraction_start_time, - timestamp_end=extraction_end_time, - limit=20, - ) - if not messages: - return - - # 按时间排序,确保编号与上下文一致 - messages = sorted(messages, key=lambda msg: msg.time or 0) - - chat_str, message_id_list = build_readable_messages_with_id( - messages=messages, - replace_bot_name=True, - timestamp_mode="relative", - truncate=False, - show_actions=False, - show_pic=True, - pic_single=True, - ) - if not chat_str.strip(): - return - - msg_id_to_index: Dict[str, int] = {} - for idx, (msg_id, _msg) in enumerate(message_id_list or []): - if not msg_id: - continue - msg_id_to_index[msg_id] = idx - if not msg_id_to_index: - logger.warning("未能生成消息ID映射,跳过本次提取") - return - - prompt: str = await global_prompt_manager.format_prompt( - "extract_jargon_prompt", - bot_name=global_config.bot.nickname, - chat_str=chat_str, - ) - - response, _ = await self.llm.generate_response_async(prompt, temperature=0.2) - if not response: - return - - if global_config.debug.show_jargon_prompt: - logger.info(f"jargon提取提示词: {prompt}") - logger.info(f"jargon提取结果: {response}") - - # 解析为JSON - entries: List[dict] = [] + # 使用异步锁防止并发执行 + async with self._extraction_lock: try: - resp = response.strip() - parsed = None - if resp.startswith("[") and resp.endswith("]"): - parsed = json.loads(resp) - else: - repaired = repair_json(resp) - if isinstance(repaired, str): - parsed = json.loads(repaired) - else: - parsed = repaired - - if isinstance(parsed, dict): - parsed = [parsed] - - if not isinstance(parsed, list): + # 在锁内检查,避免并发触发 + if not self.should_trigger(): return - for item in parsed: - if not isinstance(item, dict): - continue + chat_stream = get_chat_manager().get_stream(self.chat_id) + if not chat_stream: + return - content = str(item.get("content", "")).strip() - msg_id_value = item.get("msg_id") - - if not content: - continue - - if contains_bot_self_name(content): - logger.info(f"解析阶段跳过包含机器人昵称/别名的词条: {content}") - continue - - msg_id_str = str(msg_id_value or "").strip() - if not msg_id_str: - logger.warning(f"解析jargon失败:msg_id缺失,content={content}") - continue - - msg_index = msg_id_to_index.get(msg_id_str) - if msg_index is None: - logger.warning(f"解析jargon失败:msg_id未找到,content={content}, msg_id={msg_id_str}") - continue - - target_msg = messages[msg_index] - if is_bot_message(target_msg): - logger.info(f"解析阶段跳过引用机器人自身消息的词条: content={content}, msg_id={msg_id_str}") - continue - - context_paragraph = build_context_paragraph(messages, msg_index) - if not context_paragraph: - logger.warning(f"解析jargon失败:上下文为空,content={content}, msg_id={msg_id_str}") - continue - - entries.append({"content": content, "raw_content": [context_paragraph]}) - cached_entries = self._collect_cached_entries(messages) - if cached_entries: - entries.extend(cached_entries) - except Exception as e: - logger.error(f"解析jargon JSON失败: {e}; 原始: {response}") - return - - if not entries: - return - - # 去重并合并raw_content(按 content 聚合) - merged_entries: OrderedDict[str, Dict[str, List[str]]] = OrderedDict() - for entry in entries: - content_key = entry["content"] - raw_list = entry.get("raw_content", []) or [] - if content_key in merged_entries: - merged_entries[content_key]["raw_content"].extend(raw_list) - else: - merged_entries[content_key] = { - "content": content_key, - "raw_content": list(raw_list), - } - - uniq_entries = [] - for merged_entry in merged_entries.values(): - raw_content_list = merged_entry["raw_content"] - if raw_content_list: - merged_entry["raw_content"] = list(dict.fromkeys(raw_content_list)) - uniq_entries.append(merged_entry) - - saved = 0 - updated = 0 - for entry in uniq_entries: - content = entry["content"] - raw_content_list = entry["raw_content"] # 已经是列表 - - try: - # 查询所有content匹配的记录 - query = Jargon.select().where(Jargon.content == content) - - # 查找匹配的记录 - matched_obj = None - for obj in query: - if global_config.jargon.all_global: - # 开启all_global:所有content匹配的记录都可以 - matched_obj = obj - break - else: - # 关闭all_global:需要检查chat_id列表是否包含目标chat_id - chat_id_list = parse_chat_id_list(obj.chat_id) - if chat_id_list_contains(chat_id_list, self.chat_id): - matched_obj = obj - break - - if matched_obj: - obj = matched_obj - try: - obj.count = (obj.count or 0) + 1 - except Exception: - obj.count = 1 - - # 合并raw_content列表:读取现有列表,追加新值,去重 - existing_raw_content = [] - if obj.raw_content: - try: - existing_raw_content = ( - json.loads(obj.raw_content) if isinstance(obj.raw_content, str) else obj.raw_content - ) - if not isinstance(existing_raw_content, list): - existing_raw_content = [existing_raw_content] if existing_raw_content else [] - except (json.JSONDecodeError, TypeError): - existing_raw_content = [obj.raw_content] if obj.raw_content else [] - - # 合并并去重 - merged_list = list(dict.fromkeys(existing_raw_content + raw_content_list)) - obj.raw_content = json.dumps(merged_list, ensure_ascii=False) - - # 更新chat_id列表:增加当前chat_id的计数 - chat_id_list = parse_chat_id_list(obj.chat_id) - updated_chat_id_list = update_chat_id_list(chat_id_list, self.chat_id, increment=1) - obj.chat_id = json.dumps(updated_chat_id_list, ensure_ascii=False) - - # 开启all_global时,确保记录标记为is_global=True - if global_config.jargon.all_global: - obj.is_global = True - # 关闭all_global时,保持原有is_global不变(不修改) - - obj.save() - - # 检查是否需要推断(达到阈值且超过上次判定值) - if _should_infer_meaning(obj): - # 异步触发推断,不阻塞主流程 - # 重新加载对象以确保数据最新 - jargon_id = obj.id - asyncio.create_task(self._infer_meaning_by_id(jargon_id)) - - updated += 1 - else: - # 没找到匹配记录,创建新记录 - if global_config.jargon.all_global: - # 开启all_global:新记录默认为is_global=True - is_global_new = True - else: - # 关闭all_global:新记录is_global=False - is_global_new = False - - # 使用新格式创建chat_id列表:[[chat_id, count]] - chat_id_list = [[self.chat_id, 1]] - chat_id_json = json.dumps(chat_id_list, ensure_ascii=False) - - Jargon.create( - content=content, - raw_content=json.dumps(raw_content_list, ensure_ascii=False), - chat_id=chat_id_json, - is_global=is_global_new, - count=1, - ) - saved += 1 - except Exception as e: - logger.error(f"保存jargon失败: chat_id={self.chat_id}, content={content}, err={e}") - continue - finally: - self._add_to_cache(content) - - # 固定输出提取的jargon结果,格式化为可读形式(只要有提取结果就输出) - if uniq_entries: - # 收集所有提取的jargon内容 - jargon_list = [entry["content"] for entry in uniq_entries] - jargon_str = ",".join(jargon_list) - - # 输出格式化的结果(使用logger.info会自动应用jargon模块的颜色) - logger.info(f"[{self.stream_name}]疑似黑话: {jargon_str}") - - # 更新为本次提取的结束时间,确保不会重复提取相同的消息窗口 + # 记录本次提取的时间窗口,避免重复提取 + extraction_start_time = self.last_learning_time + extraction_end_time = time.time() + + # 立即更新学习时间,防止并发触发 self.last_learning_time = extraction_end_time - if saved or updated: - logger.info(f"jargon写入: 新增 {saved} 条,更新 {updated} 条,chat_id={self.chat_id}") - except Exception as e: - logger.error(f"JargonMiner 运行失败: {e}") + # 拉取学习窗口内的消息 + messages = get_raw_msg_by_timestamp_with_chat_inclusive( + chat_id=self.chat_id, + timestamp_start=extraction_start_time, + timestamp_end=extraction_end_time, + limit=20, + ) + if not messages: + return + + # 按时间排序,确保编号与上下文一致 + messages = sorted(messages, key=lambda msg: msg.time or 0) + + chat_str, message_id_list = build_readable_messages_with_id( + messages=messages, + replace_bot_name=True, + timestamp_mode="relative", + truncate=False, + show_actions=False, + show_pic=True, + pic_single=True, + ) + if not chat_str.strip(): + return + + msg_id_to_index: Dict[str, int] = {} + for idx, (msg_id, _msg) in enumerate(message_id_list or []): + if not msg_id: + continue + msg_id_to_index[msg_id] = idx + if not msg_id_to_index: + logger.warning("未能生成消息ID映射,跳过本次提取") + return + + prompt: str = await global_prompt_manager.format_prompt( + "extract_jargon_prompt", + bot_name=global_config.bot.nickname, + chat_str=chat_str, + ) + + response, _ = await self.llm.generate_response_async(prompt, temperature=0.2) + if not response: + return + + if global_config.debug.show_jargon_prompt: + logger.info(f"jargon提取提示词: {prompt}") + logger.info(f"jargon提取结果: {response}") + + # 解析为JSON + entries: List[dict] = [] + try: + resp = response.strip() + parsed = None + if resp.startswith("[") and resp.endswith("]"): + parsed = json.loads(resp) + else: + repaired = repair_json(resp) + if isinstance(repaired, str): + parsed = json.loads(repaired) + else: + parsed = repaired + + if isinstance(parsed, dict): + parsed = [parsed] + + if not isinstance(parsed, list): + return + + for item in parsed: + if not isinstance(item, dict): + continue + + content = str(item.get("content", "")).strip() + msg_id_value = item.get("msg_id") + + if not content: + continue + + if contains_bot_self_name(content): + logger.info(f"解析阶段跳过包含机器人昵称/别名的词条: {content}") + continue + + msg_id_str = str(msg_id_value or "").strip() + if not msg_id_str: + logger.warning(f"解析jargon失败:msg_id缺失,content={content}") + continue + + msg_index = msg_id_to_index.get(msg_id_str) + if msg_index is None: + logger.warning(f"解析jargon失败:msg_id未找到,content={content}, msg_id={msg_id_str}") + continue + + target_msg = messages[msg_index] + if is_bot_message(target_msg): + logger.info(f"解析阶段跳过引用机器人自身消息的词条: content={content}, msg_id={msg_id_str}") + continue + + context_paragraph = build_context_paragraph(messages, msg_index) + if not context_paragraph: + logger.warning(f"解析jargon失败:上下文为空,content={content}, msg_id={msg_id_str}") + continue + + entries.append({"content": content, "raw_content": [context_paragraph]}) + cached_entries = self._collect_cached_entries(messages) + if cached_entries: + entries.extend(cached_entries) + except Exception as e: + logger.error(f"解析jargon JSON失败: {e}; 原始: {response}") + return + + if not entries: + return + + # 去重并合并raw_content(按 content 聚合) + merged_entries: OrderedDict[str, Dict[str, List[str]]] = OrderedDict() + for entry in entries: + content_key = entry["content"] + raw_list = entry.get("raw_content", []) or [] + if content_key in merged_entries: + merged_entries[content_key]["raw_content"].extend(raw_list) + else: + merged_entries[content_key] = { + "content": content_key, + "raw_content": list(raw_list), + } + + uniq_entries = [] + for merged_entry in merged_entries.values(): + raw_content_list = merged_entry["raw_content"] + if raw_content_list: + merged_entry["raw_content"] = list(dict.fromkeys(raw_content_list)) + uniq_entries.append(merged_entry) + + saved = 0 + updated = 0 + for entry in uniq_entries: + content = entry["content"] + raw_content_list = entry["raw_content"] # 已经是列表 + + try: + # 查询所有content匹配的记录 + query = Jargon.select().where(Jargon.content == content) + + # 查找匹配的记录 + matched_obj = None + for obj in query: + if global_config.jargon.all_global: + # 开启all_global:所有content匹配的记录都可以 + matched_obj = obj + break + else: + # 关闭all_global:需要检查chat_id列表是否包含目标chat_id + chat_id_list = parse_chat_id_list(obj.chat_id) + if chat_id_list_contains(chat_id_list, self.chat_id): + matched_obj = obj + break + + if matched_obj: + obj = matched_obj + try: + obj.count = (obj.count or 0) + 1 + except Exception: + obj.count = 1 + + # 合并raw_content列表:读取现有列表,追加新值,去重 + existing_raw_content = [] + if obj.raw_content: + try: + existing_raw_content = ( + json.loads(obj.raw_content) if isinstance(obj.raw_content, str) else obj.raw_content + ) + if not isinstance(existing_raw_content, list): + existing_raw_content = [existing_raw_content] if existing_raw_content else [] + except (json.JSONDecodeError, TypeError): + existing_raw_content = [obj.raw_content] if obj.raw_content else [] + + # 合并并去重 + merged_list = list(dict.fromkeys(existing_raw_content + raw_content_list)) + obj.raw_content = json.dumps(merged_list, ensure_ascii=False) + + # 更新chat_id列表:增加当前chat_id的计数 + chat_id_list = parse_chat_id_list(obj.chat_id) + updated_chat_id_list = update_chat_id_list(chat_id_list, self.chat_id, increment=1) + obj.chat_id = json.dumps(updated_chat_id_list, ensure_ascii=False) + + # 开启all_global时,确保记录标记为is_global=True + if global_config.jargon.all_global: + obj.is_global = True + # 关闭all_global时,保持原有is_global不变(不修改) + + obj.save() + + # 检查是否需要推断(达到阈值且超过上次判定值) + if _should_infer_meaning(obj): + # 异步触发推断,不阻塞主流程 + # 重新加载对象以确保数据最新 + jargon_id = obj.id + asyncio.create_task(self._infer_meaning_by_id(jargon_id)) + + updated += 1 + else: + # 没找到匹配记录,创建新记录 + if global_config.jargon.all_global: + # 开启all_global:新记录默认为is_global=True + is_global_new = True + else: + # 关闭all_global:新记录is_global=False + is_global_new = False + + # 使用新格式创建chat_id列表:[[chat_id, count]] + chat_id_list = [[self.chat_id, 1]] + chat_id_json = json.dumps(chat_id_list, ensure_ascii=False) + + Jargon.create( + content=content, + raw_content=json.dumps(raw_content_list, ensure_ascii=False), + chat_id=chat_id_json, + is_global=is_global_new, + count=1, + ) + saved += 1 + except Exception as e: + logger.error(f"保存jargon失败: chat_id={self.chat_id}, content={content}, err={e}") + continue + finally: + self._add_to_cache(content) + + # 固定输出提取的jargon结果,格式化为可读形式(只要有提取结果就输出) + if uniq_entries: + # 收集所有提取的jargon内容 + jargon_list = [entry["content"] for entry in uniq_entries] + jargon_str = ",".join(jargon_list) + + # 输出格式化的结果(使用logger.info会自动应用jargon模块的颜色) + logger.info(f"[{self.stream_name}]疑似黑话: {jargon_str}") + + if saved or updated: + logger.info(f"jargon写入: 新增 {saved} 条,更新 {updated} 条,chat_id={self.chat_id}") + except Exception as e: + logger.error(f"JargonMiner 运行失败: {e}") + # 即使失败也保持时间戳更新,避免频繁重试 class JargonMinerManager: