Files
DEMO-AGENT/apps/chat/services.py

206 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from collections.abc import Callable
from django.utils import timezone
from agent_core.orchestrator import run_agent
from agent_core.results import AgentResult
from apps.audit.services import create_audit_log, create_notification_record
from apps.scenarios.services import get_scenario
from .models import Conversation
def create_conversation_for_batch(batch_id: str, product_name: str) -> Conversation:
"""
为资料包创建主会话。
会话标题固定优先使用解析出的产品名称,
缺失时回退到批次号,确保前台始终有稳定标题。
"""
conversation = Conversation.objects.create(
conversation_id=_generate_conversation_id(),
title=product_name or f"未命名资料包-{batch_id}",
product_name=product_name,
batch_id=batch_id,
task_status=Conversation.STATUS_PENDING,
node_results=_build_initial_node_results(),
)
return conversation
def create_knowledge_conversation() -> Conversation:
"""
创建未绑定资料包的知识库问答会话。
该会话用于用户尚未上传资料时直接向 RAG 知识库提问,
因此 batch_id 与 product_name 保持为空Agent Core 通过空范围执行全局检索。
"""
return Conversation.objects.create(
conversation_id=_generate_conversation_id(),
title="知识库问答会话",
product_name="",
batch_id="",
task_status=Conversation.STATUS_PENDING,
node_results=_build_knowledge_node_results(),
)
def execute_conversation_agent(
*,
conversation: Conversation,
message: str,
document_ids: list[int],
detail_url_builder: Callable[[int], str] | None = None,
) -> tuple[AgentResult, object]:
"""
在服务层串起会话执行、审计留痕与通知落库。
View 只负责收集请求参数和渲染结果,不直接承载 Agent Core 编排。
"""
scenario = get_scenario("document_review")
try:
result = run_agent(
scenario,
message,
options={
"conversation_id": conversation.conversation_id,
"batch_id": conversation.batch_id,
"product_name": conversation.product_name,
"document_ids": document_ids,
},
)
except Exception as exc:
result = AgentResult(status="failed", error=str(exc), answer="")
audit_log = create_audit_log(
"document_review",
"注册审核智能体",
message,
result,
batch_id=conversation.batch_id,
conversation_id=conversation.conversation_id,
product_name=conversation.product_name,
)
_apply_agent_result_to_conversation(conversation, result)
detail_url = detail_url_builder(audit_log.id) if detail_url_builder else ""
_persist_notification_records(result, web_detail_url=detail_url)
return result, audit_log
def execute_conversation_export(*, batch, conversation: Conversation) -> dict:
"""
在服务层串起 Word 导出、会话摘要更新和审计留痕。
View 只负责提示成功/失败消息,不直接承载导出编排细节。
"""
from .export_service import (
generate_registration_export,
update_conversation_with_export_report,
)
upstream_summary = (
(conversation.latest_summary or {}).get("upstream_structured_output")
or (conversation.latest_summary or {}).get("structured_output")
or {}
)
export_report = generate_registration_export(
batch=batch,
conversation=conversation,
upstream_summary=upstream_summary,
)
update_conversation_with_export_report(conversation, export_report)
audit_log = create_audit_log(
"document_review",
"Word 回填导出",
"生成 Word 导出文件",
AgentResult(
answer=export_report.get("summary", ""),
structured_output=export_report,
status="success",
conversation_id=conversation.conversation_id,
batch_id=conversation.batch_id,
product_name=conversation.product_name,
node_results=conversation.node_results,
),
batch_id=conversation.batch_id,
conversation_id=conversation.conversation_id,
product_name=conversation.product_name,
)
return {
"export_report": export_report,
"audit_log": audit_log,
}
def _generate_conversation_id() -> str:
return f"conv-{Conversation.objects.count() + 1:03d}"
def _build_initial_node_results() -> list[dict]:
return [
{"code": "package_import", "label": "资料包导入", "status": "已完成"},
{"code": "overview", "label": "目录汇总", "status": "处理中"},
{"code": "completeness", "label": "法规完整性检查", "status": "待处理"},
{"code": "field_extraction", "label": "字段抽取", "status": "待处理"},
{"code": "consistency", "label": "一致性核查", "status": "待处理"},
{"code": "risk", "label": "风险预警", "status": "待处理"},
{"code": "word_export", "label": "Word 回填导出", "status": "待处理"},
{"code": "feishu_notify", "label": "飞书通知", "status": "待处理"},
]
def _build_knowledge_node_results() -> list[dict]:
return [
{"code": "knowledge_retrieval", "label": "知识库检索", "status": "待处理"},
{"code": "answer_generation", "label": "问答生成", "status": "待处理"},
{"code": "risk", "label": "风险预警", "status": "待处理"},
]
def _persist_notification_records(result: AgentResult, *, web_detail_url: str = "") -> None:
payload = result.notification_payload or {}
owners = payload.get("owners") or []
if not owners:
return
resolved_detail_url = payload.get("web_detail_url") or web_detail_url
resolved_message_status = payload.get("message_status") or (
"sent" if result.status == "success" else "failed"
)
resolved_receipt = payload.get("receipt") or {"status": result.status}
for owner in owners:
create_notification_record(
batch_id=payload.get("batch_id", ""),
conversation_id=payload.get("conversation_id", ""),
product_name=payload.get("product_name", ""),
trigger_source="agent_execution",
notify_reason=payload.get("notify_reason", "task_completed"),
owner_role=owner.get("owner_role", ""),
feishu_user_id=owner.get("feishu_user_id", ""),
message_status=resolved_message_status,
web_detail_url=resolved_detail_url,
receipt=resolved_receipt,
)
def _apply_agent_result_to_conversation(conversation: Conversation, result: AgentResult) -> None:
conversation.task_status = result.status
if result.node_results:
conversation.node_results = result.node_results
conversation.latest_summary = {
"answer": result.answer,
"status": result.status,
"error": result.error,
"structured_output": result.structured_output,
"notification_payload": result.notification_payload,
}
conversation.last_run_at = timezone.now()
conversation.save(
update_fields=[
"task_status",
"node_results",
"latest_summary",
"last_run_at",
"updated_at",
]
)