feat: add feishu question preview services

This commit is contained in:
2026-06-07 22:16:36 +08:00
parent be7fbab0a0
commit bd9b2e872e
7 changed files with 288 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Reserved Feishu question services."""

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
import re
WORKFLOW_KEYWORDS = {
"regulatory_review": ("法规核查", "风险", "整改", "RR-"),
"application_form_fill": ("自动填表", "填表", "申报文件", "AFF-"),
"file_summary": ("自动汇总", "文件汇总", "目录", "页数", "FS-"),
}
def parse_question_intent(text: str) -> dict[str, object]:
normalized = (text or "").strip()
batch_no = _extract_batch_no(normalized)
workflow_type = _detect_workflow_type(normalized, batch_no)
latest = bool(re.search(r"(最新|最近|上一个|最后一个)", normalized))
intent = "batch_status" if batch_no or latest else "unknown"
if workflow_type == "regulatory_review" and any(keyword in normalized for keyword in ["风险", "阻断", "整改"]):
intent = "risk_summary"
if workflow_type == "application_form_fill" and any(keyword in normalized for keyword in ["导出", "文件", "word", "Word"]):
intent = "export_summary"
if workflow_type == "file_summary" and any(keyword in normalized for keyword in ["缺失", "目录", "页数"]):
intent = "missing_summary"
return {
"intent": intent,
"workflow_type": workflow_type,
"batch_no": batch_no,
"latest": latest or not batch_no,
}
def _extract_batch_no(text: str) -> str:
match = re.search(r"\b(?:RR|AFF|FS)-[A-Za-z0-9-]+", text, flags=re.IGNORECASE)
return match.group(0).upper() if match else ""
def _detect_workflow_type(text: str, batch_no: str = "") -> str:
source = f"{text} {batch_no}"
for workflow_type, keywords in WORKFLOW_KEYWORDS.items():
if any(keyword in source for keyword in keywords):
return workflow_type
return ""

View File

@@ -0,0 +1,9 @@
from __future__ import annotations
def can_access_batch(user, batch) -> bool:
if not user or not getattr(user, "is_authenticated", False):
return False
if getattr(user, "is_staff", False) or getattr(user, "is_superuser", False):
return True
return getattr(batch, "user_id", None) == user.pk

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
from review_agent.models import ApplicationFormFillBatch, ExportedSummaryFile, FileSummaryBatch, RegulatoryReviewBatch
from .permissions import can_access_batch
WORKFLOW_MODELS = {
"file_summary": FileSummaryBatch,
"regulatory_review": RegulatoryReviewBatch,
"application_form_fill": ApplicationFormFillBatch,
}
def query_batch_summary(user, *, workflow_type: str | None = None, batch_no: str | None = None, latest: bool = False) -> dict:
candidates = _candidate_batches(workflow_type)
if batch_no:
for current_workflow_type, model in candidates:
batch = model.objects.filter(batch_no=batch_no).first()
if batch:
return _serialize_allowed_batch(user, current_workflow_type, batch)
return {"ok": False, "permission_result": "not_found", "answer_summary": "未找到对应批次。"}
if latest:
for current_workflow_type, model in candidates:
queryset = model.objects.all().order_by("-finished_at", "-created_at", "-id")
for batch in queryset:
if can_access_batch(user, batch):
return _serialize_batch(current_workflow_type, batch, permission_result="allowed")
return {"ok": False, "permission_result": "not_found", "answer_summary": "未找到可访问的批次。"}
return {"ok": False, "permission_result": "not_found", "answer_summary": "请提供批次号,或询问最新/最近批次。"}
def _candidate_batches(workflow_type: str | None):
if workflow_type and workflow_type in WORKFLOW_MODELS:
return [(workflow_type, WORKFLOW_MODELS[workflow_type])]
return list(WORKFLOW_MODELS.items())
def _serialize_allowed_batch(user, workflow_type: str, batch) -> dict:
if not can_access_batch(user, batch):
return {"ok": False, "permission_result": "denied", "answer_summary": "无权限访问该批次。"}
return _serialize_batch(workflow_type, batch, permission_result="allowed")
def _serialize_batch(workflow_type: str, batch, *, permission_result: str) -> dict:
summary = _summary_for_batch(workflow_type, batch)
result_url = _result_url(workflow_type, batch.pk)
answer = f"{batch.batch_no} 状态 {batch.status}{summary}"
return {
"ok": True,
"permission_result": permission_result,
"workflow_type": workflow_type,
"batch_id": batch.pk,
"batch_no": batch.batch_no,
"status": batch.status,
"summary": summary,
"result_url": result_url,
"answer_summary": answer,
}
def _summary_for_batch(workflow_type: str, batch) -> str:
if workflow_type == "file_summary":
return f"文件 {batch.total_files} 个,成功 {batch.success_files} 个,失败 {batch.failed_files} 个。"
if workflow_type == "regulatory_review":
risk = batch.risk_summary or {}
return f"阻断项 {int(risk.get('blocking') or 0)} 个,高风险 {int(risk.get('high') or 0)} 个。"
if workflow_type == "application_form_fill":
export_count = ExportedSummaryFile.objects.filter(
workflow_type="application_form_fill",
workflow_batch_id=batch.pk,
).count()
return f"导出文件 {export_count} 个,冲突字段 {len(batch.conflict_summary or [])} 个。"
return ""
def _result_url(workflow_type: str, batch_id: int) -> str:
paths = {
"file_summary": f"/api/review-agent/file-summary/{batch_id}/status/",
"regulatory_review": f"/api/review-agent/regulatory-review/{batch_id}/status/",
"application_form_fill": f"/api/review-agent/application-form-fill/{batch_id}/status/",
}
return paths.get(workflow_type, "/")

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
from django.utils import timezone
from review_agent.models import FeishuQuestionLog
from .intent import parse_question_intent
from .query import query_batch_summary
def answer_question(user, text: str, *, source_type: str = FeishuQuestionLog.SourceType.SIMULATE) -> dict:
parsed = parse_question_intent(text)
result = query_batch_summary(
user,
workflow_type=parsed.get("workflow_type") or None,
batch_no=parsed.get("batch_no") or None,
latest=bool(parsed.get("latest")),
)
status = FeishuQuestionLog.Status.SUCCESS if result.get("ok") else FeishuQuestionLog.Status.FAILED
answer_summary = str(result.get("answer_summary") or "")
log = FeishuQuestionLog.objects.create(
system_user=user if getattr(user, "is_authenticated", False) else None,
source_type=source_type,
question_text=text,
intent=str(parsed.get("intent") or "unknown"),
query_object={
"workflow_type": parsed.get("workflow_type") or "",
"batch_no": parsed.get("batch_no") or "",
"latest": bool(parsed.get("latest")),
},
answer_summary=answer_summary[:500],
permission_result=str(result.get("permission_result") or ""),
status=status,
error_message="" if result.get("ok") else answer_summary,
processed_at=timezone.now(),
)
return {**result, "intent": parsed.get("intent"), "log_id": log.pk}

View File

@@ -0,0 +1,21 @@
from __future__ import annotations
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand, CommandError
from review_agent.feishu_questions.service import answer_question
class Command(BaseCommand):
help = "Simulate a reserved Feishu question against local workflow data."
def add_arguments(self, parser):
parser.add_argument("--username", required=True, help="System username used as asker.")
parser.add_argument("question", help="Question text, for example: 查最新法规核查")
def handle(self, *args, **options):
user = get_user_model().objects.filter(username=options["username"]).first()
if not user:
raise CommandError(f"用户不存在:{options['username']}")
result = answer_question(user, options["question"])
self.stdout.write(result.get("answer_summary") or "无可返回摘要。")

View File

@@ -0,0 +1,92 @@
from io import StringIO
from django.core.management import call_command
import pytest
from review_agent.feishu_questions.intent import parse_question_intent
from review_agent.feishu_questions.query import query_batch_summary
from review_agent.feishu_questions.service import answer_question
from review_agent.models import Conversation, FeishuQuestionLog, FileSummaryBatch, RegulatoryReviewBatch
pytestmark = pytest.mark.django_db
def test_query_latest_regulatory_batch_for_owner(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-001")
RegulatoryReviewBatch.objects.create(
conversation=conversation,
user=user,
source_summary_batch=summary,
batch_no="RR-001",
status=RegulatoryReviewBatch.Status.SUCCESS,
risk_summary={"blocking": 0, "high": 1},
)
result = query_batch_summary(user, workflow_type="regulatory_review", latest=True)
assert result["ok"]
assert result["batch_no"] == "RR-001"
assert "高风险 1" in result["answer_summary"]
def test_query_denies_other_users_batch(django_user_model):
owner = django_user_model.objects.create_user(username="owner", password="pass")
other = django_user_model.objects.create_user(username="other", password="pass")
conversation = Conversation.objects.create(user=owner, title="会话")
batch = FileSummaryBatch.objects.create(conversation=conversation, user=owner, batch_no="FS-PRIVATE")
result = query_batch_summary(other, batch_no=batch.batch_no)
assert not result["ok"]
assert result["permission_result"] == "denied"
def test_query_admin_can_access_other_users_batch(django_user_model):
owner = django_user_model.objects.create_user(username="owner", password="pass")
admin = django_user_model.objects.create_user(username="admin", password="pass", is_staff=True)
conversation = Conversation.objects.create(user=owner, title="会话")
FileSummaryBatch.objects.create(conversation=conversation, user=owner, batch_no="FS-ADMIN")
result = query_batch_summary(admin, batch_no="FS-ADMIN")
assert result["ok"]
assert result["permission_result"] == "allowed"
def test_parse_question_intent_recognizes_batch_latest_and_workflow():
parsed = parse_question_intent("查最新法规核查")
assert parsed["workflow_type"] == "regulatory_review"
assert parsed["latest"] is True
parsed = parse_question_intent("AFF-20260607-001 的 Word 在哪里")
assert parsed["workflow_type"] == "application_form_fill"
assert parsed["batch_no"] == "AFF-20260607-001"
assert parsed["intent"] == "export_summary"
def test_answer_question_records_log_without_full_answer(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-LOG")
result = answer_question(user, "查最新自动汇总")
log = FeishuQuestionLog.objects.get(pk=result["log_id"])
assert log.intent == "batch_status"
assert log.query_object["workflow_type"] == "file_summary"
assert log.answer_summary
assert len(log.answer_summary) <= 500
def test_feishu_question_simulate_command_outputs_summary(django_user_model):
user = django_user_model.objects.create_user(username="owner", password="pass")
conversation = Conversation.objects.create(user=user, title="会话")
FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-CMD")
output = StringIO()
call_command("feishu_question_simulate", "--username", user.username, "查最新自动汇总", stdout=output)
assert "FS-CMD" in output.getvalue()