From 8fc14f75b6ef8a45552c55841b85764069a2c032 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 21 May 2026 18:10:58 +0530 Subject: [PATCH 1/3] fix: custom_datasource example ignores projection pushdown in execute() --- .../custom_data_source/custom_datasource.rs | 42 +++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index 701a886d2a140..e349fadc6d5c1 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -53,6 +53,20 @@ pub async fn custom_datasource() -> Result<()> { search_accounts(db.clone(), Some(col("bank_account").gt(lit(8000u64))), 1).await?; search_accounts(db.clone(), Some(col("bank_account").gt(lit(200u64))), 2).await?; + // exercise SQL paths that push down non-trivial projections: + // - `SELECT 1 ...` requests no source columns (projection: Some([])) + // - `SELECT COUNT(id) ...` requests a single column (projection: Some([0])) + let ctx = SessionContext::new(); + ctx.register_table("accounts", Arc::new(db))?; + ctx.sql("SELECT 1 AS a FROM accounts") + .await? + .collect() + .await?; + ctx.sql("SELECT COUNT(id) FROM accounts") + .await? + .collect() + .await?; + Ok(()) } @@ -187,6 +201,7 @@ impl TableProvider for CustomDataSource { #[derive(Debug, Clone)] struct CustomExec { db: CustomDataSource, + projection: Option>, projected_schema: SchemaRef, cache: Arc, } @@ -202,6 +217,7 @@ impl CustomExec { let cache = Self::compute_properties(projected_schema.clone()); Self { db, + projection: projections.cloned(), projected_schema, cache: Arc::new(cache), } @@ -263,15 +279,25 @@ impl ExecutionPlan for CustomExec { account_array.append_value(user.bank_account); } + // Build a batch holding every column the table can produce, then let + // Arrow drop the columns the query didn't ask for. `RecordBatch::project` + // preserves the row count, which matters when the projection selects + // zero columns (e.g. `SELECT 1 FROM t`). + let full_batch = RecordBatch::try_new( + self.db.schema(), + vec![ + Arc::new(id_array.finish()), + Arc::new(account_array.finish()), + ], + )?; + let batch = match &self.projection { + Some(indices) => full_batch.project(indices)?, + None => full_batch, + }; + Ok(Box::pin(MemoryStream::try_new( - vec![RecordBatch::try_new( - self.projected_schema.clone(), - vec![ - Arc::new(id_array.finish()), - Arc::new(account_array.finish()), - ], - )?], - self.schema(), + vec![batch], + self.projected_schema.clone(), None, )?)) } From 2aa32bcc45e71ac0d3d7905e1af54f62469ee332 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Fri, 22 May 2026 20:43:35 +0530 Subject: [PATCH 2/3] show shape of the result and values --- .../custom_data_source/custom_datasource.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index e349fadc6d5c1..e78ea86ce36ee 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -26,7 +26,7 @@ use async_trait::async_trait; use datafusion::arrow::array::{UInt8Builder, UInt64Builder}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::tree_node::TreeNodeRecursion; +use datafusion::common::{assert_batches_eq, tree_node::TreeNodeRecursion}; use datafusion::datasource::{TableProvider, TableType, provider_as_source}; use datafusion::error::Result; use datafusion::execution::context::TaskContext; @@ -58,14 +58,27 @@ pub async fn custom_datasource() -> Result<()> { // - `SELECT COUNT(id) ...` requests a single column (projection: Some([0])) let ctx = SessionContext::new(); ctx.register_table("accounts", Arc::new(db))?; - ctx.sql("SELECT 1 AS a FROM accounts") + let constant_batches = ctx + .sql("SELECT 1 AS a FROM accounts") .await? .collect() .await?; - ctx.sql("SELECT COUNT(id) FROM accounts") + assert_batches_eq!( + [ + "+---+", "| a |", "+---+", "| 1 |", "| 1 |", "| 1 |", "+---+", + ], + &constant_batches + ); + + let count_batches = ctx + .sql("SELECT COUNT(id) AS cnt FROM accounts") .await? .collect() .await?; + assert_batches_eq!( + ["+-----+", "| cnt |", "+-----+", "| 3 |", "+-----+",], + &count_batches + ); Ok(()) } From 6e1c90beeb19cd05366380fbf1fb6b5f663ff5c2 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Sat, 23 May 2026 15:45:45 +0530 Subject: [PATCH 3/3] fix clippy --- .../examples/custom_data_source/custom_datasource.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index 9fb4b5684123c..a67738520b010 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -26,7 +26,7 @@ use async_trait::async_trait; use datafusion::arrow::array::{UInt8Builder, UInt64Builder}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::{assert_batches_eq, tree_node::TreeNodeRecursion}; +use datafusion::common::assert_batches_eq; use datafusion::datasource::{TableProvider, TableType, provider_as_source}; use datafusion::error::Result; use datafusion::execution::context::TaskContext;