Skip to content

[DATAFLINT-4359] Add executor metadata collection plugin#62

Open
minskya wants to merge 8 commits intomainfrom
executor_plugin
Open

[DATAFLINT-4359] Add executor metadata collection plugin#62
minskya wants to merge 8 commits intomainfrom
executor_plugin

Conversation

@minskya
Copy link
Copy Markdown
Contributor

@minskya minskya commented Apr 19, 2026

Summary

Adds executor metadata collection to the DataFlint Spark plugin, enabling automatic detection of cloud instance type, spot/on-demand lifecycle, and system basics from each executor.

  • ExecutorPlugin runs on each executor at startup, collects metadata, and sends it to the driver via Spark's plugin RPC (PluginContext.send() / DriverPlugin.receive())
  • Cloud provider detection reads /sys/class/dmi/id/sys_vendor locally to identify AWS/GCP/Azure — no blind HTTP probing
  • Cloud metadata fetched via bash one-liners (curl to cloud metadata APIs) only for the detected provider
  • System basics always collected via JVM APIs: OS name/arch, JVM version, CPU cores, total memory
  • Event pipeline follows existing DataFlint pattern: event → listener → KVStore → export
  • Opt-in via spark.dataflint.executor.metadata.enabled=true (default: false)

Changed files

File Change
executor/CloudMetadataDetector.scala New — Detects cloud provider from /sys/class/dmi/id/sys_vendor, then runs provider-specific bash to get instance type + lifecycle
executor/DataflintExecutorPlugin.scala New — ExecutorPlugin that collects system basics + cloud metadata, sends to driver
executor/ExecutorMetadataMessage.scala New — Serializable RPC message for executor→driver communication
executor/DriverMetadataHelper.scala New — Helper to access private[spark] APIs (config, listener bus) from io.dataflint.spark package
listener/model.scala Added DataflintExecutorMetadataInfo, DataflintExecutorMetadataEvent, DataflintExecutorMetadataWrapper
listener/DataflintListener.scala Added case for DataflintExecutorMetadataEvent in onOtherEvent()
listener/DataflintStore.scala Added executorMetadata() query method
saas/SparkRunStore.scala Added dataflintExecutorMetadata field to export data model
saas/StoreDataExtractor.scala Added readAll[DataflintExecutorMetadataWrapper] to extraction
SparkDataflintPlugin.scala (spark3 + spark4) Returns DataflintExecutorPlugin, passes config via extraConf, adds receive() handler

Architecture

sequenceDiagram
    participant Driver as Driver (SparkDataflintDriverPlugin)
    participant Executor as Executor (DataflintExecutorPlugin)
    participant Cloud as Cloud Metadata API

    Driver->>Executor: init(extraConf: {executor.metadata.enabled: true})
    Executor->>Executor: Collect system basics (OS, JVM, cores, memory)
    Executor->>Executor: Read /sys/class/dmi/id/sys_vendor
    alt AWS detected
        Executor->>Cloud: curl EC2 IMDS (instance-type, instance-life-cycle)
    else GCP detected
        Executor->>Cloud: curl GCP metadata (machine-type, preemptible)
    else Azure detected
        Executor->>Cloud: curl Azure IMDS (vmSize, priority)
    else No cloud
        Executor->>Executor: Skip cloud metadata
    end
    Executor->>Driver: PluginContext.send(ExecutorMetadataMessage)
    Driver->>Driver: receive() → post DataflintExecutorMetadataEvent
    Driver->>Driver: DataflintListener → KVStore
    Driver->>Driver: StoreDataExtractor → SparkRunStore → S3 export
Loading

Cloud Detection Flow

flowchart TD
    A[Read /sys/class/dmi/id/sys_vendor] --> B{Vendor?}
    B -->|Contains 'Amazon'| C[Run AWS metadata command]
    B -->|Contains 'Google'| D[Run GCP metadata command]
    B -->|Contains 'Microsoft'| E[Run Azure metadata command]
    B -->|File missing / unknown| F[Return None — system basics only]
    C --> G[instanceType + lifecycleType]
    D --> G
    E --> G
Loading

Collected Metadata

Field Source Always collected?
executorId Spark PluginContext Yes
executorHost InetAddress.getLocalHost Yes
osName System.getProperty("os.name") Yes
osArch System.getProperty("os.arch") Yes
jvmVersion System.getProperty("java.version") Yes
availableProcessors Runtime.availableProcessors() Yes
totalMemoryBytes Runtime.maxMemory() Yes
cloudProvider /sys/class/dmi/id/sys_vendor Only on cloud VMs
instanceType Cloud metadata API Only on cloud VMs
lifecycleType Cloud metadata API Only on cloud VMs
collectionError Error message if detection fails Only on failure

Test plan

  • sbt pluginspark3/compile passes
  • sbt pluginspark4/compile passes
  • sbt pluginspark3/test — all 43 existing tests pass
  • Manual test on AWS EC2 cluster with spark.dataflint.executor.metadata.enabled=true
  • Verify executor metadata appears in exported JSON (dataflintExecutorMetadata field)
  • Verify no impact when feature is disabled (default)
  • Verify graceful degradation on non-cloud machines (system basics still collected)

🤖 Generated with Claude Code

minskya and others added 2 commits April 19, 2026 14:09
Implement an ExecutorPlugin that collects machine metadata from each executor
and reports it to the driver. When enabled via spark.dataflint.executor.metadata.enabled,
each executor detects its cloud provider by reading /sys/class/dmi/id/sys_vendor,
then fetches instance type and spot/on-demand status from the cloud metadata API.
System basics (OS, JVM, CPU cores, memory) are always collected.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@minskya minskya changed the title Add executor metadata collection plugin [DATAFLINT-4414] Add executor metadata collection plugin Apr 20, 2026
@notion-workspace
Copy link
Copy Markdown

@notion-workspace
Copy link
Copy Markdown

@minskya minskya changed the title [DATAFLINT-4414] Add executor metadata collection plugin [DATAFLINT-4359] Add executor metadata collection plugin Apr 20, 2026
minskya and others added 6 commits April 20, 2026 16:02
Return null from DriverPlugin.receive() since PluginContext.send() is
fire-and-forget — returning a string caused a spurious warning.

Relax broadcast join test assertions to duration >= 0 because the
codegen sleep is only in doProduce path, not doExecute which broadcast
joins use.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
`spark.dataflint.experimental.executor.metadata.enabled`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant