Conversation
📝 WalkthroughWalkthroughConfiguration files for NATS bucket management are refactored to use observation bucket tables, debug flags are added, and API settings are simplified. Test file introduces async helpers for NATS observation validation and new test functions for event-to-observation flows. Changes
Sequence DiagramsequenceDiagram
participant Test as Test Runner
participant API as NATS API
participant Bucket as Observation Bucket
participant Handler as Observation Handler
Test->>API: send_event(event)
activate API
API->>Bucket: publish observation
deactivate API
Test->>Bucket: subscribe(subject)
activate Bucket
Bucket->>Test: message received
deactivate Bucket
Test->>Handler: handle_observation(obsJSON)
activate Handler
Handler->>Handler: parse JSON
Handler->>Handler: assert domain match
Handler->>Handler: assert tag_mask match
Note over Handler: validation complete
deactivate Handler
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~15 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
test_basic.py (1)
71-75: Add a precondition assert forobs["added"]before indexing.This makes failures clearer than a raw
KeyError/IndexErrorwhen no additions are present.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test_basic.py` around lines 71 - 75, The test helper handle_observation should assert the presence and non-emptiness of the "added" field before indexing into it; add a precondition in handle_observation that "added" is a key in obs and that obs["added"] is a non-empty sequence (e.g., assert "added" in obs and len(obs["added"]) > 0 or isinstance(obs["added"], list) and obs["added"]), then proceed with the existing assertions on obs["added"][0]["name"] and obs["added"][0]["tag_mask"] to make failures clearer than KeyError/IndexError.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@test_basic.py`:
- Around line 95-106: There are two top-level functions both named test_looptest
which causes the second definition to override the first; rename one of them
(for example rename the second test_looptest to test_looptest_observation or the
first to test_looptest_send) so both tests run, and update any test-suite
references if present; locate the duplicate function definitions by the symbol
name test_looptest in the file and change one function name accordingly.
- Around line 59-70: The check_observation coroutine blocks indefinitely on
sub.messages and doesn't guarantee NATS cleanup; change it to await a single
message using sub.next_msg(timeout=5) (or similar timeout) and call
handle_observation(msg.data.decode(), domain, expected), and ensure cleanup in a
finally block by unsubscribing the subscription (sub.unsubscribe()) and
draining/closing the NATS connection (nc.drain() or nc.close()) so resources are
always released even on timeout/errors.
- Around line 103-105: The test currently calls send_event(...) before
check_observation(...), which can miss messages; change the test so
check_observation(...) subscribes first (e.g., start check_observation as a task
or await it to establish the subscription) and only then call send_event(...),
referencing the existing send_event and check_observation functions;
alternatively replace the transient subscription with a JetStream durable
consumer in check_observation to persist observations until consumed.
---
Nitpick comments:
In `@test_basic.py`:
- Around line 71-75: The test helper handle_observation should assert the
presence and non-emptiness of the "added" field before indexing into it; add a
precondition in handle_observation that "added" is a key in obs and that
obs["added"] is a non-empty sequence (e.g., assert "added" in obs and
len(obs["added"]) > 0 or isinstance(obs["added"], list) and obs["added"]), then
proceed with the existing assertions on obs["added"][0]["name"] and
obs["added"][0]["tag_mask"] to make failures clearer than KeyError/IndexError.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e6bcc04f-0004-4b6f-b140-3f13566afd4d
📒 Files selected for processing (3)
sut/tapir-analyse-looptest/config.tomlsut/tapir-analyse-new-qname/config.tomltest_basic.py
| async def check_observation(subject, domain, expected): | ||
| nc = await nats.connect(servers="localhost:4222") | ||
|
|
||
| sub = await nc.subscribe(subject) | ||
|
|
||
| try: | ||
| async for msg in sub.messages: | ||
| await handle_observation(msg.data.decode(), domain, expected) | ||
| await sub.unsubscribe() | ||
| except Exception as e: | ||
| raise e | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify that check_observation currently lacks timeout and close/drain cleanup.
python - <<'PY'
import ast, pathlib
p = pathlib.Path("test_basic.py")
src = p.read_text()
tree = ast.parse(src)
for node in tree.body:
if isinstance(node, ast.AsyncFunctionDef) and node.name == "check_observation":
seg = "\n".join(src.splitlines()[node.lineno-1:node.end_lineno])
print("Function lines:", node.lineno, "-", node.end_lineno)
print("has asyncio.wait_for:", "asyncio.wait_for(" in seg)
print("has sub.next_msg:", ".next_msg(" in seg)
print("has nc.drain/close:", (".drain(" in seg) or (".close(" in seg))
PYRepository: dnstapir/core-integration-test
Length of output: 175
🏁 Script executed:
#!/bin/bash
# Check if there are other NATS usage patterns in the codebase
rg "await nats\." -A 5 -B 2 test_basic.pyRepository: dnstapir/core-integration-test
Length of output: 454
🏁 Script executed:
#!/bin/bash
# Look for any other NATS subscribe/connection usage patterns
rg "\.subscribe\(|\.drain\(|\.close\(" -B 2 -A 3 test_basic.pyRepository: dnstapir/core-integration-test
Length of output: 215
🌐 Web query:
NATS Python client sub.next_msg() async timeout pattern documentation
💡 Result:
The NATS Python client (nats.py) provides an async timeout pattern for sub.next_msg via an optional timeout parameter in seconds (default 1.0). It awaits a message from the subscription's internal queue using asyncio.wait_for(self._pending_queue.get, timeout). If no message arrives within the timeout, it raises nats.errors.TimeoutError. Usage example from official docs and examples: import asyncio import nats from nats.errors import TimeoutError async def main: nc = await nats.connect("demo.nats.io") sub = await nc.subscribe("foo") try: msg = await sub.next_msg(timeout=1.0) # Wait up to 1 second print("Received:", msg) except TimeoutError: print("No message received within timeout") await nc.close asyncio.run(main) Key points: - Only for sync-style subscriptions (no callback in subscribe); using a callback prevents next_msg. - Timeout is per-call; specify None for no timeout (blocks indefinitely). - Handles connection closure by raising nats.errors.ConnectionClosedError. - Examples from NATS by Example show timeout=0.1 for quick polls. This is for core NATS subscriptions (nats.aio.client.Client.subscribe); JetStream pull subs use fetch or iterators with similar timeouts.
Citations:
- 1: https://nats-io.github.io/nats.py/_modules/nats/aio/subscription.html
- 2: https://nats-io.github.io/nats.py/index.html
- 3: https://nats-io.github.io/nats.py/
- 4: https://github.com/nats-io/nats.py
- 5: https://natsbyexample.com/examples/messaging/pub-sub/python
Add timeout and guaranteed NATS cleanup in check_observation.
The async loop on sub.messages has no timeout and will block CI indefinitely if no message arrives. Additionally, the NATS client connection and subscription must be explicitly closed/drained in a finally block to prevent resource leakage between tests. The suggested fix uses sub.next_msg(timeout=5) with proper cleanup via sub.unsubscribe() and nc.drain().
Suggested fix
async def check_observation(subject, domain, expected):
nc = await nats.connect(servers="localhost:4222")
-
- sub = await nc.subscribe(subject)
-
- try:
- async for msg in sub.messages:
- await handle_observation(msg.data.decode(), domain, expected)
- await sub.unsubscribe()
- except Exception as e:
- raise e
+ sub = await nc.subscribe(subject)
+ try:
+ msg = await asyncio.wait_for(sub.next_msg(), timeout=5)
+ await handle_observation(msg.data.decode(), domain, expected)
+ finally:
+ await sub.unsubscribe()
+ await nc.drain()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def check_observation(subject, domain, expected): | |
| nc = await nats.connect(servers="localhost:4222") | |
| sub = await nc.subscribe(subject) | |
| try: | |
| async for msg in sub.messages: | |
| await handle_observation(msg.data.decode(), domain, expected) | |
| await sub.unsubscribe() | |
| except Exception as e: | |
| raise e | |
| async def check_observation(subject, domain, expected): | |
| nc = await nats.connect(servers="localhost:4222") | |
| sub = await nc.subscribe(subject) | |
| try: | |
| msg = await asyncio.wait_for(sub.next_msg(), timeout=5) | |
| await handle_observation(msg.data.decode(), domain, expected) | |
| finally: | |
| await sub.unsubscribe() | |
| await nc.drain() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@test_basic.py` around lines 59 - 70, The check_observation coroutine blocks
indefinitely on sub.messages and doesn't guarantee NATS cleanup; change it to
await a single message using sub.next_msg(timeout=5) (or similar timeout) and
call handle_observation(msg.data.decode(), domain, expected), and ensure cleanup
in a finally block by unsubscribing the subscription (sub.unsubscribe()) and
draining/closing the NATS connection (nc.drain() or nc.close()) so resources are
always released even on timeout/errors.
| def test_looptest(): | ||
| domain = "test.from-edge.looptest.dnstapir.se" | ||
| expected_obs = 1024 | ||
|
|
||
| event = gen_event(domain) | ||
| subject = "core-integration-test.events.new_qname" | ||
| thumbprint = "thumbprint1" | ||
|
|
||
| asyncio.run(send_event(event, subject, thumbprint)) | ||
| asyncio.run(check_observation("core-integration-test.out", domain, expected_obs)) | ||
|
|
||
| def test_looptest(): |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Detect duplicate top-level test function names in test_basic.py
python - <<'PY'
import ast, collections, pathlib
p = pathlib.Path("test_basic.py")
tree = ast.parse(p.read_text())
names = [n.name for n in tree.body if isinstance(n, ast.FunctionDef)]
dups = {k:v for k,v in collections.Counter(names).items() if v > 1}
print("duplicates:", dups)
for n in tree.body:
if isinstance(n, ast.FunctionDef) and n.name in dups:
print(f"{n.name} at line {n.lineno}")
PYRepository: dnstapir/core-integration-test
Length of output: 157
Rename one test_looptest function; Line 106 definition overrides Line 95.
Two functions have the same name in module scope. The second definition at line 106 silently replaces the first at line 95, so only one test executes and the first test scenario is lost.
Suggested fix
-def test_looptest():
+def test_looptest_non_loop_domain():
domain = "example.xa"
expected_obs = 1🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@test_basic.py` around lines 95 - 106, There are two top-level functions both
named test_looptest which causes the second definition to override the first;
rename one of them (for example rename the second test_looptest to
test_looptest_observation or the first to test_looptest_send) so both tests run,
and update any test-suite references if present; locate the duplicate function
definitions by the symbol name test_looptest in the file and change one function
name accordingly.
| asyncio.run(send_event(event, subject, thumbprint)) | ||
| asyncio.run(check_observation("core-integration-test.out", domain, expected_obs)) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify publish happens before observation subscription in both tests.
python - <<'PY'
import ast, pathlib
src = pathlib.Path("test_basic.py").read_text()
tree = ast.parse(src)
for fn in tree.body:
if isinstance(fn, ast.FunctionDef) and fn.name.startswith("test_"):
calls = []
for n in ast.walk(fn):
if isinstance(n, ast.Call) and isinstance(n.func, ast.Attribute) and n.func.attr == "run":
if n.args and isinstance(n.args[0], ast.Call) and isinstance(n.args[0].func, ast.Name):
calls.append((n.lineno, n.args[0].func.id))
if calls:
print(fn.name, calls)
PYRepository: dnstapir/core-integration-test
Length of output: 201
🏁 Script executed:
cat -n test_basic.py | head -150Repository: dnstapir/core-integration-test
Length of output: 4213
🏁 Script executed:
# Look for conftest.py and any fixtures
find . -name "conftest.py" -o -name "pytest.ini" | head -20
# Check if there are other test files or service definitions
ls -laRepository: dnstapir/core-integration-test
Length of output: 886
🏁 Script executed:
cat run_test.sh
echo "=== SUT Directory Structure ==="
find sut -type f | head -20
echo "=== Check for NATS configuration ==="
grep -r "JetStream\|durable\|subscribe" sut/ --include="*.py" --include="*.go" | head -20Repository: dnstapir/core-integration-test
Length of output: 795
🏁 Script executed:
cat sut/docker-compose.yaml
echo "=== NATS Configuration ==="
cat sut/nats/nats.confRepository: dnstapir/core-integration-test
Length of output: 1650
🏁 Script executed:
# Search for stream creation or JetStream configuration
grep -r "stream\|Stream\|add_stream" . --include="*.py" --include="*.toml" --include="*.json" --include="*.conf" 2>/dev/null
echo "=== Check observation-encoder config ==="
cat sut/observation-encoder/config.toml
echo "=== Check analysis service configs ==="
cat sut/tapir-analyse-new-qname/config.toml
cat sut/tapir-analyse-looptest/config.tomlRepository: dnstapir/core-integration-test
Length of output: 1470
🏁 Script executed:
# Verify the test subject names one more time and check if there's any other way observations are captured
grep -n "core-integration-test.out\|check_observation" test_basic.py
# Check if there's any pre-subscription setup or message waiting logic
grep -n "subscribe\|Consumer\|durable" test_basic.pyRepository: dnstapir/core-integration-test
Length of output: 392
Subscribe before publishing to avoid race in both loop tests.
Lines 103–105 and 114–115 publish the event before subscribing to the observation output. The processing pipeline runs concurrently in Docker containers and can emit observations before check_observation() establishes its subscription, causing the message to be lost since regular NATS subscriptions receive only messages published after subscription. Reorder calls so check_observation() subscribes first, or use a JetStream durable consumer to persist the observation until consumed.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@test_basic.py` around lines 103 - 105, The test currently calls
send_event(...) before check_observation(...), which can miss messages; change
the test so check_observation(...) subscribes first (e.g., start
check_observation as a task or await it to establish the subscription) and only
then call send_event(...), referencing the existing send_event and
check_observation functions; alternatively replace the transient subscription
with a JetStream durable consumer in check_observation to persist observations
until consumed.
Summary by CodeRabbit
Tests
Chores