Skip to content

Commit e9cf755

Browse files
committed
HIVE-29242: Add catalog for transaction module
1 parent 8d63caf commit e9cf755

File tree

247 files changed

+5853
-1160
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

247 files changed

+5853
-1160
lines changed

hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGen
668668
return;
669669
}
670670
CommitTxnMessage msg =
671-
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId(), commitTxnEvent.getDatabases(), commitTxnEvent.getWriteId());
671+
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId(), commitTxnEvent.getCatalogs(), commitTxnEvent.getDatabases(), commitTxnEvent.getWriteId());
672672

673673
NotificationEvent event =
674674
new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(),
@@ -840,11 +840,12 @@ public void onAddCheckConstraint(AddCheckConstraintEvent addCheckConstraintEvent
840840
*/
841841
@Override
842842
public void onDropConstraint(DropConstraintEvent dropConstraintEvent) throws MetaException {
843+
String catName = dropConstraintEvent.getCatName();
843844
String dbName = dropConstraintEvent.getDbName();
844845
String tableName = dropConstraintEvent.getTableName();
845846
String constraintName = dropConstraintEvent.getConstraintName();
846847
DropConstraintMessage msg = MessageBuilder.getInstance()
847-
.buildDropConstraintMessage(dbName, tableName, constraintName);
848+
.buildDropConstraintMessage(catName, dbName, tableName, constraintName);
848849
NotificationEvent event =
849850
new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(),
850851
msgEncoder.getSerializer().serialize(msg));
@@ -863,8 +864,9 @@ public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent, Connection dbCon
863864
throws MetaException {
864865
String tableName = allocWriteIdEvent.getTableName();
865866
String dbName = allocWriteIdEvent.getDbName();
867+
String catName = allocWriteIdEvent.getCatName();
866868
AllocWriteIdMessage msg = MessageBuilder.getInstance()
867-
.buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName);
869+
.buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), catName, dbName, tableName);
868870
NotificationEvent event =
869871
new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(),
870872
msgEncoder.getSerializer().serialize(msg)
@@ -914,6 +916,7 @@ public void onBatchAcidWrite(BatchAcidWriteEvent batchAcidWriteEvent, Connection
914916
NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(),
915917
msgEncoder.getSerializer().serialize(msg));
916918
event.setMessageFormat(msgEncoder.getMessageFormat());
919+
event.setCatName(batchAcidWriteEvent.getCatalog(i));
917920
event.setDbName(batchAcidWriteEvent.getDatabase(i));
918921
event.setTableName(batchAcidWriteEvent.getTable(i));
919922
eventBatch.add(event);
@@ -946,8 +949,8 @@ public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumn
946949
@Override
947950
public void onDeleteTableColumnStat(DeleteTableColumnStatEvent deleteTableColumnStatEvent) throws MetaException {
948951
DeleteTableColumnStatMessage msg = MessageBuilder.getInstance()
949-
.buildDeleteTableColumnStatMessage(deleteTableColumnStatEvent.getDBName(),
950-
deleteTableColumnStatEvent.getColName());
952+
.buildDeleteTableColumnStatMessage(deleteTableColumnStatEvent.getCatName(),
953+
deleteTableColumnStatEvent.getDBName(), deleteTableColumnStatEvent.getColName());
951954
NotificationEvent event = new NotificationEvent(0, now(), EventType.DELETE_TABLE_COLUMN_STAT.toString(),
952955
msgEncoder.getSerializer().serialize(msg));
953956
event.setCatName(deleteTableColumnStatEvent.getCatName());
@@ -1008,9 +1011,9 @@ public void onUpdatePartitionColumnStatInBatch(UpdatePartitionColumnStatEventBat
10081011
@Override
10091012
public void onDeletePartitionColumnStat(DeletePartitionColumnStatEvent deletePartColStatEvent) throws MetaException {
10101013
DeletePartitionColumnStatMessage msg = MessageBuilder.getInstance()
1011-
.buildDeletePartitionColumnStatMessage(deletePartColStatEvent.getDBName(),
1012-
deletePartColStatEvent.getColName(), deletePartColStatEvent.getPartName(),
1013-
deletePartColStatEvent.getPartVals());
1014+
.buildDeletePartitionColumnStatMessage(deletePartColStatEvent.getCatName(),
1015+
deletePartColStatEvent.getDBName(), deletePartColStatEvent.getColName(),
1016+
deletePartColStatEvent.getPartName(), deletePartColStatEvent.getPartVals());
10141017
NotificationEvent event = new NotificationEvent(0, now(), EventType.DELETE_PARTITION_COLUMN_STAT.toString(),
10151018
msgEncoder.getSerializer().serialize(msg));
10161019
event.setCatName(deletePartColStatEvent.getCatName());
@@ -1205,25 +1208,28 @@ private void addWriteNotificationLog(List<NotificationEvent> eventBatch, List<Ac
12051208
ResultSet rs = null;
12061209
String select = sqlGenerator.addForUpdateClause("select \"WNL_ID\", \"WNL_FILES\" from" +
12071210
" \"TXN_WRITE_NOTIFICATION_LOG\" " +
1208-
"where \"WNL_DATABASE\" = ? " +
1211+
"where \"WNL_CATALOG\" = ? " +
1212+
"and \"WNL_DATABASE\" = ? " +
12091213
"and \"WNL_TABLE\" = ? " + " and (\"WNL_PARTITION\" = ? OR (? IS NULL AND \"WNL_PARTITION\" IS NULL)) " +
12101214
"and \"WNL_TXNID\" = ? ");
12111215
List<Integer> insertList = new ArrayList<>();
12121216
Map<Integer, Pair<Long, String>> updateMap = new HashMap<>();
12131217
try (PreparedStatement pst = dbConn.prepareStatement(select)) {
12141218
for (int i = 0; i < acidWriteEventList.size(); i++) {
1219+
String catName = acidWriteEventList.get(i).getCatalog();
12151220
String dbName = acidWriteEventList.get(i).getDatabase();
12161221
String tblName = acidWriteEventList.get(i).getTable();
12171222
String partition = acidWriteEventList.get(i).getPartition();
12181223
Long txnId = acidWriteEventList.get(i).getTxnId();
12191224

12201225
LOG.debug("Going to execute query <" + select.replaceAll("\\?", "{}") + ">",
1221-
quoteString(dbName), quoteString(tblName), quoteString(partition));
1222-
pst.setString(1, dbName);
1223-
pst.setString(2, tblName);
1224-
pst.setString(3, partition);
1226+
quoteString(catName), quoteString(dbName), quoteString(tblName), quoteString(partition));
1227+
pst.setString(1, catName);
1228+
pst.setString(2, dbName);
1229+
pst.setString(3, tblName);
12251230
pst.setString(4, partition);
1226-
pst.setLong(5, txnId);
1231+
pst.setString(5, partition);
1232+
pst.setLong(6, txnId);
12271233
rs = pst.executeQuery();
12281234
if (!rs.next()) {
12291235
insertList.add(i);
@@ -1244,15 +1250,16 @@ private void addWriteNotificationLog(List<NotificationEvent> eventBatch, List<Ac
12441250
"org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog", insertList.size());
12451251

12461252
String insert = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " +
1247-
"(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
1253+
"(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_CATALOG\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
12481254
"\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " +
1249-
"\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?)";
1255+
"\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?,?)";
12501256
try (PreparedStatement pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(insert))) {
12511257
numRows = 0;
12521258
for (int idx : insertList) {
12531259
String tableObj = msgBatch.get(idx).getTableObjStr();
12541260
String partitionObj = msgBatch.get(idx).getPartitionObjStr();
12551261
String files = ReplChangeManager.joinWithSeparator(msgBatch.get(idx).getFiles());
1262+
String catName = acidWriteEventList.get(idx).getCatalog();
12561263
String dbName = acidWriteEventList.get(idx).getDatabase();
12571264
String tblName = acidWriteEventList.get(idx).getTable();
12581265
String partition = acidWriteEventList.get(idx).getPartition();
@@ -1261,16 +1268,17 @@ private void addWriteNotificationLog(List<NotificationEvent> eventBatch, List<Ac
12611268
pst.setLong(1, nextNLId++);
12621269
pst.setLong(2, acidWriteEventList.get(idx).getTxnId());
12631270
pst.setLong(3, acidWriteEventList.get(idx).getWriteId());
1264-
pst.setString(4, dbName);
1265-
pst.setString(5, tblName);
1266-
pst.setString(6, partition);
1267-
pst.setString(7, tableObj);
1268-
pst.setString(8, partitionObj);
1269-
pst.setString(9, files);
1270-
pst.setInt(10, currentTime);
1271+
pst.setString(4, catName);
1272+
pst.setString(5, dbName);
1273+
pst.setString(6, tblName);
1274+
pst.setString(7, partition);
1275+
pst.setString(8, tableObj);
1276+
pst.setString(9, partitionObj);
1277+
pst.setString(10, files);
1278+
pst.setInt(11, currentTime);
12711279
LOG.debug("Going to execute insert <" + insert.replaceAll("\\?", "{}") + ">", nextNLId
12721280
, acidWriteEventList.get(idx).getTxnId(), acidWriteEventList.get(idx).getWriteId()
1273-
, quoteString(dbName), quoteString(tblName),
1281+
, quoteString(catName), quoteString(dbName), quoteString(tblName),
12741282
quoteString(partition), quoteString(tableObj), quoteString(partitionObj), quoteString(files), currentTime);
12751283
pst.addBatch();
12761284
numRows++;

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private void addCompactionTargetIfEligible(Table table, org.apache.iceberg.Table
139139
Set<CompactionInfo> compactions, ShowCompactResponse currentCompactions, Set<String> skipDBs,
140140
Set<String> skipTables) {
141141

142-
CompactionInfo ci = new CompactionInfo(table.getDbName(), table.getTableName(), partitionName,
142+
CompactionInfo ci = new CompactionInfo(table.getCatName(), table.getDbName(), table.getTableName(), partitionName,
143143
CompactionType.SMART_OPTIMIZE);
144144

145145
// Common Hive compaction eligibility checks

itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,13 +814,14 @@ private List<Long> allocateTxns(int numTxns) throws Throwable {
814814

815815
private List<TxnToWriteId> allocateWriteIds(List<Long> txnIds, String dbName, String tblName) throws Throwable {
816816
AllocateTableWriteIdsRequest allocateTableWriteIdsRequest = new AllocateTableWriteIdsRequest(dbName, tblName);
817+
allocateTableWriteIdsRequest.setCatName(Warehouse.DEFAULT_CATALOG_NAME);
817818
allocateTableWriteIdsRequest.setTxnIds(txnIds);
818819
return hmsHandler.allocate_table_write_ids(allocateTableWriteIdsRequest).getTxnToWriteIds();
819820
}
820821

821822
private String getValidWriteIds(String dbName, String tblName) throws Throwable {
822823
GetValidWriteIdsRequest validWriteIdsRequest = new GetValidWriteIdsRequest(
823-
Collections.singletonList(TableName.getDbTable(dbName, tblName)));
824+
Collections.singletonList(TableName.getQualified(Warehouse.DEFAULT_CATALOG_NAME, dbName, tblName)));
824825
GetValidWriteIdsResponse validWriteIdsResponse = hmsHandler.get_valid_write_ids(validWriteIdsRequest);
825826
return TxnCommonUtils.createValidReaderWriteIdList(validWriteIdsResponse.
826827
getTblValidWriteIds().get(0)).writeToString();

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryCatName, Strin
355355
List<Long> txns, HiveConf primaryConf) throws Throwable {
356356
AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest();
357357
rqst.setDbName(primaryDbName);
358+
rqst.setCatName(primaryCatName);
358359
List<Long> lockIds = new ArrayList<>();
359360
for(Map.Entry<String, Long> entry : tables.entrySet()) {
360361
rqst.setTableName(entry.getKey());
@@ -373,17 +374,17 @@ List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryCatName, Strin
373374
lockIds.add(txnHandler.lock(lockRequest).getLockid());
374375
}
375376
}
376-
verifyWriteIdsForTables(tables, primaryConf, primaryDbName);
377+
verifyWriteIdsForTables(tables, primaryConf, PRIMARY_CAT_NAME, primaryDbName);
377378
return lockIds;
378379
}
379380

380-
void verifyWriteIdsForTables(Map<String, Long> tables, HiveConf conf, String dbName)
381+
void verifyWriteIdsForTables(Map<String, Long> tables, HiveConf conf, String catName, String dbName)
381382
throws Throwable {
382383
for(Map.Entry<String, Long> entry : tables.entrySet()) {
383384
Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from TXN_TO_WRITE_ID"),
384385
entry.getValue().longValue(),
385386
TestTxnDbUtil.countQueryAgent(conf,
386-
"select count(*) from TXN_TO_WRITE_ID where t2w_database = '"
387+
"select count(*) from TXN_TO_WRITE_ID where t2w_catalog = '" + catName.toLowerCase() + "' and t2w_database = '"
387388
+ dbName.toLowerCase()
388389
+ "' and t2w_table = '" + entry.getKey() + "'"));
389390
}
@@ -411,25 +412,26 @@ void verifyAllOpenTxnsNotAborted(List<Long> txns, HiveConf primaryConf) throws T
411412
"select count(*) from TXNS where txn_state = 'a' and " + txnIdRange));
412413
}
413414

414-
void verifyNextId(Map<String, Long> tables, String dbName, HiveConf conf) throws Throwable {
415+
void verifyNextId(Map<String, Long> tables, String catName, String dbName, HiveConf conf) throws Throwable {
415416
// Verify the next write id
416417
for(Map.Entry<String, Long> entry : tables.entrySet()) {
417418
String[] nextWriteId =
418419
TestTxnDbUtil.queryToString(conf,
419-
"select nwi_next from NEXT_WRITE_ID where nwi_database = '"
420+
"select nwi_next from NEXT_WRITE_ID where nwi_catalog = '"
421+
+ catName.toLowerCase() + "' and nwi_database = '"
420422
+ dbName.toLowerCase() + "' and nwi_table = '"
421423
+ entry.getKey() + "'").split("\n");
422424
Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), entry.getValue() + 1);
423425
}
424426
}
425427

426-
void verifyCompactionQueue(Map<String, Long> tables, String dbName, HiveConf conf)
428+
void verifyCompactionQueue(Map<String, Long> tables, String catName, String dbName, HiveConf conf)
427429
throws Throwable {
428430
for(Map.Entry<String, Long> entry : tables.entrySet()) {
429431
Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
430432
entry.getValue().longValue(),
431433
TestTxnDbUtil.countQueryAgent(conf,
432-
"select count(*) from COMPACTION_QUEUE where cq_database = '" + dbName
434+
"select count(*) from COMPACTION_QUEUE where cq_catalog = '" + catName + "' and cq_database = '" + dbName
433435
+ "' and cq_table = '" + entry.getKey() + "'"));
434436
}
435437
}

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ public boolean validate(Task task) {
528528
return validator.hasTask(rootTask);
529529
}
530530

531-
private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIncrementalDump,
531+
private Task getReplLoadRootTask(String sourceCat, String sourceDb, String replicadb, boolean isIncrementalDump,
532532
Tuple tuple) throws Throwable {
533533
HiveConf confTemp = driverMirror.getConf();
534534
Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
@@ -544,7 +544,7 @@ private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIn
544544
run only database creation task, and only in next iteration of Repl Load Task execution, remaining tasks will be
545545
executed. Hence disabling this to perform the test on task optimization. */
546546
confTemp.setBoolVar(HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET, false);
547-
ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceDb, replicadb,
547+
ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceCat, sourceDb, replicadb,
548548
null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
549549
0L, metricCollector, false);
550550
Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
@@ -565,7 +565,7 @@ public void testTaskCreationOptimization() throws Throwable {
565565
Tuple dump = replDumpDb(dbName);
566566

567567
//bootstrap load should not have move task
568-
Task task = getReplLoadRootTask(dbName, dbNameReplica, false, dump);
568+
Task task = getReplLoadRootTask(Warehouse.DEFAULT_CATALOG_NAME, dbName, dbNameReplica, false, dump);
569569
assertEquals(false, hasMoveTask(task));
570570
assertEquals(true, hasPartitionTask(task));
571571

@@ -579,7 +579,7 @@ public void testTaskCreationOptimization() throws Throwable {
579579

580580
// Partition level statistics gets updated as part of the INSERT above. So we see a partition
581581
// task corresponding to an ALTER_PARTITION event.
582-
task = getReplLoadRootTask(dbName, dbNameReplica, true, dump);
582+
task = getReplLoadRootTask(Warehouse.DEFAULT_CATALOG_NAME, dbName, dbNameReplica, true, dump);
583583
assertEquals(true, hasMoveTask(task));
584584
assertEquals(true, hasPartitionTask(task));
585585

@@ -592,7 +592,7 @@ public void testTaskCreationOptimization() throws Throwable {
592592
dump = replDumpDb(dbName);
593593

594594
//no move task should be added as the operation is adding a dynamic partition
595-
task = getReplLoadRootTask(dbName, dbNameReplica, true, dump);
595+
task = getReplLoadRootTask(Warehouse.DEFAULT_CATALOG_NAME, dbName, dbNameReplica, true, dump);
596596
assertEquals(false, hasMoveTask(task));
597597
assertEquals(true, hasPartitionTask(task));
598598
}

0 commit comments

Comments
 (0)