diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index 399966e3..9df77726 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -20,26 +20,36 @@ from ...config.config import global_config logger = get_module_logger("chat_utils") # 预编译正则表达式以提高性能 -_LETTER_NOT_HAN_REGEX = regex.compile(r'[\p{L}&&\P{Han}]') -_HAN_CHAR_REGEX = regex.compile(r'\p{Han}') +_L_REGEX = regex.compile(r'\p{L}') # 匹配任何Unicode字母 +_HAN_CHAR_REGEX = regex.compile(r'\p{Han}') # 匹配汉字 (Unicode属性) + +def is_letter_not_han(char_str: str) -> bool: + """ + 检查字符是否为“字母”且“非汉字”。 + 例如拉丁字母、西里尔字母、韩文等返回True。 + 汉字、数字、标点、空格等返回False。 + """ + if not isinstance(char_str, str) or len(char_str) != 1: + return False + + is_letter = _L_REGEX.fullmatch(char_str) is not None + if not is_letter: + return False + + # 使用 \p{Han} 属性进行汉字判断,更为准确 + is_han = _HAN_CHAR_REGEX.fullmatch(char_str) is not None + return not is_han + +def is_han_character(char_str: str) -> bool: + """检查字符是否为汉字 (使用 \p{Han} Unicode 属性)""" + if not isinstance(char_str, str) or len(char_str) != 1: + return False + return _HAN_CHAR_REGEX.fullmatch(char_str) is not None def is_english_letter(char: str) -> bool: """检查字符是否为英文字母(忽略大小写)""" return "a" <= char.lower() <= "z" -def is_letter_not_han(char_str: str) -> bool: - """检查字符是否为非汉字字母 (例如拉丁字母、西里尔字母、韩文等)""" - if not isinstance(char_str, str) or len(char_str) != 1: - return False - return _LETTER_NOT_HAN_REGEX.fullmatch(char_str) is not None - - -def is_han_character(char_str: str) -> bool: - """检查字符是否为汉字""" - if not isinstance(char_str, str) or len(char_str) != 1: - return False - return _HAN_CHAR_REGEX.fullmatch(char_str) is not None - def db_message_to_str(message_dict: dict) -> str: logger.debug(f"message_dict: {message_dict}") time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(message_dict["time"])) @@ -190,120 +200,126 @@ def get_recent_group_speaker(chat_stream_id: int, sender, limit: int = 12) -> li def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: """将文本分割成句子,并根据概率合并 - 1. 识别分割点(, , 。 ; 空格),但如果分割点左右都是英文字母则不分割。 - 2. 将文本分割成 (内容, 分隔符) 的元组。 - 3. 根据原始文本长度计算合并概率,概率性地合并相邻段落。 - 注意:此函数假定颜文字已在上层被保护。 Args: text: 要分割的文本字符串 (假定颜文字已被保护) Returns: List[str]: 分割和合并后的句子列表 """ - # 预处理:处理多余的换行符 + # 预处理: # 1. 将连续的换行符替换为单个换行符 - text = regex.sub(r"\n\s*\n+", "\n", text) # 使用 regex 保持一致性,虽然 re 也能处理 - # 2. 处理换行符和其他分隔符的组合 - text = regex.sub(r"\n\s*([,,。;\s])", r"\1", text) - text = regex.sub(r"([,,。;\s])\s*\n", r"\1", text) - - # 处理两个汉字中间的换行符 - # text = re.sub(r"([\u4e00-\u9fff])\n([\u4e00-\u9fff])", r"\1。\2", text) # 原代码 - text = regex.sub(r"(\p{Han})\n(\p{Han})", r"\1。\2", text) # 修改后:使用 regex 和 \p{Han} + text = regex.sub(r"\n\s*\n+", "\n", text) + # 2. 处理换行符和其他分隔符的组合 (增加了 . 和 —) + text = regex.sub(r"\n\s*([—。.,,;\s\xa0])", r"\1", text) + text = regex.sub(r"([—。.,,;\s\xa0])\s*\n", r"\1", text) + # 3. 处理两个汉字中间的换行符 + def replace_han_newline(match): + char1 = match.group(1) + char2 = match.group(2) + if is_han_character(char1) and is_han_character(char2): + return char1 + "。" + char2 + return match.group(0) + text = regex.sub(r"(.)\n(.)", replace_han_newline, text) len_text = len(text) if len_text < 3: if random.random() < 0.01: - return list(text) # 如果文本很短且触发随机条件,直接按字符分割 + return list(text) else: return [text] - # 定义分隔符 - separators = {",", ",", " ", "。", ";"} + # 定义分隔符,增加了 \n, ., — + separators = {",", ",", " ", "。", ";", "\xa0", "\n", ".", "—"} segments = [] current_segment = "" - # 1. 分割成 (内容, 分隔符) 元组 i = 0 while i < len(text): char = text[i] + if char in separators: - # 检查分割条件:如果分隔符左右都是非汉字字母 (如英文、俄文、韩文等),则不分割 can_split = True - if 0 < i < len(text) - 1: - prev_char = text[i - 1] - next_char = text[i + 1] - # if is_english_letter(prev_char) and is_english_letter(next_char): # 原代码 - if is_letter_not_han(prev_char) and is_letter_not_han(next_char): # 修改后:使用 is_letter_not_han - can_split = False + + if char == ' ' or char == '\xa0': + if 0 < i < len(text) - 1: + prev_char = text[i - 1] + next_char = text[i + 1] + is_prev_letter_not_han = is_letter_not_han(prev_char) + is_next_letter_not_han = is_letter_not_han(next_char) + if is_prev_letter_not_han and is_next_letter_not_han: + can_split = False if can_split: - # 只有当当前段不为空时才添加 if current_segment: segments.append((current_segment, char)) - # 如果当前段为空,但分隔符是空格,则也添加一个空段(保留空格) - elif char == " ": + elif char not in [' ', '\xa0']: + segments.append(("", char)) + elif char in [' ', '\xa0']: segments.append(("", char)) current_segment = "" else: - # 不分割,将分隔符加入当前段 current_segment += char else: current_segment += char i += 1 - # 添加最后一个段(没有后续分隔符) if current_segment: segments.append((current_segment, "")) - # 过滤掉完全空的段(内容和分隔符都为空) - segments = [(content, sep) for content, sep in segments if content or sep] + temp_segments_for_filter = [] + for content, sep in segments: + if content.strip(): + temp_segments_for_filter.append((content,sep)) + elif sep and sep not in [' ', '\xa0']: + temp_segments_for_filter.append((content,sep)) + segments = temp_segments_for_filter - # 如果分割后为空(例如,输入全是分隔符且不满足保留条件),恢复颜文字并返回 if not segments: - return [text] if text else [] + return [text] if text.strip() else [] + + preliminary_final_sentences = [] + current_sentence_build = "" + for content, sep in segments: + current_sentence_build += content + if sep and sep not in [' ', '\xa0']: + current_sentence_build += sep + if current_sentence_build.strip(): + preliminary_final_sentences.append(current_sentence_build.strip()) + current_sentence_build = "" + elif sep: + current_sentence_build += sep + if current_sentence_build.strip(): + preliminary_final_sentences.append(current_sentence_build.strip()) + + if not preliminary_final_sentences: + return [] - # 2. 概率合并 if len_text < 12: split_strength = 0.2 elif len_text < 32: split_strength = 0.5 else: split_strength = 0.7 - # 合并概率与分割强度相反 merge_probability = 1.0 - split_strength - merged_segments = [] - idx = 0 - while idx < len(segments): - current_content, current_sep = segments[idx] + if merge_probability == 1.0: + return [" ".join(preliminary_final_sentences).strip()] if preliminary_final_sentences else [] - # 检查是否可以与下一段合并 - # 条件:不是最后一段,且随机数小于合并概率,且当前段有内容(避免合并空段) - if idx + 1 < len(segments) and random.random() < merge_probability and current_content: - next_content, next_sep = segments[idx + 1] - # 合并: (内容1 + 分隔符1 + 内容2, 分隔符2) - # 只有当下一段也有内容时才合并文本,否则只传递分隔符 - if next_content: - merged_content = current_content + current_sep + next_content - merged_segments.append((merged_content, next_sep)) - else: # 下一段内容为空,只保留当前内容和下一段的分隔符 - merged_segments.append((current_content, next_sep)) + final_sentences_merged = [] + temp_sentence = "" + if preliminary_final_sentences: + temp_sentence = preliminary_final_sentences[0] + for i in range(1, len(preliminary_final_sentences)): + if random.random() < merge_probability and temp_sentence: + temp_sentence += " " + preliminary_final_sentences[i] + else: + if temp_sentence: + final_sentences_merged.append(temp_sentence) + temp_sentence = preliminary_final_sentences[i] + if temp_sentence: + final_sentences_merged.append(temp_sentence) - idx += 2 # 跳过下一段,因为它已被合并 - else: - # 不合并,直接添加当前段 - merged_segments.append((current_content, current_sep)) - idx += 1 + final_sentences = [s.strip() for s in final_sentences_merged if s.strip()] - # 提取最终的句子内容 - final_sentences = [content for content, sep in merged_segments if content] # 只保留有内容的段 - - # 清理可能引入的空字符串和仅包含空白的字符串 - final_sentences = [ - s for s in final_sentences if s.strip() - ] # 过滤掉空字符串以及仅包含空白(如换行符、空格)的字符串 - - logger.debug(f"分割并合并后的句子: {final_sentences}") return final_sentences