Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .ai/skills/check-upstream/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,17 @@ The user may specify an area via `$ARGUMENTS`. If no area is specified or "all"
- Python API: `python/datafusion/functions.py` — each function wraps a call to `datafusion._internal.functions`
- Rust bindings: `crates/core/src/functions.rs` — `#[pyfunction]` definitions registered via `init_module()`

**Evaluated and not requiring separate Python exposure:**
- `get_field_path` — already covered by `get_field(expr, *names)`, which takes a
variadic field path and dispatches to the same underlying
`functions::core::get_field` UDF as the upstream `get_field_path` helper.

**How to check:**
1. Fetch the upstream scalar function documentation page
2. Compare against functions listed in `python/datafusion/functions.py` (check the `__all__` list and function definitions)
3. A function is covered if it exists in the Python API — it does NOT need a dedicated Rust `#[pyfunction]`. Many functions are aliases that reuse another function's Rust binding.
4. Only report functions that are missing from the Python `__all__` list / function definitions
4. Check against the "evaluated and not requiring exposure" list before flagging as a gap
5. Only report functions that are missing from the Python `__all__` list / function definitions

### 2. Aggregate Functions

Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

repos:
- repo: https://github.com/rhysd/actionlint
rev: v1.7.6
rev: v1.7.12
hooks:
- id: actionlint-docker
- repo: https://github.com/astral-sh/ruff-pre-commit
Expand Down
51 changes: 50 additions & 1 deletion crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::TaskContextProvider;
use datafusion::execution::context::{
DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext,
};
Expand All @@ -44,6 +43,7 @@ use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, Unboun
use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::{FunctionRegistry, TaskContextProvider};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions,
};
Expand Down Expand Up @@ -847,6 +847,13 @@ impl PySessionContext {
Ok(())
}

