diff --git a/src/llm_models/model_client/base_client.py b/src/llm_models/model_client/base_client.py index eb74b0df..dcb710fe 100644 --- a/src/llm_models/model_client/base_client.py +++ b/src/llm_models/model_client/base_client.py @@ -72,8 +72,8 @@ class BaseClient(ABC): model_info: ModelInfo, message_list: list[Message], tool_options: list[ToolOption] | None = None, - max_tokens: int = 1024, - temperature: float = 0.7, + max_tokens: Optional[int] = None, + temperature: Optional[float] = None, response_format: RespFormat | None = None, stream_response_handler: Optional[ Callable[[Any, asyncio.Event | None], tuple[APIResponse, tuple[int, int, int]]] @@ -117,6 +117,7 @@ class BaseClient(ABC): self, model_info: ModelInfo, audio_base64: str, + max_tokens: Optional[int] = None, extra_params: dict[str, Any] | None = None, ) -> APIResponse: """ diff --git a/src/llm_models/model_client/gemini_client.py b/src/llm_models/model_client/gemini_client.py index c23ba90a..0713a33a 100644 --- a/src/llm_models/model_client/gemini_client.py +++ b/src/llm_models/model_client/gemini_client.py @@ -182,7 +182,15 @@ def _process_delta( if delta.text: fc_delta_buffer.write(delta.text) - + + # 处理 thought(Gemini 的特殊字段) + for c in getattr(delta, "candidates", []): + if c.content and getattr(c.content, "parts", None): + for p in c.content.parts: + if getattr(p, "thought", False) and getattr(p, "text", None): + # 把 thought 写入 buffer,避免 resp.content 永远为空 + fc_delta_buffer.write(p.text) + if delta.function_calls: # 为什么不用hasattr呢,是因为这个属性一定有,即使是个空的 for call in delta.function_calls: try: @@ -204,6 +212,7 @@ def _process_delta( def _build_stream_api_resp( _fc_delta_buffer: io.StringIO, _tool_calls_buffer: list[tuple[str, str, dict]], + last_resp: GenerateContentResponse | None = None, # 传入 last_resp ) -> APIResponse: # sourcery skip: simplify-len-comparison, use-assigned-variable resp = APIResponse() @@ -228,6 +237,24 @@ def _build_stream_api_resp( resp.tool_calls.append(ToolCall(call_id, function_name, arguments)) + # 检查是否因为 max_tokens 截断 + reason = None + if last_resp and getattr(last_resp, "candidates", None): + c0 = last_resp.candidates[0] + reason = getattr(c0, "finish_reason", None) or getattr(c0, "finishReason", None) + + if str(reason).endswith("MAX_TOKENS"): + if resp.content and resp.content.strip(): + logger.warning( + "⚠ Gemini 响应因达到 max_tokens 限制被部分截断,\n" + " 可能会对回复内容造成影响,建议修改模型 max_tokens 配置!" + ) + else: + logger.warning( + "⚠ Gemini 响应因达到 max_tokens 限制被截断,\n" + " 请修改模型 max_tokens 配置!" + ) + if not resp.content and not resp.tool_calls: raise EmptyResponseException() @@ -246,12 +273,14 @@ async def _default_stream_response_handler( _fc_delta_buffer = io.StringIO() # 正式内容缓冲区,用于存储接收到的正式内容 _tool_calls_buffer: list[tuple[str, str, dict]] = [] # 工具调用缓冲区,用于存储接收到的工具调用 _usage_record = None # 使用情况记录 + last_resp: GenerateContentResponse | None = None # 保存最后一个 chunk def _insure_buffer_closed(): if _fc_delta_buffer and not _fc_delta_buffer.closed: _fc_delta_buffer.close() async for chunk in resp_stream: + last_resp = chunk # 保存最后一个响应 # 检查是否有中断量 if interrupt_flag and interrupt_flag.is_set(): # 如果中断量被设置,则抛出ReqAbortException @@ -270,10 +299,12 @@ async def _default_stream_response_handler( (chunk.usage_metadata.candidates_token_count or 0) + (chunk.usage_metadata.thoughts_token_count or 0), chunk.usage_metadata.total_token_count or 0, ) + try: return _build_stream_api_resp( _fc_delta_buffer, _tool_calls_buffer, + last_resp=last_resp, ), _usage_record except Exception: # 确保缓冲区被关闭 @@ -333,6 +364,38 @@ def _default_normal_response_parser( api_response.raw_data = resp + # 检查是否因为 max_tokens 截断 + try: + if resp.candidates: + c0 = resp.candidates[0] + reason = getattr(c0, "finish_reason", None) or getattr(c0, "finishReason", None) + if reason and "MAX_TOKENS" in str(reason): + # 检查第二个及之后的 parts 是否有内容 + has_real_output = False + if getattr(c0, "content", None) and getattr(c0.content, "parts", None): + for p in c0.content.parts[1:]: # 跳过第一个 thought + if getattr(p, "text", None) and p.text.strip(): + has_real_output = True + break + + if not has_real_output and getattr(resp, "text", None): + has_real_output = True + + if has_real_output: + logger.warning( + "⚠ Gemini 响应因达到 max_tokens 限制被部分截断,\n" + " 可能会对回复内容造成影响,建议修改模型 max_tokens 配置!" + ) + else: + logger.warning( + "⚠ Gemini 响应因达到 max_tokens 限制被截断,\n" + " 请修改模型 max_tokens 配置!" + ) + + return api_response, _usage_record + except Exception as e: + logger.debug(f"检查 MAX_TOKENS 截断时异常: {e}") + # 最终的、唯一的空响应检查 if not api_response.content and not api_response.tool_calls: raise EmptyResponseException("响应中既无文本内容也无工具调用") @@ -362,18 +425,29 @@ class GeminiClient(BaseClient): http_options_kwargs["api_version"] = parts[1] else: http_options_kwargs["base_url"] = api_provider.base_url + http_options_kwargs["api_version"] = None self.client = genai.Client( http_options=HttpOptions(**http_options_kwargs), api_key=api_provider.api_key, ) # 这里和openai不一样,gemini会自己决定自己是否需要retry @staticmethod - def clamp_thinking_budget(tb: int, model_id: str) -> int: + def clamp_thinking_budget(extra_params: dict[str, Any] | None, model_id: str) -> int: """ 按模型限制思考预算范围,仅支持指定的模型(支持带数字后缀的新版本) """ limits = None + # 参数传入处理 + tb = THINKING_BUDGET_AUTO + if extra_params and "thinking_budget" in extra_params: + try: + tb = int(extra_params["thinking_budget"]) + except (ValueError, TypeError): + logger.warning( + f"无效的 thinking_budget 值 {extra_params['thinking_budget']},将使用模型自动预算模式 {tb}" + ) + # 优先尝试精确匹配 if model_id in THINKING_BUDGET_LIMITS: limits = THINKING_BUDGET_LIMITS[model_id] @@ -416,8 +490,8 @@ class GeminiClient(BaseClient): model_info: ModelInfo, message_list: list[Message], tool_options: list[ToolOption] | None = None, - max_tokens: int = 1024, - temperature: float = 0.4, + max_tokens: Optional[int] = 1024, + temperature: Optional[float] = 0.4, response_format: RespFormat | None = None, stream_response_handler: Optional[ Callable[ @@ -456,19 +530,9 @@ class GeminiClient(BaseClient): messages = _convert_messages(message_list) # 将tool_options转换为Gemini API所需的格式 tools = _convert_tool_options(tool_options) if tool_options else None - - tb = THINKING_BUDGET_AUTO - # 空处理 - if extra_params and "thinking_budget" in extra_params: - try: - tb = int(extra_params["thinking_budget"]) - except (ValueError, TypeError): - logger.warning( - f"无效的 thinking_budget 值 {extra_params['thinking_budget']},将使用模型自动预算模式 {tb}" - ) - # 裁剪到模型支持的范围 - tb = self.clamp_thinking_budget(tb, model_info.model_identifier) - + # 解析并裁剪 thinking_budget + tb = self.clamp_thinking_budget(extra_params, model_info.model_identifier) + # 将response_format转换为Gemini API所需的格式 generation_config_dict = { "max_output_tokens": max_tokens, @@ -526,15 +590,20 @@ class GeminiClient(BaseClient): resp, usage_record = async_response_parser(req_task.result()) except (ClientError, ServerError) as e: - # 重封装ClientError和ServerError为RespNotOkException + # 重封装 ClientError 和 ServerError 为 RespNotOkException raise RespNotOkException(e.code, e.message) from None except ( UnknownFunctionCallArgumentError, UnsupportedFunctionError, FunctionInvocationError, ) as e: - raise ValueError(f"工具类型错误:请检查工具选项和参数:{str(e)}") from None + # 工具调用相关错误 + raise RespParseException(None, f"工具调用参数错误: {str(e)}") from None + except EmptyResponseException as e: + # 保持原始异常,便于区分“空响应”和网络异常 + raise e except Exception as e: + # 其他未预料的错误,才归为网络连接类 raise NetworkConnectionError() from e if usage_record: @@ -590,41 +659,51 @@ class GeminiClient(BaseClient): return response - def get_audio_transcriptions( - self, model_info: ModelInfo, audio_base64: str, extra_params: dict[str, Any] | None = None + async def get_audio_transcriptions( + self, + model_info: ModelInfo, + audio_base64: str, + max_tokens: Optional[int] = 2048, + extra_params: dict[str, Any] | None = None, ) -> APIResponse: """ 获取音频转录 :param model_info: 模型信息 :param audio_base64: 音频文件的Base64编码字符串 + :param max_tokens: 最大输出token数(默认2048) :param extra_params: 额外参数(可选) :return: 转录响应 """ + # 解析并裁剪 thinking_budget + tb = self.clamp_thinking_budget(extra_params, model_info.model_identifier) + + # 构造 prompt + 音频输入 + prompt = "Generate a transcript of the speech. The language of the transcript should **match the language of the speech**." + contents = [ + Content( + role="user", + parts=[ + Part.from_text(text=prompt), + Part.from_bytes(data=base64.b64decode(audio_base64), mime_type="audio/wav"), + ], + ) + ] + generation_config_dict = { - "max_output_tokens": 2048, + "max_output_tokens": max_tokens, "response_modalities": ["TEXT"], "thinking_config": ThinkingConfig( include_thoughts=True, - thinking_budget=( - extra_params["thinking_budget"] if extra_params and "thinking_budget" in extra_params else 1024 - ), + thinking_budget=tb, ), "safety_settings": gemini_safe_settings, } generate_content_config = GenerateContentConfig(**generation_config_dict) - prompt = "Generate a transcript of the speech. The language of the transcript should **match the language of the speech**." + try: - raw_response: GenerateContentResponse = self.client.models.generate_content( + raw_response: GenerateContentResponse = await self.client.aio.models.generate_content( model=model_info.model_identifier, - contents=[ - Content( - role="user", - parts=[ - Part.from_text(text=prompt), - Part.from_bytes(data=base64.b64decode(audio_base64), mime_type="audio/wav"), - ], - ) - ], + contents=contents, config=generate_content_config, ) resp, usage_record = _default_normal_response_parser(raw_response) diff --git a/src/llm_models/model_client/openai_client.py b/src/llm_models/model_client/openai_client.py index ffee2ad7..92632222 100644 --- a/src/llm_models/model_client/openai_client.py +++ b/src/llm_models/model_client/openai_client.py @@ -403,8 +403,8 @@ class OpenaiClient(BaseClient): model_info: ModelInfo, message_list: list[Message], tool_options: list[ToolOption] | None = None, - max_tokens: int = 1024, - temperature: float = 0.7, + max_tokens: Optional[int] = 1024, + temperature: Optional[float] = 0.7, response_format: RespFormat | None = None, stream_response_handler: Optional[ Callable[