Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions apisix/plugins/ai-providers/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf)
res._httpc:close()
res._httpc = nil
end
res._upstream_bytes = total
return nil, "max_response_bytes exceeded", 502
end
parts[#parts + 1] = chunk
Expand All @@ -309,6 +310,7 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf)
core.log.warn("failed to read response body: ", err)
return nil, err
end
res._upstream_bytes = #raw_res_body
ngx.status = res.status
Comment thread
nic-6443 marked this conversation as resolved.
ctx.var.llm_time_to_first_token = math.floor((ngx_now() - ctx.llm_request_start_time) * 1000)
ctx.var.apisix_upstream_response_time = ctx.var.llm_time_to_first_token
Expand Down Expand Up @@ -377,31 +379,33 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co
-- uncommitted and causing nginx to fall through to the balancer phase.
local output_sent = false

-- Runaway-upstream safeguards. Both are opt-in; unset means no cap.
local max_duration_ms = conf and conf.max_stream_duration_ms
local max_bytes = conf and conf.max_response_bytes
local deadline
if max_duration_ms then
deadline = ctx.llm_request_start_time + max_duration_ms / 1000
end
local bytes_read = 0

local function abort_on_disconnect(flush_err)
core.log.info("client disconnected during AI streaming, ",
"aborting upstream read: ", flush_err)
if res._httpc then
res._httpc:close()
res._httpc = nil
end
res._upstream_bytes = bytes_read
ctx.var.llm_request_done = true
end

-- Runaway-upstream safeguards. Both are opt-in; unset means no cap.
local max_duration_ms = conf and conf.max_stream_duration_ms
local max_bytes = conf and conf.max_response_bytes
local deadline
if max_duration_ms then
deadline = ctx.llm_request_start_time + max_duration_ms / 1000
end
local bytes_read = 0

while true do
local chunk, err = body_reader()
ctx.var.apisix_upstream_response_time = math.floor((ngx_now() -
ctx.llm_request_start_time) * 1000)
if err then
core.log.warn("failed to read response chunk: ", err)
res._upstream_bytes = bytes_read
return transport_http.handle_error(err)
end
if not chunk then
Expand All @@ -410,6 +414,7 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co
#sse_buf)
end

res._upstream_bytes = bytes_read
if converter and not output_sent then
local msg = "streaming response completed without producing "
.. "any output; the upstream likely returned a "
Expand Down Expand Up @@ -521,6 +526,7 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co
-- Signal downstream filters (e.g. moderation plugins that defer
-- work until request completion) that no more content is coming.
ctx.var.llm_request_done = true
res._upstream_bytes = bytes_read
if output_sent then
-- Client has already received partial SSE; stop feeding chunks.
-- nginx will close the downstream connection at end of content
Expand Down
1 change: 1 addition & 0 deletions apisix/plugins/ai-proxy/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ function _M.before_proxy(conf, ctx, on_error)
if res._t0 then
apisix_upstream.update_upstream_state({
response_time = (ngx_now() - res._t0) * 1000,
response_length = res._upstream_bytes or 0,
})
end

Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/plugins/ai-proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -2062,6 +2062,7 @@ In addition, the following standard nginx upstream variables are automatically p
* `upstream_response_time`: Total time spent receiving the response from the upstream LLM service, in seconds (e.g., `2.858`).
* `upstream_connect_time`: Time spent establishing the connection to the upstream LLM service, in seconds.
* `upstream_header_time`: Time spent receiving the response headers from the upstream LLM service, in seconds.
* `upstream_response_length`: Total number of bytes received from the upstream LLM service response body (e.g., `1024`).
* `upstream_host`: Hostname of the upstream LLM service as configured in the endpoint (e.g., `api.openai.com`).
* `upstream_scheme`: Scheme used to connect to the upstream LLM service (e.g., `https`).
* `upstream_uri`: Request URI path sent to the upstream LLM service (e.g., `/v1/chat/completions`).
Expand Down
72 changes: 72 additions & 0 deletions t/plugin/ai-proxy-upstream-vars.t
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,75 @@ X-AI-Fixture: openai/chat-basic.json
--- error_code: 200
--- access_log eval
qr{http://127\.0\.0\.1/v1/chat/completions}



=== TEST 5: set route with serverless plugin to log upstream_response_length
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"uri": "/anything",
"plugins": {
"ai-proxy": {
"provider": "openai",
"auth": {
"header": {
"Authorization": "Bearer test-key"
}
},
"options": {
"model": "gpt-4"
},
"override": {
"endpoint": "http://127.0.0.1:1980"
},
"ssl_verify": false
},
"serverless-post-function": {
"phase": "log",
"functions": ["return function(_, ctx) ngx.log(ngx.WARN, 'upstream_response_length: ', ngx.var.upstream_response_length) end"]
}
}
}]]
)

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed



=== TEST 6: non-streaming request has non-zero upstream_response_length
--- request
POST /anything
{"model":"gpt-4","messages":[{"role":"user","content":"hello"}]}
--- more_headers
X-AI-Fixture: openai/chat-basic.json
--- error_code: 200
--- error_log eval
qr/upstream_response_length: [1-9]\d*/
--- no_error_log
upstream_response_length: 0



=== TEST 7: streaming request has non-zero upstream_response_length
--- request
POST /anything
{"model":"gpt-4","messages":[{"role":"user","content":"hello"}],"stream":true}
--- more_headers
X-AI-Fixture: openai/chat-streaming.sse
--- error_code: 200
--- error_log eval
qr/upstream_response_length: [1-9]\d*/
--- no_error_log
upstream_response_length: 0
Loading