-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Fix Java-SDK logging level #68696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Java-SDK logging level #68696
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,12 +34,58 @@ import java.util.concurrent.ConcurrentLinkedDeque | |
| import kotlin.reflect.KClass | ||
| import kotlin.time.Clock | ||
|
|
||
| enum class Level { ERROR, DEBUG, } | ||
| // wireName is the level string the Airflow supervisor understands (structlog's | ||
| // NAME_TO_LEVEL). It has no TRACE, so TRACE maps to "debug"; a level the | ||
| // supervisor does not recognise is dropped silently on the Python side. | ||
| // severity mirrors Python's logging numeric values (DEBUG=10 ... ERROR=40) so a | ||
| // level can be compared against the configured threshold; TRACE sits below DEBUG. | ||
| enum class Level( | ||
| val wireName: String, | ||
| val severity: Int, | ||
| ) { | ||
| TRACE("debug", 5), | ||
| DEBUG("debug", 10), | ||
| INFO("info", 20), | ||
| WARN("warning", 30), | ||
| ERROR("error", 40), | ||
| } | ||
|
|
||
| /** | ||
| * Resolves the effective task log level the JVM should emit. | ||
| * | ||
| * The supervisor reconfigures the task logger with `level_override=NOTSET`, | ||
| * delegating threshold filtering to the subprocess, so the Java SDK must drop | ||
| * sub-threshold events itself. The threshold comes from the `airflow.logging.level` | ||
| * system property (an explicit JVM flag wins) and otherwise the | ||
| * `AIRFLOW__LOGGING__LOGGING_LEVEL` env var the subprocess inherits from the | ||
| * supervisor; unset or unrecognised values fall back to INFO. | ||
| */ | ||
| internal object LogLevel { | ||
| const val LEVEL_PROPERTY = "airflow.logging.level" | ||
| const val LEVEL_ENV = "AIRFLOW__LOGGING__LOGGING_LEVEL" | ||
|
Comment on lines
+64
to
+65
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I felt we should provide the I prefer to respect the Airflow side logging level, which means user might setup custom secret backend for the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, let’s do this. The Python implementation should probably also be changed to respect this Since this requires Python side changes, I would say for 3.3 let’s just default to INFO in SDKs and add support for Airflow configuration later.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, then I will go with this direction. This should solve Phani's review comment as well. |
||
|
|
||
| fun threshold(): Level = parse(configuredLevel()) ?: Level.INFO | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you test this scenario with this PR, I think it breaks. In airflow.cfg Since it wont be in os.environ: no AIRFLOW__LOGGING__LOGGING_LEVEL set . Hence Java resolves to INFO. Overall Result: Java WARNING logs will be silently dropped. A Python task in the same deployment with the same cfg would show WARNING logs. I think it is identical config, divergent behavior by task language . |
||
|
|
||
| fun isEnabled(level: Level): Boolean = level.severity >= threshold().severity | ||
|
|
||
| fun parse(name: String?): Level? = | ||
| when (name?.trim()?.uppercase()) { | ||
| "TRACE" -> Level.TRACE | ||
| "DEBUG" -> Level.DEBUG | ||
| "INFO" -> Level.INFO | ||
| "WARN", "WARNING" -> Level.WARN | ||
| // The Java SDK has no CRITICAL/FATAL, so the strictest threshold it can honour is ERROR. | ||
| "ERROR", "CRITICAL", "FATAL" -> Level.ERROR | ||
| else -> null | ||
| } | ||
|
|
||
| private fun configuredLevel(): String? = System.getProperty(LEVEL_PROPERTY) ?: System.getenv(LEVEL_ENV) | ||
| } | ||
|
|
||
| internal data class LogMessage( | ||
| val event: String, | ||
| val arguments: Map<String, Any>, | ||
| val logger: Logger, | ||
| val loggerName: String, | ||
| val level: Level, | ||
| val timestamp: LocalDateTime = Clock.System.now().toLocalDateTime(TimeZone.currentSystemDefault()), | ||
| ) | ||
|
|
@@ -49,9 +95,7 @@ internal class Logger( | |
| ) { | ||
| val name: String? = cls.java.typeName | ||
|
|
||
| // TODO: Actually implement level filtering. | ||
| @Suppress("UNUSED_PARAMETER") | ||
| fun isEnabledForLevel(level: Level): Boolean = true | ||
| fun isEnabledForLevel(level: Level): Boolean = LogLevel.isEnabled(level) | ||
|
|
||
| fun debug( | ||
| message: String, | ||
|
|
@@ -73,7 +117,7 @@ internal class Logger( | |
| arguments: Map<String, Any>, | ||
| ) { | ||
| if (!isEnabledForLevel(level)) return | ||
| LogSender.send(LogMessage(event, arguments, this, level)) | ||
| LogSender.send(LogMessage(event, arguments, name ?: "(java)", level)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -99,17 +143,21 @@ internal object LogSender { | |
| } | ||
| } | ||
|
|
||
| internal fun encode(message: LogMessage): String { | ||
| val map = message.arguments.toMutableMap() | ||
| map["event"] = message.event | ||
| map["level"] = message.level.wireName | ||
| map["logger"] = message.loggerName | ||
| map["timestamp"] = message.timestamp | ||
| return "${map.toJsonElement()}\n" | ||
| } | ||
|
|
||
| private fun sendTo( | ||
| writer: ByteWriteChannel, | ||
| message: LogMessage, | ||
| ) { | ||
| val map = message.arguments.toMutableMap() | ||
| map["event"] = message.event | ||
| map["level"] = message.level.name.lowercase() | ||
| map["logger"] = message.logger.name ?: "(java)" | ||
| map["timestamp"] = message.timestamp | ||
| // TODO: Can this be done asynchronously instead? | ||
| runBlocking { writer.writeString("${map.toJsonElement()}\n") } | ||
| runBlocking { writer.writeString(encode(message)) } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.airflow.sdk.execution | ||
|
|
||
| import org.slf4j.ILoggerFactory | ||
| import org.slf4j.IMarkerFactory | ||
| import org.slf4j.Marker | ||
| import org.slf4j.helpers.AbstractLogger | ||
| import org.slf4j.helpers.BasicMDCAdapter | ||
| import org.slf4j.helpers.BasicMarkerFactory | ||
| import org.slf4j.helpers.MessageFormatter | ||
| import org.slf4j.spi.MDCAdapter | ||
| import org.slf4j.spi.SLF4JServiceProvider | ||
| import java.util.concurrent.ConcurrentHashMap | ||
| import org.slf4j.Logger as Slf4jLogger | ||
| import org.slf4j.event.Level as Slf4jLevel | ||
|
|
||
| /** | ||
| * SLF4J binding that streams task logs through the Airflow logs socket. | ||
| * | ||
| * Without a binding, an application's SLF4J calls fall back to whatever | ||
| * provider is on the classpath (e.g. slf4j-simple writing to stderr), and the | ||
| * supervisor then tags every stderr line as ERROR regardless of the original | ||
| * level. Routing logs through [LogSender] instead carries each message's real | ||
| * level on the wire, so the UI shows INFO/WARN/etc. as logged. | ||
| */ | ||
| class AirflowSlf4jServiceProvider : SLF4JServiceProvider { | ||
| private val loggerFactory = AirflowLoggerFactory() | ||
| private val markerFactory: IMarkerFactory = BasicMarkerFactory() | ||
| private val mdcAdapter: MDCAdapter = BasicMDCAdapter() | ||
|
|
||
| override fun getLoggerFactory(): ILoggerFactory = loggerFactory | ||
|
|
||
| override fun getMarkerFactory(): IMarkerFactory = markerFactory | ||
|
|
||
| override fun getMDCAdapter(): MDCAdapter = mdcAdapter | ||
|
|
||
| override fun getRequestedApiVersion(): String = "2.0.99" | ||
|
|
||
| override fun initialize() {} | ||
| } | ||
|
|
||
| internal class AirflowLoggerFactory : ILoggerFactory { | ||
| private val loggers = ConcurrentHashMap<String, Slf4jLogger>() | ||
|
|
||
| override fun getLogger(name: String): Slf4jLogger = loggers.computeIfAbsent(name, ::AirflowSlf4jLogger) | ||
| } | ||
|
|
||
| internal class AirflowSlf4jLogger( | ||
| private val loggerName: String, | ||
| ) : AbstractLogger() { | ||
| init { | ||
| name = loggerName | ||
| } | ||
|
|
||
| // The supervisor delegates threshold filtering to the subprocess (it sets | ||
| // level_override=NOTSET), so drop sub-threshold events here rather than forward | ||
| // everything. See [LogLevel]. | ||
| override fun isTraceEnabled(): Boolean = LogLevel.isEnabled(Level.TRACE) | ||
|
|
||
| override fun isTraceEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.TRACE) | ||
|
|
||
| override fun isDebugEnabled(): Boolean = LogLevel.isEnabled(Level.DEBUG) | ||
|
|
||
| override fun isDebugEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.DEBUG) | ||
|
|
||
| override fun isInfoEnabled(): Boolean = LogLevel.isEnabled(Level.INFO) | ||
|
|
||
| override fun isInfoEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.INFO) | ||
|
|
||
| override fun isWarnEnabled(): Boolean = LogLevel.isEnabled(Level.WARN) | ||
|
|
||
| override fun isWarnEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.WARN) | ||
|
|
||
| override fun isErrorEnabled(): Boolean = LogLevel.isEnabled(Level.ERROR) | ||
|
|
||
| override fun isErrorEnabled(marker: Marker?): Boolean = LogLevel.isEnabled(Level.ERROR) | ||
|
|
||
| override fun getFullyQualifiedCallerName(): String? = null | ||
|
|
||
| override fun handleNormalizedLoggingCall( | ||
| level: Slf4jLevel, | ||
| marker: Marker?, | ||
| messagePattern: String?, | ||
| arguments: Array<out Any?>?, | ||
| throwable: Throwable?, | ||
| ) { | ||
| val airflowLevel = level.toAirflowLevel() | ||
| if (!LogLevel.isEnabled(airflowLevel)) return | ||
| @Suppress("UNCHECKED_CAST") | ||
| val message = MessageFormatter.basicArrayFormat(messagePattern, arguments as Array<Any?>?) | ||
| val extra = throwable?.let { mapOf<String, Any>("error_detail" to it.stackTraceToString()) } ?: emptyMap() | ||
| LogSender.send(LogMessage(message ?: "", extra, loggerName, airflowLevel)) | ||
| } | ||
| } | ||
|
|
||
| internal fun Slf4jLevel.toAirflowLevel(): Level = | ||
| when (this) { | ||
| Slf4jLevel.TRACE -> Level.TRACE | ||
| Slf4jLevel.DEBUG -> Level.DEBUG | ||
| Slf4jLevel.INFO -> Level.INFO | ||
| Slf4jLevel.WARN -> Level.WARN | ||
| Slf4jLevel.ERROR -> Level.ERROR | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| org.apache.airflow.sdk.execution.AirflowSlf4jServiceProvider |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think TRACE should be mapped to NOTSET (0).
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, why do we need to reinvent level? This enum can simply map 1:1 to Python levels, and we just resolve SLF4J levels to Python levels directly.