diff --git a/config/settings.py b/config/settings.py index ad63757..4cb4de2 100644 --- a/config/settings.py +++ b/config/settings.py @@ -115,6 +115,9 @@ REGULATORY_RAG_COLLECTION = os.environ.get( "nmpa_ivd_registration_v1", ) REGULATORY_REVIEW_ASYNC = os.environ.get("REGULATORY_REVIEW_ASYNC", "true").lower() == "true" +REGULATORY_LLM_REVIEW_MAX_ATTEMPTS = int(os.environ.get("REGULATORY_LLM_REVIEW_MAX_ATTEMPTS", "3")) +REGULATORY_LLM_REVIEW_RETRY_DELAY_SECONDS = float(os.environ.get("REGULATORY_LLM_REVIEW_RETRY_DELAY_SECONDS", "0.5")) +REGULATORY_LLM_REVIEW_TIMEOUT_SECONDS = float(os.environ.get("REGULATORY_LLM_REVIEW_TIMEOUT_SECONDS", "15")) SILICONFLOW_BASE_URL = os.environ.get("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1") SILICONFLOW_API_KEY = os.environ.get("SILICONFLOW_API_KEY", "") SILICONFLOW_EMBEDDING_MODEL = os.environ.get( @@ -126,10 +129,16 @@ SILICONFLOW_EMBEDDING_DIMENSIONS = int(os.environ.get("SILICONFLOW_EMBEDDING_DIM LOGGING = { "version": 1, "disable_existing_loggers": False, + "filters": { + "suppress_workflow_status_poll": { + "()": "review_agent.logging_filters.SuppressWorkflowStatusPollFilter", + }, + }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "verbose", + "filters": ["suppress_workflow_status_poll"], }, }, "formatters": { @@ -143,5 +152,10 @@ LOGGING = { "level": os.environ.get("REVIEW_AGENT_LOG_LEVEL", "INFO"), "propagate": True, }, + "django.server": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + }, }, } diff --git a/review_agent/llm.py b/review_agent/llm.py index 92e79c1..9057536 100644 --- a/review_agent/llm.py +++ b/review_agent/llm.py @@ -57,7 +57,7 @@ def generate_reply(conversation, user_message: str) -> str: raise LLMRequestError("模型接口返回格式不符合预期。") from exc -def generate_completion(messages: list[dict[str, str]], *, temperature: float = 0.0) -> str: +def generate_completion(messages: list[dict[str, str]], *, temperature: float = 0.0, timeout: float = 60) -> str: """Calls the configured chat endpoint with explicit messages and returns assistant text.""" if not settings.LLM_API_KEY: @@ -84,7 +84,7 @@ def generate_completion(messages: list[dict[str, str]], *, temperature: float = ) try: - with request.urlopen(http_request, timeout=60) as response: + with request.urlopen(http_request, timeout=timeout) as response: data = json.loads(response.read().decode("utf-8")) except error.HTTPError as exc: details = exc.read().decode("utf-8", errors="ignore") diff --git a/review_agent/logging_filters.py b/review_agent/logging_filters.py new file mode 100644 index 0000000..b340ea7 --- /dev/null +++ b/review_agent/logging_filters.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +import logging +import re + + +class SuppressWorkflowStatusPollFilter(logging.Filter): + """Hides noisy workflow status polling access logs from runserver output.""" + + STATUS_POLL_PATTERN = re.compile( + r'"GET /api/review-agent/(?:file-summary|regulatory-review)/\d+/status/ HTTP/[0-9.]+" 200 ' + ) + + def filter(self, record: logging.LogRecord) -> bool: + return not self.STATUS_POLL_PATTERN.search(record.getMessage()) diff --git a/review_agent/regulatory_review/services/llm_review.py b/review_agent/regulatory_review/services/llm_review.py index 4e5666d..9988c60 100644 --- a/review_agent/regulatory_review/services/llm_review.py +++ b/review_agent/regulatory_review/services/llm_review.py @@ -4,6 +4,7 @@ import json import os import re import time +import inspect from collections.abc import Callable from typing import Any @@ -152,9 +153,13 @@ def _parse_json_object(raw: str) -> dict[str, Any]: def _call_completion_with_retries(completion_func: Callable[..., str], messages: list[dict[str, str]]) -> str: attempts = max(1, int(getattr(settings, "REGULATORY_LLM_REVIEW_MAX_ATTEMPTS", 3) or 3)) delay_seconds = float(getattr(settings, "REGULATORY_LLM_REVIEW_RETRY_DELAY_SECONDS", 0.5) or 0) + timeout_seconds = float(getattr(settings, "REGULATORY_LLM_REVIEW_TIMEOUT_SECONDS", 15) or 15) + accepts_timeout = _accepts_timeout(completion_func) last_error: Exception | None = None for attempt in range(1, attempts + 1): try: + if accepts_timeout: + return completion_func(messages, temperature=0.0, timeout=timeout_seconds) return completion_func(messages, temperature=0.0) except (LLMRequestError, OSError, TimeoutError) as exc: last_error = exc @@ -167,6 +172,14 @@ def _call_completion_with_retries(completion_func: Callable[..., str], messages: raise LLMRequestError("LLM复核调用失败。") +def _accepts_timeout(completion_func: Callable[..., str]) -> bool: + try: + signature = inspect.signature(completion_func) + except (TypeError, ValueError): + return True + return "timeout" in signature.parameters + + def _should_call_llm(completion_func: Callable[..., str] | None) -> bool: if completion_func is not None: return True diff --git a/review_agent/regulatory_review/workflow.py b/review_agent/regulatory_review/workflow.py index 09499d2..8e3c62c 100644 --- a/review_agent/regulatory_review/workflow.py +++ b/review_agent/regulatory_review/workflow.py @@ -125,6 +125,7 @@ class RegulatoryWorkflowExecutor: self.llm_reviews: dict[str, dict[str, object]] = {} def run(self) -> None: + logger.info("法规核查工作流开始 batch_no=%s batch_id=%s", self.batch.batch_no, self.batch.pk) self.batch.status = RegulatoryReviewBatch.Status.RUNNING self.batch.started_at = timezone.now() self.batch.save(update_fields=["status", "started_at"]) @@ -136,6 +137,7 @@ class RegulatoryWorkflowExecutor: continue self._run_node(node) except WorkflowPausedForUser: + logger.info("法规核查工作流等待用户 batch_no=%s node=condition_confirm", self.batch.batch_no) return except Exception as exc: logger.exception("Regulatory workflow failed", extra={"batch_id": self.batch.pk}) @@ -150,6 +152,7 @@ class RegulatoryWorkflowExecutor: self.batch.finished_at = timezone.now() self.batch.save(update_fields=["status", "finished_at"]) record_event(self.batch, "workflow_completed", {"batch_id": self.batch.pk}) + logger.info("法规核查工作流完成 batch_no=%s findings=%s", self.batch.batch_no, len(self.findings)) def _nodes(self): return WorkflowNodeRun.objects.filter( @@ -158,6 +161,12 @@ class RegulatoryWorkflowExecutor: ).order_by("id") def _run_node(self, node: WorkflowNodeRun) -> None: + logger.info( + "节点开始 batch_no=%s node=%s name=%s", + self.batch.batch_no, + node.node_code, + node.node_name, + ) node.status = WorkflowNodeRun.Status.RUNNING node.progress = 10 node.started_at = timezone.now() @@ -181,6 +190,13 @@ class RegulatoryWorkflowExecutor: "node_progress", {"node_code": node.node_code, "status": node.status, "progress": node.progress, "message": node.message}, ) + logger.info( + "节点完成 batch_no=%s node=%s name=%s progress=%s", + self.batch.batch_no, + node.node_code, + node.node_name, + node.progress, + ) def _execute_node(self, node_code: str) -> None: if node_code == "condition_confirm": @@ -188,10 +204,22 @@ class RegulatoryWorkflowExecutor: return if node_code == "rule_scope": self.rule_set = apply_rule_scope(load_rule_file(), self.batch.condition_json.get("rule_scope") or {}) + logger.info( + "方法执行 batch_no=%s method=apply_rule_scope requirements=%s scope=%s", + self.batch.batch_no, + len(self.rule_set.get("requirements", [])), + self.batch.condition_json.get("rule_scope") or {}, + ) return if node_code == "completeness_check": findings = run_completeness_check(self.batch.source_summary_batch, self._rules()) self.findings.extend(findings) + logger.info( + "方法执行 batch_no=%s method=run_completeness_check findings=%s source_summary=%s", + self.batch.batch_no, + len(findings), + self.batch.source_summary_batch.batch_no, + ) self._save_llm_review( "completeness_check", { @@ -202,6 +230,12 @@ class RegulatoryWorkflowExecutor: return if node_code == "text_extract": self.document_texts = self._extract_source_texts() + logger.info( + "方法执行 batch_no=%s method=_extract_source_texts success_docs=%s total_files=%s", + self.batch.batch_no, + len(self.document_texts), + len(self.text_extract_status), + ) self._save_llm_review("text_extract", {"files": self.text_extract_status}) save_artifact( self.batch, @@ -214,17 +248,35 @@ class RegulatoryWorkflowExecutor: if node_code == "structure_check": findings = run_structure_check(self.document_texts, self._rules()) self.findings.extend(findings) + logger.info( + "方法执行 batch_no=%s method=run_structure_check findings=%s docs=%s", + self.batch.batch_no, + len(findings), + len(self.document_texts), + ) self._save_llm_review("structure_check", {"findings": [finding.to_dict() for finding in findings]}) return if node_code == "consistency_check": findings = run_consistency_check(self.document_texts) self.findings.extend(findings) + logger.info( + "方法执行 batch_no=%s method=run_consistency_check findings=%s docs=%s", + self.batch.batch_no, + len(findings), + len(self.document_texts), + ) self._save_llm_review("consistency_check", {"findings": [finding.to_dict() for finding in findings]}) return if node_code == "risk_assess": self._save_llm_review("risk_assess", {"findings": [finding.to_dict() for finding in self.findings]}) issues = persist_findings(self.batch, self.findings) create_mock_notifications(self.batch) + logger.info( + "方法执行 batch_no=%s method=persist_findings issues=%s findings=%s", + self.batch.batch_no, + len(issues), + len(self.findings), + ) save_artifact( self.batch, name="rag_result_json.json", @@ -251,6 +303,11 @@ class RegulatoryWorkflowExecutor: return if node_code == "report_export": exports = export_review_results(self.batch) + logger.info( + "方法执行 batch_no=%s method=export_review_results exports=%s", + self.batch.batch_no, + len(exports), + ) Message.objects.create( conversation=self.batch.conversation, role=Message.Role.ASSISTANT, @@ -261,6 +318,12 @@ class RegulatoryWorkflowExecutor: if self.batch.condition_json.get("confirmed"): return candidates = detect_regulatory_condition_candidates(self.batch.source_summary_batch) + logger.info( + "方法执行 batch_no=%s method=detect_regulatory_condition_candidates product_category=%s product_name=%s", + self.batch.batch_no, + (candidates.get("product_category") or {}).get("suggested"), + (candidates.get("product_name") or {}).get("suggested"), + ) self.batch.condition_json = { **(self.batch.condition_json or {}), "confirmed": False, @@ -297,6 +360,7 @@ class RegulatoryWorkflowExecutor: if not path.is_absolute(): path = Path(settings.MEDIA_ROOT) / item.storage_path if not path.exists(): + logger.info("文本抽取跳过 batch_no=%s file=%s reason=missing", self.batch.batch_no, item.file_name) self.text_extract_status[item.file_name] = { "status": "missing", "path": str(path), @@ -324,11 +388,25 @@ class RegulatoryWorkflowExecutor: } if result.status == "success" and result.text: texts[item.file_name] = result.text + logger.info( + "文本抽取文件 batch_no=%s file=%s status=%s fields=%s chars=%s", + self.batch.batch_no, + item.file_name, + result.status, + len((field_review.get("selected_fields") or {})), + len(result.text or ""), + ) return texts def _save_llm_review(self, stage: str, payload: dict[str, object]) -> dict[str, object]: review = review_workflow_payload(stage=stage, payload=payload) self.llm_reviews[stage] = review + logger.info( + "方法执行 batch_no=%s method=review_workflow_payload stage=%s status=%s", + self.batch.batch_no, + stage, + review.get("status"), + ) save_artifact( self.batch, name=f"llm_review_{stage}.json", diff --git a/tests/test_logging_filters.py b/tests/test_logging_filters.py new file mode 100644 index 0000000..629ecd3 --- /dev/null +++ b/tests/test_logging_filters.py @@ -0,0 +1,31 @@ +import logging + +from review_agent.logging_filters import SuppressWorkflowStatusPollFilter + + +def test_suppress_workflow_status_poll_filter_hides_status_poll_requests(): + record = logging.LogRecord( + name="django.server", + level=logging.INFO, + pathname="", + lineno=1, + msg='"GET /api/review-agent/regulatory-review/7/status/ HTTP/1.1" 200 1660', + args=(), + exc_info=None, + ) + + assert SuppressWorkflowStatusPollFilter().filter(record) is False + + +def test_suppress_workflow_status_poll_filter_keeps_other_requests(): + record = logging.LogRecord( + name="django.server", + level=logging.INFO, + pathname="", + lineno=1, + msg='"POST /api/review-agent/regulatory-review/7/conditions/ HTTP/1.1" 200 256', + args=(), + exc_info=None, + ) + + assert SuppressWorkflowStatusPollFilter().filter(record) is True diff --git a/tests/test_regulatory_llm_review.py b/tests/test_regulatory_llm_review.py index 85c2dd6..b67c762 100644 --- a/tests/test_regulatory_llm_review.py +++ b/tests/test_regulatory_llm_review.py @@ -91,3 +91,21 @@ def test_review_workflow_payload_retries_timeout_before_success(settings): assert attempts["count"] == 3 assert result["status"] == "success" assert result["result"]["reviewed"] is True + + +def test_review_workflow_payload_passes_configured_timeout(settings): + settings.REGULATORY_LLM_REVIEW_RETRY_DELAY_SECONDS = 0 + settings.REGULATORY_LLM_REVIEW_TIMEOUT_SECONDS = 7 + observed = {} + + def completion(messages, temperature=0.0, timeout=None): + observed["timeout"] = timeout + return json.dumps({"reviewed": True}) + + review_workflow_payload( + stage="completeness_check", + payload={"findings": []}, + completion_func=completion, + ) + + assert observed["timeout"] == 7 diff --git a/tests/test_regulatory_workflow.py b/tests/test_regulatory_workflow.py index 886ed60..9230357 100644 --- a/tests/test_regulatory_workflow.py +++ b/tests/test_regulatory_workflow.py @@ -1,3 +1,5 @@ +import logging + import pytest from review_agent.models import ( @@ -147,6 +149,33 @@ def test_workflow_continues_when_llm_review_times_out(monkeypatch, settings, dja assert batch.error_message == "" +def test_regulatory_workflow_logs_node_and_method_details(caplog, django_user_model): + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + summary = FileSummaryBatch.objects.create( + conversation=conversation, + user=user, + batch_no="FS-OK", + status=FileSummaryBatch.Status.SUCCESS, + ) + batch = create_regulatory_review_batch( + conversation=conversation, + user=user, + source_summary_batch=summary, + ) + batch.condition_json = {"confirmed": True, "confirmed_conditions": {"product_category": "体外诊断试剂"}} + batch.save(update_fields=["condition_json"]) + + with caplog.at_level(logging.INFO, logger="review_agent.regulatory_review.workflow"): + start_regulatory_review_workflow(batch, async_run=False) + + messages = [record.getMessage() for record in caplog.records] + assert any("法规核查工作流开始" in message and batch.batch_no in message for message in messages) + assert any("节点开始" in message and "完整性核查" in message for message in messages) + assert any("方法执行" in message and "run_completeness_check" in message for message in messages) + assert any("节点完成" in message and "完整性核查" in message for message in messages) + + def test_stream_message_prompts_for_summary_when_missing(monkeypatch, django_user_model): user = django_user_model.objects.create_user(username="owner", password="pass") conversation = Conversation.objects.create(user=user, title="会话")