Updated with the replay features and readme#10
Conversation
There was a problem hiding this comment.
Pull request overview
This PR improves the reliability and observability of replay in cortexadb-py by adding a strict mode, structured validation, and diagnostic reporting for both replay and replay export.
Changes:
- Added
strictflag toCortexaDB.replay()with per-op validation, reporting, and fail-fast behavior. - Added replay/export diagnostic report properties (
last_replay_report,last_export_replay_report) and export counters. - Added replay-focused smoke tests and updated README with strict-mode and diagnostics notes.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| crates/cortexadb-py/cortexadb/client.py | Implements strict/non-strict replay handling, report generation, export reporting, and new report properties. |
| crates/cortexadb-py/test_smoke.py | Adds smoke tests for strict vs non-strict replay behavior and export report presence. |
| README.md | Documents replay strict mode and diagnostic report usage; adds a PyPI downloads badge. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| except Exception: | ||
| pass # non-fatal: IDs may not exist if log is partial | ||
| if strict: | ||
| raise CortexaDBError( | ||
| f"Replay op #{index} (connect) failed for ids " | ||
| f"{record.get('from_id')}->{record.get('to_id')}" | ||
| ) |
There was a problem hiding this comment.
The exception handlers for connect/delete/checkpoint/compact drop the original exception (except Exception:) and, in strict mode, re-raise without chaining. This makes strict-mode failures and non-strict diagnostics harder to debug compared to the remember branch which includes the exception message. Capture the exception as e, include e in the raised message / failure reason, and use exception chaining (raise ... from e) to preserve the root cause.
| if strict: | ||
| raise CortexaDBError( | ||
| f"Replay op #{index} (remember) failed: {e}" | ||
| ) |
There was a problem hiding this comment.
In strict mode, the remember failure path re-raises a new CortexaDBError but does not chain the original exception (raise ... from e). Chaining would preserve the traceback and make replay failures significantly easier to diagnose.
| ) | |
| ) from e |
| except Exception: | ||
| pass | ||
| if strict: | ||
| raise CortexaDBError( | ||
| f"Replay op #{index} (delete) failed for id {record.get('id')}" | ||
| ) |
There was a problem hiding this comment.
The delete exception handler uses except Exception: and then re-raises in strict mode without including/chaining the original exception. Capture the exception (as e), include it in the strict-mode error, and use raise ... from e so callers can see the underlying failure reason.
| except Exception: | ||
| pass # non-fatal on fresh DB | ||
| if strict: | ||
| raise CortexaDBError(f"Replay op #{index} (checkpoint) failed") | ||
| add_failure( |
There was a problem hiding this comment.
The checkpoint exception handler drops the original exception (except Exception:) and the strict-mode error message has no underlying details. Capture the exception as e and chain it (raise ... from e) so failures are diagnosable.
| except Exception: | ||
| pass # non-fatal on fresh DB | ||
| if strict: | ||
| raise CortexaDBError(f"Replay op #{index} (compact) failed") | ||
| add_failure( |
There was a problem hiding this comment.
The compact exception handler drops the original exception (except Exception:) and re-raises without chaining in strict mode. Capture the exception as e and use raise ... from e (and optionally include e in the message) to preserve the root cause for diagnostics.
| def test_replay_non_strict_unknown_op_reports_skip(cleanup_replay): | ||
| with open(LOG_PATH, "w", encoding="utf-8") as f: | ||
| f.write( | ||
| json.dumps( | ||
| { | ||
| "cortexadb_replay": "1.0", | ||
| "dimension": 3, | ||
| "sync": "strict", | ||
| "recorded_at": "2026-03-01T00:00:00Z", | ||
| } | ||
| ) | ||
| + "\n" | ||
| ) | ||
| f.write(json.dumps({"op": "unknown_op", "foo": "bar"}) + "\n") | ||
|
|
||
| db = CortexaDB.replay(LOG_PATH, REPLAY_DB, strict=False) | ||
| assert len(db) == 0 | ||
| report = db.last_replay_report | ||
| assert report is not None | ||
| assert report["skipped"] == 1 | ||
| assert report["failed"] == 0 | ||
| assert report["op_counts"]["unknown"] == 1 | ||
|
|
||
|
|
||
| def test_replay_strict_unknown_op_raises(cleanup_replay): | ||
| with open(LOG_PATH, "w", encoding="utf-8") as f: | ||
| f.write( | ||
| json.dumps( | ||
| { | ||
| "cortexadb_replay": "1.0", | ||
| "dimension": 3, | ||
| "sync": "strict", | ||
| "recorded_at": "2026-03-01T00:00:00Z", | ||
| } | ||
| ) | ||
| + "\n" | ||
| ) | ||
| f.write(json.dumps({"op": "unknown_op", "foo": "bar"}) + "\n") | ||
|
|
||
| with pytest.raises(CortexaDBError, match="unknown replay op"): | ||
| CortexaDB.replay(LOG_PATH, REPLAY_DB, strict=True) | ||
|
|
There was a problem hiding this comment.
Replay now has explicit handling for malformed operation records (e.g., missing/invalid 'op', and other schema validation paths), but the smoke tests only cover unknown-op and missing-embedding cases. Add a test case for a malformed log line such as a non-dict JSON value (e.g., []) and/or an op record with a non-string/missing 'op', asserting strict-mode raises and non-strict reports a skip.
| return None | ||
|
|
||
| for index, record in enumerate(reader.operations(), start=1): | ||
| report["total_ops"] += 1 |
There was a problem hiding this comment.
ReplayReader.operations() yields the result of json.loads(line), which can be any JSON type (list/int/str), not guaranteed to be a dict. The replay loop assumes a dict and calls record.get(...), which will raise AttributeError and bypass the intended strict/non-strict handling and reporting. Add a isinstance(record, dict) check and treat non-dict records as malformed (raise in strict mode; record a skipped failure otherwise).
| report["total_ops"] += 1 | |
| report["total_ops"] += 1 | |
| # Ensure each replay record is a JSON object (dict). Non-dict records are malformed. | |
| if not isinstance(record, dict): | |
| if strict: | |
| raise CortexaDBConfigError( | |
| f"Replay op #{index} has invalid format: expected object, " | |
| f"got {type(record).__name__}: {record!r}" | |
| ) | |
| report["op_counts"]["unknown"] += 1 | |
| add_failure( | |
| index=index, | |
| op="unknown", | |
| reason="non-dict replay record (expected JSON object)", | |
| record=record, | |
| ) | |
| continue |
| if not isinstance(record["embedding"], list): | ||
| return "remember record 'embedding' must be a list" | ||
| elif op == "connect": | ||
| for field in ("from_id", "to_id", "relation"): | ||
| if field not in record: | ||
| return f"connect record missing '{field}'" | ||
| elif op == "delete": | ||
| if "id" not in record: | ||
| return "delete record missing 'id'" |
There was a problem hiding this comment.
validate_record() only checks field presence for connect/delete but not that values are non-null and of the expected types (e.g., id, from_id, to_id should be ints; relation should be a string). As written, records like {"op":"delete","id":null} pass validation and then fail later as an execution error (counted as failed) rather than being treated as malformed input (skipped) or failing fast in strict mode. Consider extending validation to enforce basic types/non-null (and optionally check embedding length against the header dimension) so strict/non-strict behavior matches the documented intent.
| if not isinstance(record["embedding"], list): | |
| return "remember record 'embedding' must be a list" | |
| elif op == "connect": | |
| for field in ("from_id", "to_id", "relation"): | |
| if field not in record: | |
| return f"connect record missing '{field}'" | |
| elif op == "delete": | |
| if "id" not in record: | |
| return "delete record missing 'id'" | |
| embedding = record["embedding"] | |
| if not isinstance(embedding, list): | |
| return "remember record 'embedding' must be a list" | |
| # Optional: enforce embedding dimension from header if available | |
| if hdr.dimension and len(embedding) != hdr.dimension: | |
| return ( | |
| f"remember record 'embedding' length {len(embedding)} does not " | |
| f"match header dimension {hdr.dimension}" | |
| ) | |
| elif op == "connect": | |
| for field in ("from_id", "to_id", "relation"): | |
| if field not in record: | |
| return f"connect record missing '{field}'" | |
| from_id = record.get("from_id") | |
| to_id = record.get("to_id") | |
| relation = record.get("relation") | |
| if from_id is None or not isinstance(from_id, int): | |
| return "connect record 'from_id' must be a non-null integer" | |
| if to_id is None or not isinstance(to_id, int): | |
| return "connect record 'to_id' must be a non-null integer" | |
| if relation is None or not isinstance(relation, str): | |
| return "connect record 'relation' must be a non-null string" | |
| elif op == "delete": | |
| if "id" not in record: | |
| return "delete record missing 'id'" | |
| delete_id = record.get("id") | |
| if delete_id is None or not isinstance(delete_id, int): | |
| return "delete record 'id' must be a non-null integer" |
Description
This PR hardens Python replay reliability in
cortexadb-pyby removing silent failure paths and adding explicit replay diagnostics.Summary of changes
strictmode to replay:CortexaDB.replay(..., strict=False)keeps backward-compatible best-effort behavior.CortexaDB.replay(..., strict=True)fails fast on malformed/failed operations.remember,connect,delete,checkpoint,compact, unknown ops.db.last_replay_reportdb.last_export_replay_reportchecked,exported,skipped_missing_id,skipped_missing_embedding,errors.Motivation / context
Replay previously used multiple
except Exception: passpaths that could silently skip operations and hide data restoration issues. This change improves correctness and observability while preserving default compatibility.Type of change
How Has This Been Tested?
Tests run locally:
python3 -m py_compile crates/cortexadb-py/cortexadb/client.py crates/cortexadb-py/test_smoke.py.venv/bin/python -m pytest -q crates/cortexadb-py/test_smoke.py -k "replay_ or export_replay_sets_report"10 passed, 23 deselectedcargo check --workspaceUnit Tests
Smoke Tests
Performance Benchmarks
Checklist: