Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 115 additions & 3 deletions apisix/plugins/ai-providers/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ local log_sanitize = require("apisix.utils.log-sanitize")
local protocols = require("apisix.plugins.ai-protocols")
local ngx = ngx
local ngx_now = ngx.now
local tonumber = tonumber

local table = table
local pairs = pairs
Expand Down Expand Up @@ -196,11 +197,70 @@ end
-- using the client protocol module.
-- @param client_proto table The protocol module for the client's protocol
-- @param converter table|nil The converter module (if protocol conversion needed)
-- @param conf table|nil Plugin configuration (used for response size limits)
-- @return table|nil Parsed and optionally converted response body
-- @return string|nil Error
function _M.parse_response(self, ctx, res, client_proto, converter)
function _M.parse_response(self, ctx, res, client_proto, converter, conf)
local headers = res.headers
local raw_res_body, err = res:read_body()

-- Pre-check Content-Length against max_response_bytes when the upstream
-- advertises it. For responses without Content-Length (chunked), we read
-- the body in bounded chunks below and enforce the cap incrementally.
local max_bytes = conf and conf.max_response_bytes
if max_bytes then
local content_length = tonumber(headers["Content-Length"])
if content_length and content_length > max_bytes then
core.log.warn("aborting AI response: Content-Length ", content_length,
" exceeds max_response_bytes ", max_bytes)
if res._httpc then
res._httpc:close()
res._httpc = nil
end
return nil, "max_response_bytes exceeded", 502
end
end

local raw_res_body, err
if max_bytes then
-- Read in chunks so a runaway chunked upstream cannot force the
-- worker to buffer arbitrarily many bytes before the cap trips.
local body_reader = res.body_reader
if not body_reader then
-- Defensive: if no reader, fall back to read_body() and accept
-- that the cap is only post-facto for this path.
raw_res_body, err = res:read_body()
else
local parts = {}
local total = 0
while true do
local chunk, read_err = body_reader()
if read_err then
err = read_err
break
end
if not chunk then
break
end
total = total + #chunk
if total > max_bytes then
core.log.warn("aborting AI response: body size exceeds",
" max_response_bytes ", max_bytes,
" (read ", total, " bytes)")
if res._httpc then
res._httpc:close()
res._httpc = nil
end
return nil, "max_response_bytes exceeded", 502
end
parts[#parts + 1] = chunk
end
if not err then
raw_res_body = table.concat(parts)
end
end
else
raw_res_body, err = res:read_body()
end
if not raw_res_body then
core.log.warn("failed to read response body: ", err)
return nil, err
Expand Down Expand Up @@ -261,7 +321,8 @@ end
-- transforming events to client format.
-- @param target_proto table The protocol module for the provider's native protocol
-- @param converter table|nil The converter module (if protocol conversion needed)
function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
-- @param conf table|nil Plugin configuration (used for stream duration and size limits)
function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf)
local body_reader = res.body_reader
local contents = {}
local sse_state = { is_first = true }
Expand All @@ -272,6 +333,15 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
-- uncommitted and causing nginx to fall through to the balancer phase.
local output_sent = false

-- Runaway-upstream safeguards. Both are opt-in; unset means no cap.
local max_duration_ms = conf and conf.max_stream_duration_ms
local max_bytes = conf and conf.max_response_bytes
local deadline
if max_duration_ms then
deadline = ctx.llm_request_start_time + max_duration_ms / 1000
end
local bytes_read = 0