pub fn read_batches(
&self,
batches: PyArrowType<Vec<RecordBatch>>,
) -> PyDataFusionResult<PyDataFrame> {
Ok(PyDataFrame::new(self.ctx.read_batches(batches.0)?))
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (name, path, table_partition_cols=vec![],
parquet_pruning=true,
Expand Down Expand Up @@ -1065,6 +1072,48 @@ impl PySessionContext {
self.ctx.deregister_udwf(name);
}

pub fn udf(&self, name: &str) -> PyResult<PyScalarUDF> {
if !self.ctx.udfs().contains(name) {
return Err(PyKeyError::new_err(format!("no UDF named '{name}'")));
}
let function = (*self.ctx.udf(name).map_err(py_datafusion_err)?).clone();
Ok(PyScalarUDF { function })
}

pub fn udaf(&self, name: &str) -> PyResult<PyAggregateUDF> {
if !self.ctx.udafs().contains(name) {
return Err(PyKeyError::new_err(format!("no UDAF named '{name}'")));
}
let function = (*self.ctx.udaf(name).map_err(py_datafusion_err)?).clone();
Ok(PyAggregateUDF { function })
}

pub fn udwf(&self, name: &str) -> PyResult<PyWindowUDF> {
if !self.ctx.udwfs().contains(name) {
return Err(PyKeyError::new_err(format!("no UDWF named '{name}'")));
}
let function = (*self.ctx.udwf(name).map_err(py_datafusion_err)?).clone();
Ok(PyWindowUDF { function })
}

pub fn udfs(&self) -> Vec<String> {
let mut names: Vec<String> = self.ctx.udfs().into_iter().collect();
names.sort();
names
}

pub fn udafs(&self) -> Vec<String> {
let mut names: Vec<String> = self.ctx.udafs().into_iter().collect();
names.sort();
names
}

pub fn udwfs(&self) -> Vec<String> {
let mut names: Vec<String> = self.ctx.udwfs().into_iter().collect();
names.sort();
names
}

#[pyo3(signature = (name="datafusion"))]
pub fn catalog(&self, py: Python, name: &str) -> PyResult<Py<PyAny>> {
let catalog = self.ctx.catalog(name).ok_or(PyKeyError::new_err(format!(
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,10 @@ expr_fn!(union_tag, arg1);
expr_fn!(random);

#[pyfunction]
fn get_field(expr: PyExpr, name: PyExpr) -> PyExpr {
functions::core::get_field()
.call(vec![expr.into(), name.into()])
.into()
fn get_field(expr: PyExpr, names: Vec<PyExpr>) -> PyExpr {
let mut args = vec![expr.into()];
args.extend(names.into_iter().map(Into::into));
functions::core::get_field().call(args).into()
}

#[pyfunction]
Expand Down
5 changes: 2 additions & 3 deletions examples/datafusion-ffi-example/src/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

use std::sync::Arc;

use datafusion_catalog::{TableFunctionImpl, TableProvider};
use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider};
use datafusion_common::error::Result as DataFusionResult;
use datafusion_expr::Expr;
use datafusion_ffi::udtf::FFI_TableFunction;
use datafusion_python_util::ffi_logical_codec_from_pycapsule;
use pyo3::types::PyCapsule;
Expand Down Expand Up @@ -59,7 +58,7 @@ impl MyTableFunction {
}

impl TableFunctionImpl for MyTableFunction {
fn call(&self, _args: &[Expr]) -> DataFusionResult<Arc<dyn TableProvider>> {
fn call_with_args(&self, _args: TableFunctionArgs) -> DataFusionResult<Arc<dyn TableProvider>> {
let provider = MyTableProvider::new(4, 3, 2).create_table()?;
Ok(Arc::new(provider))
}
Expand Down
208 changes: 201 additions & 7 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,20 @@

if TYPE_CHECKING:
import pathlib
from collections.abc import Sequence
from collections.abc import Iterable, Sequence

import pandas as pd
import polars as pl # type: ignore[import]
from _typeshed import CapsuleType as _PyCapsule

from datafusion.catalog import CatalogProvider, Table
from datafusion.common import DFSchema
from datafusion.expr import Expr, SortKey
from datafusion.plan import ExecutionPlan, LogicalPlan
from datafusion.user_defined import (
AggregateUDF,
LogicalExtensionCodecExportable,
PhysicalExtensionCodecExportable,
ScalarUDF,
TableFunction,
WindowUDF,
Expand Down Expand Up @@ -959,6 +962,52 @@ def register_record_batches(
"""
self.ctx.register_record_batches(name, partitions)

def read_batch(self, batch: pa.RecordBatch) -> DataFrame:
Comment thread
timsaucer marked this conversation as resolved.
"""Return a :py:class:`~datafusion.DataFrame` reading a single batch.

Convenience wrapper around :py:meth:`read_batches` for the single-batch
case. Unlike :py:meth:`register_batch`, this does not register the
batch as a named table; it returns an anonymous
:py:class:`~datafusion.DataFrame` directly.

Args:
batch: Record batch to wrap as a DataFrame.

Examples:
>>> ctx = dfn.SessionContext()
>>> batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3]})
>>> ctx.read_batch(batch).to_pydict()
{'a': [1, 2, 3]}
"""
return self.read_batches([batch])

def read_batches(self, batches: Iterable[pa.RecordBatch]) -> DataFrame:
"""Return a :py:class:`~datafusion.DataFrame` reading the given batches.

All batches must share the same schema. Any iterable of
:py:class:`pa.RecordBatch` is accepted (list, tuple, generator);
it is materialized into a list before being handed to the
underlying Rust binding. Unlike :py:meth:`register_record_batches`,
this does not register the batches as a named table; it returns
an anonymous :py:class:`~datafusion.DataFrame` directly.

Args:
batches: Record batches to wrap as a DataFrame.

Examples:
>>> ctx = dfn.SessionContext()
>>> b1 = pa.RecordBatch.from_pydict({"a": [1, 2]})
>>> b2 = pa.RecordBatch.from_pydict({"a": [3, 4]})
>>> ctx.read_batches([b1, b2]).to_pydict()
{'a': [1, 2, 3, 4]}

A generator works too:

>>> ctx.read_batches(b for b in [b1, b2]).to_pydict()
{'a': [1, 2, 3, 4]}
"""
return DataFrame(self.ctx.read_batches(list(batches)))

def register_parquet(
self,
name: str,
Expand Down Expand Up @@ -1268,6 +1317,145 @@ def deregister_udwf(self, name: str) -> None:
"""
self.ctx.deregister_udwf(name)

def udf(self, name: str) -> ScalarUDF:
"""Look up a registered scalar UDF by name.

Returns the same ``ScalarUDF`` wrapper that :py:meth:`register_udf`
accepts, so it can be invoked as an expression in the DataFrame API
or re-registered into a different :py:class:`SessionContext`.
Built-in scalar functions from the session's function registry are
also looked up.

Args:
name: Name of the registered scalar UDF.

Raises:
KeyError: If no scalar UDF is registered under ``name``.

Examples:
Register a UDF, then look it up by name and use it in the
DataFrame API:

>>> ctx = dfn.SessionContext()
>>> nullcheck = dfn.udf(
... lambda x: x.is_null(),
... [pa.int64()],
... pa.bool_(),
... volatility="immutable",
... name="nullcheck",
... )
>>> ctx.register_udf(nullcheck)
>>> fn = ctx.udf("nullcheck")
>>> df = ctx.from_pydict({"a": [1, None, 3]})
>>> df.select(fn(col("a")).alias("is_null")).to_pydict()
{'is_null': [False, True, False]}

Late-binding: the function name can come from configuration
rather than an imported symbol, which is useful when the set
of UDFs is plugin-driven or chosen at runtime:

>>> config = {"null_check": "nullcheck"}
>>> fn = ctx.udf(config["null_check"])
>>> df.select(fn(col("a")).alias("is_null")).to_pydict()
{'is_null': [False, True, False]}
"""
from datafusion.user_defined import ScalarUDF as _ScalarUDF # noqa: PLC0415

return _ScalarUDF._from_internal(self.ctx.udf(name))

def udaf(self, name: str) -> AggregateUDF:
"""Look up a registered aggregate UDF by name.

Returns the same ``AggregateUDF`` wrapper that :py:meth:`register_udaf`
accepts. Built-in aggregate functions such as ``sum`` or ``avg`` are
also discoverable through this lookup. See :py:meth:`udf` for a worked
late-binding example; the pattern is identical for aggregates.

Args:
name: Name of the registered aggregate UDF.

Raises:
KeyError: If no aggregate UDF is registered under ``name``.

Examples:
Look up a built-in aggregate by name and use it in
:py:meth:`~datafusion.DataFrame.aggregate`:

>>> ctx = dfn.SessionContext()
>>> sum_fn = ctx.udaf("sum")
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
>>> df.aggregate([], [sum_fn(col("a")).alias("total")]).to_pydict()
{'total': [6]}
"""
from datafusion.user_defined import ( # noqa: PLC0415
AggregateUDF as _AggregateUDF,
)

return _AggregateUDF._from_internal(self.ctx.udaf(name))

def udwf(self, name: str) -> WindowUDF:
"""Look up a registered window UDF by name.

Returns the same ``WindowUDF`` wrapper that :py:meth:`register_udwf`
accepts. Built-in window functions such as ``row_number`` or ``rank``
are also discoverable through this lookup. See :py:meth:`udf` for a
worked late-binding example; the pattern is identical for window
functions.

Args:
name: Name of the registered window UDF.

Raises:
KeyError: If no window UDF is registered under ``name``.

Examples:
Look up a built-in window function by name and use it in
``select``:

>>> ctx = dfn.SessionContext()
>>> rn = ctx.udwf("row_number")
>>> df = ctx.from_pydict({"a": [10, 20, 30]})
>>> df.select(col("a"), rn().alias("rn")).to_pydict()
{'a': [10, 20, 30], 'rn': [1, 2, 3]}
"""
from datafusion.user_defined import WindowUDF as _WindowUDF # noqa: PLC0415

return _WindowUDF._from_internal(self.ctx.udwf(name))

def udfs(self) -> list[str]:
"""Return the sorted names of all registered scalar UDFs.

Includes both user-registered and built-in scalar functions. Pair
with :py:meth:`udf` to drive discovery, validation, or config-based
dispatch.

Examples:
>>> ctx = dfn.SessionContext()
>>> "abs" in ctx.udfs()
True
"""
return self.ctx.udfs()

def udafs(self) -> list[str]:
"""Return the sorted names of all registered aggregate UDFs.

Examples:
>>> ctx = dfn.SessionContext()
>>> "sum" in ctx.udafs()
True
"""
return self.ctx.udafs()

def udwfs(self) -> list[str]:
"""Return the sorted names of all registered window UDFs.

Examples:
>>> ctx = dfn.SessionContext()
>>> "row_number" in ctx.udwfs()
True
"""
return self.ctx.udwfs()

def catalog(self, name: str = "datafusion") -> Catalog:
"""Retrieve a catalog by name."""
return Catalog(self.ctx.catalog(name))
Expand Down Expand Up @@ -1744,11 +1932,14 @@ def __datafusion_logical_extension_codec__(self) -> Any:
"""Access the PyCapsule FFI_LogicalExtensionCodec."""
return self.ctx.__datafusion_logical_extension_codec__()

def with_logical_extension_codec(self, codec: Any) -> SessionContext:
def with_logical_extension_codec(
self, codec: LogicalExtensionCodecExportable | _PyCapsule
) -> SessionContext:
"""Create a new session context with specified codec.

This only supports codecs that have been implemented using the
FFI interface.
Only FFI codecs are supported. Pass any object implementing
``__datafusion_logical_extension_codec__`` (see
:py:class:`~datafusion.user_defined.LogicalExtensionCodecExportable`).
"""
new_internal = self.ctx.with_logical_extension_codec(codec)
new = SessionContext.__new__(SessionContext)
Expand All @@ -1759,11 +1950,14 @@ def __datafusion_physical_extension_codec__(self) -> Any:
"""Access the PyCapsule FFI_PhysicalExtensionCodec."""
return self.ctx.__datafusion_physical_extension_codec__()

def with_physical_extension_codec(self, codec: Any) -> SessionContext:
def with_physical_extension_codec(
self, codec: PhysicalExtensionCodecExportable | _PyCapsule
) -> SessionContext:
"""Create a new session context with the specified physical codec.

This only supports codecs that have been implemented using the
FFI interface.
Only FFI codecs are supported. Pass any object implementing
``__datafusion_physical_extension_codec__`` (see
:py:class:`~datafusion.user_defined.PhysicalExtensionCodecExportable`).
"""
new_internal = self.ctx.with_physical_extension_codec(codec)
new = SessionContext.__new__(SessionContext)
Expand Down
Loading
Loading