From 12acb1c7f50cd1a61aac445223439bf76fdfb3b0 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 18 Jun 2026 13:21:32 +0900 Subject: [PATCH 1/2] Fix Java-SDK logging level --- .../java_sdk_tests/test_java_sdk_dag.py | 75 ++++++++ java-sdk/example/build.gradle | 1 - java-sdk/sdk/build.gradle.kts | 4 + .../apache/airflow/sdk/execution/Logger.kt | 72 ++++++-- .../org/apache/airflow/sdk/execution/Slf4j.kt | 121 +++++++++++++ .../org.slf4j.spi.SLF4JServiceProvider | 18 ++ .../apache/airflow/sdk/execution/Slf4jTest.kt | 171 ++++++++++++++++++ 7 files changed, 449 insertions(+), 13 deletions(-) create mode 100644 java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Slf4j.kt create mode 100644 java-sdk/sdk/src/main/resources/META-INF/services/org.slf4j.spi.SLF4JServiceProvider create mode 100644 java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/Slf4jTest.kt diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py index b1e04fb2dde5f..709987cb2c744 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py @@ -54,14 +54,21 @@ from __future__ import annotations +import time from datetime import datetime, timezone +from typing import TYPE_CHECKING import pytest from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient +if TYPE_CHECKING: + from collections.abc import Callable + # The Java extract task sleeps 6 s + coordinator startup; allow plenty of room. _JAVA_TASK_TIMEOUT = 600 +# Logs can lag slightly behind the task reaching a terminal state. +_LOG_FETCH_TIMEOUT = 60 class TestJavaSDKAnnotationExample: @@ -166,3 +173,71 @@ def test_load_retried_then_succeeded(self): f"Java 'load' task should have run twice (fail then retry); " f"try_number={load_ti.get('try_number')!r}, ti: {load_ti}" ) + + def _wait_for_transform_log_record( + self, run_id: str, try_number: int, match: Callable[[dict], bool] + ) -> tuple[dict | None, list[dict]]: + """Poll the ``transform`` task logs until a record matching *match* appears. + + Logs can lag behind the terminal task state, and earlier records (e.g. the + first transform line) arrive before the one under test, so returning on any + record would race. Keep polling until the target record shows up or the + deadline passes. Returns the matching record (or ``None``) and the last + batch of records seen for diagnostics. + """ + deadline = time.monotonic() + _LOG_FETCH_TIMEOUT + records: list[dict] = [] + while True: + resp = self.airflow_client.get_task_logs( + dag_id="java_annotation_example", run_id=run_id, task_id="transform", try_number=try_number + ) + records = [entry for entry in resp.get("content", []) if isinstance(entry, dict)] + record = next((r for r in records if match(r)), None) + if record is not None or time.monotonic() > deadline: + return record, records + time.sleep(3) + + def test_application_logs_preserve_their_level(self): + """A Java task's SLF4J ``logger.info`` must reach the UI as INFO, not ERROR. + + Without the SDK's SLF4J binding the application's logs fall through to + stderr and the supervisor tags every line ERROR. The binding routes them + over the logs socket carrying the real level instead. + """ + resp = self.airflow_client.trigger_dag( + "java_annotation_example", + json={"logical_date": datetime.now(timezone.utc).isoformat()}, + ) + run_id = resp["dag_run_id"] + dag_state = self.airflow_client.wait_for_dag_run( + dag_id="java_annotation_example", + run_id=run_id, + timeout=_JAVA_TASK_TIMEOUT, + ) + + # The log under test is emitted only if transform actually ran; assert it + # succeeded and fetch the attempt that produced the logs (transform does + # not retry, but read try_number rather than assuming attempt 1). + ti_resp = self.airflow_client.get_task_instances(dag_id="java_annotation_example", run_id=run_id) + ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances", [])} + transform_ti = ti_map.get("transform", {}) + assert transform_ti.get("state") == "success", ( + f"Java 'transform' task must succeed to emit the log under test.\n" + f" task state : {transform_ti.get('state')!r}\n" + f" dag state : {dag_state!r}\n" + f" all tasks : { {k: v.get('state') for k, v in ti_map.items()} }" + ) + + # transform logs `logger.info("Got variable {}", variable)` -> "Got variable 123". + record, records = self._wait_for_transform_log_record( + run_id, + transform_ti.get("try_number", 1), + lambda r: str(r.get("event", "")).startswith("Got variable"), + ) + assert record is not None, ( + f"transform should emit a 'Got variable' INFO record; " + f"events seen: {[r.get('event') for r in records]}" + ) + assert str(record.get("level", "")).lower() == "info", ( + f"application INFO log should keep its level, got {record.get('level')!r}; record: {record}" + ) diff --git a/java-sdk/example/build.gradle b/java-sdk/example/build.gradle index 497ea8f5103a6..a2d1845264a9f 100644 --- a/java-sdk/example/build.gradle +++ b/java-sdk/example/build.gradle @@ -29,7 +29,6 @@ repositories { dependencies { annotationProcessor("org.apache.airflow:airflow-sdk-processor:${projectVersion}") implementation("org.apache.airflow:airflow-sdk:${projectVersion}") - implementation("org.slf4j:slf4j-simple:2.0.17") } java { diff --git a/java-sdk/sdk/build.gradle.kts b/java-sdk/sdk/build.gradle.kts index 216ef76513123..e5bc104589e0d 100644 --- a/java-sdk/sdk/build.gradle.kts +++ b/java-sdk/sdk/build.gradle.kts @@ -52,6 +52,10 @@ dependencies { implementation("org.msgpack:msgpack-core:0.9.11") implementation("org.msgpack:jackson-dataformat-msgpack:0.9.11") + // Exposed as api: the SDK ships the SLF4J binding, so consumers should not + // pull in (and conflict with) a different slf4j-api version. + api("org.slf4j:slf4j-api:2.0.17") + testImplementation(kotlin("test")) testImplementation("com.squareup.okhttp3:mockwebserver:4.12.0") } diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt index 27005f92082cb..faf25f6816780 100644 --- a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt +++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt @@ -34,12 +34,58 @@ import java.util.concurrent.ConcurrentLinkedDeque import kotlin.reflect.KClass import kotlin.time.Clock -enum class Level { ERROR, DEBUG, } +// wireName is the level string the Airflow supervisor understands (structlog's +// NAME_TO_LEVEL). It has no TRACE, so TRACE maps to "debug"; a level the +// supervisor does not recognise is dropped silently on the Python side. +// severity mirrors Python's logging numeric values (DEBUG=10 ... ERROR=40) so a +// level can be compared against the configured threshold; TRACE sits below DEBUG. +enum class Level( + val wireName: String, + val severity: Int, +) { + TRACE("debug", 5), + DEBUG("debug", 10), + INFO("info", 20), + WARN("warning", 30), + ERROR("error", 40), +} + +/** + * Resolves the effective task log level the JVM should emit. + * + * The supervisor reconfigures the task logger with `level_override=NOTSET`, + * delegating threshold filtering to the subprocess, so the Java SDK must drop + * sub-threshold events itself. The threshold comes from the `airflow.logging.level` + * system property (an explicit JVM flag wins) and otherwise the + * `AIRFLOW__LOGGING__LOGGING_LEVEL` env var the subprocess inherits from the + * supervisor; unset or unrecognised values fall back to INFO. + */ +internal object LogLevel { + const val LEVEL_PROPERTY = "airflow.logging.level" + const val LEVEL_ENV = "AIRFLOW__LOGGING__LOGGING_LEVEL" + + fun threshold(): Level = parse(configuredLevel()) ?: Level.INFO + + fun isEnabled(level: Level): Boolean = level.severity >= threshold().severity + + fun parse(name: String?): Level? = + when (name?.trim()?.uppercase()) { + "TRACE" -> Level.TRACE + "DEBUG" -> Level.DEBUG + "INFO" -> Level.INFO + "WARN", "WARNING" -> Level.WARN + // The Java SDK has no CRITICAL/FATAL, so the strictest threshold it can honour is ERROR. + "ERROR", "CRITICAL", "FATAL" -> Level.ERROR + else -> null + } + + private fun configuredLevel(): String? = System.getProperty(LEVEL_PROPERTY) ?: System.getenv(LEVEL_ENV) +} internal data class LogMessage( val event: String, val arguments: Map, - val logger: Logger, + val loggerName: String, val level: Level, val timestamp: LocalDateTime = Clock.System.now().toLocalDateTime(TimeZone.currentSystemDefault()), ) @@ -49,9 +95,7 @@ internal class Logger( ) { val name: String? = cls.java.typeName - // TODO: Actually implement level filtering. - @Suppress("UNUSED_PARAMETER") - fun isEnabledForLevel(level: Level): Boolean = true + fun isEnabledForLevel(level: Level): Boolean = LogLevel.isEnabled(level) fun debug( message: String, @@ -73,7 +117,7 @@ internal class Logger( arguments: Map, ) { if (!isEnabledForLevel(level)) return - LogSender.send(LogMessage(event, arguments, this, level)) + LogSender.send(LogMessage(event, arguments, name ?: "(java)", level)) } } @@ -99,17 +143,21 @@ internal object LogSender { } } + internal fun encode(message: LogMessage): String { + val map = message.arguments.toMutableMap() + map["event"] = message.event + map["level"] = message.level.wireName + map["logger"] = message.loggerName + map["timestamp"] = message.timestamp + return "${map.toJsonElement()}\n" + } + private fun sendTo( writer: ByteWriteChannel, message: LogMessage, ) { - val map = message.arguments.toMutableMap() - map["event"] = message.event - map["level"] = message.level.name.lowercase() - map["logger"] = message.logger.name ?: "(java)" - map["timestamp"] = message.timestamp // TODO: Can this be done asynchronously instead? - runBlocking { writer.writeString("${map.toJsonElement()}\n") } + runBlocking { writer.writeString(encode(message)) } } } diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Slf4j.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Slf4j.kt new file mode 100644 index 0000000000000..2bd35d8855791 --- /dev/null +++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Slf4j.kt @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.execution + +import org.slf4j.ILoggerFactory +import org.slf4j.IMarkerFactory +import org.slf4j.Marker +import org.slf4j.helpers.AbstractLogger +import org.slf4j.helpers.BasicMDCAdapter +import org.slf4j.helpers.BasicMarkerFactory +import org.slf4j.helpers.MessageFormatter +import org.slf4j.spi.MDCAdapter +import org.slf4j.spi.SLF4JServiceProvider +import java.util.concurrent.ConcurrentHashMap +import org.slf4j.Logger as Slf4jLogger +import org.slf4j.event.Level as Slf4jLevel + +/** + * SLF4J binding that streams task logs through the Airflow logs socket. + * + * Without a binding, an application's SLF4J calls fall back to whatever + * provider is on the classpath (e.g. slf4j-simple writing to stderr), and the + * supervisor then tags every stderr line as ERROR regardless of the original + * level. Routing logs through [LogSender] instead carries each message's real + * level on the wire, so the UI shows INFO/WARN/etc. as logged. + */ +class AirflowSlf4jServiceProvider : SLF4JServiceProvider { + private val loggerFactory = AirflowLoggerFactory() + private val markerFactory: IMarkerFactory = BasicMarkerFactory() + private val mdcAdapter: MDCAdapter = BasicMDCAdapter() + + override fun getLoggerFactory(): ILoggerFactory = loggerFactory + + override fun getMarkerFactory(): IMarkerFactory = markerFactory + + override fun getMDCAdapter(): MDCAdapter = mdcAdapter + + override fun getRequestedApiVersion(): String = "2.0.99" + + override fun initialize() {} +} + +internal class AirflowLoggerFactory : ILoggerFactory { + private val loggers = ConcurrentHashMap() + + override fun getLogger(name: String): Slf4jLogger = loggers.computeIfAbsent(name, ::AirflowSlf4jLogger) +} + +internal class AirflowSlf4jLogger( + private val loggerName: String, +) : AbstractLogger() { + init { + name = loggerName + } + + // The supervisor delegates threshold filtering to the subprocess (it sets + // level_override=NOTSET), so drop sub-threshold events here rather than forward + // everything. See [LogLevel]. + override fun isTraceEnabled(): Boolean = LogLevel.isEnabled(Level.TRACE) + + override fun isTraceEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.TRACE) + + override fun isDebugEnabled(): Boolean = LogLevel.isEnabled(Level.DEBUG) + + override fun isDebugEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.DEBUG) + + override fun isInfoEnabled(): Boolean = LogLevel.isEnabled(Level.INFO) + + override fun isInfoEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.INFO) + + override fun isWarnEnabled(): Boolean = LogLevel.isEnabled(Level.WARN) + + override fun isWarnEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.WARN) + + override fun isErrorEnabled(): Boolean = LogLevel.isEnabled(Level.ERROR) + + override fun isErrorEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.ERROR) + + override fun getFullyQualifiedCallerName(): String? = null + + override fun handleNormalizedLoggingCall( + level: Slf4jLevel, + marker: Marker?, + messagePattern: String?, + arguments: Array?, + throwable: Throwable?, + ) { + val airflowLevel = level.toAirflowLevel() + if (!LogLevel.isEnabled(airflowLevel)) return + @Suppress("UNCHECKED_CAST") + val message = MessageFormatter.basicArrayFormat(messagePattern, arguments as Array?) + val extra = throwable?.let { mapOf("error_detail" to it.stackTraceToString()) } ?: emptyMap() + LogSender.send(LogMessage(message ?: "", extra, loggerName, airflowLevel)) + } +} + +internal fun Slf4jLevel.toAirflowLevel(): Level = + when (this) { + Slf4jLevel.TRACE -> Level.TRACE + Slf4jLevel.DEBUG -> Level.DEBUG + Slf4jLevel.INFO -> Level.INFO + Slf4jLevel.WARN -> Level.WARN + Slf4jLevel.ERROR -> Level.ERROR + } diff --git a/java-sdk/sdk/src/main/resources/META-INF/services/org.slf4j.spi.SLF4JServiceProvider b/java-sdk/sdk/src/main/resources/META-INF/services/org.slf4j.spi.SLF4JServiceProvider new file mode 100644 index 0000000000000..da5739ff23754 --- /dev/null +++ b/java-sdk/sdk/src/main/resources/META-INF/services/org.slf4j.spi.SLF4JServiceProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +org.apache.airflow.sdk.execution.AirflowSlf4jServiceProvider diff --git a/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/Slf4jTest.kt b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/Slf4jTest.kt new file mode 100644 index 0000000000000..2aeef39d47d04 --- /dev/null +++ b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/Slf4jTest.kt @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.execution + +import io.ktor.utils.io.ByteChannel +import io.ktor.utils.io.readUTF8Line +import kotlinx.coroutines.runBlocking +import kotlinx.datetime.LocalDateTime +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assumptions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.slf4j.Logger as Slf4jLogger +import org.slf4j.event.Level as Slf4jLevel + +class Slf4jTest { + // Default threshold is INFO, which would suppress the trace/debug cases the + // mapping and formatting tests exercise; pin TRACE so every level is emitted. + // Suppression tests override this property explicitly. + @BeforeEach + fun emitAllLevels() { + System.setProperty(LogLevel.LEVEL_PROPERTY, "TRACE") + } + + @AfterEach + fun clearLevelProperty() { + System.clearProperty(LogLevel.LEVEL_PROPERTY) + } + + private fun captureOrNull(block: (Slf4jLogger) -> Unit): String? { + val channel = ByteChannel(autoFlush = true) + LogSender.configure(channel) + block(AirflowLoggerFactory().getLogger("my.logger")) + return runBlocking { + channel.flushAndClose() + channel.readUTF8Line() + } + } + + private fun logAndCapture(block: (Slf4jLogger) -> Unit): JsonObject { + val line = captureOrNull(block) + Assertions.assertNotNull(line) + return Json.parseToJsonElement(line!!).jsonObject + } + + @Test + @DisplayName("Should map every SLF4J level to its Airflow wire name") + fun shouldMapSlf4jLevelsToWireNames() { + val cases = + listOf Unit, String>>( + { l: Slf4jLogger -> l.trace("m") } to "debug", + { l: Slf4jLogger -> l.debug("m") } to "debug", + { l: Slf4jLogger -> l.info("m") } to "info", + { l: Slf4jLogger -> l.warn("m") } to "warning", + { l: Slf4jLogger -> l.error("m") } to "error", + ) + for ((act, expected) in cases) { + Assertions.assertEquals(expected, logAndCapture(act)["level"]!!.jsonPrimitive.content) + } + } + + @Test + @DisplayName("Should format placeholders and carry the logger name") + fun shouldFormatPlaceholdersAndCarryLoggerName() { + val obj = logAndCapture { it.info("Got XCom from '{}' {}", "extract", 42) } + Assertions.assertEquals("info", obj["level"]!!.jsonPrimitive.content) + Assertions.assertEquals("my.logger", obj["logger"]!!.jsonPrimitive.content) + Assertions.assertEquals("Got XCom from 'extract' 42", obj["event"]!!.jsonPrimitive.content) + } + + @Test + @DisplayName("Should attach the stack trace of a logged throwable") + fun shouldAttachThrowableStackTrace() { + val obj = logAndCapture { it.error("boom", RuntimeException("kaboom")) } + Assertions.assertEquals("error", obj["level"]!!.jsonPrimitive.content) + Assertions.assertTrue(obj["error_detail"]!!.jsonPrimitive.content.contains("kaboom")) + } + + @Test + @DisplayName("Should encode a log message into the supervisor's JSON shape") + fun shouldEncodeLogMessage() { + val message = + LogMessage( + event = "hello", + arguments = mapOf("k" to "v"), + loggerName = "the.logger", + level = Level.WARN, + timestamp = LocalDateTime(2024, 1, 1, 0, 0, 0), + ) + val obj = Json.parseToJsonElement(LogSender.encode(message)).jsonObject + Assertions.assertEquals("warning", obj["level"]!!.jsonPrimitive.content) + Assertions.assertEquals("the.logger", obj["logger"]!!.jsonPrimitive.content) + Assertions.assertEquals("hello", obj["event"]!!.jsonPrimitive.content) + Assertions.assertEquals("v", obj["k"]!!.jsonPrimitive.content) + } + + @Test + @DisplayName("Should translate SLF4J levels to Airflow levels") + fun shouldTranslateSlf4jLevelEnum() { + Assertions.assertEquals(Level.TRACE, Slf4jLevel.TRACE.toAirflowLevel()) + Assertions.assertEquals(Level.DEBUG, Slf4jLevel.DEBUG.toAirflowLevel()) + Assertions.assertEquals(Level.INFO, Slf4jLevel.INFO.toAirflowLevel()) + Assertions.assertEquals(Level.WARN, Slf4jLevel.WARN.toAirflowLevel()) + Assertions.assertEquals(Level.ERROR, Slf4jLevel.ERROR.toAirflowLevel()) + } + + @Test + @DisplayName("Should suppress events below the configured threshold") + fun shouldSuppressBelowThreshold() { + System.setProperty(LogLevel.LEVEL_PROPERTY, "WARN") + Assertions.assertNull(captureOrNull { it.trace("nope") }) + Assertions.assertNull(captureOrNull { it.debug("nope") }) + Assertions.assertNull(captureOrNull { it.info("nope") }) + Assertions.assertNotNull(captureOrNull { it.warn("yes") }) + Assertions.assertNotNull(captureOrNull { it.error("yes") }) + } + + @Test + @DisplayName("Should report enabled levels matching the threshold") + fun shouldReportEnabledLevelsForThreshold() { + System.setProperty(LogLevel.LEVEL_PROPERTY, "INFO") + val logger = AirflowLoggerFactory().getLogger("my.logger") + Assertions.assertFalse(logger.isTraceEnabled) + Assertions.assertFalse(logger.isDebugEnabled) + Assertions.assertTrue(logger.isInfoEnabled) + Assertions.assertTrue(logger.isWarnEnabled) + Assertions.assertTrue(logger.isErrorEnabled) + } + + @Test + @DisplayName("Should parse level names, aliases, and reject unknown values") + fun shouldParseLevelNames() { + Assertions.assertEquals(Level.DEBUG, LogLevel.parse("debug")) + Assertions.assertEquals(Level.WARN, LogLevel.parse("WARNING")) + Assertions.assertEquals(Level.WARN, LogLevel.parse(" warn ")) + Assertions.assertEquals(Level.ERROR, LogLevel.parse("CRITICAL")) + Assertions.assertNull(LogLevel.parse("bogus")) + Assertions.assertNull(LogLevel.parse(null)) + } + + @Test + @DisplayName("Should default to INFO when no level is configured") + fun shouldDefaultToInfoWhenUnconfigured() { + Assumptions.assumeTrue(System.getenv(LogLevel.LEVEL_ENV) == null) + System.clearProperty(LogLevel.LEVEL_PROPERTY) + Assertions.assertEquals(Level.INFO, LogLevel.threshold()) + } +} From c4e9487d5f8006daafd1d24fa7bf5adc10e478d3 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 18 Jun 2026 15:31:05 +0900 Subject: [PATCH 2/2] Fix Java SDK dropping task logs emitted during execution The logs socket was wrapped in use{}, which closed it as soon as LogSender.configure() returned. Logs produced later while the task ran then hit a closed channel, were buffered, and were lost at process exit, so the UI showed no application logs for Java tasks. Keep the socket open until the comm job completes so they are flushed. --- .../kotlin/org/apache/airflow/sdk/Server.kt | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt index 2bab74bf23d2b..23ba0fb450c3d 100644 --- a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt +++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt @@ -139,20 +139,25 @@ class Server( */ suspend fun serveAsync(bundle: Bundle) = coroutineScope { - launch { - aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(comm).use { socket -> - logger.debug("Connected comm", mapOf("addr" to comm)) - CoordinatorComm( - bundle, - socket.openReadChannel(), - socket.openWriteChannel(autoFlush = true), - ).startProcessing() + val commJob = + launch { + aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(comm).use { socket -> + logger.debug("Connected comm", mapOf("addr" to comm)) + CoordinatorComm( + bundle, + socket.openReadChannel(), + socket.openWriteChannel(autoFlush = true), + ).startProcessing() + } } - } launch { aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(logs).use { socket -> logger.debug("Connected logs", mapOf("addr" to logs)) LogSender.configure(socket.openWriteChannel(autoFlush = true)) + // Keep the logs socket open for the whole task run; otherwise `use` + // closes it the moment configure() returns and every task log is + // buffered and dropped when the process exits. + commJob.join() } } }