diff --git a/review_agent/regulatory_review/services/info_extract.py b/review_agent/regulatory_review/services/info_extract.py index 1a48820..90d17f2 100644 --- a/review_agent/regulatory_review/services/info_extract.py +++ b/review_agent/regulatory_review/services/info_extract.py @@ -1,10 +1,11 @@ from __future__ import annotations +import re from pathlib import Path from django.conf import settings -from review_agent.models import FileSummaryBatch +from review_agent.models import FileSummaryBatch, RegulatoryReviewBatch from review_agent.regulatory_review.services.llm_review import review_condition_fields from review_agent.regulatory_review.services.text_extract import extract_text @@ -16,6 +17,18 @@ OPTION_FIELDS = { } +def ensure_regulatory_condition_candidates(batch: RegulatoryReviewBatch) -> dict[str, dict[str, object]]: + condition_json = batch.condition_json or {} + candidates = condition_json.get("candidates") or {} + if batch.status != RegulatoryReviewBatch.Status.WAITING_USER or not _condition_candidates_incomplete(candidates): + return candidates + refreshed = detect_regulatory_condition_candidates(batch.source_summary_batch) + refreshed = _merge_condition_candidates(candidates, refreshed) + batch.condition_json = {**condition_json, "candidates": refreshed} + batch.save(update_fields=["condition_json"]) + return refreshed + + def detect_regulatory_condition_candidates(summary_batch: FileSummaryBatch) -> dict[str, dict[str, object]]: """Infers review-scope conditions from the summary batch and file names.""" @@ -30,6 +43,8 @@ def detect_regulatory_condition_candidates(summary_batch: FileSummaryBatch) -> d field_candidates.update({key: value for key, value in extracted.items() if value and key not in field_candidates}) field_sources.update({key: value for key, value in sources.items() if value and key not in field_sources}) corpus_parts.extend(extracted.values()) + if review.get("front_text"): + corpus_parts.append(str(review["front_text"])) corpus = "\n".join(part for part in corpus_parts if part) product_name = field_candidates.get("产品名称") or _safe_summary_product_name(summary_batch.product_name) @@ -80,13 +95,22 @@ def _extract_item_fields(item) -> dict[str, object]: if not path.exists(): return {} result = extract_text(path) - if result.status != "success" or not result.field_candidates: + if result.status != "success" or not result.text: return {} - return review_condition_fields( + inferred_fields = _infer_fields_from_text(result.front_text or result.text) + rule_fields = {**inferred_fields, **(result.field_candidates or {})} + review = review_condition_fields( text=result.front_text or result.text, - rule_fields=result.field_candidates, + rule_fields=rule_fields, file_context=f"{item.directory_level}\n{item.file_name}\n{item.relative_path}", ) + selected_sources = dict(review.get("selected_sources") or {}) + for key in inferred_fields: + if selected_sources.get(key) == "rule" and key not in (result.field_candidates or {}): + selected_sources[key] = "inferred" + review["selected_sources"] = selected_sources + review["front_text"] = result.front_text or result.text[:1200] + return review def _safe_summary_product_name(product_name: str) -> str: @@ -98,6 +122,99 @@ def _safe_summary_product_name(product_name: str) -> str: return value +def _infer_fields_from_text(text: str) -> dict[str, str]: + normalized = _normalize_text_for_inference(text) + fields = {} + product_name = _infer_product_name(normalized) + if product_name: + fields["产品名称"] = product_name + model_spec = _infer_model_spec(normalized) + if model_spec: + fields["型号规格"] = model_spec + return fields + + +def _normalize_text_for_inference(text: str) -> str: + value = re.sub(r"\s+", "", text or "") + value = value.replace("(", "(").replace(")", ")") + return value + + +def _infer_product_name(text: str) -> str: + patterns = [ + r"体外诊断试剂(?P[^。;;,,]{4,120}?试剂盒\([^()]{2,30}\))产品注册", + r"(?P[^。;;,,]{4,120}?试剂盒\([^()]{2,30}\))", + ] + for pattern in patterns: + match = re.search(pattern, text) + if match: + return _restore_chinese_parentheses(_trim_product_name(match.group("name"))) + return "" + + +def _trim_product_name(value: str) -> str: + prefixes = ["申请境内第三类体外诊断试剂", "申请境内第二类体外诊断试剂", "境内第三类体外诊断试剂", "境内第二类体外诊断试剂"] + result = value + for prefix in prefixes: + if prefix in result: + result = result.split(prefix, 1)[-1] + return result + + +def _infer_model_spec(text: str) -> str: + specs = sorted(set(re.findall(r"规格[A-ZA-Z]", text))) + if specs: + return "、".join(specs) + match = re.search(r"产品的包装规格(?P.{1,80}?(?:人份/盒|测试/盒|反应/盒)(?:[、,,].{1,30}?(?:人份/盒|测试/盒|反应/盒))*)", text) + if not match: + return "" + return _restore_chinese_parentheses(match.group("spec").strip("::,,。;;")) + + +def _restore_chinese_parentheses(value: str) -> str: + return value.replace("(", "(").replace(")", ")") + + +def _condition_candidates_incomplete(candidates: dict[str, dict[str, object]]) -> bool: + if not candidates: + return True + product_name = str((candidates.get("product_name") or {}).get("suggested") or "").strip() + product_category = str((candidates.get("product_category") or {}).get("suggested") or "").strip() + return not product_name or "�" in product_name or product_category == "其他" + + +def _merge_condition_candidates( + current: dict[str, dict[str, object]], + refreshed: dict[str, dict[str, object]], +) -> dict[str, dict[str, object]]: + merged = {**(current or {})} + for field, config in (refreshed or {}).items(): + current_config = merged.get(field) or {} + current_value = str(current_config.get("suggested") or "").strip() + refreshed_value = str((config or {}).get("suggested") or "").strip() + if _is_better_condition_value(current_value, refreshed_value): + merged[field] = config + elif field not in merged: + merged[field] = config + return merged + + +def _is_better_condition_value(current_value: str, refreshed_value: str) -> bool: + if not refreshed_value: + return False + if "�" in refreshed_value: + return False + if "�" in current_value: + return True + if not current_value: + return True + if current_value == "其他" and refreshed_value != "其他": + return True + if current_value == "待确认" and refreshed_value != "待确认": + return True + return len(refreshed_value) > len(current_value) and current_value in refreshed_value + + def _detect_product_category(corpus: str) -> str: if any(keyword in corpus for keyword in ["体外诊断", "检测试剂", "试剂盒", "IVD"]): return "体外诊断试剂" diff --git a/review_agent/regulatory_review/services/llm_review.py b/review_agent/regulatory_review/services/llm_review.py index 62357b2..b74fd94 100644 --- a/review_agent/regulatory_review/services/llm_review.py +++ b/review_agent/regulatory_review/services/llm_review.py @@ -156,7 +156,7 @@ def _clean_fields(fields: dict[str, Any]) -> dict[str, str]: value = fields.get(label) if not isinstance(value, str): continue - normalized = " ".join(value.strip().split()) + normalized = " ".join(value.strip().split()).replace("(", "(").replace(")", ")") if normalized: clean[label] = normalized return clean @@ -200,4 +200,6 @@ def _better_product_name(candidate: str, current: str) -> bool: def _invalid_field_value(value: str) -> bool: if not value: return True + if "�" in value: + return True return any(keyword in value for keyword in ["第1章", "第2章", "第3章", "监管信息", "综述资料", "章节目录"]) diff --git a/review_agent/regulatory_review/views.py b/review_agent/regulatory_review/views.py index b244421..ff52236 100644 --- a/review_agent/regulatory_review/views.py +++ b/review_agent/regulatory_review/views.py @@ -9,6 +9,7 @@ from django.contrib.auth.decorators import login_required from review_agent.models import FileSummaryBatch, RegulatoryReviewBatch, WorkflowNodeRun from review_agent.regulatory_review.events import record_event +from review_agent.regulatory_review.services.info_extract import ensure_regulatory_condition_candidates from review_agent.regulatory_review.services.rectification_review import review_missing_issues from review_agent.regulatory_review.workflow import create_regulatory_review_batch, start_regulatory_review_workflow @@ -19,6 +20,7 @@ def batch_status(request, batch_id: int): batch = RegulatoryReviewBatch.objects.filter(pk=batch_id, user=request.user).first() if not batch: raise Http404("批次不存在。") + condition_candidates = ensure_regulatory_condition_candidates(batch) nodes = WorkflowNodeRun.objects.filter( workflow_type="regulatory_review", workflow_batch_id=batch.pk, @@ -45,12 +47,12 @@ def batch_status(request, batch_id: int): for node in nodes ], } - if batch.status == RegulatoryReviewBatch.Status.WAITING_USER and (batch.condition_json or {}).get("candidates"): + if batch.status == RegulatoryReviewBatch.Status.WAITING_USER and condition_candidates: payload["condition_confirmation"] = { "batch_id": batch.pk, "batch_no": batch.batch_no, "confirm_url": f"/api/review-agent/regulatory-review/{batch.pk}/conditions/", - "candidates": batch.condition_json["candidates"], + "candidates": condition_candidates, } return JsonResponse(payload) diff --git a/review_agent/views.py b/review_agent/views.py index 5decbdb..2f78b2b 100644 --- a/review_agent/views.py +++ b/review_agent/views.py @@ -12,6 +12,7 @@ from .services import ( stream_message, ) from .models import Conversation, FileAttachment, FileSummaryBatch, RegulatoryReviewBatch, WorkflowNodeRun +from .regulatory_review.services.info_extract import ensure_regulatory_condition_candidates @login_required @@ -132,6 +133,7 @@ def build_workflow_cards(conversation: Conversation) -> list[dict[str, object]]: ) regulatory_batches = RegulatoryReviewBatch.objects.filter(conversation=conversation) for batch in regulatory_batches: + condition_candidates = ensure_regulatory_condition_candidates(batch) cards.append( { "id": batch.pk, @@ -141,7 +143,7 @@ def build_workflow_cards(conversation: Conversation) -> list[dict[str, object]]: "error_message": batch.error_message, "risk_label": _format_risk_label(batch.risk_summary or {}), "condition_json": batch.condition_json or {}, - "condition_candidates": (batch.condition_json or {}).get("candidates") or {}, + "condition_candidates": condition_candidates, "notification_count": batch.notifications.count(), "review_record_count": batch.artifacts.filter(metadata__artifact="review_record").count(), "created_at": batch.created_at, diff --git a/tests/test_regulatory_condition.py b/tests/test_regulatory_condition.py index 334ba4a..e397f83 100644 --- a/tests/test_regulatory_condition.py +++ b/tests/test_regulatory_condition.py @@ -161,6 +161,61 @@ def test_detect_regulatory_condition_uses_llm_review_for_better_product_name( assert candidates["product_name"]["source"] == "llm" +def test_detect_regulatory_condition_infers_fields_from_unlabeled_attachment_text( + settings, tmp_path, django_user_model +): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-COND", + status=FileSummaryBatch.Status.SUCCESS, + product_name="第1章 监管信息", + ) + standard_list = tmp_path / "standard_list.txt" + standard_list.write_text( + "国家药品监督管理局:\n" + "卡尤迪生物科技宜兴有限公司申请境内第三类体外诊断试剂" + "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)产品注册。\n", + encoding="utf-8", + ) + product_list = tmp_path / "product_list.txt" + product_list.write_text( + "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒\n" + "(荧光PCR法)\n" + "产品的包装规格\n" + "24人份/盒、48人份/盒\n", + encoding="utf-8", + ) + FileSummaryItem.objects.create( + batch=summary, + file_index=1, + directory_level="第1章 监管信息", + file_name="符合标准的清单.txt", + file_type="txt", + relative_path="第1章 监管信息/符合标准的清单.txt", + storage_path=str(standard_list), + ) + FileSummaryItem.objects.create( + batch=summary, + file_index=2, + directory_level="第1章 监管信息", + file_name="产品列表.txt", + file_type="txt", + relative_path="第1章 监管信息/产品列表.txt", + storage_path=str(product_list), + ) + + candidates = detect_regulatory_condition_candidates(summary) + + assert candidates["product_category"]["suggested"] == "体外诊断试剂" + assert candidates["product_name"]["suggested"] == "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)" + assert candidates["product_name"]["source"] == "inferred" + assert candidates["model_spec"]["suggested"] == "24人份/盒、48人份/盒" + + def test_workflow_pauses_before_rule_scope_until_conditions_confirmed(settings, tmp_path, django_user_model): settings.MEDIA_ROOT = tmp_path user = django_user_model.objects.create_user(username="owner", password="pass") diff --git a/tests/test_regulatory_frontend.py b/tests/test_regulatory_frontend.py index 9fbef5f..013920e 100644 --- a/tests/test_regulatory_frontend.py +++ b/tests/test_regulatory_frontend.py @@ -4,6 +4,7 @@ from django.urls import reverse from review_agent.models import ( Conversation, FileSummaryBatch, + FileSummaryItem, RegulatoryArtifact, RegulatoryNotificationRecord, RegulatoryReviewBatch, @@ -108,6 +109,55 @@ def test_workspace_renders_condition_confirmation_form(client, django_user_model assert "data-condition-confirm-form" not in content[summary_index:] +def test_workspace_refreshes_incomplete_condition_confirmation_candidates(client, settings, tmp_path, django_user_model): + settings.MEDIA_ROOT = tmp_path + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-OK", + status=FileSummaryBatch.Status.SUCCESS, + product_name="第1章 监管信息", + ) + application = tmp_path / "application.txt" + application.write_text( + "卡尤迪生物科技宜兴有限公司申请境内第三类体外诊断试剂" + "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)产品注册。", + encoding="utf-8", + ) + FileSummaryItem.objects.create( + batch=summary, + file_index=1, + directory_level="第1章 监管信息", + file_name="符合标准的清单.txt", + file_type="txt", + relative_path="第1章 监管信息/符合标准的清单.txt", + storage_path=str(application), + ) + RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="RR-WAIT-EMPTY", + status=RegulatoryReviewBatch.Status.WAITING_USER, + condition_json={ + "confirmed": False, + "candidates": { + "product_category": {"label": "产品类别", "input_type": "select", "options": ["其他"], "suggested": "其他"}, + "product_name": {"label": "产品名称", "input_type": "text", "suggested": ""}, + }, + }, + ) + client.force_login(user) + + response = client.get(f"{reverse('home')}?conversation={conversation.pk}") + + content = response.content.decode("utf-8") + assert "体外诊断试剂" in content + assert "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)" in content + + def test_workspace_renders_rectification_actions_and_summaries(client, tmp_path, django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话") diff --git a/tests/test_regulatory_llm_review.py b/tests/test_regulatory_llm_review.py index 0d5ad6e..b35a037 100644 --- a/tests/test_regulatory_llm_review.py +++ b/tests/test_regulatory_llm_review.py @@ -40,3 +40,18 @@ def test_review_condition_fields_falls_back_when_llm_returns_chapter_title(): assert result["selected_fields"]["产品名称"] == "甲胎蛋白检测试剂盒" assert result["selected_sources"]["产品名称"] == "rule" + + +def test_review_condition_fields_rejects_garbled_llm_product_name(): + def completion(messages, temperature=0.0): + return json.dumps({"fields": {"产品名称": "呼吸道合胞病毒、 �肺炎支原体核酸检测试剂盒 (荧光PCR法)"}}, ensure_ascii=False) + + result = review_condition_fields( + text="呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)", + rule_fields={"产品名称": "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)"}, + file_context="产品列表.txt", + completion_func=completion, + ) + + assert result["selected_fields"]["产品名称"] == "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)" + assert result["selected_sources"]["产品名称"] == "rule" diff --git a/tests/test_regulatory_views.py b/tests/test_regulatory_views.py index 3636f39..4b507b2 100644 --- a/tests/test_regulatory_views.py +++ b/tests/test_regulatory_views.py @@ -80,3 +80,57 @@ def test_regulatory_batch_status_exposes_condition_confirmation(client, django_u assert payload["batch"]["status"] == RegulatoryReviewBatch.Status.WAITING_USER assert payload["condition_confirmation"]["batch_id"] == batch.pk assert payload["condition_confirmation"]["candidates"]["product_category"]["suggested"] == "体外诊断试剂" + + +def test_regulatory_batch_status_refreshes_incomplete_condition_candidates( + client, settings, tmp_path, django_user_model +): + settings.MEDIA_ROOT = tmp_path + owner = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=owner, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=owner, + batch_no="FS-OK", + status=FileSummaryBatch.Status.SUCCESS, + product_name="第1章 监管信息", + ) + application = tmp_path / "application.txt" + application.write_text( + "卡尤迪生物科技宜兴有限公司申请境内第三类体外诊断试剂" + "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)产品注册。", + encoding="utf-8", + ) + from review_agent.models import FileSummaryItem + + FileSummaryItem.objects.create( + batch=summary, + file_index=1, + directory_level="第1章 监管信息", + file_name="符合标准的清单.txt", + file_type="txt", + relative_path="第1章 监管信息/符合标准的清单.txt", + storage_path=str(application), + ) + batch = RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=owner, + source_summary_batch=summary, + batch_no="RR-WAIT-EMPTY", + status=RegulatoryReviewBatch.Status.WAITING_USER, + condition_json={ + "confirmed": False, + "candidates": { + "product_category": {"suggested": "其他"}, + "product_name": {"suggested": ""}, + }, + }, + ) + client.force_login(owner) + + response = client.get(reverse("regulatory_review_batch_status", args=[batch.pk])) + + payload = response.json() + candidates = payload["condition_confirmation"]["candidates"] + assert candidates["product_category"]["suggested"] == "体外诊断试剂" + assert candidates["product_name"]["suggested"] == "呼吸道合胞病毒、肺炎支原体核酸检测试剂盒(荧光PCR法)"