fix(file-summary): 补强 Office 页数统计
This commit is contained in:
@@ -1,10 +1,14 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from xml.etree import ElementTree
|
||||||
|
from zipfile import ZipFile
|
||||||
|
|
||||||
|
|
||||||
SUPPORTED_EXTENSIONS = {"pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx"}
|
SUPPORTED_EXTENSIONS = {"pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx"}
|
||||||
|
logger = logging.getLogger("review_agent.file_summary.page_count")
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
@@ -26,34 +30,233 @@ def count_document_pages(path: str | Path) -> PageCountResult:
|
|||||||
|
|
||||||
return PageCountResult(status="success", page_count=len(PdfReader(str(file_path)).pages))
|
return PageCountResult(status="success", page_count=len(PdfReader(str(file_path)).pages))
|
||||||
if ext == "docx":
|
if ext == "docx":
|
||||||
from docx import Document
|
pages = _count_docx_pages_from_extended_properties(file_path)
|
||||||
|
if pages:
|
||||||
properties = Document(str(file_path)).core_properties
|
return PageCountResult(status="success", page_count=pages)
|
||||||
pages = getattr(properties, "pages", None)
|
pages = _count_word_pages_with_com(file_path)
|
||||||
if pages:
|
if pages:
|
||||||
return PageCountResult(status="success", page_count=pages)
|
return PageCountResult(status="success", page_count=pages)
|
||||||
return PageCountResult(status="uncertain")
|
return PageCountResult(status="uncertain")
|
||||||
if ext == "xlsx":
|
if ext == "xlsx":
|
||||||
from openpyxl import load_workbook
|
pages = _count_xlsx_sheets(file_path) or _count_excel_sheets_with_com(file_path)
|
||||||
|
if pages:
|
||||||
workbook = load_workbook(str(file_path), read_only=True, data_only=True)
|
return PageCountResult(status="success", page_count=pages)
|
||||||
return PageCountResult(status="success", page_count=len(workbook.sheetnames))
|
return PageCountResult(status="uncertain")
|
||||||
if ext == "xls":
|
if ext == "xls":
|
||||||
import xlrd
|
pages = _count_xls_sheets(file_path) or _count_excel_sheets_with_com(file_path)
|
||||||
|
if pages:
|
||||||
workbook = xlrd.open_workbook(str(file_path), on_demand=True)
|
return PageCountResult(status="success", page_count=pages)
|
||||||
return PageCountResult(status="success", page_count=workbook.nsheets)
|
return PageCountResult(status="uncertain")
|
||||||
if ext == "pptx":
|
if ext == "pptx":
|
||||||
from pptx import Presentation
|
pages = _count_pptx_slides(file_path) or _count_powerpoint_slides_with_com(file_path)
|
||||||
|
if pages:
|
||||||
return PageCountResult(status="success", page_count=len(Presentation(str(file_path)).slides))
|
return PageCountResult(status="success", page_count=pages)
|
||||||
if ext in {"doc", "ppt"}:
|
return PageCountResult(status="uncertain")
|
||||||
import olefile
|
if ext == "doc":
|
||||||
|
pages = _count_word_pages_with_com(file_path)
|
||||||
if olefile.isOleFile(str(file_path)):
|
if pages:
|
||||||
return PageCountResult(status="uncertain")
|
return PageCountResult(status="success", page_count=pages)
|
||||||
return PageCountResult(status="failed", error_message="不是有效的 OLE 文件。")
|
return _ole_uncertain_or_failed(file_path)
|
||||||
|
if ext == "ppt":
|
||||||
|
pages = _count_powerpoint_slides_with_com(file_path)
|
||||||
|
if pages:
|
||||||
|
return PageCountResult(status="success", page_count=pages)
|
||||||
|
return _ole_uncertain_or_failed(file_path)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
return PageCountResult(status="failed", error_message=str(exc))
|
return PageCountResult(status="failed", error_message=str(exc))
|
||||||
|
|
||||||
return PageCountResult(status="uncertain")
|
return PageCountResult(status="uncertain")
|
||||||
|
|
||||||
|
|
||||||
|
def _count_docx_pages_from_extended_properties(path: Path) -> int | None:
|
||||||
|
try:
|
||||||
|
with ZipFile(path) as archive:
|
||||||
|
app_entries = [
|
||||||
|
item for item in archive.infolist() if item.filename == "docProps/app.xml"
|
||||||
|
]
|
||||||
|
if not app_entries:
|
||||||
|
return None
|
||||||
|
content = archive.read(app_entries[-1]).decode("utf-8", errors="replace")
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("DOCX extended properties read failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
root = ElementTree.fromstring(content)
|
||||||
|
except ElementTree.ParseError as exc:
|
||||||
|
logger.warning("DOCX extended properties parse failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
|
||||||
|
pages_node = root.find("{http://schemas.openxmlformats.org/officeDocument/2006/extended-properties}Pages")
|
||||||
|
if pages_node is None or not pages_node.text:
|
||||||
|
return None
|
||||||
|
return _positive_int(pages_node.text)
|
||||||
|
|
||||||
|
|
||||||
|
def _count_xlsx_sheets(path: Path) -> int | None:
|
||||||
|
try:
|
||||||
|
from openpyxl import load_workbook
|
||||||
|
|
||||||
|
workbook = load_workbook(str(path), read_only=True, data_only=True)
|
||||||
|
try:
|
||||||
|
return _positive_int(len(workbook.sheetnames))
|
||||||
|
finally:
|
||||||
|
workbook.close()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("XLSX sheet count failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _count_xls_sheets(path: Path) -> int | None:
|
||||||
|
try:
|
||||||
|
import xlrd
|
||||||
|
|
||||||
|
workbook = xlrd.open_workbook(str(path), on_demand=True)
|
||||||
|
try:
|
||||||
|
return _positive_int(workbook.nsheets)
|
||||||
|
finally:
|
||||||
|
workbook.release_resources()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("XLS sheet count failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _count_pptx_slides(path: Path) -> int | None:
|
||||||
|
try:
|
||||||
|
from pptx import Presentation
|
||||||
|
|
||||||
|
return _positive_int(len(Presentation(str(path)).slides))
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("PPTX slide count failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _ole_uncertain_or_failed(path: Path) -> PageCountResult:
|
||||||
|
try:
|
||||||
|
import olefile
|
||||||
|
|
||||||
|
if olefile.isOleFile(str(path)):
|
||||||
|
return PageCountResult(status="uncertain")
|
||||||
|
return PageCountResult(status="failed", error_message="不是有效的 OLE 文件。")
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("OLE validation failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return PageCountResult(status="uncertain")
|
||||||
|
|
||||||
|
|
||||||
|
def _count_word_pages_with_com(path: Path) -> int | None:
|
||||||
|
try:
|
||||||
|
import pythoncom
|
||||||
|
import win32com.client
|
||||||
|
except Exception as exc:
|
||||||
|
logger.info("Word COM page count unavailable", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
|
||||||
|
word = None
|
||||||
|
document = None
|
||||||
|
pythoncom.CoInitialize()
|
||||||
|
try:
|
||||||
|
word = win32com.client.DispatchEx("Word.Application")
|
||||||
|
word.Visible = False
|
||||||
|
word.DisplayAlerts = 0
|
||||||
|
document = word.Documents.Open(
|
||||||
|
str(path.resolve()),
|
||||||
|
ReadOnly=True,
|
||||||
|
AddToRecentFiles=False,
|
||||||
|
ConfirmConversions=False,
|
||||||
|
)
|
||||||
|
document.Repaginate()
|
||||||
|
return _positive_int(document.ComputeStatistics(2))
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Word COM page count failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
if document is not None:
|
||||||
|
document.Close(False)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Word document close failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
try:
|
||||||
|
if word is not None:
|
||||||
|
word.Quit()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Word application quit failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
pythoncom.CoUninitialize()
|
||||||
|
|
||||||
|
|
||||||
|
def _count_powerpoint_slides_with_com(path: Path) -> int | None:
|
||||||
|
try:
|
||||||
|
import pythoncom
|
||||||
|
import win32com.client
|
||||||
|
except Exception as exc:
|
||||||
|
logger.info("PowerPoint COM slide count unavailable", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
|
||||||
|
powerpoint = None
|
||||||
|
presentation = None
|
||||||
|
pythoncom.CoInitialize()
|
||||||
|
try:
|
||||||
|
powerpoint = win32com.client.DispatchEx("PowerPoint.Application")
|
||||||
|
presentation = powerpoint.Presentations.Open(
|
||||||
|
str(path.resolve()),
|
||||||
|
ReadOnly=True,
|
||||||
|
Untitled=False,
|
||||||
|
WithWindow=False,
|
||||||
|
)
|
||||||
|
return _positive_int(presentation.Slides.Count)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("PowerPoint COM slide count failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
if presentation is not None:
|
||||||
|
presentation.Close()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("PowerPoint presentation close failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
try:
|
||||||
|
if powerpoint is not None:
|
||||||
|
powerpoint.Quit()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("PowerPoint application quit failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
pythoncom.CoUninitialize()
|
||||||
|
|
||||||
|
|
||||||
|
def _count_excel_sheets_with_com(path: Path) -> int | None:
|
||||||
|
try:
|
||||||
|
import pythoncom
|
||||||
|
import win32com.client
|
||||||
|
except Exception as exc:
|
||||||
|
logger.info("Excel COM sheet count unavailable", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
|
||||||
|
excel = None
|
||||||
|
workbook = None
|
||||||
|
pythoncom.CoInitialize()
|
||||||
|
try:
|
||||||
|
excel = win32com.client.DispatchEx("Excel.Application")
|
||||||
|
excel.Visible = False
|
||||||
|
excel.DisplayAlerts = False
|
||||||
|
workbook = excel.Workbooks.Open(str(path.resolve()), ReadOnly=True)
|
||||||
|
return _positive_int(workbook.Worksheets.Count)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Excel COM sheet count failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
if workbook is not None:
|
||||||
|
workbook.Close(False)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Excel workbook close failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
try:
|
||||||
|
if excel is not None:
|
||||||
|
excel.Quit()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Excel application quit failed", extra={"path": str(path), "error": str(exc)})
|
||||||
|
pythoncom.CoUninitialize()
|
||||||
|
|
||||||
|
|
||||||
|
def _positive_int(value) -> int | None:
|
||||||
|
try:
|
||||||
|
number = int(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
return number if number > 0 else None
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
import shutil
|
||||||
|
from zipfile import ZipFile
|
||||||
from docx import Document
|
from docx import Document
|
||||||
from openpyxl import Workbook
|
from openpyxl import Workbook
|
||||||
from pptx import Presentation
|
from pptx import Presentation
|
||||||
@@ -31,6 +33,89 @@ def test_count_document_pages_for_office_formats(tmp_path):
|
|||||||
assert count_document_pages(pptx_path).page_count == 1
|
assert count_document_pages(pptx_path).page_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_count_docx_pages_from_extended_properties(tmp_path):
|
||||||
|
docx_path = tmp_path / "with-pages.docx"
|
||||||
|
Document().save(docx_path)
|
||||||
|
app_xml = (
|
||||||
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
||||||
|
'<Properties xmlns="http://schemas.openxmlformats.org/officeDocument/2006/extended-properties">'
|
||||||
|
"<Pages>7</Pages>"
|
||||||
|
"</Properties>"
|
||||||
|
)
|
||||||
|
rewritten = tmp_path / "rewritten.docx"
|
||||||
|
with ZipFile(docx_path) as source, ZipFile(rewritten, "w") as target:
|
||||||
|
for entry in source.infolist():
|
||||||
|
if entry.filename != "docProps/app.xml":
|
||||||
|
target.writestr(entry, source.read(entry.filename))
|
||||||
|
target.writestr("docProps/app.xml", app_xml)
|
||||||
|
shutil.move(rewritten, docx_path)
|
||||||
|
|
||||||
|
result = count_document_pages(docx_path)
|
||||||
|
|
||||||
|
assert result.status == "success"
|
||||||
|
assert result.page_count == 7
|
||||||
|
|
||||||
|
|
||||||
|
def test_count_docx_pages_uses_word_com_fallback(monkeypatch, tmp_path):
|
||||||
|
docx_path = tmp_path / "without-pages.docx"
|
||||||
|
Document().save(docx_path)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"review_agent.file_summary.services.page_count._count_docx_pages_from_extended_properties",
|
||||||
|
lambda path: None,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"review_agent.file_summary.services.page_count._count_word_pages_with_com",
|
||||||
|
lambda path: 22,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = count_document_pages(docx_path)
|
||||||
|
|
||||||
|
assert result.status == "success"
|
||||||
|
assert result.page_count == 22
|
||||||
|
|
||||||
|
|
||||||
|
def test_count_doc_pages_uses_word_com_fallback(monkeypatch, tmp_path):
|
||||||
|
doc_path = tmp_path / "legacy.doc"
|
||||||
|
doc_path.write_bytes(b"legacy-doc-placeholder")
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"review_agent.file_summary.services.page_count._count_word_pages_with_com",
|
||||||
|
lambda path: 5,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = count_document_pages(doc_path)
|
||||||
|
|
||||||
|
assert result.status == "success"
|
||||||
|
assert result.page_count == 5
|
||||||
|
|
||||||
|
|
||||||
|
def test_count_ppt_pages_uses_powerpoint_com_fallback(monkeypatch, tmp_path):
|
||||||
|
ppt_path = tmp_path / "legacy.ppt"
|
||||||
|
ppt_path.write_bytes(b"legacy-ppt-placeholder")
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"review_agent.file_summary.services.page_count._count_powerpoint_slides_with_com",
|
||||||
|
lambda path: 9,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = count_document_pages(ppt_path)
|
||||||
|
|
||||||
|
assert result.status == "success"
|
||||||
|
assert result.page_count == 9
|
||||||
|
|
||||||
|
|
||||||
|
def test_count_excel_pages_uses_excel_com_fallback(monkeypatch, tmp_path):
|
||||||
|
xls_path = tmp_path / "legacy.xls"
|
||||||
|
xls_path.write_bytes(b"legacy-xls-placeholder")
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"review_agent.file_summary.services.page_count._count_excel_sheets_with_com",
|
||||||
|
lambda path: 3,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = count_document_pages(xls_path)
|
||||||
|
|
||||||
|
assert result.status == "success"
|
||||||
|
assert result.page_count == 3
|
||||||
|
|
||||||
|
|
||||||
def test_document_page_count_skill_marks_unsupported_and_success(tmp_path, django_user_model):
|
def test_document_page_count_skill_marks_unsupported_and_success(tmp_path, django_user_model):
|
||||||
xlsx_path = tmp_path / "a.xlsx"
|
xlsx_path = tmp_path / "a.xlsx"
|
||||||
workbook = Workbook()
|
workbook = Workbook()
|
||||||
|
|||||||
Reference in New Issue
Block a user