-
Notifications
You must be signed in to change notification settings - Fork 4.8k
[WIP]HIVE-29228: Support vended credentials for S3 #6458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.rest; | ||
|
|
||
| /** | ||
| * Possible values for the X-Iceberg-Access-Delegation header. | ||
| */ | ||
| public enum AccessDelegationMode { | ||
| VENDED_CREDENTIALS, REMOTE_SIGNING | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,8 +21,10 @@ | |
|
|
||
| import com.google.common.base.Preconditions; | ||
| import java.util.Arrays; | ||
| import java.util.EnumSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.function.Consumer; | ||
| import org.apache.iceberg.BaseTable; | ||
| import org.apache.iceberg.BaseTransaction; | ||
|
|
@@ -72,12 +74,15 @@ | |
| import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; | ||
| import org.apache.iceberg.util.Pair; | ||
| import org.apache.iceberg.util.PropertyUtil; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Original @ <a href="https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java">RESTCatalogAdapter.java</a> | ||
| * Adaptor class to translate REST requests into {@link Catalog} API calls. | ||
| */ | ||
| public class HMSCatalogAdapter implements RESTClient { | ||
| private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogAdapter.class); | ||
| private static final Splitter SLASH = Splitter.on('/'); | ||
|
|
||
| private static final Map<Class<? extends Exception>, Integer> EXCEPTION_ERROR_CODES = | ||
|
|
@@ -102,14 +107,15 @@ public class HMSCatalogAdapter implements RESTClient { | |
| private final Catalog catalog; | ||
| private final SupportsNamespaces asNamespaceCatalog; | ||
| private final ViewCatalog asViewCatalog; | ||
| private final IcebergVendedCredentialProvider credentialProvider; | ||
|
|
||
|
|
||
| public HMSCatalogAdapter(Catalog catalog) { | ||
| HMSCatalogAdapter(Catalog catalog, IcebergVendedCredentialProvider credentialProvider) { | ||
| Preconditions.checkArgument(catalog instanceof SupportsNamespaces); | ||
| Preconditions.checkArgument(catalog instanceof ViewCatalog); | ||
| this.catalog = catalog; | ||
| this.asNamespaceCatalog = (SupportsNamespaces) catalog; | ||
| this.asViewCatalog = (ViewCatalog) catalog; | ||
| this.credentialProvider = credentialProvider; | ||
| } | ||
|
|
||
| enum Route { | ||
|
|
@@ -263,18 +269,21 @@ private ListTablesResponse listTables(Map<String, String> vars) { | |
| return castResponse(ListTablesResponse.class, CatalogHandlers.listTables(catalog, namespace)); | ||
| } | ||
|
|
||
| private LoadTableResponse createTable(Map<String, String> vars, Object body) { | ||
| private LoadTableResponse createTable(Set<AccessDelegationMode> accessDelegationModes, Map<String, String> vars, | ||
| Object body) { | ||
| final Class<LoadTableResponse> responseType = LoadTableResponse.class; | ||
| Namespace namespace = namespaceFromPathVars(vars); | ||
| CreateTableRequest request = castRequest(CreateTableRequest.class, body); | ||
| request.validate(); | ||
| LoadTableResponse response; | ||
| if (request.stageCreate()) { | ||
| return castResponse( | ||
| responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request)); | ||
| response = castResponse( | ||
| responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request)); | ||
| } else { | ||
| return castResponse( | ||
| responseType, CatalogHandlers.createTable(catalog, namespace, request)); | ||
| response = castResponse( | ||
| responseType, CatalogHandlers.createTable(catalog, namespace, request)); | ||
| } | ||
| return attachCredentials(accessDelegationModes, TableIdentifier.of(namespace, request.name()), response); | ||
| } | ||
|
|
||
| private RESTResponse dropTable(Map<String, String> vars) { | ||
|
|
@@ -292,21 +301,33 @@ private RESTResponse tableExists(Map<String, String> vars) { | |
| return null; | ||
| } | ||
|
|
||
| private LoadTableResponse loadTable(Map<String, String> vars) { | ||
| private LoadTableResponse loadTable(Set<AccessDelegationMode> delegationModes, Map<String, String> vars) { | ||
| TableIdentifier ident = identFromPathVars(vars); | ||
| return castResponse(LoadTableResponse.class, CatalogHandlers.loadTable(catalog, ident)); | ||
| LoadTableResponse response = | ||
| castResponse(LoadTableResponse.class, CatalogHandlers.loadTable(catalog, ident)); | ||
| return attachCredentials(delegationModes, ident, response); | ||
| } | ||
|
|
||
| private LoadTableResponse registerTable(Map<String, String> vars, Object body) { | ||
| Namespace namespace = namespaceFromPathVars(vars); | ||
| RegisterTableRequest request = castRequest(RegisterTableRequest.class, body); | ||
| return castResponse(LoadTableResponse.class, CatalogHandlers.registerTable(catalog, namespace, request)); | ||
| private LoadTableResponse registerTable( | ||
| Set<AccessDelegationMode> delegationModes, | ||
| Map<String, String> vars, | ||
| Object body) { | ||
| Namespace namespace = namespaceFromPathVars(vars); | ||
| RegisterTableRequest request = castRequest(RegisterTableRequest.class, body); | ||
| LoadTableResponse response = | ||
| castResponse(LoadTableResponse.class, CatalogHandlers.registerTable(catalog, namespace, request)); | ||
| return attachCredentials(delegationModes, TableIdentifier.of(namespace, request.name()), response); | ||
| } | ||
|
|
||
| private LoadTableResponse updateTable(Map<String, String> vars, Object body) { | ||
| private LoadTableResponse updateTable( | ||
| Set<AccessDelegationMode> delegationModes, | ||
| Map<String, String> vars, | ||
| Object body) { | ||
| TableIdentifier ident = identFromPathVars(vars); | ||
| UpdateTableRequest request = castRequest(UpdateTableRequest.class, body); | ||
| return castResponse(LoadTableResponse.class, CatalogHandlers.updateTable(catalog, ident, request)); | ||
| LoadTableResponse response = | ||
| castResponse(LoadTableResponse.class, CatalogHandlers.updateTable(catalog, ident, request)); | ||
| return attachCredentials(delegationModes, ident, response); | ||
| } | ||
|
|
||
| private RESTResponse renameTable(Object body) { | ||
|
|
@@ -377,6 +398,23 @@ private RESTResponse dropView(Map<String, String> vars) { | |
| return null; | ||
| } | ||
|
|
||
| private LoadTableResponse attachCredentials(Set<AccessDelegationMode> accessDelegationModes, TableIdentifier ident, | ||
| LoadTableResponse response) { | ||
| if (credentialProvider == null) { | ||
| return response; | ||
| } | ||
|
|
||
| if (accessDelegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we do small refactor |
||
| final var credentials = credentialProvider.vend(ident, response.tableMetadata().location()); | ||
| return LoadTableResponse.builder().withTableMetadata(response.tableMetadata()).addAllConfig(response.config()) | ||
| .addAllCredentials(credentials).build(); | ||
| } | ||
| if (accessDelegationModes.contains(AccessDelegationMode.REMOTE_SIGNING)) { | ||
| LOG.warn("Remote signing is not supported. Ignoring..."); | ||
| } | ||
| return response; | ||
| } | ||
|
|
||
| /** | ||
| * This is a very simplistic approach that only validates the requirements for each table and does | ||
| * not do any other conflict detection. Therefore, it does not guarantee true transactional | ||
|
|
@@ -408,7 +446,10 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest | |
|
|
||
| @SuppressWarnings({"MethodLength", "unchecked"}) | ||
| private <T extends RESTResponse> T handleRequest( | ||
| Route route, Map<String, String> vars, Object body) { | ||
| Route route, | ||
| Set<AccessDelegationMode> accessDelegationModes, | ||
| Map<String, String> vars, | ||
| Object body) { | ||
| switch (route) { | ||
| case CONFIG: | ||
| return (T) config(); | ||
|
|
@@ -435,7 +476,7 @@ private <T extends RESTResponse> T handleRequest( | |
| return (T) listTables(vars); | ||
|
|
||
| case CREATE_TABLE: | ||
| return (T) createTable(vars, body); | ||
| return (T) createTable(accessDelegationModes, vars, body); | ||
|
|
||
| case DROP_TABLE: | ||
| return (T) dropTable(vars); | ||
|
|
@@ -444,13 +485,13 @@ private <T extends RESTResponse> T handleRequest( | |
| return (T) tableExists(vars); | ||
|
|
||
| case LOAD_TABLE: | ||
| return (T) loadTable(vars); | ||
| return (T) loadTable(accessDelegationModes, vars); | ||
|
|
||
| case REGISTER_TABLE: | ||
| return (T) registerTable(vars, body); | ||
| return (T) registerTable(accessDelegationModes, vars, body); | ||
|
|
||
| case UPDATE_TABLE: | ||
| return (T) updateTable(vars, body); | ||
| return (T) updateTable(accessDelegationModes, vars, body); | ||
|
|
||
| case RENAME_TABLE: | ||
| return (T) renameTable(body); | ||
|
|
@@ -491,9 +532,9 @@ private <T extends RESTResponse> T handleRequest( | |
| <T extends RESTResponse> T execute( | ||
| HTTPMethod method, | ||
| String path, | ||
| Set<AccessDelegationMode> accessDelegationModes, | ||
| Map<String, String> queryParams, | ||
| Object body, | ||
| Consumer<ErrorResponse> errorHandler) { | ||
| Object body, Consumer<ErrorResponse> errorHandler) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please move errorHandler to a new line |
||
| ErrorResponse.Builder errorBuilder = ErrorResponse.builder(); | ||
| Pair<Route, Map<String, String>> routeAndVars = Route.from(method, path); | ||
| if (routeAndVars != null) { | ||
|
|
@@ -503,7 +544,7 @@ <T extends RESTResponse> T execute( | |
| vars.putAll(queryParams); | ||
| } | ||
| vars.putAll(routeAndVars.second()); | ||
| return handleRequest(routeAndVars.first(), vars.build(), body); | ||
| return handleRequest(routeAndVars.first(), accessDelegationModes, vars.build(), body); | ||
| } catch (RuntimeException e) { | ||
| configureResponseFromException(e, errorBuilder); | ||
| } | ||
|
|
@@ -519,6 +560,15 @@ <T extends RESTResponse> T execute( | |
| throw new RESTException("Unhandled error: %s", error); | ||
| } | ||
|
|
||
| <T extends RESTResponse> T execute( | ||
| HTTPMethod method, | ||
| String path, | ||
| Map<String, String> queryParams, | ||
| Object body, | ||
| Consumer<ErrorResponse> errorHandler) { | ||
| return execute(method, path, EnumSet.noneOf(AccessDelegationMode.class), queryParams, body, errorHandler); | ||
| } | ||
|
|
||
| @Override | ||
| public <T extends RESTResponse> T delete( | ||
| String path, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -112,7 +112,13 @@ private HttpServlet createServlet(Catalog catalog) { | |
| // Iceberg REST client uses "catalog" by default | ||
| List<String> scopes = Collections.singletonList("catalog"); | ||
| ServletSecurity security = new ServletSecurity(AuthType.fromString(authType), configuration, req -> scopes); | ||
| return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog))); | ||
| final IcebergVendedCredentialProvider vendedCredentialProvider; | ||
| if (MetastoreConf.getBoolVar(configuration, ConfVars.ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED)) { | ||
| vendedCredentialProvider = new IcebergVendedCredentialProvider(configuration); | ||
| } else { | ||
| vendedCredentialProvider = null; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need else? |
||
| } | ||
| return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog, vendedCredentialProvider))); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest-catalog now depends on a hive-exec-core? scope was test-only before