Skip to content

Leon/dev#2

Open
zluudg wants to merge 2 commits intomainfrom
leon/dev
Open

Leon/dev#2
zluudg wants to merge 2 commits intomainfrom
leon/dev

Conversation

@zluudg
Copy link
Copy Markdown
Collaborator

@zluudg zluudg commented Apr 14, 2026

Summary by CodeRabbit

  • Tests

    • Enhanced test coverage with new observation validation helpers and expanded test scenarios for improved system reliability verification.
  • Chores

    • Updated service configurations to enable improved debugging and monitoring capabilities, along with refined bucket management settings for better system observability.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 14, 2026

📝 Walkthrough

Walkthrough

Configuration files for NATS bucket management are refactored to use observation bucket tables, debug flags are added, and API settings are simplified. Test file introduces async helpers for NATS observation validation and new test functions for event-to-observation flows.

Changes

Cohort / File(s) Summary
Configuration Updates
sut/tapir-analyse-looptest/config.toml, sut/tapir-analyse-new-qname/config.toml
NATS configuration restructured: simple bucket string settings replaced with structured [[nats.observation_buckets]] tables; debug = true added to NATS section; seen_domains_bucket converted to table format; API section loses explicit address and port; standalone [libtapir] section removed. Ignore list updated to use DNS suffix instead of edge-specific patterns.
Test Helpers & Validation
test_basic.py
Added async helpers check_observation() and handle_observation() to subscribe to NATS observation buckets, consume messages, and validate domain/tag matching. Introduced two test_looptest() test functions (with same name) that send events and validate observations; replaced prior test_send() function.

Sequence Diagram

sequenceDiagram
    participant Test as Test Runner
    participant API as NATS API
    participant Bucket as Observation Bucket
    participant Handler as Observation Handler

    Test->>API: send_event(event)
    activate API
    API->>Bucket: publish observation
    deactivate API
    
    Test->>Bucket: subscribe(subject)
    activate Bucket
    Bucket->>Test: message received
    deactivate Bucket
    
    Test->>Handler: handle_observation(obsJSON)
    activate Handler
    Handler->>Handler: parse JSON
    Handler->>Handler: assert domain match
    Handler->>Handler: assert tag_mask match
    Note over Handler: validation complete
    deactivate Handler
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~15 minutes

Poem

🐰 Hoppy hops through NATS so bright,
Observation buckets structured just right!
Async helpers dance with validation's grace,
Events and domains find their place! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Leon/dev' is generic and does not describe the actual changes in the pull request, which involve refactoring NATS configuration, restructuring bucket settings, and updating test helpers. Replace the branch name with a descriptive title that summarizes the main changes, such as 'Refactor NATS configuration and add observation bucket helpers' or 'Update NATS bucket configuration and test utilities'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch leon/dev

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
test_basic.py (1)

71-75: Add a precondition assert for obs["added"] before indexing.

This makes failures clearer than a raw KeyError/IndexError when no additions are present.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test_basic.py` around lines 71 - 75, The test helper handle_observation
should assert the presence and non-emptiness of the "added" field before
indexing into it; add a precondition in handle_observation that "added" is a key
in obs and that obs["added"] is a non-empty sequence (e.g., assert "added" in
obs and len(obs["added"]) > 0 or isinstance(obs["added"], list) and
obs["added"]), then proceed with the existing assertions on
obs["added"][0]["name"] and obs["added"][0]["tag_mask"] to make failures clearer
than KeyError/IndexError.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@test_basic.py`:
- Around line 95-106: There are two top-level functions both named test_looptest
which causes the second definition to override the first; rename one of them
(for example rename the second test_looptest to test_looptest_observation or the
first to test_looptest_send) so both tests run, and update any test-suite
references if present; locate the duplicate function definitions by the symbol
name test_looptest in the file and change one function name accordingly.
- Around line 59-70: The check_observation coroutine blocks indefinitely on
sub.messages and doesn't guarantee NATS cleanup; change it to await a single
message using sub.next_msg(timeout=5) (or similar timeout) and call
handle_observation(msg.data.decode(), domain, expected), and ensure cleanup in a
finally block by unsubscribing the subscription (sub.unsubscribe()) and
draining/closing the NATS connection (nc.drain() or nc.close()) so resources are
always released even on timeout/errors.
- Around line 103-105: The test currently calls send_event(...) before
check_observation(...), which can miss messages; change the test so
check_observation(...) subscribes first (e.g., start check_observation as a task
or await it to establish the subscription) and only then call send_event(...),
referencing the existing send_event and check_observation functions;
alternatively replace the transient subscription with a JetStream durable
consumer in check_observation to persist observations until consumed.

