Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,21 @@

from __future__ import annotations

import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING

import pytest

from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient

if TYPE_CHECKING:
from collections.abc import Callable

# The Java extract task sleeps 6 s + coordinator startup; allow plenty of room.
_JAVA_TASK_TIMEOUT = 600
# Logs can lag slightly behind the task reaching a terminal state.
_LOG_FETCH_TIMEOUT = 60


class TestJavaSDKAnnotationExample:
Expand Down Expand Up @@ -166,3 +173,71 @@ def test_load_retried_then_succeeded(self):
f"Java 'load' task should have run twice (fail then retry); "
f"try_number={load_ti.get('try_number')!r}, ti: {load_ti}"
)

def _wait_for_transform_log_record(
self, run_id: str, try_number: int, match: Callable[[dict], bool]
) -> tuple[dict | None, list[dict]]:
"""Poll the ``transform`` task logs until a record matching *match* appears.

Logs can lag behind the terminal task state, and earlier records (e.g. the
first transform line) arrive before the one under test, so returning on any
record would race. Keep polling until the target record shows up or the
deadline passes. Returns the matching record (or ``None``) and the last
batch of records seen for diagnostics.
"""
deadline = time.monotonic() + _LOG_FETCH_TIMEOUT
records: list[dict] = []
while True:
resp = self.airflow_client.get_task_logs(
dag_id="java_annotation_example", run_id=run_id, task_id="transform", try_number=try_number
)
records = [entry for entry in resp.get("content", []) if isinstance(entry, dict)]
record = next((r for r in records if match(r)), None)
if record is not None or time.monotonic() > deadline:
return record, records
time.sleep(3)

def test_application_logs_preserve_their_level(self):
"""A Java task's SLF4J ``logger.info`` must reach the UI as INFO, not ERROR.

Without the SDK's SLF4J binding the application's logs fall through to
stderr and the supervisor tags every line ERROR. The binding routes them
over the logs socket carrying the real level instead.
"""
resp = self.airflow_client.trigger_dag(
"java_annotation_example",
json={"logical_date": datetime.now(timezone.utc).isoformat()},
)
run_id = resp["dag_run_id"]
dag_state = self.airflow_client.wait_for_dag_run(
dag_id="java_annotation_example",
run_id=run_id,
timeout=_JAVA_TASK_TIMEOUT,
)

# The log under test is emitted only if transform actually ran; assert it
# succeeded and fetch the attempt that produced the logs (transform does
# not retry, but read try_number rather than assuming attempt 1).
ti_resp = self.airflow_client.get_task_instances(dag_id="java_annotation_example", run_id=run_id)
ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances", [])}
transform_ti = ti_map.get("transform", {})
assert transform_ti.get("state") == "success", (
f"Java 'transform' task must succeed to emit the log under test.\n"
f" task state : {transform_ti.get('state')!r}\n"
f" dag state : {dag_state!r}\n"
f" all tasks : { {k: v.get('state') for k, v in ti_map.items()} }"
)

