Audit fixes: rebalance-flush timeout, batch-subscriber guard, backpressure + docs/test/refactor#32
Merged
Merged
Conversation
Spec for the deep-read audit: rebalance-flush timeout (#1), batch-subscriber rejection (#2), backpressure ceiling (#3), README/license/wording fixes (#4-#7), test rewrites + coverage (#8-#9), batch_committer split and cleanup (#10-#12). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ng (#3) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The blocked handler is always cancelled before the 30s sleep returns, so the completion-append line is never reached; the `completed_phase1 == []` assertion already proves the handler does not complete. Restores the 100% coverage gate. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Per PR #32 review: document the per-task memory footprint (commit metadata only, not payload — ~few MB at the 10000 default) and note that values below commit_batch_size fall back to timeout/flush-driven commits. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
A deep audit of code, docs, and tests surfaced three production-impacting behavioral risks plus documentation drift, a misleading test, and cleanup opportunities. This branch fixes all of them. The at-least-once contract and
committed = processed_max + 1offset semantics are unchanged.Behavioral fixes
commit_all()previously did an unboundedmessages_queue.join(), blockingon_partitions_revokeduntil every in-flight handler finished — a slow/hung handler could stall the rebalance pastmax.poll.interval.msand trigger a rebalance storm. It's now bounded byflush_timeout_sec(default 10s, plumbed throughConsumerRebalanceListener/create_rebalance_listener); on timeout it commits completed offsets and lets the rest be redelivered.@broker.subscriber(batch=True, ack_policy=MANUAL)previously crashed with an opaqueAttributeErrordeep in the commit path.consume_scopenow raises a clearRuntimeError(after the FakeConsumer and non-MANUAL pass-throughs, so those are unaffected).max_uncommitted_tasks(default 10_000,None= unbounded) caps tasks admitted-but-not-committed; when the ceiling is hit,send_taskblocks so the consume path stalls until commits catch up, preventing unbounded memory growth during a commit/broker outage.Docs
LICENSE(MIT, matching pyproject + the README link).max_uncommitted_tasks.Tests
test_real_kafka_graceful_shutdown_waits_for_tasks(asserted shutdown waits for tasks, butstop()cancels them) intotest_real_kafka_shutdown_cancels_in_flight_tasks, which genuinely asserts cancellation + at-least-once redelivery against real Redpanda.Cleanup
insert_sorted,extract_ready_prefixes,map_offsets_per_partition) andKafkaCommitTaskinto a new_pending_state.py, leavingbatch_committer.pyas the loop driver (thin delegators preserve all call sites — existing tests pass unmodified).dead_letter_queue.pyc.Design spec and task-by-task plan are included under
planning/.Test Plan
just test— 118 tests pass (unit + integration against Redpanda) at 100% coveragejust lint-ciclean (eof-fixer, ruff format, ruff check, ty)send_tasklost-wakeup,commit_alltimeout queue-counter safety, refactor class-identity, and batch-guard placement all independently verifiedflush_timeout_sec(10s) andmax_uncommitted_tasks(10_000) defaults🤖 Generated with Claude Code