---

Nitpick comments:
In `@test_basic.py`:
- Around line 71-75: The test helper handle_observation should assert the
presence and non-emptiness of the "added" field before indexing into it; add a
precondition in handle_observation that "added" is a key in obs and that
obs["added"] is a non-empty sequence (e.g., assert "added" in obs and
len(obs["added"]) > 0 or isinstance(obs["added"], list) and obs["added"]), then
proceed with the existing assertions on obs["added"][0]["name"] and
obs["added"][0]["tag_mask"] to make failures clearer than KeyError/IndexError.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e6bcc04f-0004-4b6f-b140-3f13566afd4d

📥 Commits

Reviewing files that changed from the base of the PR and between 88081c4 and d3d73d4.

📒 Files selected for processing (3)
  • sut/tapir-analyse-looptest/config.toml
  • sut/tapir-analyse-new-qname/config.toml
  • test_basic.py

Comment thread test_basic.py
Comment on lines +59 to +70
async def check_observation(subject, domain, expected):
nc = await nats.connect(servers="localhost:4222")

sub = await nc.subscribe(subject)

try:
async for msg in sub.messages:
await handle_observation(msg.data.decode(), domain, expected)
await sub.unsubscribe()
except Exception as e:
raise e

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify that check_observation currently lacks timeout and close/drain cleanup.
python - <<'PY'
import ast, pathlib
p = pathlib.Path("test_basic.py")
src = p.read_text()
tree = ast.parse(src)
for node in tree.body:
    if isinstance(node, ast.AsyncFunctionDef) and node.name == "check_observation":
        seg = "\n".join(src.splitlines()[node.lineno-1:node.end_lineno])
        print("Function lines:", node.lineno, "-", node.end_lineno)
        print("has asyncio.wait_for:", "asyncio.wait_for(" in seg)
        print("has sub.next_msg:", ".next_msg(" in seg)
        print("has nc.drain/close:", (".drain(" in seg) or (".close(" in seg))
PY

Repository: dnstapir/core-integration-test

Length of output: 175


🏁 Script executed:

#!/bin/bash
# Check if there are other NATS usage patterns in the codebase
rg "await nats\." -A 5 -B 2 test_basic.py

Repository: dnstapir/core-integration-test

Length of output: 454


🏁 Script executed:

#!/bin/bash
# Look for any other NATS subscribe/connection usage patterns
rg "\.subscribe\(|\.drain\(|\.close\(" -B 2 -A 3 test_basic.py

Repository: dnstapir/core-integration-test

Length of output: 215


🌐 Web query:

NATS Python client sub.next_msg() async timeout pattern documentation

💡 Result:

The NATS Python client (nats.py) provides an async timeout pattern for sub.next_msg via an optional timeout parameter in seconds (default 1.0). It awaits a message from the subscription's internal queue using asyncio.wait_for(self._pending_queue.get, timeout). If no message arrives within the timeout, it raises nats.errors.TimeoutError. Usage example from official docs and examples: import asyncio import nats from nats.errors import TimeoutError async def main: nc = await nats.connect("demo.nats.io") sub = await nc.subscribe("foo") try: msg = await sub.next_msg(timeout=1.0) # Wait up to 1 second print("Received:", msg) except TimeoutError: print("No message received within timeout") await nc.close asyncio.run(main) Key points: - Only for sync-style subscriptions (no callback in subscribe); using a callback prevents next_msg. - Timeout is per-call; specify None for no timeout (blocks indefinitely). - Handles connection closure by raising nats.errors.ConnectionClosedError. - Examples from NATS by Example show timeout=0.1 for quick polls. This is for core NATS subscriptions (nats.aio.client.Client.subscribe); JetStream pull subs use fetch or iterators with similar timeouts.