while true do
local chunk, err = body_reader()
ctx.var.apisix_upstream_response_time = math.floor((ngx_now() -
Expand All @@ -296,6 +366,8 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
return
end

bytes_read = bytes_read + #chunk

if ctx.var.llm_time_to_first_token == "0" then
ctx.var.llm_time_to_first_token = math.floor(
(ngx_now() - ctx.llm_request_start_time) * 1000)
Expand Down Expand Up @@ -360,6 +432,46 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
end
else
plugin.lua_response_filter(ctx, res.headers, chunk)
output_sent = true
end

-- Enforce runaway-upstream safeguards after processing the chunk.
-- Checked post-flush so clients still see any bytes we already emitted.
local limit_hit
if deadline and ngx_now() >= deadline then
limit_hit = "max_stream_duration_ms"
elseif max_bytes and bytes_read > max_bytes then
limit_hit = "max_response_bytes"
end
if limit_hit then
local duration_ms = math.floor((ngx_now() -
ctx.llm_request_start_time) * 1000)
core.log.warn("aborting AI stream: ", limit_hit, " exceeded;",
" bytes=", bytes_read,
" duration_ms=", duration_ms,
" route_id=", ctx.var.route_id or "")
-- Force-close upstream so we don't pool a half-drained connection.
if res._httpc then
res._httpc:close()
res._httpc = nil
end
-- Signal downstream filters (e.g. moderation plugins that defer
-- work until request completion) that no more content is coming.
ctx.var.llm_request_done = true
if output_sent then
-- Client has already received partial SSE; stop feeding chunks.
-- nginx will close the downstream connection at end of content
-- phase. Clients detect incomplete responses via the absence
-- of a protocol-specific terminator (e.g. OpenAI [DONE],
-- Anthropic message_stop, Responses response.completed).
return
end
-- No bytes flushed yet (e.g. converter skipped all events so far).
-- Surface as 504 for duration (timeout-like) or 502 for size-limit
-- (bad gateway response), so on_error / fallback policies can
-- distinguish the failure modes.
local status = limit_hit == "max_stream_duration_ms" and 504 or 502
return status, limit_hit .. " exceeded"
end
end
end
Expand Down
10 changes: 5 additions & 5 deletions apisix/plugins/ai-proxy/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ function _M.before_proxy(conf, ctx, on_error)
core.log.error("no protocol module for streaming target: ", target_proto)
return 500
end
code = ai_provider:parse_streaming_response(
ctx, res, target_proto_module, converter)
code, body = ai_provider:parse_streaming_response(
ctx, res, target_proto_module, converter, conf)
else
local _, parse_err = ai_provider:parse_response(
ctx, res, client_proto, converter)
local _, parse_err, parse_status = ai_provider:parse_response(
ctx, res, client_proto, converter, conf)
if parse_err then
code = 500
code = parse_status or 500
body = parse_err
end
end
Expand Down
32 changes: 32 additions & 0 deletions apisix/plugins/ai-proxy/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,22 @@ _M.ai_proxy_schema = {
default = 30000,
description = "timeout in milliseconds",
},
max_stream_duration_ms = {
type = "integer",
minimum = 1,
description = "Maximum wall-clock duration (in milliseconds) for a "
.. "streaming AI response. If the upstream keeps sending "
.. "data past this deadline, the connection is closed. "
.. "Unset means no cap. Use this to protect the gateway "
.. "from upstream bugs that produce tokens indefinitely.",
},
max_response_bytes = {
type = "integer",
minimum = 1,
description = "Maximum total bytes read from the upstream for a "
.. "single AI response (streaming or non-streaming). If "
.. "exceeded, the connection is closed. Unset means no cap.",
},
keepalive = {type = "boolean", default = true},
keepalive_timeout = {
type = "integer",
Expand Down Expand Up @@ -258,6 +274,22 @@ _M.ai_proxy_multi_schema = {
default = 30000,
description = "timeout in milliseconds",
},
max_stream_duration_ms = {
type = "integer",
minimum = 1,
description = "Maximum wall-clock duration (in milliseconds) for a "
.. "streaming AI response. If the upstream keeps sending "
.. "data past this deadline, the connection is closed. "
.. "Unset means no cap. Use this to protect the gateway "
.. "from upstream bugs that produce tokens indefinitely.",
},
max_response_bytes = {
type = "integer",
minimum = 1,
description = "Maximum total bytes read from the upstream for a "
.. "single AI response (streaming or non-streaming). If "
.. "exceeded, the connection is closed. Unset means no cap.",
},
keepalive = {type = "boolean", default = true},
keepalive_timeout = {
type = "integer",
Expand Down
4 changes: 3 additions & 1 deletion docs/en/latest/plugins/ai-proxy-multi.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ In addition, the Plugin also supports logging LLM request information in the acc
| instances.checks.active.unhealthy.http_statuses | array[integer] | False | [429,404,500,501,502,503,504,505] | status code between 200 and 599 inclusive | An array of HTTP status codes that defines an unhealthy node. |
| instances.checks.active.unhealthy.http_failures | integer | False | 5 | between 1 and 254 inclusive | Number of HTTP failures to define an unhealthy node. |
| instances.checks.active.unhealthy.timeout | integer | False | 3 | between 1 and 254 inclusive | Number of probe timeouts to define an unhealthy node. |
| timeout | integer | False | 30000 | greater than or equal to 1 | Request timeout in milliseconds when requesting the LLM service. |
| timeout | integer | False | 30000 | greater than or equal to 1 | Request timeout in milliseconds when requesting the LLM service. Applied per socket operation (connect / send / read block); does not cap the total duration of a streaming response. |
| max_stream_duration_ms | integer | False | | greater than or equal to 1 | Maximum wall-clock duration (in milliseconds) for a streaming AI response. If the upstream keeps sending data past this deadline, the gateway closes the connection. Unset means no cap. Use this to protect the gateway from upstream bugs that produce tokens indefinitely. When the limit is hit mid-stream, the downstream SSE stream is truncated (no protocol-specific terminator such as `[DONE]`, `message_stop`, or `response.completed`); well-behaved clients should treat a missing terminator as an incomplete response. |
| max_response_bytes | integer | False | | greater than or equal to 1 | Maximum total bytes read from the upstream for a single AI response (streaming or non-streaming). If exceeded, the gateway closes the connection. For non-streaming responses with `Content-Length`, the check is performed before reading the body; for chunked (no-`Content-Length`) non-streaming responses and for streaming responses, the cap is enforced incrementally as bytes are received. Unset means no cap. |
| keepalive | boolean | False | true | | If true, keep the connection alive when requesting the LLM service. |
| keepalive_timeout | integer | False | 60000 | greater than or equal to 1000 | Request timeout in milliseconds when requesting the LLM service. |
| keepalive_pool | integer | False | 30 | | Keepalive pool size for when connecting with the LLM service. |
Expand Down
4 changes: 3 additions & 1 deletion docs/en/latest/plugins/ai-proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ In addition, the Plugin also supports logging LLM request information in the acc
| logging | object | False | | | Logging configurations. Does not affect `error.log`. |
| logging.summaries | boolean | False | false | | If true, logs request LLM model, duration, request, and response tokens. |
| logging.payloads | boolean | False | false | | If true, logs request and response payload. |
| timeout | integer | False | 30000 | ≥ 1 | Request timeout in milliseconds when requesting the LLM service. |
| timeout | integer | False | 30000 | ≥ 1 | Request timeout in milliseconds when requesting the LLM service. Applied per socket operation (connect / send / read block); does not cap the total duration of a streaming response. |
| max_stream_duration_ms | integer | False | | ≥ 1 | Maximum wall-clock duration (in milliseconds) for a streaming AI response. If the upstream keeps sending data past this deadline, the gateway closes the connection. Unset means no cap. Use this to protect the gateway from upstream bugs that produce tokens indefinitely. When the limit is hit mid-stream, the downstream SSE stream is truncated (no protocol-specific terminator such as `[DONE]`, `message_stop`, or `response.completed`); well-behaved clients should treat a missing terminator as an incomplete response. |
| max_response_bytes | integer | False | | ≥ 1 | Maximum total bytes read from the upstream for a single AI response (streaming or non-streaming). If exceeded, the gateway closes the connection. For non-streaming responses with `Content-Length`, the check is performed before reading the body; for chunked (no-`Content-Length`) non-streaming responses and for streaming responses, the cap is enforced incrementally as bytes are received. Unset means no cap. |
| keepalive | boolean | False | true | | If true, keeps the connection alive when requesting the LLM service. |
| keepalive_timeout | integer | False | 60000 | ≥ 1000 | Keepalive timeout in milliseconds when connecting to the LLM service. |
| keepalive_pool | integer | False | 30 | | Keepalive pool size for the LLM service connection. |
Expand Down
4 changes: 3 additions & 1 deletion docs/zh/latest/plugins/ai-proxy-multi.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ import TabItem from '@theme/TabItem';
| instances.checks.active.unhealthy.http_statuses | array[integer] | 否 | [429,404,500,501,502,503,504,505] | 200 到 599 之间的状态码(包含) | 定义不健康节点的 HTTP 状态码数组。 |
| instances.checks.active.unhealthy.http_failures | integer | 否 | 5 | 1 到 254(包含) | 定义不健康节点的 HTTP 失败次数。 |
| instances.checks.active.unhealthy.timeout | integer | 否 | 3 | 1 到 254(包含) | 定义不健康节点的探测超时次数。 |
| timeout | integer | 否 | 30000 | 大于或等于 1 | 请求 LLM 服务时的请求超时时间(毫秒)。 |
| timeout | integer | 否 | 30000 | 大于或等于 1 | 请求 LLM 服务时的请求超时时间(毫秒)。应用于单次 socket 操作(连接 / 发送 / 读取块),不限制流式响应的总时长。 |
| max_stream_duration_ms | integer | 否 | | 大于或等于 1 | 流式 AI 响应的总墙钟时长上限(毫秒)。若上游在此时间后仍持续发送数据,网关将关闭连接。未设置时不限制。用于防护上游持续输出 token 导致网关 CPU 被打满的异常情况。中途触发上限时,下游 SSE 流会被截断(不再发送协议特定的终止标记,例如 `[DONE]`、`message_stop` 或 `response.completed`),客户端应将缺失的终止标记视为响应未完成。 |
| max_response_bytes | integer | 否 | | 大于或等于 1 | 单次 AI 响应(流式或非流式)允许从上游读取的最大总字节数。超出时关闭连接。非流式响应若存在 `Content-Length`,在读取 body 之前预检;否则(chunked 传输)与流式响应一样在接收字节的过程中增量检查。未设置时不限制。 |
| keepalive | boolean | 否 | true | | 如果为 true,在请求 LLM 服务时保持连接活跃。 |
| keepalive_timeout | integer | 否 | 60000 | 大于或等于 1000 | 请求 LLM 服务时的请求超时时间(毫秒)。 |
| keepalive_pool | integer | 否 | 30 | | 连接 LLM 服务时的保活池大小。 |
Expand Down
Loading
Loading