Skip to content

Audit fixes: rebalance-flush timeout, batch-subscriber guard, backpressure + docs/test/refactor#32

Merged
lesnik512 merged 12 commits into
mainfrom
audit/robustness-docs-test-fixes
Jun 13, 2026
Merged

Audit fixes: rebalance-flush timeout, batch-subscriber guard, backpressure + docs/test/refactor#32
lesnik512 merged 12 commits into
mainfrom
audit/robustness-docs-test-fixes

Conversation

@lesnik512

Copy link
Copy Markdown
Member

Summary

A deep audit of code, docs, and tests surfaced three production-impacting behavioral risks plus documentation drift, a misleading test, and cleanup opportunities. This branch fixes all of them. The at-least-once contract and committed = processed_max + 1 offset semantics are unchanged.

Behavioral fixes

  • update README.md #1 Rebalance flush no longer hangs. commit_all() previously did an unbounded messages_queue.join(), blocking on_partitions_revoked until every in-flight handler finished — a slow/hung handler could stall the rebalance past max.poll.interval.ms and trigger a rebalance storm. It's now bounded by flush_timeout_sec (default 10s, plumbed through ConsumerRebalanceListener / create_rebalance_listener); on timeout it commits completed offsets and lets the rest be redelivered.
  • small fixes #2 Batch subscribers rejected cleanly. @broker.subscriber(batch=True, ack_policy=MANUAL) previously crashed with an opaque AttributeError deep in the commit path. consume_scope now raises a clear RuntimeError (after the FakeConsumer and non-MANUAL pass-throughs, so those are unaffected).
  • refactor and add integration tests #3 Opt-in backpressure. New max_uncommitted_tasks (default 10_000, None = unbounded) caps tasks admitted-but-not-committed; when the ceiling is hit, send_task blocks so the consume path stalls until commits catch up, preventing unbounded memory growth during a commit/broker outage.

Docs

  • Added missing LICENSE (MIT, matching pyproject + the README link).
  • Rewrote the stale README §KafkaConcurrentHandler (claimed a counter+Event and signal handlers the code never had) and documented the rebalance flush cost + max_uncommitted_tasks.
  • Fixed "observer task" → "committer task" wording.

Tests

  • Rewrote the misleading test_real_kafka_graceful_shutdown_waits_for_tasks (asserted shutdown waits for tasks, but stop() cancels them) into test_real_kafka_shutdown_cancels_in_flight_tasks, which genuinely asserts cancellation + at-least-once redelivery against real Redpanda.
  • Added unit coverage for the flush timeout, batch rejection, and backpressure (block / unbounded / dead-while-blocked).

Cleanup

  • Extracted the pure pending/offset helpers (insert_sorted, extract_ready_prefixes, map_offsets_per_partition) and KafkaCommitTask into a new _pending_state.py, leaving batch_committer.py as the loop driver (thin delegators preserve all call sites — existing tests pass unmodified).
  • Removed an orphaned dead_letter_queue .pyc.

Design spec and task-by-task plan are included under planning/.

Test Plan

  • just test — 118 tests pass (unit + integration against Redpanda) at 100% coverage
  • just lint-ci clean (eof-fixer, ruff format, ruff check, ty)
  • Final adversarial code review: no Critical/Important issues; backpressure accounting conservation, send_task lost-wakeup, commit_all timeout queue-counter safety, refactor class-identity, and batch-guard placement all independently verified
  • Reviewer to sanity-check the flush_timeout_sec (10s) and max_uncommitted_tasks (10_000) defaults

🤖 Generated with Claude Code

lesnik512 and others added 12 commits June 13, 2026 17:54
Spec for the deep-read audit: rebalance-flush timeout (#1), batch-subscriber
rejection (#2), backpressure ceiling (#3), README/license/wording fixes
(#4-#7), test rewrites + coverage (#8-#9), batch_committer split and cleanup
(#10-#12).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ng (#3)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…or (#4,#6)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The blocked handler is always cancelled before the 30s sleep returns, so the
completion-append line is never reached; the `completed_phase1 == []` assertion
already proves the handler does not complete. Restores the 100% coverage gate.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Per PR #32 review: document the per-task memory footprint (commit metadata
only, not payload — ~few MB at the 10000 default) and note that values below
commit_batch_size fall back to timeout/flush-driven commits.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@lesnik512 lesnik512 merged commit a4355ee into main Jun 13, 2026
5 checks passed
@lesnik512 lesnik512 deleted the audit/robustness-docs-test-fixes branch June 13, 2026 16:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant