From abb2ba3ce1620e99a89c311ceeda32df59ab640b Mon Sep 17 00:00:00 2001 From: Bakadax Date: Thu, 15 May 2025 19:00:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9C=A8=E5=8F=A5=E5=AD=90=E5=88=86=E5=89=B2?= =?UTF-8?q?=E9=87=8C=E6=8B=89=E7=9F=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/utils/utils.py | 130 ++++++++++++++++++++++++---------------- 1 file changed, 80 insertions(+), 50 deletions(-) diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index 20338271..a262e306 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -50,6 +50,13 @@ def is_han_character(char_str: str) -> bool: return _HAN_CHAR_REGEX.fullmatch(char_str) is not None +def is_digit(char_str: str) -> bool: + """检查字符是否为Unicode数字""" + if not isinstance(char_str, str) or len(char_str) != 1: + return False + return _Nd_REGEX.fullmatch(char_str) is not None + + def is_relevant_word_char(char_str: str) -> bool: # 新增辅助函数 """ 检查字符是否为“相关词语字符”(非汉字字母 或 数字)。 @@ -229,16 +236,16 @@ def get_recent_group_speaker(chat_stream_id: int, sender, limit: int = 12) -> li def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: """将文本分割成句子,并根据概率合并 Args: - text: 要分割的文本字符串 (假定颜文字已被保护) + text: 要分割的文本字符串 Returns: List[str]: 分割和合并后的句子列表 """ # print(f"DEBUG: 输入文本 (repr): {repr(text)}") - # 预处理: + # 预处理 text = regex.sub(r"\n\s*\n+", "\n", text) - text = regex.sub(r"\n\s*([—。.,,;\s\xa0])", r"\1", text) - text = regex.sub(r"([—。.,,;\s\xa0])\s*\n", r"\1", text) + text = regex.sub(r"\n\s*([—。.,,;\s\xa0!?])", r"\1", text) + text = regex.sub(r"([—。.,,;\s\xa0!?])\s*\n", r"\1", text) def replace_han_newline(match): char1 = match.group(1) char2 = match.group(2) @@ -250,13 +257,13 @@ def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: len_text = len(text) if len_text < 3: - if random.random() < 0.01: - return list(text) - else: - return [text] + stripped_text = text.strip() + if not stripped_text: return [] + if len(stripped_text) == 1 and stripped_text in {"。", ",", ",", ".", ";", "!", "?"}: + return [] + return [stripped_text] - separators = {"。", ",", ",", " ", ";", "\xa0", "\n", ".", "—", "!", "?"} # 保持原有分隔符集合 - # logger.debug(f"DEBUG: 使用的分隔符集合: {separators}") + separators = {"。", ",", ",", " ", ";", "\xa0", "\n", ".", "—", "!", "?"} segments = [] current_segment = "" @@ -264,16 +271,29 @@ def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: while i < len(text): char = text[i] if char in separators: - can_split = True - if char == ' ' or char == '\xa0': # 仅当分隔符是空格或NBSP时,检查两侧字符 + can_split = True # 默认情况下,分隔符会导致分割 + + if char == '.': + # 检查 '.' 是否处于需要特殊处理的上下文中 (例如,小数点或缩写词) + # 只有当 '.' 同时拥有前一个和后一个字符时,这些上下文检查才有意义 + if 0 < i < len(text) - 1: + prev_char_val = text[i-1] + next_char_val = text[i+1] + # 规则1: 小数点 (数字.数字) -> 不分割 + if is_digit(prev_char_val) and is_digit(next_char_val): + can_split = False + # 规则2: 西文缩写/域名 (西文字母.西文字母) -> 不分割 + # 例如 U.S.A., example.com + elif is_letter_not_han(prev_char_val) and is_letter_not_han(next_char_val): + can_split = False + # 如果不满足上述不分割的条件 (例如句末的'.', 或'. '后的空格),can_split 保持 True,执行分割 + elif char == ' ' or char == '\xa0': # 处理空格/NBSP if 0 < i < len(text) - 1: prev_char = text[i - 1] next_char = text[i + 1] - # 检查前后字符是否都是“相关词语字符”(非汉字字母或数字) - # 如果是,则不应在此处分割,因为这可能是一个单词内部的空格(例如 "word1 word2") if is_relevant_word_char(prev_char) and is_relevant_word_char(next_char): - can_split = False - + can_split = False # 非中文单词内部的空格不分割 + if can_split: if current_segment: segments.append((current_segment, char)) @@ -281,81 +301,91 @@ def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: segments.append(("", char)) current_segment = "" else: - # 如果不能分割 (can_split is False),则将当前字符(空格/NBSP)加入到当前段落 - current_segment += char + current_segment += char # 不分割,将当前分隔符加入到当前段落 else: current_segment += char i += 1 - + if current_segment: segments.append((current_segment, "")) filtered_segments = [] for content, sep in segments: - if content.strip(): - filtered_segments.append((content, sep)) - elif sep and sep not in [' ', '\xa0']: + stripped_content = content.strip() + if stripped_content: + filtered_segments.append((stripped_content, sep)) + elif sep and (sep not in [' ', '\xa0'] or sep == '\n'): filtered_segments.append(("", sep)) segments = filtered_segments - + if not segments: - return [text] if text.strip() else [] + return [text.strip()] if text.strip() else [] preliminary_final_sentences = [] current_sentence_build = "" - for content, sep in segments: - current_sentence_build += content - - if sep and sep not in [' ', '\xa0']: - current_sentence_build += sep - if current_sentence_build.strip(): + for k, (content, sep) in enumerate(segments): + current_sentence_build += content + + is_strong_separator = sep in {"。", ".", "!", "?", "\n", "—"} + + if content: + if sep and sep not in [' ', '\xa0']: + current_sentence_build += sep + if current_sentence_build.strip(): + preliminary_final_sentences.append(current_sentence_build.strip()) + current_sentence_build = "" + elif sep: + if current_sentence_build.strip() and not content.endswith(sep): + preliminary_final_sentences.append(current_sentence_build.strip()) + current_sentence_build = "" + elif sep: + if current_sentence_build.strip() and is_strong_separator: + current_sentence_build += sep preliminary_final_sentences.append(current_sentence_build.strip()) - current_sentence_build = "" - elif sep: - if current_sentence_build.strip(): - preliminary_final_sentences.append(current_sentence_build.strip()) - current_sentence_build = "" + current_sentence_build = "" + elif not current_sentence_build.strip() and sep not in [' ', '\xa0']: + preliminary_final_sentences.append(sep) if current_sentence_build.strip(): preliminary_final_sentences.append(current_sentence_build.strip()) - logger.debug(f"初步分割(未合并,已strip)后的句子: {preliminary_final_sentences}") + preliminary_final_sentences = [s for s in preliminary_final_sentences if s.strip()] + # print(f"DEBUG: 初步分割(未合并,已strip)后的句子: {preliminary_final_sentences}") if not preliminary_final_sentences: return [] - if len_text < 12: - split_strength = 0.5 - elif len_text < 32: - split_strength = 0.7 - else: - split_strength = 0.9 + if len_text < 12: split_strength = 0.2 + elif len_text < 32: split_strength = 0.5 + else: split_strength = 0.7 merge_probability = 1.0 - split_strength - if merge_probability == 1.0 and len(preliminary_final_sentences) > 1 : # 只有多个句子才合并 + if merge_probability == 1.0 and len(preliminary_final_sentences) > 1: merged_text = ",".join(preliminary_final_sentences).strip() - # 移除末尾的逗号(中英文) if merged_text.endswith(',') or merged_text.endswith(','): merged_text = merged_text[:-1].strip() return [merged_text] if merged_text else [] - elif len(preliminary_final_sentences) == 1: # 如果只有一个初步句子,直接返回 + elif len(preliminary_final_sentences) == 1: s = preliminary_final_sentences[0].strip() if s.endswith(',') or s.endswith(','): s = s[:-1].strip() return [s] if s else [] - final_sentences_merged = [] temp_sentence = "" if preliminary_final_sentences: temp_sentence = preliminary_final_sentences[0] - for i in range(1, len(preliminary_final_sentences)): - if random.random() < merge_probability and temp_sentence: - temp_sentence += " " + preliminary_final_sentences[i] + for i_merge in range(1, len(preliminary_final_sentences)): + should_merge_based_on_punctuation = True + if temp_sentence and temp_sentence[-1] in {"。", ".", "!", "?"}: + should_merge_based_on_punctuation = False + + if random.random() < merge_probability and temp_sentence and should_merge_based_on_punctuation: + temp_sentence += " " + preliminary_final_sentences[i_merge] else: if temp_sentence: final_sentences_merged.append(temp_sentence) - temp_sentence = preliminary_final_sentences[i] + temp_sentence = preliminary_final_sentences[i_merge] if temp_sentence: final_sentences_merged.append(temp_sentence)