Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ futures-channel = "0.3.31"
futures = "0.3.31"
regex = "1.11.1"
once_cell = "1.20.3"
dashmap = "6"
10 changes: 6 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ ignore = [
[tool.ruff.lint.per-file-ignores]
"python/psqlpy/*" = ["PYI021"]
"python/tests/*" = [
"S101", # Use of assert detected
"S608", # Possible SQL injection vector through string-based query construction
"D103", # Missing docstring in public function
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
"S101", # Use of assert detected
"S608", # Possible SQL injection via string-based query construction
"D103", # Missing docstring in public function
"S311", # Standard pseudo-random generators not suitable for security
"PLR2004", # Magic value in comparison (common in test assertions)
"D205", # 1 blank line required between summary and description
]
"python/psqlpy/_internal/exceptions.pyi" = [
"D205",
Expand Down
36 changes: 36 additions & 0 deletions python/psqlpy/_internal/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,33 @@ _RowFactoryRV = TypeVar(

ParamsT: TypeAlias = Sequence[Any] | Mapping[str, Any] | None

class Record:
"""An asyncpg-compatible row type with eagerly decoded column values.

Supports positional indexing (`row[0]`), by-name indexing (`row["col"]`),
slicing, iteration, and dict-like access methods.
"""

def __len__(self) -> int: ...
@typing.overload
def __getitem__(self, key: int) -> Any: ...
@typing.overload
def __getitem__(self, key: str) -> Any: ...
@typing.overload
def __getitem__(self, key: slice) -> list[Any]: ...
def __iter__(self) -> typing.Iterator[Any]: ...
def get(self, key: str, default: Any = None) -> Any:
"""Return column value by name, or `default` if the column does not exist."""

def keys(self) -> list[str]:
"""Return ordered list of column names."""

def values(self) -> list[Any]:
"""Return ordered list of column values."""

def items(self) -> list[tuple[str, Any]]:
"""Return ordered list of (column_name, value) pairs."""

class QueryResult:
"""Result."""

Expand Down Expand Up @@ -107,6 +134,15 @@ class QueryResult:
List of type that return passed `row_factory`.
"""

def records(self: Self) -> list[Record]:
"""Return result as a list of Record instances.

Each Record shares column metadata with others from the same result set.
Supports positional/by-name indexing and dict-like access.
Unlike `result()`, the column-name lookup table is shared, not
re-created per row.
"""

class SingleQueryResult:
"""Single result."""

Expand Down
141 changes: 140 additions & 1 deletion python/tests/test_copy_records.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import typing
from datetime import datetime, timezone
import uuid
from datetime import date, datetime, timezone
from decimal import Decimal

import pytest
from psqlpy import ConnectionPool
Expand Down Expand Up @@ -172,3 +174,140 @@ async def test_copy_records_to_table_uses_schema_qualifier(
finally:
async with psql_pool.acquire() as connection:
await connection.execute(f"DROP SCHEMA IF EXISTS {schema} CASCADE")


async def test_copy_records_heterogeneous_types(
psql_pool: ConnectionPool,
) -> None:
"""Characterization test: covers int, float, text, bytea, UUID, numeric,
date, timestamp, NULL, and array column types (AC-3.4).
"""
target: typing.Final = "copy_records_hetero"

async with psql_pool.acquire() as connection:
await connection.execute(f"DROP TABLE IF EXISTS {target}")
await connection.execute(
f"""
CREATE TABLE {target} (
col_int INTEGER,
col_float DOUBLE PRECISION,
col_text TEXT,
col_bytea BYTEA,
col_uuid UUID,
col_numeric NUMERIC,
col_date DATE,
col_ts TIMESTAMPTZ,
col_null TEXT,
col_arr INTEGER[]
)
""",
)

try:
sample_uuid = uuid.uuid4()
records = [
(
42,
3.14,
"hello",
b"\x00\x01\x02",
sample_uuid,
Decimal("12345.6789"),
date(2024, 6, 1),
datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc),
None,
[1, 2, 3],
),
]

async with psql_pool.acquire() as connection:
inserted = await connection.copy_records_to_table(
table_name=target,
records=records,
)

assert inserted == 1

async with psql_pool.acquire() as connection:
result = await connection.execute(f"SELECT * FROM {target}")
row = result.result()[0]
assert row["col_int"] == 42
assert abs(row["col_float"] - 3.14) < 1e-9
assert row["col_text"] == "hello"
assert bytes(row["col_bytea"]) == b"\x00\x01\x02"
assert row["col_uuid"] == str(sample_uuid)
assert row["col_numeric"] == Decimal("12345.6789")
assert row["col_date"] == date(2024, 6, 1)
assert row["col_null"] is None
assert row["col_arr"] == [1, 2, 3]
finally:
async with psql_pool.acquire() as connection:
await connection.execute(f"DROP TABLE IF EXISTS {target}")


async def test_copy_records_introspection_cache(
psql_pool: ConnectionPool,
) -> None:
"""Second call to copy_records_to_table against the same table should not
issue a new column-type introspection PREPARE (AC-4.3).
"""
target: typing.Final = "copy_records_cache_test"
records = [(1, "first"), (2, "second")]

async with psql_pool.acquire() as connection:
await connection.execute(f"DROP TABLE IF EXISTS {target}")
await connection.execute(
f"CREATE TABLE {target} (id INTEGER, label TEXT)",
)

# Snapshot introspection query count before — use pg_stat_statements if available.
introspect_pattern = f"%{target}%WHERE false%"
pre_calls: int | None = None
try:
async with psql_pool.acquire() as connection:
res = await connection.execute(
"SELECT COALESCE(SUM(calls), 0) AS n FROM pg_stat_statements "
"WHERE query ILIKE $1",
parameters=[introspect_pattern],
)
pre_calls = res.result()[0]["n"]
except Exception: # noqa: BLE001, S110
pass # pg_stat_statements not available — skip count check

try:
async with psql_pool.acquire() as connection:
# First call — populates the cache.
await connection.copy_records_to_table(
table_name=target,
records=records[:1],
)
# Second call on the same connection — must hit the type cache.
await connection.copy_records_to_table(
table_name=target,
records=records[1:],
)

# Verify both rows were written correctly.
async with psql_pool.acquire() as connection:
result = await connection.execute(
f"SELECT id, label FROM {target} ORDER BY id",
)
rows = [(r["id"], r["label"]) for r in result.result()]
assert rows == [(1, "first"), (2, "second")]

# Verify only one introspection query was issued (cache hit on second call).
if pre_calls is not None:
async with psql_pool.acquire() as connection:
res = await connection.execute(
"SELECT COALESCE(SUM(calls), 0) AS n FROM pg_stat_statements "
"WHERE query ILIKE $1",
parameters=[introspect_pattern],
)
post_calls = res.result()[0]["n"]
# At most one introspection PREPARE should be issued (cache hit on call 2).
assert post_calls - pre_calls <= 1, (
f"Expected at most 1 introspection call, got {post_calls - pre_calls}"
)
finally:
async with psql_pool.acquire() as connection:
await connection.execute(f"DROP TABLE IF EXISTS {target}")
Loading