diff --git a/review_agent/feishu_questions/__init__.py b/review_agent/feishu_questions/__init__.py new file mode 100644 index 0000000..c83871d --- /dev/null +++ b/review_agent/feishu_questions/__init__.py @@ -0,0 +1 @@ +"""Reserved Feishu question services.""" diff --git a/review_agent/feishu_questions/intent.py b/review_agent/feishu_questions/intent.py new file mode 100644 index 0000000..d0cadd1 --- /dev/null +++ b/review_agent/feishu_questions/intent.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import re + + +WORKFLOW_KEYWORDS = { + "regulatory_review": ("法规核查", "风险", "整改", "RR-"), + "application_form_fill": ("自动填表", "填表", "申报文件", "AFF-"), + "file_summary": ("自动汇总", "文件汇总", "目录", "页数", "FS-"), +} + + +def parse_question_intent(text: str) -> dict[str, object]: + normalized = (text or "").strip() + batch_no = _extract_batch_no(normalized) + workflow_type = _detect_workflow_type(normalized, batch_no) + latest = bool(re.search(r"(最新|最近|上一个|最后一个)", normalized)) + intent = "batch_status" if batch_no or latest else "unknown" + if workflow_type == "regulatory_review" and any(keyword in normalized for keyword in ["风险", "阻断", "整改"]): + intent = "risk_summary" + if workflow_type == "application_form_fill" and any(keyword in normalized for keyword in ["导出", "文件", "word", "Word"]): + intent = "export_summary" + if workflow_type == "file_summary" and any(keyword in normalized for keyword in ["缺失", "目录", "页数"]): + intent = "missing_summary" + return { + "intent": intent, + "workflow_type": workflow_type, + "batch_no": batch_no, + "latest": latest or not batch_no, + } + + +def _extract_batch_no(text: str) -> str: + match = re.search(r"\b(?:RR|AFF|FS)-[A-Za-z0-9-]+", text, flags=re.IGNORECASE) + return match.group(0).upper() if match else "" + + +def _detect_workflow_type(text: str, batch_no: str = "") -> str: + source = f"{text} {batch_no}" + for workflow_type, keywords in WORKFLOW_KEYWORDS.items(): + if any(keyword in source for keyword in keywords): + return workflow_type + return "" diff --git a/review_agent/feishu_questions/permissions.py b/review_agent/feishu_questions/permissions.py new file mode 100644 index 0000000..a99ea63 --- /dev/null +++ b/review_agent/feishu_questions/permissions.py @@ -0,0 +1,9 @@ +from __future__ import annotations + + +def can_access_batch(user, batch) -> bool: + if not user or not getattr(user, "is_authenticated", False): + return False + if getattr(user, "is_staff", False) or getattr(user, "is_superuser", False): + return True + return getattr(batch, "user_id", None) == user.pk diff --git a/review_agent/feishu_questions/query.py b/review_agent/feishu_questions/query.py new file mode 100644 index 0000000..91f8256 --- /dev/null +++ b/review_agent/feishu_questions/query.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from review_agent.models import ApplicationFormFillBatch, ExportedSummaryFile, FileSummaryBatch, RegulatoryReviewBatch + +from .permissions import can_access_batch + + +WORKFLOW_MODELS = { + "file_summary": FileSummaryBatch, + "regulatory_review": RegulatoryReviewBatch, + "application_form_fill": ApplicationFormFillBatch, +} + + +def query_batch_summary(user, *, workflow_type: str | None = None, batch_no: str | None = None, latest: bool = False) -> dict: + candidates = _candidate_batches(workflow_type) + if batch_no: + for current_workflow_type, model in candidates: + batch = model.objects.filter(batch_no=batch_no).first() + if batch: + return _serialize_allowed_batch(user, current_workflow_type, batch) + return {"ok": False, "permission_result": "not_found", "answer_summary": "未找到对应批次。"} + + if latest: + for current_workflow_type, model in candidates: + queryset = model.objects.all().order_by("-finished_at", "-created_at", "-id") + for batch in queryset: + if can_access_batch(user, batch): + return _serialize_batch(current_workflow_type, batch, permission_result="allowed") + return {"ok": False, "permission_result": "not_found", "answer_summary": "未找到可访问的批次。"} + + return {"ok": False, "permission_result": "not_found", "answer_summary": "请提供批次号,或询问最新/最近批次。"} + + +def _candidate_batches(workflow_type: str | None): + if workflow_type and workflow_type in WORKFLOW_MODELS: + return [(workflow_type, WORKFLOW_MODELS[workflow_type])] + return list(WORKFLOW_MODELS.items()) + + +def _serialize_allowed_batch(user, workflow_type: str, batch) -> dict: + if not can_access_batch(user, batch): + return {"ok": False, "permission_result": "denied", "answer_summary": "无权限访问该批次。"} + return _serialize_batch(workflow_type, batch, permission_result="allowed") + + +def _serialize_batch(workflow_type: str, batch, *, permission_result: str) -> dict: + summary = _summary_for_batch(workflow_type, batch) + result_url = _result_url(workflow_type, batch.pk) + answer = f"{batch.batch_no} 状态 {batch.status}。{summary}" + return { + "ok": True, + "permission_result": permission_result, + "workflow_type": workflow_type, + "batch_id": batch.pk, + "batch_no": batch.batch_no, + "status": batch.status, + "summary": summary, + "result_url": result_url, + "answer_summary": answer, + } + + +def _summary_for_batch(workflow_type: str, batch) -> str: + if workflow_type == "file_summary": + return f"文件 {batch.total_files} 个,成功 {batch.success_files} 个,失败 {batch.failed_files} 个。" + if workflow_type == "regulatory_review": + risk = batch.risk_summary or {} + return f"阻断项 {int(risk.get('blocking') or 0)} 个,高风险 {int(risk.get('high') or 0)} 个。" + if workflow_type == "application_form_fill": + export_count = ExportedSummaryFile.objects.filter( + workflow_type="application_form_fill", + workflow_batch_id=batch.pk, + ).count() + return f"导出文件 {export_count} 个,冲突字段 {len(batch.conflict_summary or [])} 个。" + return "" + + +def _result_url(workflow_type: str, batch_id: int) -> str: + paths = { + "file_summary": f"/api/review-agent/file-summary/{batch_id}/status/", + "regulatory_review": f"/api/review-agent/regulatory-review/{batch_id}/status/", + "application_form_fill": f"/api/review-agent/application-form-fill/{batch_id}/status/", + } + return paths.get(workflow_type, "/") diff --git a/review_agent/feishu_questions/service.py b/review_agent/feishu_questions/service.py new file mode 100644 index 0000000..3d36d8f --- /dev/null +++ b/review_agent/feishu_questions/service.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from django.utils import timezone + +from review_agent.models import FeishuQuestionLog + +from .intent import parse_question_intent +from .query import query_batch_summary + + +def answer_question(user, text: str, *, source_type: str = FeishuQuestionLog.SourceType.SIMULATE) -> dict: + parsed = parse_question_intent(text) + result = query_batch_summary( + user, + workflow_type=parsed.get("workflow_type") or None, + batch_no=parsed.get("batch_no") or None, + latest=bool(parsed.get("latest")), + ) + status = FeishuQuestionLog.Status.SUCCESS if result.get("ok") else FeishuQuestionLog.Status.FAILED + answer_summary = str(result.get("answer_summary") or "") + log = FeishuQuestionLog.objects.create( + system_user=user if getattr(user, "is_authenticated", False) else None, + source_type=source_type, + question_text=text, + intent=str(parsed.get("intent") or "unknown"), + query_object={ + "workflow_type": parsed.get("workflow_type") or "", + "batch_no": parsed.get("batch_no") or "", + "latest": bool(parsed.get("latest")), + }, + answer_summary=answer_summary[:500], + permission_result=str(result.get("permission_result") or ""), + status=status, + error_message="" if result.get("ok") else answer_summary, + processed_at=timezone.now(), + ) + return {**result, "intent": parsed.get("intent"), "log_id": log.pk} diff --git a/review_agent/management/commands/feishu_question_simulate.py b/review_agent/management/commands/feishu_question_simulate.py new file mode 100644 index 0000000..1220ed4 --- /dev/null +++ b/review_agent/management/commands/feishu_question_simulate.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from django.contrib.auth import get_user_model +from django.core.management.base import BaseCommand, CommandError + +from review_agent.feishu_questions.service import answer_question + + +class Command(BaseCommand): + help = "Simulate a reserved Feishu question against local workflow data." + + def add_arguments(self, parser): + parser.add_argument("--username", required=True, help="System username used as asker.") + parser.add_argument("question", help="Question text, for example: 查最新法规核查") + + def handle(self, *args, **options): + user = get_user_model().objects.filter(username=options["username"]).first() + if not user: + raise CommandError(f"用户不存在:{options['username']}") + result = answer_question(user, options["question"]) + self.stdout.write(result.get("answer_summary") or "无可返回摘要。") diff --git a/tests/test_feishu_question_reserved.py b/tests/test_feishu_question_reserved.py new file mode 100644 index 0000000..07b01e4 --- /dev/null +++ b/tests/test_feishu_question_reserved.py @@ -0,0 +1,92 @@ +from io import StringIO + +from django.core.management import call_command +import pytest + +from review_agent.feishu_questions.intent import parse_question_intent +from review_agent.feishu_questions.query import query_batch_summary +from review_agent.feishu_questions.service import answer_question +from review_agent.models import Conversation, FeishuQuestionLog, FileSummaryBatch, RegulatoryReviewBatch + + +pytestmark = pytest.mark.django_db + + +def test_query_latest_regulatory_batch_for_owner(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-001") + RegulatoryReviewBatch.objects.create( + conversation=conversation, + user=user, + source_summary_batch=summary, + batch_no="RR-001", + status=RegulatoryReviewBatch.Status.SUCCESS, + risk_summary={"blocking": 0, "high": 1}, + ) + + result = query_batch_summary(user, workflow_type="regulatory_review", latest=True) + + assert result["ok"] + assert result["batch_no"] == "RR-001" + assert "高风险 1" in result["answer_summary"] + + +def test_query_denies_other_users_batch(django_user_model): + owner = django_user_model.objects.create_user(username="owner", password="pass") + other = django_user_model.objects.create_user(username="other", password="pass") + conversation = Conversation.objects.create(user=owner, title="会话") + batch = FileSummaryBatch.objects.create(conversation=conversation, user=owner, batch_no="FS-PRIVATE") + + result = query_batch_summary(other, batch_no=batch.batch_no) + + assert not result["ok"] + assert result["permission_result"] == "denied" + + +def test_query_admin_can_access_other_users_batch(django_user_model): + owner = django_user_model.objects.create_user(username="owner", password="pass") + admin = django_user_model.objects.create_user(username="admin", password="pass", is_staff=True) + conversation = Conversation.objects.create(user=owner, title="会话") + FileSummaryBatch.objects.create(conversation=conversation, user=owner, batch_no="FS-ADMIN") + + result = query_batch_summary(admin, batch_no="FS-ADMIN") + + assert result["ok"] + assert result["permission_result"] == "allowed" + + +def test_parse_question_intent_recognizes_batch_latest_and_workflow(): + parsed = parse_question_intent("查最新法规核查") + assert parsed["workflow_type"] == "regulatory_review" + assert parsed["latest"] is True + + parsed = parse_question_intent("AFF-20260607-001 的 Word 在哪里") + assert parsed["workflow_type"] == "application_form_fill" + assert parsed["batch_no"] == "AFF-20260607-001" + assert parsed["intent"] == "export_summary" + + +def test_answer_question_records_log_without_full_answer(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-LOG") + + result = answer_question(user, "查最新自动汇总") + + log = FeishuQuestionLog.objects.get(pk=result["log_id"]) + assert log.intent == "batch_status" + assert log.query_object["workflow_type"] == "file_summary" + assert log.answer_summary + assert len(log.answer_summary) <= 500 + + +def test_feishu_question_simulate_command_outputs_summary(django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + FileSummaryBatch.objects.create(conversation=conversation, user=user, batch_no="FS-CMD") + output = StringIO() + + call_command("feishu_question_simulate", "--username", user.username, "查最新自动汇总", stdout=output) + + assert "FS-CMD" in output.getvalue()