Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

/**
* Resolves a multipart SQL identifier whose head names a path-based data source format
* (e.g. `pathformat.\`/path/to/t\``) to the owning catalog and a connector-canonicalized
* identifier, when the format implements [[SupportsCatalogOptions]].
*/
private[sql] trait DataSourceCatalogResolver {
def resolve(nameParts: Seq[String]): Option[(String, Identifier)]
}

private[sql] object DataSourceCatalogResolver {
val NoOp: DataSourceCatalogResolver = (_: Seq[String]) => None
}

/**
* A thread-safe contract for managing [[CatalogPlugin]]s. Implementations resolve catalogs by
* name and maintain the current catalog and namespace for a session.
Expand All @@ -55,6 +68,7 @@ private[sql] trait CatalogManager extends SQLConfHelper with Logging {
def defaultSessionCatalog: CatalogPlugin
def v1SessionCatalog: SessionCatalog
def tempVariableManager: TempVariableManager
def dataSourceCatalogResolver: DataSourceCatalogResolver

// ---- Catalog access ----
def catalog(name: String): CatalogPlugin
Expand All @@ -71,6 +85,14 @@ private[sql] trait CatalogManager extends SQLConfHelper with Logging {
}
}

/**
* Returns the catalog name and connector-canonicalized identifier for a multipart SQL name
* whose head is a [[SupportsCatalogOptions]] data source format. Returns None if the format
* head is unknown or does not implement [[SupportsCatalogOptions]].
*/
def catalogAndIdentForDataSource(nameParts: Seq[String]): Option[(String, Identifier)] =
dataSourceCatalogResolver.resolve(nameParts)

// ---- Transactions ----
def transaction: Option[Transaction] = None

