Skip to content
Merged
360 changes: 248 additions & 112 deletions src/bin/ui.rs

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,21 @@ pub struct Config {
/// Setup walkthrough at `assets/exit_node/README.md`. Default off.
#[serde(default)]
pub exit_node: ExitNodeConfig,

/// Daily request quota per account bucket. Each configured script_id is
/// treated as one separate account. Default 20_000 matches the free-tier
/// Apps Script UrlFetchApp limit. Set to 100_000 for Workspace accounts.
#[serde(default = "default_quota_daily_limit")]
pub quota_daily_limit: u64,

/// Per-account safety buffer. An account is considered effectively
/// exhausted when its remaining requests for the current 24-hour window
/// drop below this value. The reserve intentionally keeps calls away from
/// Google's hard quota edge to avoid triggering anti-abuse heuristics.
/// Aggregate hard-stop reserve = account_count × quota_safety_buffer.
/// Default 500.
#[serde(default = "default_quota_safety_buffer")]
pub quota_safety_buffer: u64,
}

/// Configuration for the optional second-hop exit node.
Expand Down Expand Up @@ -526,6 +541,8 @@ fn default_block_doh() -> bool { true }
fn default_auto_blacklist_strikes() -> u32 { 3 }
fn default_auto_blacklist_window_secs() -> u64 { 30 }
fn default_auto_blacklist_cooldown_secs() -> u64 { 120 }
fn default_quota_daily_limit() -> u64 { 20_000 }
fn default_quota_safety_buffer() -> u64 { 500 }

/// Default for `request_timeout_secs`: 30s, matching the historical
/// hard-coded `BATCH_TIMEOUT` and Apps Script's typical response cliff.
Expand Down Expand Up @@ -920,6 +937,8 @@ impl From<TomlConfig> for Config {
auto_blacklist_cooldown_secs: t.relay.auto_blacklist_cooldown_secs,
request_timeout_secs: t.relay.request_timeout_secs,
exit_node: t.exit_node,
quota_daily_limit: default_quota_daily_limit(),
quota_safety_buffer: default_quota_safety_buffer(),
}
}
}
Expand Down
198 changes: 188 additions & 10 deletions src/domain_fronter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use rustls::{ClientConfig, DigitallySignedStruct, SignatureScheme};

use crate::cache::{cache_key, is_cacheable_method, parse_ttl, ResponseCache};
use crate::config::Config;
use crate::quota_tracker::{QuotaSummary, QuotaTracker};

