Files
DEMO-AGENT/review_agent/regulatory_review/services/info_extract.py

242 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import re
from pathlib import Path
from django.conf import settings
from review_agent.models import FileSummaryBatch, RegulatoryReviewBatch
from review_agent.regulatory_review.services.llm_review import review_condition_fields
from review_agent.regulatory_review.services.text_extract import extract_text
OPTION_FIELDS = {
"product_category": ["体外诊断试剂", "医疗器械", "其他"],
"registration_type": ["首次注册", "变更注册", "延续注册"],
"clinical_evaluation_path": ["临床试验", "免临床", "同品种比对", "待确认"],
}
def ensure_regulatory_condition_candidates(batch: RegulatoryReviewBatch) -> dict[str, dict[str, object]]:
condition_json = batch.condition_json or {}
candidates = condition_json.get("candidates") or {}
if batch.status != RegulatoryReviewBatch.Status.WAITING_USER or not _condition_candidates_incomplete(candidates):
return candidates
refreshed = detect_regulatory_condition_candidates(batch.source_summary_batch)
refreshed = _merge_condition_candidates(candidates, refreshed)
batch.condition_json = {**condition_json, "candidates": refreshed}
batch.save(update_fields=["condition_json"])
return refreshed
def detect_regulatory_condition_candidates(summary_batch: FileSummaryBatch) -> dict[str, dict[str, object]]:
"""Infers review-scope conditions from the summary batch and file names."""
corpus_parts = [summary_batch.product_name or ""]
field_candidates: dict[str, str] = {}
field_sources: dict[str, str] = {}
for item in summary_batch.items.order_by("file_index"):
corpus_parts.extend([item.directory_level, item.file_name, item.relative_path])
review = _extract_item_fields(item)
extracted = review.get("selected_fields", {})
sources = review.get("selected_sources", {})
field_candidates.update({key: value for key, value in extracted.items() if value and key not in field_candidates})
field_sources.update({key: value for key, value in sources.items() if value and key not in field_sources})
corpus_parts.extend(extracted.values())
if review.get("front_text"):
corpus_parts.append(str(review["front_text"]))
corpus = "\n".join(part for part in corpus_parts if part)
product_name = field_candidates.get("产品名称") or _safe_summary_product_name(summary_batch.product_name)
return {
"product_category": {
"label": "产品类别",
"input_type": "select",
"options": OPTION_FIELDS["product_category"],
"suggested": _detect_product_category(corpus),
},
"registration_type": {
"label": "注册类型",
"input_type": "select",
"options": OPTION_FIELDS["registration_type"],
"suggested": _detect_registration_type(corpus),
},
"clinical_evaluation_path": {
"label": "临床评价路径",
"input_type": "select",
"options": OPTION_FIELDS["clinical_evaluation_path"],
"suggested": _detect_clinical_path(corpus),
},
"product_name": {
"label": "产品名称",
"input_type": "text",
"suggested": product_name,
"source": field_sources.get("产品名称", "summary" if product_name else ""),
},
"model_spec": {
"label": "型号规格",
"input_type": "text",
"suggested": field_candidates.get("型号规格", ""),
"source": field_sources.get("型号规格", ""),
},
"intended_use": {
"label": "预期用途",
"input_type": "text",
"suggested": field_candidates.get("预期用途", ""),
"source": field_sources.get("预期用途", ""),
},
}
def _extract_item_fields(item) -> dict[str, object]:
path = Path(item.storage_path)
if not path.is_absolute():
path = Path(settings.MEDIA_ROOT) / item.storage_path
if not path.exists():
return {}
result = extract_text(path)
if result.status != "success" or not result.text:
return {}
inferred_fields = _infer_fields_from_text(result.front_text or result.text)
rule_fields = {**inferred_fields, **(result.field_candidates or {})}
review = review_condition_fields(
text=result.front_text or result.text,
rule_fields=rule_fields,
file_context=f"{item.directory_level}\n{item.file_name}\n{item.relative_path}",
)
selected_sources = dict(review.get("selected_sources") or {})
for key in inferred_fields:
if selected_sources.get(key) == "rule" and key not in (result.field_candidates or {}):
selected_sources[key] = "inferred"
review["selected_sources"] = selected_sources
review["front_text"] = result.front_text or result.text[:1200]
return review
def _safe_summary_product_name(product_name: str) -> str:
value = (product_name or "").strip()
if not value:
return ""
if any(keyword in value for keyword in ["第1章", "第2章", "监管信息", "综述资料", "非临床资料", "章节目录"]):
return ""
return value
def _infer_fields_from_text(text: str) -> dict[str, str]:
normalized = _normalize_text_for_inference(text)
fields = {}
product_name = _infer_product_name(normalized)
if product_name:
fields["产品名称"] = product_name
model_spec = _infer_model_spec(normalized)
if model_spec:
fields["型号规格"] = model_spec
return fields
def _normalize_text_for_inference(text: str) -> str:
value = re.sub(r"\s+", "", text or "")
value = value.replace("", "(").replace("", ")")
return value
def _infer_product_name(text: str) -> str:
patterns = [
r"体外诊断试剂(?P<name>[^。;;,]{4,120}?试剂盒\([^()]{2,30}\))产品注册",
r"(?P<name>[^。;;,]{4,120}?试剂盒\([^()]{2,30}\))",
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return _restore_chinese_parentheses(_trim_product_name(match.group("name")))
return ""
def _trim_product_name(value: str) -> str:
prefixes = ["申请境内第三类体外诊断试剂", "申请境内第二类体外诊断试剂", "境内第三类体外诊断试剂", "境内第二类体外诊断试剂"]
result = value
for prefix in prefixes:
if prefix in result:
result = result.split(prefix, 1)[-1]
return result
def _infer_model_spec(text: str) -> str:
specs = sorted(set(re.findall(r"规格[A-Z-]", text)))
if specs:
return "".join(specs)
match = re.search(r"产品的包装规格(?P<spec>.{1,80}?(?:人份/盒|测试/盒|反应/盒)(?:[、,].{1,30}?(?:人份/盒|测试/盒|反应/盒))*)", text)
if not match:
return ""
return _restore_chinese_parentheses(match.group("spec").strip(":,。;;"))
def _restore_chinese_parentheses(value: str) -> str:
return value.replace("(", "").replace(")", "")
def _condition_candidates_incomplete(candidates: dict[str, dict[str, object]]) -> bool:
if not candidates:
return True
product_name = str((candidates.get("product_name") or {}).get("suggested") or "").strip()
product_category = str((candidates.get("product_category") or {}).get("suggested") or "").strip()
return not product_name or "<EFBFBD>" in product_name or product_category == "其他"
def _merge_condition_candidates(
current: dict[str, dict[str, object]],
refreshed: dict[str, dict[str, object]],
) -> dict[str, dict[str, object]]:
merged = {**(current or {})}
for field, config in (refreshed or {}).items():
current_config = merged.get(field) or {}
current_value = str(current_config.get("suggested") or "").strip()
refreshed_value = str((config or {}).get("suggested") or "").strip()
if _is_better_condition_value(current_value, refreshed_value):
merged[field] = config
elif field not in merged:
merged[field] = config
return merged
def _is_better_condition_value(current_value: str, refreshed_value: str) -> bool:
if not refreshed_value:
return False
if "<EFBFBD>" in refreshed_value:
return False
if "<EFBFBD>" in current_value:
return True
if not current_value:
return True
if current_value == "其他" and refreshed_value != "其他":
return True
if current_value == "待确认" and refreshed_value != "待确认":
return True
return len(refreshed_value) > len(current_value) and current_value in refreshed_value
def _detect_product_category(corpus: str) -> str:
if any(keyword in corpus for keyword in ["体外诊断", "检测试剂", "试剂盒", "IVD"]):
return "体外诊断试剂"
if "医疗器械" in corpus:
return "医疗器械"
return "其他"
def _detect_registration_type(corpus: str) -> str:
if "延续" in corpus:
return "延续注册"
if "变更" in corpus:
return "变更注册"
return "首次注册"
def _detect_clinical_path(corpus: str) -> str:
if "免临床" in corpus or "免于临床" in corpus:
return "免临床"
if "同品种" in corpus or "同类" in corpus:
return "同品种比对"
if "临床试验" in corpus:
return "临床试验"
return "待确认"