# transform logs `logger.info("Got variable {}", variable)` -> "Got variable 123".
record, records = self._wait_for_transform_log_record(
run_id,
transform_ti.get("try_number", 1),
lambda r: str(r.get("event", "")).startswith("Got variable"),
)
assert record is not None, (
f"transform should emit a 'Got variable' INFO record; "
f"events seen: {[r.get('event') for r in records]}"
)
assert str(record.get("level", "")).lower() == "info", (
f"application INFO log should keep its level, got {record.get('level')!r}; record: {record}"
)
1 change: 0 additions & 1 deletion java-sdk/example/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ repositories {
dependencies {
annotationProcessor("org.apache.airflow:airflow-sdk-processor:${projectVersion}")
implementation("org.apache.airflow:airflow-sdk:${projectVersion}")
implementation("org.slf4j:slf4j-simple:2.0.17")
}

java {
Expand Down
4 changes: 4 additions & 0 deletions java-sdk/sdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ dependencies {
implementation("org.msgpack:msgpack-core:0.9.11")
implementation("org.msgpack:jackson-dataformat-msgpack:0.9.11")

// Exposed as api: the SDK ships the SLF4J binding, so consumers should not
// pull in (and conflict with) a different slf4j-api version.
api("org.slf4j:slf4j-api:2.0.17")

testImplementation(kotlin("test"))
testImplementation("com.squareup.okhttp3:mockwebserver:4.12.0")
}
Expand Down
23 changes: 14 additions & 9 deletions java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,25 @@ class Server(
*/
suspend fun serveAsync(bundle: Bundle) =
coroutineScope {
launch {
aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(comm).use { socket ->
logger.debug("Connected comm", mapOf("addr" to comm))
CoordinatorComm(
bundle,
socket.openReadChannel(),
socket.openWriteChannel(autoFlush = true),
).startProcessing()
val commJob =
launch {
aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(comm).use { socket ->
logger.debug("Connected comm", mapOf("addr" to comm))
CoordinatorComm(
bundle,
socket.openReadChannel(),
socket.openWriteChannel(autoFlush = true),
).startProcessing()
}
}
}
launch {
aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(logs).use { socket ->
logger.debug("Connected logs", mapOf("addr" to logs))
LogSender.configure(socket.openWriteChannel(autoFlush = true))
// Keep the logs socket open for the whole task run; otherwise `use`
// closes it the moment configure() returns and every task log is
// buffered and dropped when the process exits.
commJob.join()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,58 @@ import java.util.concurrent.ConcurrentLinkedDeque
import kotlin.reflect.KClass
import kotlin.time.Clock

enum class Level { ERROR, DEBUG, }
// wireName is the level string the Airflow supervisor understands (structlog's
// NAME_TO_LEVEL). It has no TRACE, so TRACE maps to "debug"; a level the

@uranusjr uranusjr Jun 18, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think TRACE should be mapped to NOTSET (0).

@uranusjr uranusjr Jun 18, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why do we need to reinvent level? This enum can simply map 1:1 to Python levels, and we just resolve SLF4J levels to Python levels directly.

// supervisor does not recognise is dropped silently on the Python side.
// severity mirrors Python's logging numeric values (DEBUG=10 ... ERROR=40) so a
// level can be compared against the configured threshold; TRACE sits below DEBUG.
enum class Level(
val wireName: String,
val severity: Int,
) {
TRACE("debug", 5),
DEBUG("debug", 10),
INFO("info", 20),
WARN("warning", 30),
ERROR("error", 40),
}

/**
* Resolves the effective task log level the JVM should emit.
*
* The supervisor reconfigures the task logger with `level_override=NOTSET`,
* delegating threshold filtering to the subprocess, so the Java SDK must drop
* sub-threshold events itself. The threshold comes from the `airflow.logging.level`
* system property (an explicit JVM flag wins) and otherwise the
* `AIRFLOW__LOGGING__LOGGING_LEVEL` env var the subprocess inherits from the
* supervisor; unset or unrecognised values fall back to INFO.
*/
internal object LogLevel {
const val LEVEL_PROPERTY = "airflow.logging.level"
const val LEVEL_ENV = "AIRFLOW__LOGGING__LOGGING_LEVEL"
Comment on lines +64 to +65

@jason810496 jason810496 Jun 18, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I felt we should provide the log_level in StartupDetails.

I prefer to respect the Airflow side logging level, which means user might setup custom secret backend for the conf. So we need to send the [logging/log_level] info from supervisor.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let’s do this. The Python implementation should probably also be changed to respect this log_level value (and fall back to various configurations).

Since this requires Python side changes, I would say for 3.3 let’s just default to INFO in SDKs and add support for Airflow configuration later.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, then I will go with this direction. This should solve Phani's review comment as well.


fun threshold(): Level = parse(configuredLevel()) ?: Level.INFO

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you test this scenario with this PR, I think it breaks.

In airflow.cfg
[logging]
logging_level = WARNING

Since it wont be in os.environ: no AIRFLOW__LOGGING__LOGGING_LEVEL set . Hence Java resolves to INFO.

Overall Result: Java WARNING logs will be silently dropped. A Python task in the same deployment with the same cfg would show WARNING logs.

I think it is identical config, divergent behavior by task language .


fun isEnabled(level: Level): Boolean = level.severity >= threshold().severity

fun parse(name: String?): Level? =
when (name?.trim()?.uppercase()) {
"TRACE" -> Level.TRACE
"DEBUG" -> Level.DEBUG
"INFO" -> Level.INFO
"WARN", "WARNING" -> Level.WARN
// The Java SDK has no CRITICAL/FATAL, so the strictest threshold it can honour is ERROR.
"ERROR", "CRITICAL", "FATAL" -> Level.ERROR
else -> null
}

private fun configuredLevel(): String? = System.getProperty(LEVEL_PROPERTY) ?: System.getenv(LEVEL_ENV)
}

internal data class LogMessage(
val event: String,
val arguments: Map<String, Any>,
val logger: Logger,
val loggerName: String,
val level: Level,
val timestamp: LocalDateTime = Clock.System.now().toLocalDateTime(TimeZone.currentSystemDefault()),
)
Expand All @@ -49,9 +95,7 @@ internal class Logger(
) {
val name: String? = cls.java.typeName

// TODO: Actually implement level filtering.
@Suppress("UNUSED_PARAMETER")
fun isEnabledForLevel(level: Level): Boolean = true
fun isEnabledForLevel(level: Level): Boolean = LogLevel.isEnabled(level)

fun debug(
message: String,
Expand All @@ -73,7 +117,7 @@ internal class Logger(
arguments: Map<String, Any>,
) {
if (!isEnabledForLevel(level)) return
LogSender.send(LogMessage(event, arguments, this, level))
LogSender.send(LogMessage(event, arguments, name ?: "(java)", level))
}
}

Expand All @@ -99,17 +143,21 @@ internal object LogSender {
}
}

internal fun encode(message: LogMessage): String {
val map = message.arguments.toMutableMap()
map["event"] = message.event
map["level"] = message.level.wireName
map["logger"] = message.loggerName
map["timestamp"] = message.timestamp
return "${map.toJsonElement()}\n"
}

private fun sendTo(
writer: ByteWriteChannel,
message: LogMessage,
) {
val map = message.arguments.toMutableMap()
map["event"] = message.event
map["level"] = message.level.name.lowercase()
map["logger"] = message.logger.name ?: "(java)"
map["timestamp"] = message.timestamp
// TODO: Can this be done asynchronously instead?
runBlocking { writer.writeString("${map.toJsonElement()}\n") }
runBlocking { writer.writeString(encode(message)) }
}
}

Expand Down
121 changes: 121 additions & 0 deletions java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Slf4j.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.airflow.sdk.execution

import org.slf4j.ILoggerFactory
import org.slf4j.IMarkerFactory
import org.slf4j.Marker
import org.slf4j.helpers.AbstractLogger
import org.slf4j.helpers.BasicMDCAdapter
import org.slf4j.helpers.BasicMarkerFactory
import org.slf4j.helpers.MessageFormatter
import org.slf4j.spi.MDCAdapter
import org.slf4j.spi.SLF4JServiceProvider
import java.util.concurrent.ConcurrentHashMap
import org.slf4j.Logger as Slf4jLogger
import org.slf4j.event.Level as Slf4jLevel

/**
* SLF4J binding that streams task logs through the Airflow logs socket.
*
* Without a binding, an application's SLF4J calls fall back to whatever
* provider is on the classpath (e.g. slf4j-simple writing to stderr), and the
* supervisor then tags every stderr line as ERROR regardless of the original
* level. Routing logs through [LogSender] instead carries each message's real
* level on the wire, so the UI shows INFO/WARN/etc. as logged.
*/
class AirflowSlf4jServiceProvider : SLF4JServiceProvider {
private val loggerFactory = AirflowLoggerFactory()
private val markerFactory: IMarkerFactory = BasicMarkerFactory()
private val mdcAdapter: MDCAdapter = BasicMDCAdapter()

override fun getLoggerFactory(): ILoggerFactory = loggerFactory

override fun getMarkerFactory(): IMarkerFactory = markerFactory

override fun getMDCAdapter(): MDCAdapter = mdcAdapter

override fun getRequestedApiVersion(): String = "2.0.99"

override fun initialize() {}
}

internal class AirflowLoggerFactory : ILoggerFactory {
private val loggers = ConcurrentHashMap<String, Slf4jLogger>()

override fun getLogger(name: String): Slf4jLogger = loggers.computeIfAbsent(name, ::AirflowSlf4jLogger)
}

internal class AirflowSlf4jLogger(
private val loggerName: String,
) : AbstractLogger() {
init {
name = loggerName
}

// The supervisor delegates threshold filtering to the subprocess (it sets
// level_override=NOTSET), so drop sub-threshold events here rather than forward
// everything. See [LogLevel].
override fun isTraceEnabled(): Boolean = LogLevel.isEnabled(Level.TRACE)

override fun isTraceEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.TRACE)

override fun isDebugEnabled(): Boolean = LogLevel.isEnabled(Level.DEBUG)

override fun isDebugEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.DEBUG)

override fun isInfoEnabled(): Boolean = LogLevel.isEnabled(Level.INFO)

override fun isInfoEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.INFO)

override fun isWarnEnabled(): Boolean = LogLevel.isEnabled(Level.WARN)

override fun isWarnEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.WARN)

override fun isErrorEnabled(): Boolean = LogLevel.isEnabled(Level.ERROR)

override fun isErrorEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.ERROR)

override fun getFullyQualifiedCallerName(): String? = null

override fun handleNormalizedLoggingCall(
level: Slf4jLevel,
marker: Marker?,
messagePattern: String?,
arguments: Array<out Any?>?,
throwable: Throwable?,
) {
val airflowLevel = level.toAirflowLevel()
if (!LogLevel.isEnabled(airflowLevel)) return
@Suppress("UNCHECKED_CAST")
val message = MessageFormatter.basicArrayFormat(messagePattern, arguments as Array<Any?>?)
val extra = throwable?.let { mapOf<String, Any>("error_detail" to it.stackTraceToString()) } ?: emptyMap()
LogSender.send(LogMessage(message ?: "", extra, loggerName, airflowLevel))
}
}

internal fun Slf4jLevel.toAirflowLevel(): Level =
when (this) {
Slf4jLevel.TRACE -> Level.TRACE
Slf4jLevel.DEBUG -> Level.DEBUG
Slf4jLevel.INFO -> Level.INFO
Slf4jLevel.WARN -> Level.WARN
Slf4jLevel.ERROR -> Level.ERROR
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
org.apache.airflow.sdk.execution.AirflowSlf4jServiceProvider
Loading
Loading