diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index a262e306..e6c22a89 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -23,7 +23,18 @@ logger = get_module_logger("chat_utils") _L_REGEX = regex.compile(r"\p{L}") # 匹配任何Unicode字母 _HAN_CHAR_REGEX = regex.compile(r"\p{Han}") # 匹配汉字 (Unicode属性) _Nd_REGEX = regex.compile(r'\p{Nd}') # 新增:匹配Unicode数字 (Nd = Number, decimal digit) - +SEPARATORS = {"。", ",", ",", " ", ";", "\xa0", "\n", ".", "—", "!", "?"} +KNOWN_ABBREVIATIONS_ENDING_WITH_DOT = { + "Mr.", "Mrs.", "Ms.", "Dr.", "Prof.", "St.", "Messrs.", "Mmes.", "Capt.", "Gov.", + "Inc.", "Ltd.", "Corp.", "Co.", "PLC", # PLC通常不带点,但有些可能 + "vs.", "etc.", "i.e.", "e.g.", "viz.", "al.", "et al.", "ca.", "cf.", + "No.", "Vol.", "pp.", "fig.", "figs.", "ed.", "Ph.D.", "M.D.", "B.A.", "M.A.", + "Jan.", "Feb.", "Mar.", "Apr.", "Jun.", "Jul.", "Aug.", "Sep.", "Oct.", "Nov.", "Dec.", # May. 通常不用点 + "Mon.", "Tue.", "Wed.", "Thu.", "Fri.", "Sat.", "Sun.", + "U.S.", "U.K.", "E.U.", "U.S.A.", "U.S.S.R.", + "Ave.", "Blvd.", "Rd.", "Ln.", # Street suffixes + "approx.", "dept.", "appt.", "श्री." # Hindi Shri. +} def is_letter_not_han(char_str: str) -> bool: """ @@ -234,81 +245,80 @@ def get_recent_group_speaker(chat_stream_id: int, sender, limit: int = 12) -> li def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: - """将文本分割成句子,并根据概率合并 - Args: - text: 要分割的文本字符串 - Returns: - List[str]: 分割和合并后的句子列表 - """ + """将文本分割成句子,并根据概率合并""" # print(f"DEBUG: 输入文本 (repr): {repr(text)}") # 预处理 - text = regex.sub(r"\n\s*\n+", "\n", text) - text = regex.sub(r"\n\s*([—。.,,;\s\xa0!?])", r"\1", text) - text = regex.sub(r"([—。.,,;\s\xa0!?])\s*\n", r"\1", text) + text = regex.sub(r"\n\s*\n+", "\n", text) # 合并多个换行符 + text = regex.sub(r"\n\s*([—。.,,;\s\xa0!?])", r"\1", text) + text = regex.sub(r"([—。.,,;\s\xa0!?])\s*\n", r"\1", text) def replace_han_newline(match): char1 = match.group(1) char2 = match.group(2) if is_han_character(char1) and is_han_character(char2): - return char1 + "," + char2 + return char1 + "," + char2 # 汉字间的换行符替换为逗号 return match.group(0) text = regex.sub(r"(.)\n(.)", replace_han_newline, text) - # print(f"DEBUG: 预处理后文本 (repr): {repr(text)}") len_text = len(text) if len_text < 3: stripped_text = text.strip() if not stripped_text: return [] - if len(stripped_text) == 1 and stripped_text in {"。", ",", ",", ".", ";", "!", "?"}: + if len(stripped_text) == 1 and stripped_text in SEPARATORS: return [] return [stripped_text] - separators = {"。", ",", ",", " ", ";", "\xa0", "\n", ".", "—", "!", "?"} segments = [] current_segment = "" - i = 0 while i < len(text): char = text[i] - if char in separators: - can_split = True # 默认情况下,分隔符会导致分割 + if char in SEPARATORS: + can_split_current_char = True if char == '.': - # 检查 '.' 是否处于需要特殊处理的上下文中 (例如,小数点或缩写词) - # 只有当 '.' 同时拥有前一个和后一个字符时,这些上下文检查才有意义 - if 0 < i < len(text) - 1: - prev_char_val = text[i-1] - next_char_val = text[i+1] - # 规则1: 小数点 (数字.数字) -> 不分割 - if is_digit(prev_char_val) and is_digit(next_char_val): - can_split = False - # 规则2: 西文缩写/域名 (西文字母.西文字母) -> 不分割 - # 例如 U.S.A., example.com - elif is_letter_not_han(prev_char_val) and is_letter_not_han(next_char_val): - can_split = False - # 如果不满足上述不分割的条件 (例如句末的'.', 或'. '后的空格),can_split 保持 True,执行分割 + can_split_this_dot = True + # 规则1: 小数点 (数字.数字) + if 0 < i < len_text - 1 and is_digit(text[i-1]) and is_digit(text[i+1]): + can_split_this_dot = False + # 规则2: 西文缩写/域名内部的点 (西文字母.西文字母) + elif 0 < i < len_text - 1 and is_letter_not_han(text[i-1]) and is_letter_not_han(text[i+1]): + can_split_this_dot = False + # 规则3: 已知缩写词的末尾点 (例如 "e.g. ", "U.S.A. ") + else: + potential_abbreviation_word = current_segment + char + is_followed_by_space = (i + 1 < len_text and text[i+1] == ' ') + is_at_end_of_text = (i + 1 == len_text) + + if potential_abbreviation_word in KNOWN_ABBREVIATIONS_ENDING_WITH_DOT and \ + (is_followed_by_space or is_at_end_of_text): + can_split_this_dot = False + can_split_current_char = can_split_this_dot elif char == ' ' or char == '\xa0': # 处理空格/NBSP - if 0 < i < len(text) - 1: + if 0 < i < len_text - 1: prev_char = text[i - 1] next_char = text[i + 1] + # 非中文单词内部的空格不分割 (例如 "hello world", "слово1 слово2") if is_relevant_word_char(prev_char) and is_relevant_word_char(next_char): - can_split = False # 非中文单词内部的空格不分割 - - if can_split: - if current_segment: + can_split_current_char = False + + if can_split_current_char: + if current_segment: # 如果当前段落有内容,则添加 (内容, 分隔符) segments.append((current_segment, char)) + # 如果当前段落为空,但分隔符不是简单的排版空格 (除非是换行符这种有意义的空行分隔) elif char not in [' ', '\xa0'] or char == '\n': - segments.append(("", char)) - current_segment = "" + segments.append(("", char)) # 添加 ("", 分隔符) + current_segment = "" # 重置当前段落 else: current_segment += char # 不分割,将当前分隔符加入到当前段落 else: - current_segment += char + current_segment += char # 非分隔符,加入当前段落 i += 1 - - if current_segment: + + if current_segment: # 处理末尾剩余的段落 segments.append((current_segment, "")) + # 过滤掉仅由空格组成的segment,但保留其后的有效分隔符 filtered_segments = [] for content, sep in segments: stripped_content = content.strip() @@ -317,40 +327,41 @@ def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: elif sep and (sep not in [' ', '\xa0'] or sep == '\n'): filtered_segments.append(("", sep)) segments = filtered_segments - + if not segments: return [text.strip()] if text.strip() else [] preliminary_final_sentences = [] current_sentence_build = "" for k, (content, sep) in enumerate(segments): - current_sentence_build += content - - is_strong_separator = sep in {"。", ".", "!", "?", "\n", "—"} - - if content: - if sep and sep not in [' ', '\xa0']: + current_sentence_build += content # 先添加内容部分 + + # 判断分隔符类型 + is_strong_terminator = sep in {"。", ".", "!", "?", "\n", "—"} + is_space_separator = sep in [' ', '\xa0'] + + if is_strong_terminator: + current_sentence_build += sep # 将强终止符加入 + if current_sentence_build.strip(): + preliminary_final_sentences.append(current_sentence_build.strip()) + current_sentence_build = "" # 开始新的句子构建 + elif is_space_separator: + # 如果是空格,并且当前构建的句子不以空格结尾,则添加空格并继续构建 + if not current_sentence_build.endswith(sep): current_sentence_build += sep - if current_sentence_build.strip(): - preliminary_final_sentences.append(current_sentence_build.strip()) - current_sentence_build = "" - elif sep: - if current_sentence_build.strip() and not content.endswith(sep): - preliminary_final_sentences.append(current_sentence_build.strip()) - current_sentence_build = "" - elif sep: - if current_sentence_build.strip() and is_strong_separator: - current_sentence_build += sep + elif sep: # 其他分隔符 (如 ',', ';') + current_sentence_build += sep # 加入并继续构建,这些通常不独立成句 + # 如果这些弱分隔符后紧跟的就是文本末尾,则它们可能结束一个句子 + if k == len(segments) -1 and current_sentence_build.strip(): preliminary_final_sentences.append(current_sentence_build.strip()) current_sentence_build = "" - elif not current_sentence_build.strip() and sep not in [' ', '\xa0']: - preliminary_final_sentences.append(sep) - if current_sentence_build.strip(): + + if current_sentence_build.strip(): # 处理最后一个构建中的句子 preliminary_final_sentences.append(current_sentence_build.strip()) - preliminary_final_sentences = [s for s in preliminary_final_sentences if s.strip()] - # print(f"DEBUG: 初步分割(未合并,已strip)后的句子: {preliminary_final_sentences}") + preliminary_final_sentences = [s for s in preliminary_final_sentences if s.strip()] # 清理空字符串 + # print(f"DEBUG: 初步分割(优化组装后)的句子: {preliminary_final_sentences}") if not preliminary_final_sentences: return [] @@ -361,7 +372,7 @@ def split_into_sentences_w_remove_punctuation(text: str) -> list[str]: merge_probability = 1.0 - split_strength if merge_probability == 1.0 and len(preliminary_final_sentences) > 1: - merged_text = ",".join(preliminary_final_sentences).strip() + merged_text = " ".join(preliminary_final_sentences).strip() if merged_text.endswith(',') or merged_text.endswith(','): merged_text = merged_text[:-1].strip() return [merged_text] if merged_text else []