#[derive(Debug, thiserror::Error)]
pub enum FronterError {
Expand Down Expand Up @@ -420,6 +421,10 @@ pub struct DomainFronter {
auto_blacklist_strikes: u32,
auto_blacklist_window: Duration,
auto_blacklist_cooldown: Duration,
/// Per-account quota tracker. One bucket per configured script_id,
/// each treated as a separate Google account per the model assumption.
/// Persists to quota_state.json so quota state survives restarts.
quota_tracker: Arc<QuotaTracker>,
/// Per-batch HTTP timeout. Mirrors `Config::request_timeout_secs`
/// (#430, masterking32 PR #25). Read by `tunnel_client::fire_batch`
/// so a single config field tunes the timeout used everywhere.
Expand Down Expand Up @@ -594,6 +599,13 @@ impl DomainFronter {
tls_h1.alpn_protocols = vec![b"http/1.1".to_vec()];
let tls_connector_h1 = TlsConnector::from(Arc::new(tls_h1));

// Build quota tracker before script_ids is moved into the struct.
let quota_tracker_arc = Arc::new(QuotaTracker::load(
&script_ids,
config.quota_daily_limit,
config.quota_safety_buffer,
));

Ok(Self {
connect_host: config.google_ip.clone(),
sni_hosts: build_sni_pool_for(
Expand Down Expand Up @@ -639,6 +651,7 @@ impl DomainFronter {
auto_blacklist_cooldown: Duration::from_secs(
config.auto_blacklist_cooldown_secs.clamp(1, 86400),
),
quota_tracker: quota_tracker_arc,
batch_timeout: Duration::from_secs(
config.request_timeout_secs.clamp(5, 300),
),
Expand Down Expand Up @@ -761,7 +774,9 @@ impl DomainFronter {
}
guard.clone()
};
let quota = self.quota_tracker.summary();
StatsSnapshot {
total_relay_calls: quota.total_relay_calls,
relay_calls: self.relay_calls.load(Ordering::Relaxed),
relay_failures: self.relay_failures.load(Ordering::Relaxed),
coalesced: self.coalesced.load(Ordering::Relaxed),
Expand All @@ -778,9 +793,15 @@ impl DomainFronter {
h2_calls: self.h2_calls.load(Ordering::Relaxed),
h2_fallbacks: self.h2_fallbacks.load(Ordering::Relaxed),
h2_disabled: self.h2_disabled.load(Ordering::Relaxed),
quota,
}
}

/// Access the quota tracker for periodic saves and startup logging.
pub fn quota_tracker(&self) -> &Arc<QuotaTracker> {
&self.quota_tracker
}

pub fn num_scripts(&self) -> usize {
self.script_ids.len()
}
Expand All @@ -806,11 +827,25 @@ impl DomainFronter {
for _ in 0..n {
let idx = self.script_idx.fetch_add(1, Ordering::Relaxed);
let sid = &self.script_ids[idx % n];
if !bl.contains_key(sid) {
if !bl.contains_key(sid) && !self.quota_tracker.is_hard_stopped(sid) {
return sid.clone();
}
}
// All blacklisted: pick whichever comes off cooldown soonest.
// Fallback: prefer a blacklisted-but-not-quota-exhausted account
// over a fully quota-exhausted one (blacklist is transient, quota
// exhaustion is per-window).
let not_exhausted: Vec<_> = bl
.iter()
.filter(|(sid, _)| !self.quota_tracker.is_hard_stopped(sid))
.collect();
if let Some((sid, _)) = not_exhausted.iter().min_by_key(|(_, t)| **t) {
let sid = sid.to_string();
bl.remove(&sid);
return sid;
}
// All accounts are either quota-exhausted or blacklisted. The global
// hard-stop check in do_relay_with_retry will handle the quota case.
// Fall back to soonest-off-blacklist cooldown as a last resort.
if let Some((sid, _)) = bl.iter().min_by_key(|(_, t)| **t) {
let sid = sid.clone();
bl.remove(&sid);
Expand Down Expand Up @@ -839,7 +874,10 @@ impl DomainFronter {
}
let idx = self.script_idx.fetch_add(1, Ordering::Relaxed);
let sid = &self.script_ids[idx % n];
if !bl.contains_key(sid) && !picked.iter().any(|p| p == sid) {
if !bl.contains_key(sid)
&& !self.quota_tracker.is_hard_stopped(sid)
&& !picked.iter().any(|p| p == sid)
{
picked.push(sid.clone());
}
}
Expand Down Expand Up @@ -1748,6 +1786,23 @@ impl DomainFronter {
headers: &[(String, String)],
body: &[u8],
) -> Vec<u8> {
self.quota_tracker.record_relay();

// Block ALL relay paths (exit node + Apps Script) when every account
// bucket is quota-exhausted. Checked here so the exit node short-circuit
// below can't bypass the global hard stop.
if self.quota_tracker.is_globally_hard_stopped() {
self.relay_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(
"[quota] global hard stop active — all Apps Script account buckets exhausted"
);
return error_response(
503,
"All Apps Script accounts quota exhausted; hard stop active. \
Quota resets on a rolling 24-hour window per account.",
);
}

// Optional URL rewrite for X/Twitter GraphQL (issue #16). Applied
// here, at the top of relay(), so it affects BOTH the cache key
// (so matching requests collapse into one entry) AND the URL that
Expand Down Expand Up @@ -1782,6 +1837,10 @@ impl DomainFronter {
bytes.len() as u64,
t0.elapsed().as_nanos() as u64,
);
self.bytes_relayed.fetch_add(
(body.len() + bytes.len()) as u64,
Ordering::Relaxed,
);
return bytes;
}
Err(e) if !e.is_retryable() => {
Expand Down Expand Up @@ -2349,6 +2408,21 @@ impl DomainFronter {
headers: &[(String, String)],
body: &[u8],
) -> Result<Vec<u8>, FronterError> {
// Refuse immediately if every configured account bucket is exhausted.
// Conservative: only triggers when all buckets are hard-stopped OR the
// aggregate remaining quota has crossed the collective safety threshold
// with confirmed quota error evidence (not random network failures).
if self.quota_tracker.is_globally_hard_stopped() {
tracing::error!(
"[quota] global hard stop active — all Apps Script account buckets exhausted"
);
return Err(FronterError::Relay(
"All Apps Script accounts quota exhausted; hard stop active. \
Quota resets on a rolling 24-hour window per account."
.into(),
));
}

// Fan-out path: fire N instances in parallel, return first Ok, cancel
// the rest. Clamps to number of available script IDs so the single-ID
// case is a no-op even if parallel_relay>1 was configured.
Expand Down Expand Up @@ -2445,6 +2519,14 @@ impl DomainFronter {
self.do_relay_once_with(script_id, method, url, headers, body).await
}

/// Quota-recording wrapper around `do_relay_once_inner`. Counts every
/// Apps Script fetch attempt (including retries) against the per-account
/// bucket, records byte metrics on success, and marks an account as
/// hard-stopped when the response carries a confirmed quota error message.
///
/// Local transport failures (Io, Tls, Timeout) are recorded as failed
/// attempts but do NOT trigger exhaustion — only quota-like Relay errors
/// qualify, keeping transient network issues from false-stopping accounts.
async fn do_relay_once_with(
&self,
script_id: String,
Expand All @@ -2453,12 +2535,68 @@ impl DomainFronter {
headers: &[(String, String)],
body: &[u8],
) -> Result<Vec<u8>, FronterError> {
// Build once, wrap in Bytes (zero-copy move). h2 takes a clone
// (Arc bump, not memcpy); h1 fallback uses the same Bytes via
// Deref<&[u8]>. Saves a full payload allocation+copy per call
// — meaningful on range-parallel fan-out where N copies fire
// in parallel for one user-facing GET.
// Defense-in-depth: if next_script_id's last-resort fallback handed us
// a hard-stopped account (all exhausted, none in the blacklist), refuse
// here before building the payload or touching the network.
if self.quota_tracker.is_hard_stopped(&script_id) {
return Err(FronterError::Relay(format!(
"account {} is quota-hard-stopped; skipping dispatch",
mask_script_id(&script_id),
)));
}

let payload: Bytes = Bytes::from(self.build_payload_json(method, url, headers, body)?);
let bytes_up = payload.len() as u64;

// Count ALL attempts, including retries. Each call here maps to one
// real UrlFetchApp.fetch() on Google's side — that's the unit Google
// bills against the daily quota.
self.quota_tracker.record_attempt(&script_id, bytes_up);

let result = self
.do_relay_once_inner(script_id.clone(), method, url, payload)
.await;

match &result {
Ok(bytes) => {
self.quota_tracker
.record_success(&script_id, bytes.len() as u64);
}
Err(e) => {
let is_quota = is_quota_like_fronter_error(e);
self.quota_tracker.record_failure(&script_id, is_quota);
if is_quota {
self.quota_tracker
.mark_exhausted(&script_id, &e.to_string());
tracing::warn!(
"[quota] account {} exhausted: {}",
mask_script_id(&script_id),
e
);
}
}
}

tracing::debug!(
"[quota] {} dispatch result: {}",
mask_script_id(&script_id),
match &result {
Ok(_) => "Ok".to_string(),
Err(e) => format!("Err({})", e),
},
);

result
}

async fn do_relay_once_inner(
&self,
script_id: String,
method: &str,
url: &str,
payload: Bytes,
) -> Result<Vec<u8>, FronterError> {
// payload already built by the caller; path derived from script_id.
let path = format!("/macros/s/{}/exec", script_id);

// h2 fast path: one shared TCP/TLS connection multiplexes all
Expand Down Expand Up @@ -4792,6 +4930,9 @@ fn decode_js_string_escapes(s: &str) -> Option<String> {

#[derive(Debug, Clone)]
pub struct StatsSnapshot {
/// Total relay() calls today (exit node + Apps Script). Sourced from the
/// persisted quota tracker so this survives proxy restarts.
pub total_relay_calls: u64,
pub relay_calls: u64,
pub relay_failures: u64,
pub coalesced: u64,
Expand Down Expand Up @@ -4831,6 +4972,9 @@ pub struct StatsSnapshot {
/// switch set, or peer refused h2 during ALPN). All traffic on the
/// h1 path.
pub h2_disabled: bool,
/// Quota state snapshot. Only meaningful in AppsScript/Full modes where
/// a DomainFronter is active; defaults to zero values in Direct mode.
pub quota: QuotaSummary,
}

impl StatsSnapshot {
Expand Down Expand Up @@ -4863,8 +5007,22 @@ impl StatsSnapshot {
)
}
};
let q = &self.quota;
let quota_seg = if q.account_count > 0 && (q.exhausted_count > 0 || q.global_hard_stop) {
format!(
" quota={}/{} remaining={} exhausted={}/{}{}",
q.requests_used_total,
q.daily_capacity_total,
q.requests_remaining_total,
q.exhausted_count,
q.account_count,
if q.global_hard_stop { " HARD-STOP" } else { "" },
)
} else {
String::new()
};
format!(
"stats: relay={} ({}KB) failures={} coalesced={} cache={}/{} ({:.0}% hit, {}KB) scripts={}/{} active{}",
"stats: relay={} ({}KB) failures={} coalesced={} cache={}/{} ({:.0}% hit, {}KB) scripts={}/{} active{}{}",
self.relay_calls,
self.bytes_relayed / 1024,
self.relay_failures,
Expand All @@ -4876,6 +5034,7 @@ impl StatsSnapshot {
self.total_scripts - self.blacklisted_scripts,
self.total_scripts,
h2_seg,
quota_seg,
)
}

Expand All @@ -4887,8 +5046,9 @@ impl StatsSnapshot {
fn esc(s: &str) -> String {
s.replace('\\', "\\\\").replace('"', "\\\"")
}
let q = &self.quota;
format!(
r#"{{"relay_calls":{},"relay_failures":{},"coalesced":{},"bytes_relayed":{},"cache_hits":{},"cache_misses":{},"cache_bytes":{},"blacklisted_scripts":{},"total_scripts":{},"today_calls":{},"today_bytes":{},"today_key":"{}","today_reset_secs":{},"h2_calls":{},"h2_fallbacks":{},"h2_disabled":{}}}"#,
r#"{{"relay_calls":{},"relay_failures":{},"coalesced":{},"bytes_relayed":{},"cache_hits":{},"cache_misses":{},"cache_bytes":{},"blacklisted_scripts":{},"total_scripts":{},"today_calls":{},"today_bytes":{},"today_key":"{}","today_reset_secs":{},"h2_calls":{},"h2_fallbacks":{},"h2_disabled":{},"quota_account_count":{},"quota_capacity":{},"quota_used":{},"quota_remaining":{},"quota_exhausted":{},"quota_hard_stop":{}}}"#,
self.relay_calls,
self.relay_failures,
self.coalesced,
Expand All @@ -4905,6 +5065,12 @@ impl StatsSnapshot {
self.h2_calls,
self.h2_fallbacks,
self.h2_disabled,
q.account_count,
q.daily_capacity_total,
q.requests_used_total,
q.requests_remaining_total,
q.exhausted_count,
q.global_hard_stop,
)
}
}
Expand All @@ -4916,6 +5082,18 @@ fn should_blacklist(status: u16, body: &str) -> bool {
looks_like_quota_error(body)
}

/// True only when the error is a Relay-level message that looks like a quota
/// signal from Apps Script. Io/Tls/Timeout errors are local transport issues
/// and must NOT trigger account exhaustion — that would false-stop accounts on
/// any network glitch.
fn is_quota_like_fronter_error(e: &FronterError) -> bool {
match e {
FronterError::Relay(msg) => looks_like_quota_error(msg),
FronterError::NonRetryable(inner) => is_quota_like_fronter_error(inner),
_ => false,
}
}

fn looks_like_quota_error(msg: &str) -> bool {
let lower = msg.to_ascii_lowercase();
lower.contains("quota")
Expand Down
Loading
Loading