Skip to content

Commit e2ea57d

Browse files
committed
row lineage compaction
1 parent 25d4a85 commit e2ea57d

21 files changed

+601
-365
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hive.ql.metadata.Hive;
3838
import org.apache.hadoop.hive.ql.metadata.HiveException;
3939
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
40+
import org.apache.hadoop.hive.ql.metadata.RowLineageUtils;
4041
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
4142
import org.apache.hadoop.hive.ql.parse.TransformSpec;
4243
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -69,27 +70,39 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
6970

7071
HiveConf conf = new HiveConf(context.getConf());
7172
CompactionInfo ci = context.getCompactionInfo();
72-
String compactionQuery = buildCompactionQuery(context, compactTableName, conf);
73+
org.apache.hadoop.hive.ql.metadata.Table hiveTable =
74+
new org.apache.hadoop.hive.ql.metadata.Table(context.getTable());
75+
boolean rowLineageEnabled = RowLineageUtils.supportsRowLineage(hiveTable);
76+
String compactionQuery = buildCompactionQuery(context, compactTableName, conf, rowLineageEnabled);
7377

7478
SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties);
79+
80+
if (rowLineageEnabled) {
81+
RowLineageUtils.enableRowLineage(sessionState);
82+
LOG.debug("Row lineage flag set for compaction of table {}", compactTableName);
83+
}
84+
7585
String compactionTarget = "table " + HiveUtils.unparseIdentifier(compactTableName) +
7686
(ci.partName != null ? ", partition " + HiveUtils.unparseIdentifier(ci.partName) : "");
7787

7888
try {
79-
DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
89+
DriverUtils.runOnDriver(sessionState.getConf(), sessionState, compactionQuery);
8090
LOG.info("Completed compaction for {}", compactionTarget);
8191
return true;
8292
} catch (HiveException e) {
8393
LOG.error("Failed compacting {}", compactionTarget, e);
8494
throw e;
8595
} finally {
96+
RowLineageUtils.disableRowLineage(sessionState);
8697
sessionState.setCompaction(false);
8798
}
8899
}
89100

90-
private String buildCompactionQuery(CompactorContext context, String compactTableName, HiveConf conf)
101+
private String buildCompactionQuery(CompactorContext context, String compactTableName, HiveConf conf,
102+
boolean rowLineageEnabled)
91103
throws HiveException {
92104
CompactionInfo ci = context.getCompactionInfo();
105+
String rowLineageColumns = RowLineageUtils.getRowLineageSelectColumns(rowLineageEnabled);
93106
org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(),
94107
context.getTable().getTableName());
95108
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
@@ -108,18 +121,21 @@ private String buildCompactionQuery(CompactorContext context, String compactTabl
108121
}
109122