Expand Down Expand Up @@ -112,7 +134,9 @@ private[sql] trait CatalogManager extends SQLConfHelper with Logging {
*/
private[sql] class DefaultCatalogManager(
override val defaultSessionCatalog: CatalogPlugin,
override val v1SessionCatalog: SessionCatalog) extends CatalogManager {
override val v1SessionCatalog: SessionCatalog,
override val dataSourceCatalogResolver: DataSourceCatalogResolver =
DataSourceCatalogResolver.NoOp) extends CatalogManager {
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ private[sql] trait LookupCatalog extends Logging {

private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)

/**
* Consults the [[DataSourceCatalogResolver]] for path-based v2 SCO sources. If the
* connector claims the identifier, routes to its designated catalog. Otherwise falls
* back to `(currentCatalog, nameParts.asIdentifier)` and lets later analysis raise
* table-not-found.
*/
private def resolveViaDataSource(
nameParts: Seq[String]): Some[(CatalogPlugin, Identifier)] = {
Option(catalogManager.catalogAndIdentForDataSource(nameParts)).flatten match {
case Some((catName, providerIdent)) =>
Some((catalogManager.catalog(catName), providerIdent))
case None =>
Some((currentCatalog, nameParts.asIdentifier))
}
}

def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
assert(nameParts.nonEmpty)
if (nameParts.length == 1) {
Expand Down Expand Up @@ -135,7 +151,18 @@ private[sql] trait LookupCatalog extends Logging {
Some((catalog, ident))
} catch {
case _: CatalogNotFoundException =>
Some((currentCatalog, nameParts.asIdentifier))
// No catalog matched the head. It could mean one of:
// (a) `head` is a namespace inside `currentCatalog` (e.g. a database
// in `spark_catalog`), or
// (b) `head` is a path-based v2 data source format whose connector
// implements `SupportsCatalogOptions` (e.g. `delta.`/path/to/t``).
// Resolve in priority order.
currentCatalog match {
case ns: SupportsNamespaces if ns.namespaceExists(Array(nameParts.head)) =>
Some((currentCatalog, nameParts.asIdentifier))
case _ if SQLConf.get.runSQLonFile => resolveViaDataSource(nameParts)
case _ => Some((currentCatalog, nameParts.asIdentifier))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ private[sql] class TransactionAwareCatalogManager(
override def defaultSessionCatalog: CatalogPlugin = delegate.defaultSessionCatalog
override def v1SessionCatalog: SessionCatalog = delegate.v1SessionCatalog
override def tempVariableManager: TempVariableManager = delegate.tempVariableManager
override def dataSourceCatalogResolver: DataSourceCatalogResolver =
delegate.dataSourceCatalogResolver

// ---- Catalog access: redirect to txn catalog when names match. ----
override def catalog(name: String): CatalogPlugin = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,23 @@ object SharedTablesInMemoryRowLevelOperationTableCatalog {

def reset(): Unit = sharedTables.clear()
}

/**
* A second [[SharedTablesInMemoryRowLevelOperationTableCatalog]] variant with an independent
* shared map. Tests that use this catalog must call
* [[SecondSharedTablesInMemoryRowLevelOperationTableCatalog.reset()]] in `afterEach`.
*/
class SecondSharedTablesInMemoryRowLevelOperationTableCatalog
extends InMemoryRowLevelOperationTableCatalog {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
super.initialize(name, options)
tables = SecondSharedTablesInMemoryRowLevelOperationTableCatalog.sharedTables
}
}

object SecondSharedTablesInMemoryRowLevelOperationTableCatalog {
private[catalog] val sharedTables: ConcurrentHashMap[Identifier, Table] =
new ConcurrentHashMap[Identifier, Table]()

def reset(): Unit = sharedTables.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -260,19 +259,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
// file source v2 does not support streaming yet.
classOf[FileDataSourceV2].isAssignableFrom(cls)

val optionsWithPath = if (path.isEmpty) {
extraOptions
} else {
extraOptions + ("path" -> path.get)
}
val optionsWithPath = DataSourceV2Utils.getOptionsWithPaths(extraOptions, path.toSeq: _*)

val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = ds.sparkSession.sessionState.conf)
val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val dsOptions = DataSourceV2Utils.buildDsOptions(
provider, ds.sparkSession.sessionState.conf, optionsWithPath)
// If the source accepts external table metadata, here we pass the schema of input query
// to `getTable`. This is for avoiding schema inference, which can be very expensive.
// If the query schema is not compatible with the existing data, the behavior is undefined.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.UUID
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import javax.annotation.concurrent.GuardedBy

import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
Expand All @@ -32,22 +31,21 @@ import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery, UnresolvedRelation, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, LazyExpression, NameParameterizedQuery, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, TransactionalWrite => TransactionalWritePlan, Union, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.transactions.TransactionUtils
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, LookupCatalog, SupportsCatalogOptions, TableCatalog, TransactionalCatalogPlugin}
import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ROOT_ID_KEY
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TransactionalExec, V2TableRefreshUtil}
import org.apache.spark.sql.execution.datasources.v2.{TransactionalExec, V2TableRefreshUtil}
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
Expand All @@ -56,7 +54,6 @@ import org.apache.spark.sql.execution.streaming.runtime.{IncrementalExecution, W
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.scripting.SqlScriptingExecution
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LazyTry, Utils, UUIDv7Generator}
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -119,16 +116,9 @@ class QueryExecution(
analyzerOpt.flatMap(_.catalogManager.transaction).orElse {
// Only begin a new transaction for outer QEs that lead to execution.
if (mode != CommandExecutionMode.SKIP) {
def resolve(w: TransactionalWritePlan): Option[TransactionalCatalogPlugin] =
pathBased(w) match {
case Some(c: TransactionalCatalogPlugin) => Some(c)
case Some(_) => None
// If the path is not data source based, fallback to catalog based resolution.
case None => TransactionalWrite.unapply(w)
}
val catalog = logical match {
case UnresolvedWith(w: TransactionalWritePlan, _, _) => resolve(w)
case w: TransactionalWritePlan => resolve(w)
case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c)
case TransactionalWrite(c) => Some(c)
case _ => None
}
catalog.map(TransactionUtils.beginTransaction)
Expand All @@ -139,34 +129,6 @@ class QueryExecution(
}
private def transactionOpt: Option[Transaction] = lazyTransactionOpt.get

// For path-based tables (e.g. `format.`/path/to/table``) the first identifier part is a
// connector name. SupportsCatalogOptions on the connector tells us which catalog actually
// owns the table. Returns Some(catalog) if parts.head is a recognized SupportsCatalogOptions
// data source (caller decides whether the catalog is transactional), or None to fall through
// to the catalog-based extractor.
private def pathBased(write: TransactionalWritePlan): Option[TableCatalog] =
EliminateSubqueryAliases(write.table) match {
case UnresolvedRelation(parts, _, _) if parts.length > 1 =>
try {
DataSource.lookupDataSourceV2(parts.head, sparkSession.sessionState.conf)
.collect { case sco: SupportsCatalogOptions => sco }
.map { sco =>
val sessionConfigs = DataSourceV2Utils.extractSessionConfigs(
sco, sparkSession.sessionState.conf)
// Pass the entire identifier as option. The connector can decide how to parse it
// if needed.
val options = sessionConfigs + ("identifier" -> parts.mkString("."))
CatalogV2Util.getTableProviderCatalog(
sco, catalogManager, new CaseInsensitiveStringMap(options.asJava))
}
} catch {
// The head of the multipart identifier is not a registered data source.
// Fallback to catalog-based detection.
case _: ClassNotFoundException => None
}
case _ => None
}

// Per-query analyzer: uses a transaction-aware CatalogManager when a transaction is active,
// so that all catalog lookups and rule applications during analysis see the correct state
// without relying on thread-local context. Any nested QueryExecution that is created during
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.TimeTravelSpec
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, DataSourceCatalogResolver, Identifier, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SQLExecution
Expand Down Expand Up @@ -99,6 +99,60 @@ private[sql] object DataSourceV2Utils extends Logging {
}
}

/**
* Builds the [[CaseInsensitiveStringMap]] passed to a v2 [[TableProvider]]: session configs
* extracted from the provider, merged with the caller-supplied options-with-path map.
*/
def buildDsOptions(
provider: TableProvider,
conf: SQLConf,
optionsWithPath: CaseInsensitiveMap[String]): CaseInsensitiveStringMap = {
val sessionOptions = extractSessionConfigs(provider, conf)
val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++
optionsWithPath.originalMap
new CaseInsensitiveStringMap(finalOptions.asJava)
}

/**
* Extracts the catalog name and connector-canonical identifier from a
* [[SupportsCatalogOptions]] provider. Shared by all SCO entry points (DataFrame reader, SQL
* multipart-name resolution) so they observe identical null-handling semantics:
* `extractCatalog` returning null falls back to the session catalog, matching
* [[CatalogV2Util.getTableProviderCatalog]].
*/
def extractCatalogAndIdentifier(
provider: SupportsCatalogOptions,
dsOptions: CaseInsensitiveStringMap): (String, Identifier) = {
val ident = provider.extractIdentifier(dsOptions)
val catalogName = Option(provider.extractCatalog(dsOptions))
.getOrElse(CatalogManager.SESSION_CATALOG_NAME)
(catalogName, ident)
}

/**
* Resolver bound to a session [[SQLConf]] that maps a multipart SQL identifier
* (e.g. `pathformat.\`/path/to/t\``) to a `(catalogName, identifier)` pair when the head
* names a registered [[SupportsCatalogOptions]] data source. Returns `None` for non-SCO
* sources or unknown format heads, letting the caller fall back to standard catalog
* resolution.
*/
def supportsCatalogOptionsResolver(conf: SQLConf): DataSourceCatalogResolver =
(nameParts: Seq[String]) =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we attempt to do this only if we have 2 name parts?
We are talking about SQL on file scenarios, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care about spark.sql.runSQLonFiles? See ResolveSQLOnFile.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we attempt to do this only if we have 2 name parts?
We are talking about SQL on file scenarios, right?

Yes these are SQL on file scenarios. The 2 name part sounds like a limitation of v1? IIUC in v2 dataframes we do not have such restriction. I guess the connector can decided how to handle it in extractCatalog/extractIdentifier?

Do we care about spark.sql.runSQLonFiles? See ResolveSQLOnFile

I added the sentinel.

try {
DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap {
case sco: SupportsCatalogOptions =>
val optionsWithPath = getOptionsWithPaths(
CaseInsensitiveMap(Map.empty), nameParts.tail: _*)
val dsOptions = buildDsOptions(sco, conf, optionsWithPath)
Some(extractCatalogAndIdentifier(sco, dsOptions))
case _ => None
}
} catch {
// The format name is not a registered data source. Fall through and let the caller
// treat it as a catalog/namespace name.
case _: ClassNotFoundException => None
}

def loadV2Source(
sparkSession: SparkSession,
provider: TableProvider,
Expand All @@ -108,23 +162,16 @@ private[sql] object DataSourceV2Utils extends Logging {
paths: String*): Option[LogicalPlan] = {
val catalogManager = sparkSession.sessionState.catalogManager
val conf = sparkSession.sessionState.conf
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(provider, conf)

val optionsWithPath = getOptionsWithPaths(extraOptions, paths: _*)

val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val dsOptions = buildDsOptions(provider, conf, optionsWithPath)
val (table, catalog, ident, timeTravelSpec) = provider match {
case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty =>
throw new IllegalArgumentException(
s"$source does not support user specified schema. Please don't specify the schema.")
case hasCatalog: SupportsCatalogOptions =>
val ident = hasCatalog.extractIdentifier(dsOptions)
val catalog = CatalogV2Util.getTableProviderCatalog(
hasCatalog,
catalogManager,
dsOptions)
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val (catalogName, ident) = extractCatalogAndIdentifier(hasCatalog, dsOptions)
val catalog = catalogManager.catalog(catalogName).asTableCatalog

val version = hasCatalog.extractTimeTravelVersion(dsOptions)
val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions)
Expand Down
Loading