From daa0642142373065012b8a83c315d5ecaf8cc3c2 Mon Sep 17 00:00:00 2001 From: bruce Date: Sat, 6 Jun 2026 19:45:13 +0800 Subject: [PATCH] =?UTF-8?q?fix(agent):=20=E5=A2=9E=E5=BC=BA=20LLM=20?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E5=9B=9E=E5=A4=8D=E5=85=9C=E5=BA=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- review_agent/llm.py | 10 ++++++++- review_agent/services.py | 38 ++++++++++++++++++++++++++++------ tests/test_llm_streaming.py | 41 +++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 7 deletions(-) create mode 100644 tests/test_llm_streaming.py diff --git a/review_agent/llm.py b/review_agent/llm.py index 6c7def7..92e79c1 100644 --- a/review_agent/llm.py +++ b/review_agent/llm.py @@ -1,4 +1,5 @@ import json +import logging from urllib import error, request from django.conf import settings @@ -12,6 +13,9 @@ class LLMRequestError(RuntimeError): """Raised when the remote LLM provider call fails.""" +logger = logging.getLogger(__name__) + + def generate_reply(conversation, user_message: str) -> str: """Calls the SiliconFlow OpenAI-compatible chat endpoint and returns assistant text.""" @@ -130,7 +134,11 @@ def stream_reply(conversation, user_message: str): data = line[5:].strip() if data == "[DONE]": break - payload = json.loads(data) + try: + payload = json.loads(data) + except json.JSONDecodeError: + logger.warning("Skipping malformed LLM stream data", extra={"data": data[:200]}) + continue delta = ( payload.get("choices", [{}])[0] .get("delta", {}) diff --git a/review_agent/services.py b/review_agent/services.py index 3d3f720..376d3c5 100644 --- a/review_agent/services.py +++ b/review_agent/services.py @@ -219,26 +219,52 @@ def stream_message(conversation: Conversation, content: str): ) return + stream_failed = False + stream_error = "" try: for chunk in stream_reply(conversation, content): assistant_parts.append(chunk) yield sse_event("chunk", {"delta": chunk}) except (LLMConfigurationError, LLMRequestError) as exc: - fallback = f"模型调用失败:{exc}" - assistant_parts = [fallback] + stream_failed = True + stream_error = str(exc) logger.warning( "LLM stream failed", extra={"conversation_id": conversation.pk, "error": str(exc)}, ) - yield sse_event("error", {"message": fallback}) except Exception as exc: - fallback = f"回复生成中断:{exc}" - assistant_parts.append("\n\n" + fallback) + stream_failed = True + stream_error = str(exc) logger.exception( "Unexpected stream failure", extra={"conversation_id": conversation.pk, "error": str(exc)}, ) - yield sse_event("error", {"message": fallback}) + + if stream_failed: + try: + fallback_reply = generate_reply(conversation, content) + assistant_parts = [fallback_reply] + logger.info( + "Non-stream fallback reply succeeded", + extra={"conversation_id": conversation.pk, "content_length": len(fallback_reply)}, + ) + yield sse_event("replace", {"content": fallback_reply}) + except (LLMConfigurationError, LLMRequestError) as exc: + fallback = f"模型调用失败:{exc}" + assistant_parts = [fallback] + logger.warning( + "Non-stream fallback reply failed", + extra={"conversation_id": conversation.pk, "error": str(exc), "stream_error": stream_error}, + ) + yield sse_event("error", {"message": fallback}) + except Exception as exc: + fallback = f"回复生成中断:{stream_error or exc}" + assistant_parts.append("\n\n" + fallback) + logger.exception( + "Non-stream fallback crashed", + extra={"conversation_id": conversation.pk, "error": str(exc), "stream_error": stream_error}, + ) + yield sse_event("error", {"message": fallback}) assistant_message = append_assistant_message(conversation, "".join(assistant_parts).strip()) diff --git a/tests/test_llm_streaming.py b/tests/test_llm_streaming.py new file mode 100644 index 0000000..dae4f91 --- /dev/null +++ b/tests/test_llm_streaming.py @@ -0,0 +1,41 @@ +import io +from urllib import request + +import pytest + +from review_agent.llm import stream_reply +from review_agent.models import Conversation + + +pytestmark = pytest.mark.django_db + + +class FakeStreamingResponse: + def __iter__(self): + return iter( + [ + b'data: {"choices":[{"delta":{"content":"A"}}]}\n\n', + b"data: not-json\n\n", + b'data: {"choices":[{"delta":{"content":"B"}}]}\n\n', + b"data: [DONE]\n\n", + ] + ) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, traceback): + return False + + +def test_stream_reply_skips_malformed_sse_data(monkeypatch, settings, django_user_model): + settings.LLM_API_KEY = "key" + settings.LLM_MODEL = "model" + settings.LLM_BASE_URL = "https://example.test/v1" + monkeypatch.setattr(request, "urlopen", lambda req, timeout: FakeStreamingResponse()) + user = django_user_model.objects.create_user(username="owner", password="pass") + conversation = Conversation.objects.create(user=user, title="会话") + + chunks = list(stream_reply(conversation, "你好")) + + assert chunks == ["A", "B"]