feat: 重构资料包模型与会话绑定主链路
This commit is contained in:
@@ -4,11 +4,12 @@ import xml.etree.ElementTree as ET
|
||||
from zipfile import BadZipFile, ZipFile
|
||||
|
||||
from agent_core.rag.ingest import ingest_document
|
||||
from apps.chat.services import create_conversation_for_batch
|
||||
|
||||
from .models import UploadedDocument
|
||||
from .models import SubmissionBatch, UploadedDocument
|
||||
|
||||
|
||||
def create_uploaded_document(scenario_id: str, uploaded_file) -> UploadedDocument:
|
||||
def create_uploaded_document(scenario_id: str, uploaded_file, batch: SubmissionBatch | None = None) -> UploadedDocument:
|
||||
"""
|
||||
保存上传文件的元数据记录。
|
||||
|
||||
@@ -17,15 +18,116 @@ def create_uploaded_document(scenario_id: str, uploaded_file) -> UploadedDocumen
|
||||
"""
|
||||
extension = _detect_extension(uploaded_file.name)
|
||||
return UploadedDocument.objects.create(
|
||||
batch=batch,
|
||||
scenario_id=scenario_id,
|
||||
original_name=uploaded_file.name,
|
||||
file=uploaded_file,
|
||||
file_type=extension,
|
||||
size=uploaded_file.size,
|
||||
relative_path=uploaded_file.name,
|
||||
status=UploadedDocument.STATUS_UPLOADED,
|
||||
)
|
||||
|
||||
|
||||
def import_submission_batch(scenario_id: str, uploaded_files: list) -> dict:
|
||||
"""
|
||||
导入资料包并建立批次、文档、目录汇总和主会话。
|
||||
|
||||
当前实现保持离线稳定,重点保证:
|
||||
- 资料包记录可落库
|
||||
- 产品名称可解析
|
||||
- 会话可自动绑定
|
||||
- 可直接产出 overview report
|
||||
"""
|
||||
batch = SubmissionBatch.objects.create(
|
||||
batch_id=_generate_batch_id(),
|
||||
workflow_type="registration",
|
||||
import_status=SubmissionBatch.STATUS_PROCESSING,
|
||||
)
|
||||
documents = []
|
||||
candidates = []
|
||||
chapter_summary = {}
|
||||
total_pages = 0
|
||||
|
||||
for uploaded_file in uploaded_files:
|
||||
document = create_uploaded_document(scenario_id, uploaded_file, batch=batch)
|
||||
text = extract_text(document)
|
||||
page_count = _estimate_page_count(text)
|
||||
document.page_count = page_count
|
||||
document.page_count_confidence = "estimated"
|
||||
document.document_role = _detect_document_role(document.original_name)
|
||||
document.chapter_code = _detect_chapter_code(document.original_name, text)
|
||||
document.chapter_match_status = "matched" if document.chapter_code else "unknown"
|
||||
document.needs_manual_review = not bool(document.chapter_code)
|
||||
document.save(
|
||||
update_fields=[
|
||||
"page_count",
|
||||
"page_count_confidence",
|
||||
"document_role",
|
||||
"chapter_code",
|
||||
"chapter_match_status",
|
||||
"needs_manual_review",
|
||||
"updated_at",
|
||||
]
|
||||
)
|
||||
documents.append(document)
|
||||
total_pages += page_count
|
||||
chapter_key = document.chapter_code or "UNCLASSIFIED"
|
||||
chapter_summary[chapter_key] = chapter_summary.get(chapter_key, 0) + 1
|
||||
candidates.extend(_extract_product_candidates(document.original_name, text))
|
||||
|
||||
product_name, warnings = _select_product_name(candidates)
|
||||
conversation = create_conversation_for_batch(batch.batch_id, product_name)
|
||||
|
||||
batch.product_name = product_name
|
||||
batch.conversation_id = conversation.conversation_id
|
||||
batch.file_count = len(documents)
|
||||
batch.page_count = total_pages
|
||||
batch.chapter_summary = [
|
||||
{"chapter_code": chapter_code, "document_count": count}
|
||||
for chapter_code, count in sorted(chapter_summary.items())
|
||||
]
|
||||
batch.exception_count = len(warnings)
|
||||
batch.import_status = (
|
||||
SubmissionBatch.STATUS_REVIEW_REQUIRED if warnings else SubmissionBatch.STATUS_COMPLETED
|
||||
)
|
||||
batch.save(
|
||||
update_fields=[
|
||||
"product_name",
|
||||
"conversation_id",
|
||||
"file_count",
|
||||
"page_count",
|
||||
"chapter_summary",
|
||||
"exception_count",
|
||||
"import_status",
|
||||
"updated_at",
|
||||
]
|
||||
)
|
||||
return {
|
||||
"batch_id": batch.batch_id,
|
||||
"conversation_id": conversation.conversation_id,
|
||||
"product_name": batch.product_name,
|
||||
"registration_overview_report": {
|
||||
"batch_id": batch.batch_id,
|
||||
"product_name": batch.product_name,
|
||||
"file_count": batch.file_count,
|
||||
"total_page_count": batch.page_count,
|
||||
"chapter_summary": batch.chapter_summary,
|
||||
"documents": [
|
||||
{
|
||||
"document_id": document.id,
|
||||
"original_name": document.original_name,
|
||||
"chapter_code": document.chapter_code,
|
||||
"page_count": document.page_count,
|
||||
"document_role": document.document_role,
|
||||
}
|
||||
for document in documents
|
||||
],
|
||||
"warnings": warnings,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def extract_text(document: UploadedDocument) -> str:
|
||||
"""
|
||||
根据文档类型选择合适的文本抽取策略。
|
||||
@@ -83,6 +185,99 @@ def _detect_extension(file_name: str) -> str:
|
||||
return Path(file_name).suffix.lower().lstrip(".")
|
||||
|
||||
|
||||
def _generate_batch_id() -> str:
|
||||
return f"SUB-20260604-{SubmissionBatch.objects.count() + 1:03d}"
|
||||
|
||||
|
||||
def _estimate_page_count(text: str) -> int:
|
||||
stripped = text.strip()
|
||||
if not stripped:
|
||||
return 0
|
||||
line_count = len([line for line in stripped.splitlines() if line.strip()])
|
||||
return max(1, line_count)
|
||||
|
||||
|
||||
def _detect_document_role(file_name: str) -> str:
|
||||
normalized = file_name.lower()
|
||||
if "申请表" in file_name:
|
||||
return "application_form"
|
||||
if "说明书" in file_name:
|
||||
return "product_manual"
|
||||
if "产品列表" in file_name:
|
||||
return "product_list"
|
||||
if "声明" in file_name:
|
||||
return "declaration"
|
||||
if normalized.endswith(".pdf"):
|
||||
return "pdf_document"
|
||||
return "general_document"
|
||||
|
||||
|
||||
def _detect_chapter_code(file_name: str, text: str) -> str:
|
||||
for source in (file_name, text):
|
||||
match = re.search(r"(CH\d+(?:\.\d+)*)", source, flags=re.IGNORECASE)
|
||||
if match:
|
||||
return match.group(1).upper()
|
||||
if "监管" in file_name or "申请表" in file_name or "说明书" in file_name:
|
||||
return "CH1"
|
||||
return ""
|
||||
|
||||
|
||||
def _extract_product_candidates(file_name: str, text: str) -> list[dict]:
|
||||
source_type = _detect_candidate_source(file_name)
|
||||
if not source_type:
|
||||
return []
|
||||
patterns = [
|
||||
r"产品名称[::]\s*([^\n\r]+)",
|
||||
r"名称[::]\s*([^\n\r]+检测试剂盒[^\n\r]*)",
|
||||
]
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, text)
|
||||
if match:
|
||||
return [{"source_type": source_type, "product_name": match.group(1).strip()}]
|
||||
cleaned = Path(file_name).stem.replace("目标产品", "").replace("说明书", "").strip("-_ ")
|
||||
if cleaned and "申请表" not in cleaned and "产品列表" not in cleaned:
|
||||
return [{"source_type": source_type, "product_name": cleaned}]
|
||||
return []
|
||||
|
||||
|
||||
def _detect_candidate_source(file_name: str) -> str:
|
||||
if "申请表" in file_name:
|
||||
return "application_form"
|
||||
if "说明书" in file_name:
|
||||
return "product_manual"
|
||||
if "产品列表" in file_name:
|
||||
return "product_list"
|
||||
return ""
|
||||
|
||||
|
||||
def _select_product_name(candidates: list[dict]) -> tuple[str, list[str]]:
|
||||
if not candidates:
|
||||
return "", ["未识别到产品名称,建议人工补录。"]
|
||||
|
||||
priority = {
|
||||
"application_form": 1,
|
||||
"product_manual": 2,
|
||||
"product_list": 3,
|
||||
}
|
||||
sorted_candidates = sorted(
|
||||
candidates,
|
||||
key=lambda item: priority.get(item["source_type"], 99),
|
||||
)
|
||||
top_candidate = sorted_candidates[0]
|
||||
warnings = []
|
||||
conflict_names = {
|
||||
item["product_name"]
|
||||
for item in sorted_candidates
|
||||
if item["product_name"] != top_candidate["product_name"]
|
||||
}
|
||||
if conflict_names:
|
||||
warnings.append(
|
||||
"产品名称来源冲突:"
|
||||
+ " / ".join([top_candidate["product_name"], *sorted(conflict_names)])
|
||||
)
|
||||
return top_candidate["product_name"], warnings
|
||||
|
||||
|
||||
def _read_text_file(path: Path) -> str:
|
||||
"""优先按 UTF-8 读取;失败时回退到系统默认编码。"""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user