110123
if (ci.partName == null) {
124+
String selectColumns = buildSelectColumnList(icebergTable, conf, rowLineageEnabled);
111125
if (!icebergTable.spec().isPartitioned()) {
112126
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
113-
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s %3$s", compactTableName,
127+
compactionQuery = String.format("insert overwrite table %1$s select %2$s%3$s from %1$s %4$s %5$s",
128+
compactTableName, selectColumns, rowLineageColumns,
114129
fileSizePredicate == null ? "" : "where " + fileSizePredicate, orderBy);
115130
} else if (icebergTable.specs().size() > 1) {
116131
// Compacting partitions of old partition specs on a partitioned table with partition evolution
117132
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
118133
// A single filter on a virtual column causes errors during compilation,
119134
// added another filter on file_path as a workaround.
120-
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " +
121-
"where %2$s != %3$d and %4$s is not null %5$s %6$s",
122-
compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
135+
compactionQuery = String.format("insert overwrite table %1$s select %2$s%3$s from %1$s " +
136+
"where %4$s != %5$d and %6$s is not null %7$s %8$s",
137+
compactTableName, selectColumns, rowLineageColumns,
138+
VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
123139
VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy);
124140
} else {
125141
// Partitioned table without partition evolution with partition spec as null in the compaction request - this
@@ -141,14 +157,25 @@ private String buildCompactionQuery(CompactorContext context, String compactTabl
141157
throw new HiveException(e);
142158
}
143159

144-
compactionQuery = String.format("INSERT OVERWRITE TABLE %1$s SELECT * FROM %1$s WHERE %2$s IN " +
145-
"(SELECT FILE_PATH FROM %1$s.FILES WHERE %3$s AND SPEC_ID = %4$d) %5$s %6$s",
146-
compactTableName, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
160+
compactionQuery = String.format("INSERT OVERWRITE TABLE %1$s SELECT *%2$s FROM %1$s WHERE %3$s IN " +
161+
"(SELECT FILE_PATH FROM %1$s.FILES WHERE %4$s AND SPEC_ID = %5$d) %6$s %7$s",
162+
compactTableName, rowLineageColumns, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
147163
fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
148164
}
165+
LOG.info("Compaction query: {}", compactionQuery);
149166
return compactionQuery;
150167
}
151168

169+
/**
170+
* Builds a comma-separated SELECT list from the Iceberg table schema.
171+
*/
172+
private static String buildSelectColumnList(Table icebergTable, HiveConf conf, boolean rowLineageEnabled) {
173+
return icebergTable.schema().columns().stream()
174+
.map(Types.NestedField::name)
175+
.map(col -> HiveUtils.unparseIdentifier(col, conf))
176+
.collect(Collectors.joining(", "));
177+
}
178+
152179
private String buildPartitionPredicate(CompactionInfo ci, PartitionSpec spec) throws MetaException {
153180
Map<String, String> partSpecMap = Warehouse.makeSpecFromName(ci.partName);
154181
Map<String, PartitionField> partitionFieldMap = spec.fields().stream()

iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution.q

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ create table ice_orc (
3131
team_id bigint
3232
)
3333
partitioned by (company_id bigint)
34-
stored by iceberg stored as orc
35-
tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg','compactor.threshold.target.size'='1500');
34+
stored by iceberg
35+
tblproperties ('format-version'='3', 'hive.compactor.worker.pool'='iceberg','compactor.threshold.target.size'='1500');
3636

3737
insert into ice_orc VALUES ('fn1','ln1', 1, 10, 100);
3838
insert into ice_orc VALUES ('fn2','ln2', 1, 10, 100);
@@ -89,10 +89,17 @@ explain select * from default.ice_orc.tag_v4 where company_id is not null;
8989
explain select * from ice_orc where company_id is not null;
9090
set hive.fetch.task.conversion=more;
9191

92+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
93+
FROM ice_orc
94+
ORDER BY ROW__LINEAGE__ID;
95+
9296
explain alter table ice_orc COMPACT 'major' and wait;
9397
alter table ice_orc COMPACT 'major' and wait;
9498

95-
select * from ice_orc;
99+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
100+
FROM ice_orc
101+
ORDER BY ROW__LINEAGE__ID;
102+
96103
describe formatted ice_orc;
97104
show compactions order by 'partition';
98105

iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ create table ice_orc (
3030
event_src string
3131
)
3232
partitioned by spec(truncate(3, event_src))
33-
stored by iceberg stored as orc
34-
tblproperties ('compactor.threshold.target.size'='1500');
33+
stored by iceberg
34+
tblproperties ('format-version'='3', 'compactor.threshold.target.size'='2000');
3535

3636
insert into ice_orc values
3737
(1, cast('2023-07-20 00:00:00' as timestamp with local time zone), 'AAA_1'),
@@ -76,14 +76,18 @@ insert into ice_orc values
7676
(25, cast('2024-09-05 00:00:00' as timestamp with local time zone), 'BBB_1'),
7777
(26, cast('2024-09-06 00:00:00' as timestamp with local time zone), 'BBB_2');
7878

79-
select * from ice_orc;
79+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
80+
FROM ice_orc
81+
ORDER BY ROW__LINEAGE__ID;
8082
describe formatted ice_orc;
8183

8284
alter table ice_orc COMPACT 'major' and wait
8385
where (event_src in ('BBB_1', 'BBB_2') and event_time < '2024-09-01 00:00:00') or
8486
(event_src in ('AAA_1', 'AAA_2') and event_time > '2024-08-01 00:00:00');
8587

86-
select * from ice_orc;
88+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
89+
FROM ice_orc
90+
ORDER BY ROW__LINEAGE__ID;
8791
describe formatted ice_orc;
8892

8993
show compactions order by 'partition';

iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partitioned.q

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ create table ice_orc (
2828
last_name string
2929
)
3030
partitioned by (dept_id bigint)
31-
stored by iceberg stored as orc
32-
tblproperties ('format-version'='2', 'compactor.threshold.target.size'='1500');
31+
stored by iceberg
32+
tblproperties ('format-version'='3', 'compactor.threshold.target.size'='1500');
3333

3434
insert into ice_orc VALUES ('fn1','ln1', 1);
3535
insert into ice_orc VALUES ('fn2','ln2', 1);
@@ -49,13 +49,17 @@ update ice_orc set last_name = 'ln7a' where first_name='fn7';
4949

5050
delete from ice_orc where last_name in ('ln1a', 'ln2a', 'ln7a');
5151

52-
select * from ice_orc;
52+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
53+
FROM ice_orc
54+
ORDER BY ROW__LINEAGE__ID;
5355
describe formatted ice_orc;
5456

5557
explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
5658
alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
5759

58-
select * from ice_orc;
60+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
61+
FROM ice_orc
62+
ORDER BY ROW__LINEAGE__ID;
5963
describe formatted ice_orc;
6064
show compactions order by 'partition';
6165

@@ -82,12 +86,16 @@ update ice_orc set last_name = 'ln18a' where first_name='fn18';
8286

8387
delete from ice_orc where last_name in ('ln11a', 'ln12a', 'ln17a', 'ln18a');
8488

85-
select * from ice_orc;
89+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
90+
FROM ice_orc
91+
ORDER BY ROW__LINEAGE__ID;
8692
describe formatted ice_orc;
8793

8894
explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
8995
alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
9096

91-
select * from ice_orc;
97+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
98+
FROM ice_orc
99+
ORDER BY ROW__LINEAGE__ID;
92100
describe formatted ice_orc;
93101
show compactions order by 'partition';

iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_schema_evolution.q

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ create table ice_orc (
2828
last_name string
2929
)
3030
partitioned by (dept_id bigint)
31-
stored by iceberg stored as orc
32-
tblproperties ('format-version'='2', 'compactor.threshold.target.size'='1500');
31+
stored by iceberg
32+
tblproperties ('format-version'='3', 'compactor.threshold.target.size'='1500');
3333

3434
insert into ice_orc (first_name, last_name, dept_id) VALUES ('fn1','ln1', 1);
3535
insert into ice_orc (first_name, last_name, dept_id) VALUES ('fn2','ln2', 1);
@@ -57,12 +57,16 @@ update ice_orc set last_name = 'ln8a' where fname='fn8';
5757

5858
delete from ice_orc where fname in ('fn1', 'fn3', 'fn7');
5959

60-
select * from ice_orc;
60+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
61+
FROM ice_orc
62+
ORDER BY ROW__LINEAGE__ID;
6163
describe formatted ice_orc;
6264

6365
explain alter table ice_orc COMPACT 'major' and wait;
6466
alter table ice_orc COMPACT 'major' and wait;
6567

66-
select * from ice_orc;
68+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
69+
FROM ice_orc
70+
ORDER BY ROW__LINEAGE__ID;
6771
describe formatted ice_orc;
6872
show compactions order by 'partition';

iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition.q

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ create table ice_orc_wo_evo (
3232
partitioned by (dept_id bigint,
3333
city string,
3434
registration_date date)
35-
stored by iceberg stored as orc
36-
tblproperties ('format-version'='2', 'compactor.threshold.target.size'='1500');
35+
stored by iceberg
36+
tblproperties ('format-version'='3', 'compactor.threshold.target.size'='1500');
3737

3838
insert into ice_orc_wo_evo VALUES ('fn1','ln1',1,'London','2024-03-11');
3939
insert into ice_orc_wo_evo VALUES ('fn2','ln2',1,'London','2024-03-11');
@@ -53,19 +53,25 @@ update ice_orc_wo_evo set last_name = 'ln7a' where first_name='fn7';
5353

5454
delete from ice_orc_wo_evo where last_name in ('ln1a', 'ln2a', 'ln7a');
5555

56-
select * from ice_orc_wo_evo;
56+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
57+
FROM ice_orc_wo_evo
58+
ORDER BY ROW__LINEAGE__ID;
5759
describe formatted ice_orc_wo_evo;
5860

5961
explain alter table ice_orc_wo_evo PARTITION (dept_id=1, city='London', registration_date='2024-03-11') COMPACT 'major' and wait;
6062
alter table ice_orc_wo_evo PARTITION (dept_id=1, city='London', registration_date='2024-03-11') COMPACT 'major' and wait;
6163

62-
select * from ice_orc_wo_evo;
64+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
65+
FROM ice_orc_wo_evo
66+
ORDER BY ROW__LINEAGE__ID;
6367
describe formatted ice_orc_wo_evo;
6468
show compactions order by 'partition';
6569

6670
explain alter table ice_orc_wo_evo PARTITION (dept_id=2, city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait;
6771
alter table ice_orc_wo_evo PARTITION (dept_id=2, city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait;
6872

69-
select * from ice_orc_wo_evo;
73+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
74+
FROM ice_orc_wo_evo
75+
ORDER BY ROW__LINEAGE__ID;
7076
describe formatted ice_orc_wo_evo;
7177
show compactions order by 'partition';

iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned.q

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ create table ice_orc (
2828
first_name string,
2929
last_name string
3030
)
31-
stored by iceberg stored as orc
32-
tblproperties ('format-version'='2', 'compactor.threshold.target.size'='1500');
31+
stored by iceberg
32+
tblproperties ('format-version'='3', 'compactor.threshold.target.size'='1500');
3333

3434
insert into ice_orc VALUES ('fn1','ln1');
3535
insert into ice_orc VALUES ('fn2','ln2');
@@ -49,14 +49,18 @@ update ice_orc set last_name = 'ln7a' where first_name='fn7';
4949

5050
delete from ice_orc where last_name in ('ln5a', 'ln6a', 'ln7a');
5151

52-
select * from ice_orc;
52+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
53+
FROM ice_orc
54+
ORDER BY ROW__LINEAGE__ID;
5355
describe formatted ice_orc;
5456

5557
explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
5658
explain optimize table ice_orc rewrite data pool 'iceberg';
5759

5860
alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
5961

60-
select * from ice_orc;
62+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
63+
FROM ice_orc
64+
ORDER BY ROW__LINEAGE__ID;
6165
describe formatted ice_orc;
6266
show compactions;

iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_unpartitioned_w_filter.q

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ create table ice_orc (
2828
first_name string,
2929
last_name string
3030
)
31-
stored by iceberg stored as orc
32-
tblproperties ('format-version'='2', 'compactor.threshold.target.size'='1500');
31+
stored by iceberg
32+
tblproperties ('format-version'='3', 'compactor.threshold.target.size'='1500');
3333

3434
insert into ice_orc VALUES ('fn1','ln1');
3535
insert into ice_orc VALUES ('fn2','ln2');
@@ -49,12 +49,16 @@ update ice_orc set last_name = 'ln7a' where first_name='fn7';
4949

5050
delete from ice_orc where last_name in ('ln5a', 'ln6a', 'ln7a');
5151

52-
select * from ice_orc;
52+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
53+
FROM ice_orc
54+
ORDER BY ROW__LINEAGE__ID;
5355
describe formatted ice_orc;
5456

5557
explain alter table ice_orc COMPACT 'major' and wait where last_name in ('ln1a', 'ln2a');
5658
alter table ice_orc COMPACT 'major' and wait where last_name in ('ln1a', 'ln2a');
5759

58-
select * from ice_orc;
60+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
61+
FROM ice_orc
62+
ORDER BY ROW__LINEAGE__ID;
5963
describe formatted ice_orc;
6064
show compactions;

iceberg/iceberg-handler/src/test/queries/positive/iceberg_minor_compaction_bucket.q

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ set hive.convert.join.bucket.mapjoin.tez=true;
2626

2727
CREATE TABLE srcbucket_big(id string, key int, value string)
2828
PARTITIONED BY SPEC(bucket(4, key)) STORED BY ICEBERG
29-
TBLPROPERTIES ('compactor.threshold.min.input.files'='1');
29+
TBLPROPERTIES ('format-version'='3', 'compactor.threshold.min.input.files'='1');
3030

3131
INSERT INTO srcbucket_big VALUES
3232
('a', 101, 'val_101'),
@@ -47,7 +47,9 @@ INSERT INTO srcbucket_big VALUES
4747
('l', null, null);
4848

4949
desc formatted default.srcbucket_big;
50-
SELECT * FROM default.srcbucket_big ORDER BY id;
50+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
51+
FROM default.srcbucket_big
52+
ORDER BY id;
5153

5254
select `partition`, spec_id, record_count
5355
from default.srcbucket_big.partitions
@@ -57,7 +59,9 @@ alter table srcbucket_big compact 'minor' and wait;
5759
show compactions order by 'partition';
5860

5961
desc formatted default.srcbucket_big;
60-
SELECT * FROM default.srcbucket_big ORDER BY id;
62+
SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
63+
FROM default.srcbucket_big
64+
ORDER BY id;
6165

6266
select `partition`, spec_id, record_count
6367
from default.srcbucket_big.partitions

0 commit comments

Comments
 (0)