diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java index 678c76153c5c..5604b7c0837f 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.impl.TableName; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; @@ -60,12 +61,14 @@ public void createTable(Table table) { getProvider(table.getType()).createTable(table); } else { String identifier = getIdentifier(table); + Map props = + TableUtils.getObjectMapper() + .convertValue(table.getProperties(), new TypeReference>() {}) + .entrySet().stream() + .filter(p -> !p.getKey().startsWith("beam.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); try { - Map properties = - TableUtils.getObjectMapper() - .convertValue(table.getProperties(), new TypeReference>() {}); - catalogConfig.createTable( - identifier, table.getSchema(), table.getPartitionFields(), properties); + catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields(), props); } catch (TableAlreadyExistsException e) { LOG.info( "Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier); diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java index b68aa34a1777..2cdcaa9b63a2 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.HashMap; import java.util.List; @@ -56,6 +57,8 @@ class IcebergTable extends SchemaBaseBeamTable { @VisibleForTesting static final String CATALOG_PROPERTIES_FIELD = "catalog_properties"; @VisibleForTesting static final String HADOOP_CONFIG_PROPERTIES_FIELD = "config_properties"; @VisibleForTesting static final String CATALOG_NAME_FIELD = "catalog_name"; + static final String BEAM_WRITE_PROPERTY = "beam.write."; + static final String BEAM_READ_PROPERTY = "beam.read."; @VisibleForTesting static final String TRIGGERING_FREQUENCY_FIELD = "triggering_frequency_seconds"; @@ -71,9 +74,21 @@ class IcebergTable extends SchemaBaseBeamTable { this.tableIdentifier = tableIdentifier; this.catalogConfig = catalogConfig; ObjectNode properties = table.getProperties(); - if (properties.has(TRIGGERING_FREQUENCY_FIELD)) { - this.triggeringFrequency = properties.get(TRIGGERING_FREQUENCY_FIELD).asInt(); + for (Map.Entry property : properties.properties()) { + String name = property.getKey().toLowerCase(); + if (name.startsWith(BEAM_WRITE_PROPERTY)) { + String prop = name.substring(BEAM_WRITE_PROPERTY.length()); + if (prop.equalsIgnoreCase(TRIGGERING_FREQUENCY_FIELD)) { + this.triggeringFrequency = property.getValue().asInt(); + } else { + throw new IllegalArgumentException("Unknown Beam write property: " + name); + } + } else if (name.startsWith(BEAM_READ_PROPERTY)) { + // none supported yet + throw new IllegalArgumentException("Unknown Beam read property: " + name); + } } + this.partitionFields = table.getPartitionFields(); } diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java index 900fdae743a1..8b250af2754a 100644 --- a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java +++ b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java @@ -156,7 +156,7 @@ public void testSimpleInsertWithPartitionedFields() throws Exception { + ") \n" + "TYPE 'iceberg' \n" + "PARTITIONED BY('id', 'truncate(name, 3)') \n" - + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'"; + + "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'"; String insertStatement = format("INSERT INTO %s \n", tableIdentifier) + "SELECT \n" @@ -211,7 +211,7 @@ public void testSimpleInsertFlat() throws Exception { + " name VARCHAR \n " + ") \n" + "TYPE 'iceberg' \n" - + "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'"; + + "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'"; String insertStatement = format("INSERT INTO %s \n", tableIdentifier) + "SELECT \n" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index e01b174decb0..6fb0a3480a0f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -154,7 +154,7 @@ public void createTable( String tableIdentifier, Schema tableSchema, @Nullable List partitionFields, - Map properties) { + @Nullable Map properties) { TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema); PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); @@ -164,7 +164,13 @@ public void createTable( icebergIdentifier, icebergSchema, icebergSpec); - catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties); + Catalog.TableBuilder builder = + catalog().buildTable(icebergIdentifier, icebergSchema).withPartitionSpec(icebergSpec); + if (properties != null) { + builder = builder.withProperties(properties); + } + builder.create(); + LOG.info("Successfully created table '{}'.", icebergIdentifier); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(e); diff --git a/website/www/site/assets/js/language-switch-v2.js b/website/www/site/assets/js/language-switch-v2.js index ff6fdb0253a8..e3ee3e41f58a 100644 --- a/website/www/site/assets/js/language-switch-v2.js +++ b/website/www/site/assets/js/language-switch-v2.js @@ -290,5 +290,6 @@ $(document).ready(function() { Switcher({"name": "runner", "default": "direct"}).render(); Switcher({"name": "tab"}).render(); Switcher({"name": "shell", "default": "unix"}).render(); + Switcher({"name": "tab"}).render(); Switcher({"name": "version"}).render(); }); diff --git a/website/www/site/content/en/documentation/dsls/sql/ddl.md b/website/www/site/content/en/documentation/dsls/sql/ddl.md new file mode 100644 index 000000000000..76cde33d8d3b --- /dev/null +++ b/website/www/site/content/en/documentation/dsls/sql/ddl.md @@ -0,0 +1,343 @@ +--- +type: languages +title: "Beam SQL DDL" +--- + + +# Beam SQL DDL + +Beam SQL provides a standard three-level hierarchy to manage metadata across external data sources, +enabling structured discovery and cross-source interoperability. +1. Catalog: The top-level container representing an external metadata provider. Examples include a Hive Metastore, AWS Glue, or a BigLake Catalog. +2. Database: A logical grouping within a Catalog. This typically maps to a "Schema" in traditional RDBMS or a "Namespace" in systems like Apache Iceberg +3. Table: The leaf node containing the schema definition and the underlying data. + +This structure enables Federated Querying. Because Beam can resolve multiple Catalogs simultaneously, +you can execute complex pipelines that bridge disparate environments within a single SQL statement (e.g. +joining a production BigQuery table with a developmental Iceberg dataset in GCS). + +By using fully qualified names (e.g., catalog.database.table), you can perform cross-catalog joins or +migrate data between clouds without manual schema mapping or intermediate storage. + +Below are details about metadata management at each level: + +## Catalogs +The Catalog is the entry point for external metadata. When you initialize Beam SQL, you start off with a `default` Catalog that contains a `default` Database. +You can register new Catalogs, switch between them, and modify their configurations. + +{{< tab CREATE >}} +

Registers a new Catalog instance

+

Note: Creating a Catalog does not automatically switch to it. Remember +to run USE CATALOG afterwards to set it.

+ +{{< highlight >}} +CREATE CATALOG [ IF NOT EXISTS ] catalog_name +TYPE 'type_name' +[ PROPERTIES ( 'key' = 'value' [, ...] ) ] +{{< /highlight >}} + +

Example: Creating a Hadoop Catalog (Local Storage)

+{{< highlight >}} +CREATE CATALOG local_catalog +TYPE iceberg +PROPERTIES ( + 'type' = 'hadoop', + 'warehouse' = 'file:///tmp/iceberg-warehouse' +) +{{< /highlight >}} + +

Example: Registering a BigLake Catalog (GCS)

+{{< highlight >}} +CREATE CATALOG prod_iceberg +TYPE iceberg +PROPERTIES ( + 'type' = 'rest', + 'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse' = 'gs://my-company-bucket/warehouse', + 'header.x-goog-user-project' = 'my_prod_project', + 'rest.auth.type' = 'google', + 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'header.X-Iceberg-Access-Delegation' = 'vended-credentials' +); +{{< /highlight >}} +{{< /tab >}} +{{< tab USE >}} +

Sets the active Catalog for the current session. This simplifies queries by allowing you +to reference Databases directly without their fully-qualified names (e.g. my_db instead of my_catalog.my_db)

+ +

Tip: run SHOW CURRENT CATALOG to view the currently active Catalog.

+

Note: All subsequent DATABASE and TABLE commands will be executed under this Catalog, unless fully qualified.

+ +{{< highlight >}} +USE CATALOG prod_iceberg; +{{< /highlight >}} +{{< /tab >}} +{{< tab ALTER >}} +Modifies the properties of a registered Catalog. +{{< highlight >}} +ALTER CATALOG catalog_name +[ SET ( 'key' = 'val', ... ) ] +[ RESET ( 'key', ... ) ] +{{< /highlight >}} +
    +
  1. SET: Adds new properties or updates existing ones.
  2. +
  3. RESET / UNSET: Removes properties.
  4. +
+
+{{< /tab >}} +{{< tab SHOW >}} +

Can be used to either:

+
    +
  1. List Catalogs registered in this Beam SQL session.
  2. +
  3. View the currently active Catalog.
  4. +
+ +{{< highlight >}} +SHOW CATALOGS [ LIKE regex_pattern ] +{{< /highlight >}} + +

Example: List all Catalogs

+{{< highlight >}} +SHOW CATALOGS; +{{< /highlight >}} + +

Example: List Catalogs matching a pattern

+{{< highlight >}} +SHOW CATALOGS LIKE 'prod_%'; +{{< /highlight >}} + +

Example: Verify which Catalog is currently active

+{{< highlight >}} +SHOW CURRENT CATALOG; +{{< /highlight >}} +{{< /tab >}} +{{< tab DROP >}} +

Unregisters a Catalog from the current Beam SQL session. This does not destroy external data.

+ +{{< highlight >}} +DROP CATALOG [ IF EXISTS ] catalog_name; +{{< /highlight >}} +{{< /tab >}} + +## Databases +A Database lives inside a Catalog and may contain a number of Tables. + +{{< tab CREATE >}} +

Creates a new Database within the current Catalog (default), or the specified Catalog.

+

Note: Creating a Database does not automatically switch to it. Remember +to run USE DATABASE afterwards to set it.

+ +{{< highlight >}} +CREATE DATABASE [ IF NOT EXISTS ] [ catalog_name. ]database_name; +{{< /highlight >}} + +

Example: Create a Database in the current active Catalog

+{{< highlight >}} +USE CATALOG my_catalog; +CREATE DATABASE sales_data; +{{< /highlight >}} + +

Example: Create a Database in a specified registered Catalog

+{{< highlight >}} +CREATE DATABASE other_catalog.sales_data; +{{< /highlight >}} +{{< /tab >}} +{{< tab USE >}} +

Sets the active Database for the current session. This simplifies queries by allowing you +to reference Databases directly without their fully-qualified names (e.g. my_db instead of my_catalog.my_db)

+ +

Note: All subsequent TABLE commands will be executed under this Database, unless fully qualified.

+ +{{< highlight >}} +USE DATABASE sales_data; +{{< /highlight >}} + +

Switch to a Database in a specified Catalog. Node: this also switches the default to that Catalog

+{{< highlight >}} +USE DATABASE other_catalog.sales_data; +{{< /highlight >}} +{{< /tab >}} +{{< tab SHOW >}} +

Can be used to either:

+
    +
  1. List Databases within the currently active Catalog, or a specified Catalog.
  2. +
  3. View the currently active Database.
  4. +
+ +{{< highlight >}} +SHOW DATABASES [ ( FROM | IN )? catalog_name ] [LIKE regex_pattern ] +{{< /highlight >}} + +

Example: List Databases in the currently active Catalog

+{{< highlight >}} +SHOW DATABASES; +{{< /highlight >}} + +

Example: List Databases in a specified Catalog

+{{< highlight >}} +SHOW DATABASES IN my_catalog; +{{< /highlight >}} + +

Example: List Databases matching a pattern

+{{< highlight >}} +SHOW DATABASES IN my_catalog LIKE '%geo%'; +{{< /highlight >}} + +

Example: Verify which Database is currently active

+{{< highlight >}} +SHOW CURRENT DATABASE; +{{< /highlight >}} +{{< /tab >}} +{{< tab DROP >}} +

Unregisters a Database from the current session. For some connectors, this +will also delete the Database from the external data source.

+ +{{< highlight >}} +DROP DATABASE [ IF EXISTS ] database_name [ RESTRICT | CASCADE ]; +{{< /highlight >}} + +
    +
  1. RESTRICT: (Default): Fails if the Database is not empty.
  2. +
  3. CASCADE: Drops the Database and all tables contained within it. Use with caution.
  4. +
+
+{{< /tab >}} + +## Tables +The actual entity containing data, and is described by a schema. Some +data sources also support applying a partition spec and attaching table-specific properties. + +{{< tab CREATE >}} +

Creates a new Table within the current Catalog and Database (default), or the specified Catalog and Database.

+ +{{< highlight >}} +CREATE EXTERNAL TABLE [ IF NOT EXISTS ] [ catalog. ][ db. ]table_name ( + col_name col_type [ NOT NULL ] [ COMMENT 'col_comment' ], + ... + ) + TYPE 'type_name' + [ PARTITIONED BY ( 'partition_field' [, ... ] ) ] + [ COMMENT 'table_comment' ] + [ LOCATION 'location_uri' ] + [ TBLPROPERTIES 'properties_json_string' ]; +{{< /highlight >}} +
    +
  • TYPE: the table type (e.g. 'iceberg', 'text', 'kafka').
  • +
  • PARTITIONED BY: an ordered list of fields describing the partition spec.
  • +
  • LOCATION: explicitly sets the location of the table (overriding the inferred catalog.db.table_name location)
  • +
  • TBLPROPERTIES: configuration properties used when creating the table or setting up its IO connection.
  • +
+
+ +

Example: Creating an Iceberg Table

+{{< highlight >}} +CREATE EXTERNAL TABLE prod_iceberg.sales_data.orders ( + order_id BIGINT NOT NULL COMMENT 'Unique order identifier', + amount DECIMAL(10, 2), + order_date TIMESTAMP, + region_id VARCHAR +) +TYPE 'iceberg' +PARTITIONED BY ( 'region_id', 'day(order_date)' ) +COMMENT 'Daily sales transactions' +TBLPROPERTIES '{ + "write.format.default": "parquet", + "read.split.target-size": 268435456", + "beam.write.triggering_frequency_seconds": 60" +}'; +{{< /highlight >}} + +
    +
  • This creates an Iceberg table named orders under the namespace sales_data, within the prod_iceberg catalog.
  • +
  • The table is partitioned by region_id, then by the day value of order_date (using Iceberg's hidden partitioning).
  • +
  • The table is created with the appropriate properties "write.format.default" and "read.split.target-size". The Beam property "beam.write.triggering_frequency_seconds"
  • +
  • Beam properties (prefixed with "beam.write." and "beam.read." are intended for the relevant IOs)
  • +
+{{< /tab >}} +{{< tab ALTER >}} +Modifies an existing Table's properties and evolves its partition and schema. +{{< highlight >}} +ALTER TABLE table_name + [ ADD COLUMNS ( col_def, ... ) ] + [ DROP COLUMNS ( col_name, ... ) ] + [ ADD PARTITIONS ( partition_field, ... ) ] + [ DROP PARTITIONS ( partition_field, ... ) ] + [ SET ( 'key' = 'val', ... ) ] + [ ( RESET | UNSET ) ( 'key', ... ) ]; +{{< /highlight >}} + +

Example: Add or remove columns

+{{< highlight >}} +-- Add columns +ALTER TABLE orders ADD COLUMNS ( + customer_email VARCHAR, + shipping_region VARCHAR +); + +-- Drop columns +ALTER TABLE orders DROP COLUMNS ( customer_email ); +{{< /highlight >}} + +

Example: Modify partition spec

+{{< highlight >}} +-- Add a partition field +ALTER TABLE orders ADD PARTITIONS ( 'year(order_date)' ); + +-- Remove a partition field +ALTER TABLE orders DROP PARTITIONS ( 'region_id' ); +{{< /highlight >}} + +

Example: Modify table properties

+{{< highlight >}} +ALTER TABLE orders SET ( + 'write.format.default' = 'orc', + 'write.metadata.metrics.default' = 'full' ); + +ALTER TABLE orders RESET ( 'write.target-file-size-bytes' ); +{{< /highlight >}} + +{{< /tab >}} +{{< tab SHOW >}} +

Lists tables under the currently active database, or a specified database.

+ +{{< highlight >}} +SHOW TABLES [ ( FROM | IN )? [ catalog_name '.' ] database_name ] [ LIKE regex_pattern ] +{{< /highlight >}} + +

Example: List tables in the currently active database and catalog

+{{< highlight >}} +SHOW TABLES; +{{< /highlight >}} + +

Example: List tables in a specified database

+{{< highlight >}} +SHOW TABLES IN my_db; +SHOW TABLES IN my_catalog.my_db; +{{< /highlight >}} + +

Example: List tables matching a pattern

+{{< highlight >}} +SHOW TABLES IN my_db LIKE '%orders%'; +{{< /highlight >}} + +{{< /tab >}} +{{< tab DROP >}} +

Unregisters a table from the current session. For supported connectors, this +will also delete the table from the external data source.

+ +{{< highlight >}} +DROP TABLE [ IF EXISTS ] table_name; +{{< /highlight >}} +{{< /tab >}} diff --git a/website/www/site/layouts/partials/section-menu/en/sdks.html b/website/www/site/layouts/partials/section-menu/en/sdks.html index 45fc937ac1f0..633d14c23288 100644 --- a/website/www/site/layouts/partials/section-menu/en/sdks.html +++ b/website/www/site/layouts/partials/section-menu/en/sdks.html @@ -112,6 +112,7 @@
  • Overview
  • Walkthrough
  • Shell
  • +
  • DDL
  • Apache Calcite dialect diff --git a/website/www/site/layouts/shortcodes/tab.html b/website/www/site/layouts/shortcodes/tab.html index a5d6ecd607a4..4f329832c672 100644 --- a/website/www/site/layouts/shortcodes/tab.html +++ b/website/www/site/layouts/shortcodes/tab.html @@ -10,6 +10,28 @@ limitations under the License. See accompanying LICENSE file. */}} +{{ if eq .Ordinal 0 }} + +{{ end }} + +{{/* --- 2. Your Normal Content Box --- */}} {{ $content := (trim .Inner "\n\r") | htmlUnescape | safeHTML }} {{ $ctx := . }} {{ $language := .Get 0 }}