diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d1ebbbbe746..3e3ab3429a2fb 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1151,8 +1151,13 @@ config_namespace! { /// in parallel using the provided `target_partitions` level pub repartition_aggregations: bool, default = true - /// Minimum total files size in bytes to perform file scan repartitioning. - pub repartition_file_min_size: usize, default = 10 * 1024 * 1024 + /// Minimum total file size in bytes for file-group byte-range + /// splitting to fire. Files (or merged file groups) smaller than this + /// stay as one partition. Lower values produce more, smaller + /// partitions — better at filling `target_partitions` worth of cores + /// when files are modestly sized, at the cost of slightly more + /// per-partition open / metadata-load overhead. + pub repartition_file_min_size: usize, default = 1024 * 1024 /// Should DataFusion repartition data using the join keys to execute joins in parallel /// using the provided `target_partitions` level diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index d980e802c83cb..af2c6d41af42e 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -376,7 +376,7 @@ id3 value3 # Reset repartition_file_min_size to default value statement ok -SET datafusion.optimizer.repartition_file_min_size = 10485760; +RESET datafusion.optimizer.repartition_file_min_size; statement ok drop table stored_table_with_cr_terminator; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 1aa9bc79e5bbe..a9c4b1b1e4db9 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -325,7 +325,7 @@ datafusion.optimizer.prefer_existing_union false datafusion.optimizer.prefer_hash_join true datafusion.optimizer.preserve_file_partitions 0 datafusion.optimizer.repartition_aggregations true -datafusion.optimizer.repartition_file_min_size 10485760 +datafusion.optimizer.repartition_file_min_size 1048576 datafusion.optimizer.repartition_file_scans true datafusion.optimizer.repartition_joins true datafusion.optimizer.repartition_sorts true @@ -475,7 +475,7 @@ datafusion.optimizer.prefer_existing_union false When set to true, the optimizer datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory datafusion.optimizer.preserve_file_partitions 0 Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions. datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level -datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. +datafusion.optimizer.repartition_file_min_size 1048576 Minimum total file size in bytes for file-group byte-range splitting to fire. Files (or merged file groups) smaller than this stay as one partition. Lower values produce more, smaller partitions — better at filling `target_partitions` worth of cores when files are modestly sized, at the cost of slightly more per-partition open / metadata-load overhead. datafusion.optimizer.repartition_file_scans true When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. datafusion.optimizer.repartition_joins true Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_sorts true Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below ```text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ``` would turn into the plan below which performs better in multithreaded environments ```text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ``` @@ -895,7 +895,7 @@ show functions statement ok reset datafusion.catalog.information_schema; -# The SLT runner sets `target_partitions` to 4 instead of using the default, so +# The SLT runner sets `target_partitions` to 4 instead of using the default, so # reset it explicitly. statement ok set datafusion.execution.target_partitions = 4; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 576137bda29d1..6087ea855e4af 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -149,7 +149,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | +| datafusion.optimizer.repartition_file_min_size | 1048576 | Minimum total file size in bytes for file-group byte-range splitting to fire. Files (or merged file groups) smaller than this stay as one partition. Lower values produce more, smaller partitions — better at filling `target_partitions` worth of cores when files are modestly sized, at the cost of slightly more per-partition open / metadata-load overhead. | | datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | | datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | | datafusion.optimizer.repartition_file_scans | true | When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. |