Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1927,6 +1927,10 @@ public enum ConfVars {
"The pattern to extract a user name. This is effective when you use RegexPrincipalMapper. For example, if " +
"you want to extract a user name from the local part of the email claim, set this to (.*)@example.com."
),
CATALOG_VENDED_CREDENTIALS_PROVIDERS("metastore.catalog.vended-credentials.providers",
"hive.metastore.catalog.vended-credentials.providers", "",
"List of comma-separated credential-vending provider IDs"
),
ICEBERG_CATALOG_SERVLET_PATH("metastore.iceberg.catalog.servlet.path",
"hive.metastore.iceberg.catalog.servlet.path", "iceberg",
"HMS Iceberg Catalog servlet path component of URL endpoint."
Expand All @@ -1935,6 +1939,10 @@ public enum ConfVars {
"hive.metastore.iceberg.catalog.cache.expiry", -1,
"HMS Iceberg Catalog cache expiry."
),
ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED("metastore.iceberg.catalog.vended-credentials.enabled",
"hive.metastore.iceberg.catalog.vended-credentials.enabled", false,
"Boolean flag to enable credential vending on Iceberg REST Catalog"
),
HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min",
"hive.metastore.httpserver.threadpool.min", 8,
"HMS embedded HTTP server minimum number of threads."
Expand Down
24 changes: 20 additions & 4 deletions standalone-metastore/metastore-rest-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
<iceberg.version>1.10.1</iceberg.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>core</classifier>
<scope>provided</scope>
Comment on lines +29 to +34
Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest-catalog now depends on a hive-exec-core? scope was test-only before

</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-standalone-metastore-server</artifactId>
Expand Down Expand Up @@ -72,10 +79,8 @@
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>core</classifier>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bundle</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -99,6 +104,17 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.rest;

/**
* Possible values for the X-Iceberg-Access-Delegation header.
*/
public enum AccessDelegationMode {
VENDED_CREDENTIALS, REMOTE_SIGNING
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BaseTransaction;
Expand Down Expand Up @@ -72,12 +74,15 @@
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Original @ <a href="https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java">RESTCatalogAdapter.java</a>
* Adaptor class to translate REST requests into {@link Catalog} API calls.
*/
public class HMSCatalogAdapter implements RESTClient {
private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogAdapter.class);
private static final Splitter SLASH = Splitter.on('/');

private static final Map<Class<? extends Exception>, Integer> EXCEPTION_ERROR_CODES =
Expand All @@ -102,14 +107,15 @@ public class HMSCatalogAdapter implements RESTClient {
private final Catalog catalog;
private final SupportsNamespaces asNamespaceCatalog;
private final ViewCatalog asViewCatalog;
private final IcebergVendedCredentialProvider credentialProvider;


public HMSCatalogAdapter(Catalog catalog) {
HMSCatalogAdapter(Catalog catalog, IcebergVendedCredentialProvider credentialProvider) {
Preconditions.checkArgument(catalog instanceof SupportsNamespaces);
Preconditions.checkArgument(catalog instanceof ViewCatalog);
this.catalog = catalog;
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
this.asViewCatalog = (ViewCatalog) catalog;
this.credentialProvider = credentialProvider;
}

enum Route {
Expand Down Expand Up @@ -263,18 +269,21 @@ private ListTablesResponse listTables(Map<String, String> vars) {
return castResponse(ListTablesResponse.class, CatalogHandlers.listTables(catalog, namespace));
}

private LoadTableResponse createTable(Map<String, String> vars, Object body) {
private LoadTableResponse createTable(Set<AccessDelegationMode> accessDelegationModes, Map<String, String> vars,
Object body) {
final Class<LoadTableResponse> responseType = LoadTableResponse.class;
Namespace namespace = namespaceFromPathVars(vars);
CreateTableRequest request = castRequest(CreateTableRequest.class, body);
request.validate();
LoadTableResponse response;
if (request.stageCreate()) {
return castResponse(
responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request));
response = castResponse(
responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request));
} else {
return castResponse(
responseType, CatalogHandlers.createTable(catalog, namespace, request));
response = castResponse(
responseType, CatalogHandlers.createTable(catalog, namespace, request));
}
return attachCredentials(accessDelegationModes, TableIdentifier.of(namespace, request.name()), response);
}

private RESTResponse dropTable(Map<String, String> vars) {
Expand All @@ -292,21 +301,33 @@ private RESTResponse tableExists(Map<String, String> vars) {
return null;
}

private LoadTableResponse loadTable(Map<String, String> vars) {
private LoadTableResponse loadTable(Set<AccessDelegationMode> delegationModes, Map<String, String> vars) {
TableIdentifier ident = identFromPathVars(vars);
return castResponse(LoadTableResponse.class, CatalogHandlers.loadTable(catalog, ident));
LoadTableResponse response =
castResponse(LoadTableResponse.class, CatalogHandlers.loadTable(catalog, ident));
return attachCredentials(delegationModes, ident, response);
}

private LoadTableResponse registerTable(Map<String, String> vars, Object body) {
Namespace namespace = namespaceFromPathVars(vars);
RegisterTableRequest request = castRequest(RegisterTableRequest.class, body);
return castResponse(LoadTableResponse.class, CatalogHandlers.registerTable(catalog, namespace, request));
private LoadTableResponse registerTable(
Set<AccessDelegationMode> delegationModes,
Map<String, String> vars,
Object body) {
Namespace namespace = namespaceFromPathVars(vars);
RegisterTableRequest request = castRequest(RegisterTableRequest.class, body);
LoadTableResponse response =
castResponse(LoadTableResponse.class, CatalogHandlers.registerTable(catalog, namespace, request));
return attachCredentials(delegationModes, TableIdentifier.of(namespace, request.name()), response);
}

private LoadTableResponse updateTable(Map<String, String> vars, Object body) {
private LoadTableResponse updateTable(
Set<AccessDelegationMode> delegationModes,
Map<String, String> vars,
Object body) {
TableIdentifier ident = identFromPathVars(vars);
UpdateTableRequest request = castRequest(UpdateTableRequest.class, body);
return castResponse(LoadTableResponse.class, CatalogHandlers.updateTable(catalog, ident, request));
LoadTableResponse response =
castResponse(LoadTableResponse.class, CatalogHandlers.updateTable(catalog, ident, request));
return attachCredentials(delegationModes, ident, response);
}

private RESTResponse renameTable(Object body) {
Expand Down Expand Up @@ -377,6 +398,23 @@ private RESTResponse dropView(Map<String, String> vars) {
return null;
}

private LoadTableResponse attachCredentials(Set<AccessDelegationMode> accessDelegationModes, TableIdentifier ident,
LoadTableResponse response) {
if (credentialProvider == null) {
return response;
}

if (accessDelegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we do small refactor

private LoadTableResponse attachCredentials(
    Set<AccessDelegationMode> modes,
    TableIdentifier ident,
    LoadTableResponse response) {

  if (credentialProvider == null) {
    return response;
  }

  if (modes.contains(AccessDelegationMode.VENDED_CREDENTIALS)) {
    return withVendedCredentials(ident, response);
  }

  if (modes.contains(AccessDelegationMode.REMOTE_SIGNING)) {
    LOG.warn("Remote signing is not supported. Ignoring...");
  }

  return response;
}

private LoadTableResponse withVendedCredentials(
    TableIdentifier ident,
    LoadTableResponse response) {

  var credentials = credentialProvider.vend(
      ident,
      response.tableMetadata().location());

  return LoadTableResponse.builder()
      .withTableMetadata(response.tableMetadata())
      .addAllConfig(response.config())
      .addAllCredentials(credentials)
      .build();
}

final var credentials = credentialProvider.vend(ident, response.tableMetadata().location());
return LoadTableResponse.builder().withTableMetadata(response.tableMetadata()).addAllConfig(response.config())
.addAllCredentials(credentials).build();
}
if (accessDelegationModes.contains(AccessDelegationMode.REMOTE_SIGNING)) {
LOG.warn("Remote signing is not supported. Ignoring...");
}
return response;
}

/**
* This is a very simplistic approach that only validates the requirements for each table and does
* not do any other conflict detection. Therefore, it does not guarantee true transactional
Expand Down Expand Up @@ -408,7 +446,10 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest

@SuppressWarnings({"MethodLength", "unchecked"})
private <T extends RESTResponse> T handleRequest(
Route route, Map<String, String> vars, Object body) {
Route route,
Set<AccessDelegationMode> accessDelegationModes,
Map<String, String> vars,
Object body) {
switch (route) {
case CONFIG:
return (T) config();
Expand All @@ -435,7 +476,7 @@ private <T extends RESTResponse> T handleRequest(
return (T) listTables(vars);

case CREATE_TABLE:
return (T) createTable(vars, body);
return (T) createTable(accessDelegationModes, vars, body);

case DROP_TABLE:
return (T) dropTable(vars);
Expand All @@ -444,13 +485,13 @@ private <T extends RESTResponse> T handleRequest(
return (T) tableExists(vars);

case LOAD_TABLE:
return (T) loadTable(vars);
return (T) loadTable(accessDelegationModes, vars);

case REGISTER_TABLE:
return (T) registerTable(vars, body);
return (T) registerTable(accessDelegationModes, vars, body);

case UPDATE_TABLE:
return (T) updateTable(vars, body);
return (T) updateTable(accessDelegationModes, vars, body);

case RENAME_TABLE:
return (T) renameTable(body);
Expand Down Expand Up @@ -491,9 +532,9 @@ private <T extends RESTResponse> T handleRequest(
<T extends RESTResponse> T execute(
HTTPMethod method,
String path,
Set<AccessDelegationMode> accessDelegationModes,
Map<String, String> queryParams,
Object body,
Consumer<ErrorResponse> errorHandler) {
Object body, Consumer<ErrorResponse> errorHandler) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move errorHandler to a new line

ErrorResponse.Builder errorBuilder = ErrorResponse.builder();
Pair<Route, Map<String, String>> routeAndVars = Route.from(method, path);
if (routeAndVars != null) {
Expand All @@ -503,7 +544,7 @@ <T extends RESTResponse> T execute(
vars.putAll(queryParams);
}
vars.putAll(routeAndVars.second());
return handleRequest(routeAndVars.first(), vars.build(), body);
return handleRequest(routeAndVars.first(), accessDelegationModes, vars.build(), body);
} catch (RuntimeException e) {
configureResponseFromException(e, errorBuilder);
}
Expand All @@ -519,6 +560,15 @@ <T extends RESTResponse> T execute(
throw new RESTException("Unhandled error: %s", error);
}

<T extends RESTResponse> T execute(
HTTPMethod method,
String path,
Map<String, String> queryParams,
Object body,
Consumer<ErrorResponse> errorHandler) {
return execute(method, path, EnumSet.noneOf(AccessDelegationMode.class), queryParams, body, errorHandler);
}

@Override
public <T extends RESTResponse> T delete(
String path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,13 @@ private HttpServlet createServlet(Catalog catalog) {
// Iceberg REST client uses "catalog" by default
List<String> scopes = Collections.singletonList("catalog");
ServletSecurity security = new ServletSecurity(AuthType.fromString(authType), configuration, req -> scopes);
return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog)));
final IcebergVendedCredentialProvider vendedCredentialProvider;
if (MetastoreConf.getBoolVar(configuration, ConfVars.ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED)) {
vendedCredentialProvider = new IcebergVendedCredentialProvider(configuration);
} else {
vendedCredentialProvider = null;
Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need else?

}
return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog, vendedCredentialProvider)));
}

/**
Expand Down
Loading
Loading