Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[![License: MIT/Apache-2.0](https://img.shields.io/badge/License-MIT%2FApache--2.0-blue.svg)](LICENSE)
[![Status: Beta](https://img.shields.io/badge/Status-Beta-brightgreen.svg)](#current-status)
[![Version](https://img.shields.io/badge/Version-0.1.5-blue.svg)](https://github.com/anaslimem/CortexaDB/releases)
[![PyPI Downloads](https://static.pepy.tech/personalized-badge/cortexadb?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=GREEN&left_text=downloads)](https://pepy.tech/projects/cortexadb)

**CortexaDB** is a simple, fast, and hard-durable embedded database designed specifically for AI agent memory. It provides a single-file-like experience (no server required) but with native support for vectors, graphs, and temporal search.

Expand Down Expand Up @@ -214,8 +215,10 @@ db.load("document.pdf", strategy="recursive")
| `.delete_memory(id)` | Permanently removes a memory and updates all indexes. |
| `.compact()` | Reclaims space by removing deleted entries from disk. |
| `.checkpoint()` | Truncates the WAL and snapshots the current state for fast startup. |
| `.export_replay(path)` | Exports current state to a replay log (NDJSON). |
| `CortexaDB.replay(log_path, db_path)` | Rebuilds a database from a replay log. |
| `.export_replay(path)` | Exports current state as a snapshot replay log (NDJSON). |
| `CortexaDB.replay(log_path, db_path, ...)` | Rebuilds a database from a replay log. Supports `strict` mode. |
| `.last_replay_report` | Diagnostic report dict from the most recent `replay()` call. |
| `.last_export_replay_report` | Diagnostic report dict from the most recent `export_replay()` call. |

### Configuration Options
When calling `CortexaDB.open()`, you can tune the behavior:
Expand All @@ -225,6 +228,36 @@ When calling `CortexaDB.open()`, you can tune the behavior:
- `index_mode`: `"exact"`, `"hnsw"`, or an HNSW config dict.
- `record`: Path to a log file for capturing the entire session for replay.

### Replay Notes

`CortexaDB.replay()` accepts a `strict` flag to control error handling:

| Mode | Behavior |
|------|----------|
| `strict=False` (default) | Skips malformed/failed operations and continues |
| `strict=True` | Raises `CortexaDBError` immediately on the first bad operation |

After a `replay()` or `export_replay()` call, a diagnostic report is available:

```python
db = CortexaDB.replay("session.log", "restored.mem", strict=False)
report = db.last_replay_report
print(report["total_ops"]) # total operations in the log
print(report["applied"]) # successfully applied
print(report["skipped"]) # skipped (malformed but non-fatal)
print(report["failed"]) # failed (execution error, non-fatal)
print(report["op_counts"]) # per-type counts: remember, connect, delete, ...
print(report["failures"]) # list of up to 50 failure details

# After export_replay:
db.export_replay("snapshot.log")
export_report = db.last_export_replay_report
print(export_report["exported"]) # memories written
print(export_report["skipped_missing_embedding"]) # entries without vectors
print(export_report["skipped_missing_id"]) # gaps in ID space
print(export_report["errors"]) # unexpected errors
```

---

## Technical Essentials: How it's built
Expand Down
214 changes: 192 additions & 22 deletions crates/cortexadb-py/cortexadb/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typing as t
import copy

from ._cortexadb import (
CortexaDBError,
Expand Down Expand Up @@ -159,6 +160,8 @@ def __init__(
):
self._embedder = embedder
self._recorder = _recorder
self._last_replay_report: t.Optional[t.Dict[str, t.Any]] = None
self._last_export_replay_report: t.Optional[t.Dict[str, t.Any]] = None
try:
try:
self._inner = _cortexadb.CortexaDB.open(
Expand Down Expand Up @@ -260,6 +263,7 @@ def replay(
db_path: str,
*,
sync: str = "strict",
strict: bool = False,
) -> "CortexaDB":
"""
Replay a log file into a fresh database, returning the populated instance.
Expand All @@ -271,6 +275,8 @@ def replay(
log_path: Path to the NDJSON replay log file.
db_path: Directory path for the new database.
sync: Sync policy for the replayed database.
strict: If True, fail fast on malformed/failed operations.
If False (default), skip invalid operations and continue.

Returns:
A :class:`CortexaDB` instance with all recorded operations applied.
Expand All @@ -296,51 +302,187 @@ def replay(

# old_id → new_id mapping (connect ops use original IDs)
id_map: t.Dict[int, int] = {}
report: t.Dict[str, t.Any] = {
"strict": strict,
"total_ops": 0,
"applied": 0,
"skipped": 0,
"failed": 0,
"op_counts": {
"remember": 0,
"connect": 0,
"delete": 0,
"checkpoint": 0,
"compact": 0,
"unknown": 0,
},
"failures": [],
}

def add_failure(
*,
index: int,
op: str,
reason: str,
record: t.Dict[str, t.Any],
counts_as_failed: bool = False,
) -> None:
if counts_as_failed:
report["failed"] += 1
else:
report["skipped"] += 1
if len(report["failures"]) < 50:
report["failures"].append(
{
"index": index,
"op": op,
"reason": reason,
"record": record,
}
)

for record in reader.operations():
def validate_record(op: str, record: t.Dict[str, t.Any]) -> t.Optional[str]:
if op == "remember":
if "embedding" not in record:
return "remember record missing 'embedding'"
if not isinstance(record["embedding"], list):
return "remember record 'embedding' must be a list"
elif op == "connect":
for field in ("from_id", "to_id", "relation"):
if field not in record:
return f"connect record missing '{field}'"
elif op == "delete":
if "id" not in record:
return "delete record missing 'id'"
Comment on lines +348 to +356
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_record() only checks field presence for connect/delete but not that values are non-null and of the expected types (e.g., id, from_id, to_id should be ints; relation should be a string). As written, records like {"op":"delete","id":null} pass validation and then fail later as an execution error (counted as failed) rather than being treated as malformed input (skipped) or failing fast in strict mode. Consider extending validation to enforce basic types/non-null (and optionally check embedding length against the header dimension) so strict/non-strict behavior matches the documented intent.

Suggested change
if not isinstance(record["embedding"], list):
return "remember record 'embedding' must be a list"
elif op == "connect":
for field in ("from_id", "to_id", "relation"):
if field not in record:
return f"connect record missing '{field}'"
elif op == "delete":
if "id" not in record:
return "delete record missing 'id'"
embedding = record["embedding"]
if not isinstance(embedding, list):
return "remember record 'embedding' must be a list"
# Optional: enforce embedding dimension from header if available
if hdr.dimension and len(embedding) != hdr.dimension:
return (
f"remember record 'embedding' length {len(embedding)} does not "
f"match header dimension {hdr.dimension}"
)
elif op == "connect":
for field in ("from_id", "to_id", "relation"):
if field not in record:
return f"connect record missing '{field}'"
from_id = record.get("from_id")
to_id = record.get("to_id")
relation = record.get("relation")
if from_id is None or not isinstance(from_id, int):
return "connect record 'from_id' must be a non-null integer"
if to_id is None or not isinstance(to_id, int):
return "connect record 'to_id' must be a non-null integer"
if relation is None or not isinstance(relation, str):
return "connect record 'relation' must be a non-null string"
elif op == "delete":
if "id" not in record:
return "delete record missing 'id'"
delete_id = record.get("id")
if delete_id is None or not isinstance(delete_id, int):
return "delete record 'id' must be a non-null integer"

Copilot uses AI. Check for mistakes.
elif op in ("checkpoint", "compact"):
return None
else:
return f"unknown replay op '{op}'"
return None

for index, record in enumerate(reader.operations(), start=1):
report["total_ops"] += 1
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplayReader.operations() yields the result of json.loads(line), which can be any JSON type (list/int/str), not guaranteed to be a dict. The replay loop assumes a dict and calls record.get(...), which will raise AttributeError and bypass the intended strict/non-strict handling and reporting. Add a isinstance(record, dict) check and treat non-dict records as malformed (raise in strict mode; record a skipped failure otherwise).

Suggested change
report["total_ops"] += 1
report["total_ops"] += 1
# Ensure each replay record is a JSON object (dict). Non-dict records are malformed.
if not isinstance(record, dict):
if strict:
raise CortexaDBConfigError(
f"Replay op #{index} has invalid format: expected object, "
f"got {type(record).__name__}: {record!r}"
)
report["op_counts"]["unknown"] += 1
add_failure(
index=index,
op="unknown",
reason="non-dict replay record (expected JSON object)",
record=record,
)
continue

Copilot uses AI. Check for mistakes.
op = record.get("op")
if not isinstance(op, str):
if strict:
raise CortexaDBConfigError(
f"Replay op #{index} has invalid/missing 'op': {record!r}"
)
report["op_counts"]["unknown"] += 1
add_failure(index=index, op="unknown", reason="missing/invalid 'op'", record=record)
continue

if op in report["op_counts"]:
report["op_counts"][op] += 1
else:
report["op_counts"]["unknown"] += 1

validation_error = validate_record(op, record)
if validation_error is not None:
if strict:
raise CortexaDBConfigError(
f"Replay op #{index} ({op}) invalid: {validation_error}"
)
add_failure(index=index, op=op, reason=validation_error, record=record)
continue

if op == "remember":
embedding: t.List[float] = record["embedding"]
new_id = db._inner.remember_embedding(
embedding=embedding,
metadata=record.get("metadata"),
namespace=record.get("namespace", "default"),
content=record.get("text", ""),
)
old_id = record.get("id")
if old_id is not None:
id_map[old_id] = new_id
try:
embedding: t.List[float] = record["embedding"]
new_id = db._inner.remember_embedding(
embedding=embedding,
metadata=record.get("metadata"),
namespace=record.get("namespace", "default"),
content=record.get("text", ""),
)
old_id = record.get("id")
if old_id is not None:
id_map[old_id] = new_id
report["applied"] += 1
except Exception as e:
if strict:
raise CortexaDBError(
f"Replay op #{index} (remember) failed: {e}"
)
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In strict mode, the remember failure path re-raises a new CortexaDBError but does not chain the original exception (raise ... from e). Chaining would preserve the traceback and make replay failures significantly easier to diagnose.

Suggested change
)
) from e

Copilot uses AI. Check for mistakes.
add_failure(
index=index,
op=op,
reason=f"remember failed: {e}",
record=record,
counts_as_failed=True,
)

elif op == "connect":
old_from = record["from_id"]
old_to = record["to_id"]
new_from = id_map.get(old_from, old_from)
new_to = id_map.get(old_to, old_to)
try:
old_from = record["from_id"]
old_to = record["to_id"]
new_from = id_map.get(old_from, old_from)
new_to = id_map.get(old_to, old_to)
db._inner.connect(new_from, new_to, record["relation"])
report["applied"] += 1
except Exception:
pass # non-fatal: IDs may not exist if log is partial
if strict:
raise CortexaDBError(
f"Replay op #{index} (connect) failed for ids "
f"{record.get('from_id')}->{record.get('to_id')}"
)
Comment on lines 423 to +428
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception handlers for connect/delete/checkpoint/compact drop the original exception (except Exception:) and, in strict mode, re-raise without chaining. This makes strict-mode failures and non-strict diagnostics harder to debug compared to the remember branch which includes the exception message. Capture the exception as e, include e in the raised message / failure reason, and use exception chaining (raise ... from e) to preserve the root cause.

Copilot uses AI. Check for mistakes.
add_failure(
index=index,
op=op,
reason="connect failed (possibly unresolved IDs)",
record=record,
counts_as_failed=True,
)

elif op == "delete":
old_id = record.get("id")
new_id = id_map.get(old_id, old_id)
try:
old_id = record.get("id")
new_id = id_map.get(old_id, old_id)
db._inner.delete_memory(new_id)
report["applied"] += 1
except Exception:
pass
if strict:
raise CortexaDBError(
f"Replay op #{index} (delete) failed for id {record.get('id')}"
)
Comment on lines 443 to +447
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delete exception handler uses except Exception: and then re-raises in strict mode without including/chaining the original exception. Capture the exception (as e), include it in the strict-mode error, and use raise ... from e so callers can see the underlying failure reason.

Copilot uses AI. Check for mistakes.
add_failure(
index=index,
op=op,
reason="delete failed (possibly missing ID)",
record=record,
counts_as_failed=True,
)

elif op == "checkpoint":
try:
db._inner.checkpoint()
report["applied"] += 1
except Exception:
pass # non-fatal on fresh DB
if strict:
raise CortexaDBError(f"Replay op #{index} (checkpoint) failed")
add_failure(
Comment on lines 460 to +463
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoint exception handler drops the original exception (except Exception:) and the strict-mode error message has no underlying details. Capture the exception as e and chain it (raise ... from e) so failures are diagnosable.

Copilot uses AI. Check for mistakes.
index=index,
op=op,
reason="checkpoint failed",
record=record,
counts_as_failed=True,
)
elif op == "compact":
try:
db._inner.compact()
report["applied"] += 1
except Exception:
pass # non-fatal on fresh DB
if strict:
raise CortexaDBError(f"Replay op #{index} (compact) failed")
add_failure(
Comment on lines 474 to +477
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compact exception handler drops the original exception (except Exception:) and re-raises without chaining in strict mode. Capture the exception as e and use raise ... from e (and optionally include e in the message) to preserve the root cause for diagnostics.

Copilot uses AI. Check for mistakes.
index=index,
op=op,
reason="compact failed",
record=record,
counts_as_failed=True,
)

db._last_replay_report = report
return db

# ------------------------------------------------------------------
Expand Down Expand Up @@ -716,6 +858,14 @@ def export_replay(self, log_path: str) -> None:
if os.path.exists(log_path):
os.remove(log_path)

report: t.Dict[str, t.Any] = {
"checked": 0,
"exported": 0,
"skipped_missing_id": 0,
"skipped_missing_embedding": 0,
"errors": 0,
}

with ReplayWriter(log_path, dimension=dim, sync="strict") as writer:
# Iterate memories by scanning IDs 1..entries range.
# We use a generous upper bound and skip gaps.
Expand All @@ -729,6 +879,7 @@ def export_replay(self, log_path: str) -> None:
mem = self._inner.get(candidate)
embedding = getattr(mem, "embedding", None)
if not embedding:
report["skipped_missing_embedding"] += 1
candidate += 1
checked += 1
continue
Expand All @@ -746,10 +897,15 @@ def export_replay(self, log_path: str) -> None:
metadata=metadata,
)
found += 1
report["exported"] += 1
except CortexaDBNotFoundError:
report["skipped_missing_id"] += 1
except Exception:
pass
report["errors"] += 1
candidate += 1
checked += 1
report["checked"] = checked
self._last_export_replay_report = report

def get(self, mid: int) -> Memory:
"""Retrieve a full memory by ID."""
Expand Down Expand Up @@ -781,6 +937,20 @@ def stats(self) -> Stats:
"""Get database statistics."""
return self._inner.stats()

@property
def last_replay_report(self) -> t.Optional[t.Dict[str, t.Any]]:
"""Diagnostic report from the most recent replay() call."""
if self._last_replay_report is None:
return None
return copy.deepcopy(self._last_replay_report)

@property
def last_export_replay_report(self) -> t.Optional[t.Dict[str, t.Any]]:
"""Diagnostic report from the most recent export_replay() call."""
if self._last_export_replay_report is None:
return None
return copy.deepcopy(self._last_export_replay_report)

def __repr__(self) -> str:
s = self._inner.stats()
embedder_name = type(self._embedder).__name__ if self._embedder else "none"
Expand Down
Loading