Citations:


Add timeout and guaranteed NATS cleanup in check_observation.

The async loop on sub.messages has no timeout and will block CI indefinitely if no message arrives. Additionally, the NATS client connection and subscription must be explicitly closed/drained in a finally block to prevent resource leakage between tests. The suggested fix uses sub.next_msg(timeout=5) with proper cleanup via sub.unsubscribe() and nc.drain().

Suggested fix
 async def check_observation(subject, domain, expected):
     nc = await nats.connect(servers="localhost:4222")
-
-    sub = await nc.subscribe(subject)
-
-    try:
-        async for msg in sub.messages:
-            await handle_observation(msg.data.decode(), domain, expected)
-            await sub.unsubscribe()
-    except Exception as e:
-        raise e
+    sub = await nc.subscribe(subject)
+    try:
+        msg = await asyncio.wait_for(sub.next_msg(), timeout=5)
+        await handle_observation(msg.data.decode(), domain, expected)
+    finally:
+        await sub.unsubscribe()
+        await nc.drain()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def check_observation(subject, domain, expected):
nc = await nats.connect(servers="localhost:4222")
sub = await nc.subscribe(subject)
try:
async for msg in sub.messages:
await handle_observation(msg.data.decode(), domain, expected)
await sub.unsubscribe()
except Exception as e:
raise e
async def check_observation(subject, domain, expected):
nc = await nats.connect(servers="localhost:4222")
sub = await nc.subscribe(subject)
try:
msg = await asyncio.wait_for(sub.next_msg(), timeout=5)
await handle_observation(msg.data.decode(), domain, expected)
finally:
await sub.unsubscribe()
await nc.drain()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test_basic.py` around lines 59 - 70, The check_observation coroutine blocks
indefinitely on sub.messages and doesn't guarantee NATS cleanup; change it to
await a single message using sub.next_msg(timeout=5) (or similar timeout) and
call handle_observation(msg.data.decode(), domain, expected), and ensure cleanup
in a finally block by unsubscribing the subscription (sub.unsubscribe()) and
draining/closing the NATS connection (nc.drain() or nc.close()) so resources are
always released even on timeout/errors.

Comment thread test_basic.py
Comment on lines +95 to +106
def test_looptest():
domain = "test.from-edge.looptest.dnstapir.se"
expected_obs = 1024

event = gen_event(domain)
subject = "core-integration-test.events.new_qname"
thumbprint = "thumbprint1"

asyncio.run(send_event(event, subject, thumbprint))
asyncio.run(check_observation("core-integration-test.out", domain, expected_obs))

def test_looptest():
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Detect duplicate top-level test function names in test_basic.py
python - <<'PY'
import ast, collections, pathlib
p = pathlib.Path("test_basic.py")
tree = ast.parse(p.read_text())
names = [n.name for n in tree.body if isinstance(n, ast.FunctionDef)]
dups = {k:v for k,v in collections.Counter(names).items() if v > 1}
print("duplicates:", dups)
for n in tree.body:
    if isinstance(n, ast.FunctionDef) and n.name in dups:
        print(f"{n.name} at line {n.lineno}")
PY

Repository: dnstapir/core-integration-test

Length of output: 157


Rename one test_looptest function; Line 106 definition overrides Line 95.

Two functions have the same name in module scope. The second definition at line 106 silently replaces the first at line 95, so only one test executes and the first test scenario is lost.

Suggested fix
-def test_looptest():
+def test_looptest_non_loop_domain():
     domain = "example.xa"
     expected_obs = 1
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test_basic.py` around lines 95 - 106, There are two top-level functions both
named test_looptest which causes the second definition to override the first;
rename one of them (for example rename the second test_looptest to
test_looptest_observation or the first to test_looptest_send) so both tests run,
and update any test-suite references if present; locate the duplicate function
definitions by the symbol name test_looptest in the file and change one function
name accordingly.

Comment thread test_basic.py
Comment on lines +103 to +105
asyncio.run(send_event(event, subject, thumbprint))
asyncio.run(check_observation("core-integration-test.out", domain, expected_obs))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify publish happens before observation subscription in both tests.
python - <<'PY'
import ast, pathlib
src = pathlib.Path("test_basic.py").read_text()
tree = ast.parse(src)
for fn in tree.body:
    if isinstance(fn, ast.FunctionDef) and fn.name.startswith("test_"):
        calls = []
        for n in ast.walk(fn):
            if isinstance(n, ast.Call) and isinstance(n.func, ast.Attribute) and n.func.attr == "run":
                if n.args and isinstance(n.args[0], ast.Call) and isinstance(n.args[0].func, ast.Name):
                    calls.append((n.lineno, n.args[0].func.id))
        if calls:
            print(fn.name, calls)
PY

Repository: dnstapir/core-integration-test

Length of output: 201


🏁 Script executed:

cat -n test_basic.py | head -150

Repository: dnstapir/core-integration-test

Length of output: 4213


🏁 Script executed:

# Look for conftest.py and any fixtures
find . -name "conftest.py" -o -name "pytest.ini" | head -20

# Check if there are other test files or service definitions
ls -la

Repository: dnstapir/core-integration-test

Length of output: 886


🏁 Script executed:

cat run_test.sh

echo "=== SUT Directory Structure ==="
find sut -type f | head -20

echo "=== Check for NATS configuration ==="
grep -r "JetStream\|durable\|subscribe" sut/ --include="*.py" --include="*.go" | head -20

Repository: dnstapir/core-integration-test

Length of output: 795


🏁 Script executed:

cat sut/docker-compose.yaml

echo "=== NATS Configuration ==="
cat sut/nats/nats.conf

Repository: dnstapir/core-integration-test

Length of output: 1650


🏁 Script executed:

# Search for stream creation or JetStream configuration
grep -r "stream\|Stream\|add_stream" . --include="*.py" --include="*.toml" --include="*.json" --include="*.conf" 2>/dev/null

echo "=== Check observation-encoder config ==="
cat sut/observation-encoder/config.toml

echo "=== Check analysis service configs ==="
cat sut/tapir-analyse-new-qname/config.toml
cat sut/tapir-analyse-looptest/config.toml

Repository: dnstapir/core-integration-test

Length of output: 1470


🏁 Script executed:

# Verify the test subject names one more time and check if there's any other way observations are captured
grep -n "core-integration-test.out\|check_observation" test_basic.py

# Check if there's any pre-subscription setup or message waiting logic
grep -n "subscribe\|Consumer\|durable" test_basic.py

Repository: dnstapir/core-integration-test

Length of output: 392


Subscribe before publishing to avoid race in both loop tests.

Lines 103–105 and 114–115 publish the event before subscribing to the observation output. The processing pipeline runs concurrently in Docker containers and can emit observations before check_observation() establishes its subscription, causing the message to be lost since regular NATS subscriptions receive only messages published after subscription. Reorder calls so check_observation() subscribes first, or use a JetStream durable consumer to persist the observation until consumed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test_basic.py` around lines 103 - 105, The test currently calls
send_event(...) before check_observation(...), which can miss messages; change
the test so check_observation(...) subscribes first (e.g., start
check_observation as a task or await it to establish the subscription) and only
then call send_event(...), referencing the existing send_event and
check_observation functions; alternatively replace the transient subscription
with a JetStream durable consumer in check_observation to persist observations
until consumed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant