@@ -107,69 +107,99 @@ private String buildCompactionQuery(CompactorContext context, String compactTabl
107107 context .getTable ().getTableName ());
108108 Table icebergTable = IcebergTableUtil .getTable (conf , table .getTTable ());
109109 String orderBy = ci .orderByClause == null ? "" : ci .orderByClause ;
110- String fileSizePredicate = null ;
111- String compactionQuery ;
112-
113- if (ci .type == CompactionType .MINOR ) {
114- long fileSizeInBytesThreshold = CompactionEvaluator .getFragmentSizeBytes (table .getParameters ());
115- fileSizePredicate = String .format ("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)" ,
116- VirtualColumn .FILE_PATH .getName (), compactTableName , fileSizeInBytesThreshold );
117- conf .setLong (CompactorContext .COMPACTION_FILE_SIZE_THRESHOLD , fileSizeInBytesThreshold );
118- // IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
119- // doesn't support vectorization, hence disabling it in this case.
120- conf .setBoolVar (ConfVars .HIVE_VECTORIZATION_ENABLED , false );
110+ String fileSizePredicate = buildMinorFileSizePredicate (ci , compactTableName , conf , table );
111+
112+ String compactionQuery = (ci .partName == null )
113+ ? buildFullTableCompactionQuery (
114+ compactTableName , conf , icebergTable , rowLineageColumns , fileSizePredicate , orderBy )
115+ : buildPartitionCompactionQuery (
116+ ci , compactTableName , conf , icebergTable , rowLineageColumns , fileSizePredicate , orderBy );
117+
118+ LOG .info ("Compaction query: {}" , compactionQuery );
119+ return compactionQuery ;
120+ }
121+
122+ private static String buildMinorFileSizePredicate (
123+ CompactionInfo ci , String compactTableName , HiveConf conf , org .apache .hadoop .hive .ql .metadata .Table table ) {
124+ if (ci .type != CompactionType .MINOR ) {
125+ return null ;
121126 }
122127
123- if (ci .partName == null ) {
124- String selectColumns = buildSelectColumnList (icebergTable , conf , rowLineageEnabled );
125- if (!icebergTable .spec ().isPartitioned ()) {
126- HiveConf .setVar (conf , ConfVars .REWRITE_POLICY , RewritePolicy .FULL_TABLE .name ());
127- compactionQuery = String .format ("insert overwrite table %1$s select %2$s%3$s from %1$s %4$s %5$s" ,
128- compactTableName , selectColumns , rowLineageColumns ,
129- fileSizePredicate == null ? "" : "where " + fileSizePredicate , orderBy );
130- } else if (icebergTable .specs ().size () > 1 ) {
131- // Compacting partitions of old partition specs on a partitioned table with partition evolution
132- HiveConf .setVar (conf , ConfVars .REWRITE_POLICY , RewritePolicy .PARTITION .name ());
133- // A single filter on a virtual column causes errors during compilation,
134- // added another filter on file_path as a workaround.
135- compactionQuery = String .format ("insert overwrite table %1$s select %2$s%3$s from %1$s " +
136- "where %4$s != %5$d and %6$s is not null %7$s %8$s" ,
137- compactTableName , selectColumns , rowLineageColumns ,
138- VirtualColumn .PARTITION_SPEC_ID .getName (), icebergTable .spec ().specId (),
139- VirtualColumn .FILE_PATH .getName (), fileSizePredicate == null ? "" : "and " + fileSizePredicate , orderBy );
140- } else {
141- // Partitioned table without partition evolution with partition spec as null in the compaction request - this
142- // code branch is not supposed to be reachable
143- throw new HiveException (ErrorMsg .COMPACTION_NO_PARTITION );
144- }
145- } else {
146- HiveConf .setBoolVar (conf , ConfVars .HIVE_CONVERT_JOIN , false );
147- conf .setBoolVar (ConfVars .HIVE_VECTORIZATION_ENABLED , false );
128+ long fileSizeInBytesThreshold = CompactionEvaluator .getFragmentSizeBytes (table .getParameters ());
129+ conf .setLong (CompactorContext .COMPACTION_FILE_SIZE_THRESHOLD , fileSizeInBytesThreshold );
130+ // IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
131+ // doesn't support vectorization, hence disabling it in this case.
132+ conf .setBoolVar (ConfVars .HIVE_VECTORIZATION_ENABLED , false );
133+
134+ return String .format ("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)" ,
135+ VirtualColumn .FILE_PATH .getName (), compactTableName , fileSizeInBytesThreshold );
136+ }
137+
138+ private String buildFullTableCompactionQuery (
139+ String compactTableName ,
140+ HiveConf conf ,
141+ Table icebergTable ,
142+ String rowLineageColumns ,
143+ String fileSizePredicate ,
144+ String orderBy ) throws HiveException {
145+ String selectColumns = buildSelectColumnList (icebergTable , conf );
146+
147+ if (!icebergTable .spec ().isPartitioned ()) {
148+ HiveConf .setVar (conf , ConfVars .REWRITE_POLICY , RewritePolicy .FULL_TABLE .name ());
149+ return String .format ("insert overwrite table %1$s select %2$s%3$s from %1$s %4$s %5$s" ,
150+ compactTableName , selectColumns , rowLineageColumns ,
151+ fileSizePredicate == null ? "" : "where " + fileSizePredicate , orderBy );
152+ }
153+
154+ if (icebergTable .specs ().size () > 1 ) {
155+ // Compacting partitions of old partition specs on a partitioned table with partition evolution
148156 HiveConf .setVar (conf , ConfVars .REWRITE_POLICY , RewritePolicy .PARTITION .name ());
149- conf .set (IcebergCompactionService .PARTITION_PATH , new Path (ci .partName ).toString ());
150-
151- PartitionSpec spec ;
152- String partitionPredicate ;
153- try {
154- spec = IcebergTableUtil .getPartitionSpec (icebergTable , ci .partName );
155- partitionPredicate = buildPartitionPredicate (ci , spec );
156- } catch (MetaException e ) {
157- throw new HiveException (e );
158- }
157+ // A single filter on a virtual column causes errors during compilation,
158+ // added another filter on file_path as a workaround.
159+ return String .format ("insert overwrite table %1$s select %2$s%3$s from %1$s " +
160+ "where %4$s != %5$d and %6$s is not null %7$s %8$s" ,
161+ compactTableName , selectColumns , rowLineageColumns ,
162+ VirtualColumn .PARTITION_SPEC_ID .getName (), icebergTable .spec ().specId (),
163+ VirtualColumn .FILE_PATH .getName (), fileSizePredicate == null ? "" : "and " + fileSizePredicate , orderBy );
164+ }
165+
166+ // Partitioned table without partition evolution with partition spec as null in the compaction request - this
167+ // code branch is not supposed to be reachable
168+ throw new HiveException (ErrorMsg .COMPACTION_NO_PARTITION );
169+ }
159170
160- compactionQuery = String .format ("INSERT OVERWRITE TABLE %1$s SELECT *%2$s FROM %1$s WHERE %3$s IN " +
161- "(SELECT FILE_PATH FROM %1$s.FILES WHERE %4$s AND SPEC_ID = %5$d) %6$s %7$s" ,
162- compactTableName , rowLineageColumns , VirtualColumn .FILE_PATH .getName (), partitionPredicate , spec .specId (),
163- fileSizePredicate == null ? "" : "AND " + fileSizePredicate , orderBy );
171+ private String buildPartitionCompactionQuery (
172+ CompactionInfo ci ,
173+ String compactTableName ,
174+ HiveConf conf ,
175+ Table icebergTable ,
176+ String rowLineageColumns ,
177+ String fileSizePredicate ,
178+ String orderBy ) throws HiveException {
179+ HiveConf .setBoolVar (conf , ConfVars .HIVE_CONVERT_JOIN , false );
180+ conf .setBoolVar (ConfVars .HIVE_VECTORIZATION_ENABLED , false );
181+ HiveConf .setVar (conf , ConfVars .REWRITE_POLICY , RewritePolicy .PARTITION .name ());
182+ conf .set (IcebergCompactionService .PARTITION_PATH , new Path (ci .partName ).toString ());
183+
184+ PartitionSpec spec ;
185+ String partitionPredicate ;
186+ try {
187+ spec = IcebergTableUtil .getPartitionSpec (icebergTable , ci .partName );
188+ partitionPredicate = buildPartitionPredicate (ci , spec );
189+ } catch (MetaException e ) {
190+ throw new HiveException (e );
164191 }
165- LOG .info ("Compaction query: {}" , compactionQuery );
166- return compactionQuery ;
192+
193+ return String .format ("INSERT OVERWRITE TABLE %1$s SELECT *%2$s FROM %1$s WHERE %3$s IN " +
194+ "(SELECT FILE_PATH FROM %1$s.FILES WHERE %4$s AND SPEC_ID = %5$d) %6$s %7$s" ,
195+ compactTableName , rowLineageColumns , VirtualColumn .FILE_PATH .getName (), partitionPredicate , spec .specId (),
196+ fileSizePredicate == null ? "" : "AND " + fileSizePredicate , orderBy );
167197 }
168198
169199 /**
170200 * Builds a comma-separated SELECT list from the Iceberg table schema.
171201 */
172- private static String buildSelectColumnList (Table icebergTable , HiveConf conf , boolean rowLineageEnabled ) {
202+ private static String buildSelectColumnList (Table icebergTable , HiveConf conf ) {
173203 return icebergTable .schema ().columns ().stream ()
174204 .map (Types .NestedField ::name )
175205 .map (col -> HiveUtils .unparseIdentifier (col , conf ))
0 commit comments