Skip to content

Commit 69107cb

Browse files
committed
Handle both LLMObs and ASM enabled in standalone sampler
When APM tracing is disabled but both LLMObs and ASM are enabled, the previous code returned LlmObsStandaloneSampler which never sent the 1 APM trace/minute required by ASM for billing and service catalog. Introduce LlmObsAndAsmStandaloneSampler that keeps all LLMObs and ASM traces while rate-limiting plain APM traces to 1 per minute. Also clarify the log message for the ASM-only standalone case. Signed-off-by: matsumo-and <yh134.toisanda@gmail.com>
1 parent b73eb94 commit 69107cb

File tree

3 files changed

+178
-2
lines changed

3 files changed

+178
-2
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package datadog.trace.common.sampling;
2+
3+
import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_DROP;
4+
import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_KEEP;
5+
6+
import datadog.trace.api.ProductTraceSource;
7+
import datadog.trace.api.sampling.SamplingMechanism;
8+
import datadog.trace.core.CoreSpan;
9+
import datadog.trace.core.DDSpan;
10+
import java.time.Clock;
11+
import java.util.concurrent.atomic.AtomicLong;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
/**
16+
* This sampler is used when APM tracing is disabled but both LLM Observability and ASM are enabled.
17+
* It keeps all LLMObs and ASM traces, and allows 1 APM trace per minute for billing/service catalog
18+
* purposes.
19+
*/
20+
public class LlmObsAndAsmStandaloneSampler implements Sampler, PrioritySampler {
21+
22+
private static final Logger log = LoggerFactory.getLogger(LlmObsAndAsmStandaloneSampler.class);
23+
private static final int RATE_IN_MILLISECONDS = 60000; // 1 minute
24+
25+
private final AtomicLong lastSampleTime;
26+
private final Clock clock;
27+
28+
public LlmObsAndAsmStandaloneSampler(final Clock clock) {
29+
this.clock = clock;
30+
this.lastSampleTime = new AtomicLong(clock.millis() - RATE_IN_MILLISECONDS);
31+
}
32+
33+
@Override
34+
public <T extends CoreSpan<T>> boolean sample(final T span) {
35+
// Priority sampling sends all traces to the core agent, including traces marked dropped.
36+
// This allows the core agent to collect stats on all traces.
37+
return true;
38+
}
39+
40+
@Override
41+
public <T extends CoreSpan<T>> void setSamplingPriority(final T span) {
42+
T rootSpan = span.getLocalRootSpan();
43+
if (rootSpan instanceof DDSpan) {
44+
DDSpan ddRootSpan = (DDSpan) rootSpan;
45+
int traceSource = ddRootSpan.context().getPropagationTags().getTraceSource();
46+
if (ProductTraceSource.isProductMarked(traceSource, ProductTraceSource.LLMOBS)) {
47+
log.debug("Set SAMPLER_KEEP for LLMObs span {}", span.getSpanId());
48+
span.setSamplingPriority(SAMPLER_KEEP, SamplingMechanism.DEFAULT);
49+
return;
50+
}
51+
if (ProductTraceSource.isProductMarked(traceSource, ProductTraceSource.ASM)) {
52+
log.debug("Set SAMPLER_KEEP for ASM span {}", span.getSpanId());
53+
span.setSamplingPriority(SAMPLER_KEEP, SamplingMechanism.APPSEC);
54+
return;
55+
}
56+
}
57+
// For APM-only traces, allow 1 per minute for billing/catalog purposes
58+
if (shouldSample()) {
59+
log.debug("Set SAMPLER_KEEP for APM span {}", span.getSpanId());
60+
span.setSamplingPriority(SAMPLER_KEEP, SamplingMechanism.APPSEC);
61+
} else {
62+
log.debug("Set SAMPLER_DROP for APM span {}", span.getSpanId());
63+
span.setSamplingPriority(SAMPLER_DROP, SamplingMechanism.APPSEC);
64+
}
65+
}
66+
67+
private boolean shouldSample() {
68+
long now = clock.millis();
69+
return lastSampleTime.updateAndGet(
70+
lastTime -> now - lastTime >= RATE_IN_MILLISECONDS ? now : lastTime)
71+
== now;
72+
}
73+
}

dd-trace-core/src/main/java/datadog/trace/common/sampling/Sampler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,16 @@ public static Sampler forConfig(final Config config, final TraceConfig traceConf
3737
Sampler sampler;
3838
if (config != null) {
3939
if (!config.isApmTracingEnabled()) {
40-
if (config.isLlmObsEnabled()) {
40+
if (config.isLlmObsEnabled() && isAsmEnabled(config)) {
41+
log.debug(
42+
"APM is disabled, but both LLMObs and ASM are enabled. All LLMObs and ASM traces will be kept, only 1 APM trace per minute will be sent.");
43+
return new LlmObsAndAsmStandaloneSampler(Clock.systemUTC());
44+
} else if (config.isLlmObsEnabled()) {
4145
log.debug("APM is disabled, but LLMObs is enabled. All LLMObs traces will be kept.");
4246
return new LlmObsStandaloneSampler();
4347
} else if (isAsmEnabled(config)) {
44-
log.debug("APM is disabled, but ASM is enabled. Only 1 trace per minute will be sent.");
48+
log.debug(
49+
"APM is disabled, but ASM is enabled. Only 1 APM trace per minute will be sent, all ASM traces will be kept.");
4550
return new AsmStandaloneSampler(Clock.systemUTC());
4651
}
4752
// APM disabled and no other products enabled - drop all APM traces
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package datadog.trace.common.sampling
2+
3+
import datadog.trace.api.ProductTraceSource
4+
import datadog.trace.api.sampling.PrioritySampling
5+
import datadog.trace.bootstrap.instrumentation.api.Tags
6+
import datadog.trace.common.writer.ListWriter
7+
import datadog.trace.core.test.DDCoreSpecification
8+
9+
import java.time.Clock
10+
import java.util.concurrent.atomic.AtomicLong
11+
12+
class LlmObsAndAsmStandaloneSamplerTest extends DDCoreSpecification {
13+
14+
def writer = new ListWriter()
15+
16+
void "test LLMObs spans are kept"() {
17+
setup:
18+
def sampler = new LlmObsAndAsmStandaloneSampler(Clock.systemUTC())
19+
def tracer = tracerBuilder().writer(writer).sampler(sampler).build()
20+
21+
when:
22+
def span = tracer.buildSpan("testInstrumentation", "llm-call").start()
23+
def scope = tracer.activateSpan(span)
24+
tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.LLMOBS)
25+
sampler.setSamplingPriority(span)
26+
scope.close()
27+
28+
then:
29+
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
30+
31+
cleanup:
32+
tracer.close()
33+
}
34+
35+
void "test ASM spans are kept"() {
36+
setup:
37+
def sampler = new LlmObsAndAsmStandaloneSampler(Clock.systemUTC())
38+
def tracer = tracerBuilder().writer(writer).sampler(sampler).build()
39+
40+
when:
41+
def span = tracer.buildSpan("testInstrumentation", "http-request").start()
42+
def scope = tracer.activateSpan(span)
43+
tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM)
44+
sampler.setSamplingPriority(span)
45+
scope.close()
46+
47+
then:
48+
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
49+
50+
cleanup:
51+
tracer.close()
52+
}
53+
54+
void "test APM-only spans are rate-limited to 1 per minute"() {
55+
setup:
56+
def current = new AtomicLong(System.currentTimeMillis())
57+
final Clock clock = Mock(Clock) {
58+
millis() >> {
59+
current.get()
60+
}
61+
}
62+
def sampler = new LlmObsAndAsmStandaloneSampler(clock)
63+
def tracer = tracerBuilder().writer(writer).sampler(sampler).build()
64+
65+
when: "first APM span"
66+
def span1 = tracer.buildSpan("testInstrumentation", "apm-request").start()
67+
sampler.setSamplingPriority(span1)
68+
69+
then:
70+
1 * clock.millis() >> {
71+
current.updateAndGet(v -> v + 1000)
72+
}
73+
span1.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
74+
75+
when: "second APM span within the same minute"
76+
def span2 = tracer.buildSpan("testInstrumentation", "apm-request2").start()
77+
sampler.setSamplingPriority(span2)
78+
79+
then:
80+
1 * clock.millis() >> {
81+
current.updateAndGet(v -> v + 1000)
82+
}
83+
span2.getSamplingPriority() == PrioritySampling.SAMPLER_DROP
84+
85+
when: "third APM span after 1 minute"
86+
def span3 = tracer.buildSpan("testInstrumentation", "apm-request3").start()
87+
sampler.setSamplingPriority(span3)
88+
89+
then:
90+
clock.millis() >> {
91+
current.updateAndGet(v -> v + 60000)
92+
}
93+
span3.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
94+
95+
cleanup:
96+
tracer.close()
97+
}
98+
}

0 commit comments

Comments
 (0)