Skip to content

Commit 10ae2ce

Browse files
authored
CAMEL-23312: Add SpanKind support to OpenTelemetry2 component (#22653)
1 parent faeb2c2 commit 10ae2ce

22 files changed

Lines changed: 533 additions & 8 deletions

File tree

components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ private MicrometerObservabilitySpanLifecycleManager() {
126126
}
127127

128128
@Override
129-
public Span create(String spanName, Span parent, SpanContextPropagationExtractor extractor) {
129+
public Span create(String spanName, String spanKind, Span parent, SpanContextPropagationExtractor extractor) {
130130
io.micrometer.tracing.Span span;
131131
if (parent != null) {
132132
MicrometerObservabilitySpanAdapter microObsParentSpan = (MicrometerObservabilitySpanAdapter) parent;

components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryTracer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.opentelemetry.api.GlobalOpenTelemetry;
2323
import io.opentelemetry.api.baggage.Baggage;
2424
import io.opentelemetry.api.trace.SpanBuilder;
25+
import io.opentelemetry.api.trace.SpanKind;
2526
import io.opentelemetry.api.trace.Tracer;
2627
import io.opentelemetry.context.Context;
2728
import io.opentelemetry.context.propagation.ContextPropagators;
@@ -101,7 +102,7 @@ private OpentelemetrySpanLifecycleManager(Tracer tracer, ContextPropagators cont
101102
}
102103

103104
@Override
104-
public Span create(String spanName, Span parent, SpanContextPropagationExtractor extractor) {
105+
public Span create(String spanName, String spanKind, Span parent, SpanContextPropagationExtractor extractor) {
105106
SpanBuilder builder = tracer.spanBuilder(spanName);
106107
Baggage baggage = Baggage.current();
107108

@@ -143,6 +144,10 @@ public String get(SpanContextPropagationExtractor carrier, String key) {
143144
}
144145
baggage = getBaggageFromHeaders(baggage, extractor);
145146

147+
if (spanKind != null) {
148+
builder.setSpanKind(SpanKind.valueOf(spanKind));
149+
}
150+
146151
return new OpenTelemetrySpanAdapter(builder.startSpan(), baggage);
147152
}
148153

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry2;
18+
19+
import java.util.List;
20+
import java.util.Map;
21+
22+
import io.opentelemetry.api.common.AttributeKey;
23+
import io.opentelemetry.api.trace.SpanKind;
24+
import io.opentelemetry.sdk.trace.data.SpanData;
25+
import org.apache.camel.CamelContext;
26+
import org.apache.camel.CamelContextAware;
27+
import org.apache.camel.RoutesBuilder;
28+
import org.apache.camel.builder.RouteBuilder;
29+
import org.apache.camel.component.mock.MockEndpoint;
30+
import org.apache.camel.opentelemetry2.CamelOpenTelemetryExtension.OtelTrace;
31+
import org.apache.camel.opentelemetry2.mock.MockHttpComponent;
32+
import org.apache.camel.opentelemetry2.mock.MockKafkaComponent;
33+
import org.apache.camel.telemetry.Op;
34+
import org.junit.jupiter.api.Test;
35+
36+
import static org.junit.jupiter.api.Assertions.assertEquals;
37+
38+
/**
39+
* Test that verifies SpanKind is set correctly for different component types.
40+
*/
41+
public class SpanKindTest extends OpenTelemetryTracerTestSupport {
42+
43+
@Override
44+
protected CamelContext createCamelContext() throws Exception {
45+
OpenTelemetryTracer tst = new OpenTelemetryTracer();
46+
tst.setTracer(otelExtension.getOpenTelemetry().getTracer("spanKindTest"));
47+
tst.setContextPropagators(otelExtension.getOpenTelemetry().getPropagators());
48+
CamelContext context = super.createCamelContext();
49+
50+
// Register mock HTTP component for testing
51+
context.addComponent("mock-http", new MockHttpComponent());
52+
// Register mock Kafka component for testing
53+
context.addComponent("mock-kafka", new MockKafkaComponent());
54+
55+
CamelContextAware.trySetCamelContext(tst, context);
56+
tst.init(context);
57+
return context;
58+
}
59+
60+
@Test
61+
void testDirectComponentHasInternalSpanKind() {
62+
template.sendBody("direct:start", "test");
63+
64+
List<OtelTrace> traces = List.copyOf(otelExtension.getTraces().values());
65+
assertEquals(1, traces.size());
66+
67+
List<SpanData> spans = traces.get(0).getSpans();
68+
69+
// Find the direct:start EVENT_SENT span
70+
SpanData directSentSpan = getSpan(spans, "direct://start", Op.EVENT_SENT);
71+
assertEquals(SpanKind.INTERNAL, directSentSpan.getKind(),
72+
"direct:start EVENT_SENT should have INTERNAL SpanKind");
73+
74+
// Find the direct:start EVENT_RECEIVED span
75+
SpanData directReceivedSpan = getSpan(spans, "direct://start", Op.EVENT_RECEIVED);
76+
assertEquals(SpanKind.INTERNAL, directReceivedSpan.getKind(),
77+
"direct:start EVENT_RECEIVED should have INTERNAL SpanKind");
78+
}
79+
80+
@Test
81+
void testHttpComponentHasClientServerSpanKind() throws Exception {
82+
MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
83+
mockEndpoint.expectedMessageCount(1);
84+
85+
template.sendBody("direct:httpClient", "test message");
86+
87+
mockEndpoint.assertIsSatisfied();
88+
89+
List<OtelTrace> traces = List.copyOf(otelExtension.getTraces().values());
90+
assertEquals(1, traces.size());
91+
92+
List<SpanData> spans = traces.get(0).getSpans();
93+
94+
// Find the mock-http EVENT_SENT span (client side)
95+
SpanData httpClientSpan = getSpan(spans, "mock-http://testEndpoint", Op.EVENT_SENT);
96+
assertEquals(SpanKind.CLIENT, httpClientSpan.getKind(),
97+
"HTTP EVENT_SENT should have CLIENT SpanKind");
98+
}
99+
100+
@Test
101+
void testKafkaComponentHasProducerSpanKindAndInheritedProperties() throws Exception {
102+
MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
103+
mockEndpoint.expectedMessageCount(1);
104+
105+
// Send with Kafka headers that would normally be set before/during sending
106+
template.sendBodyAndHeaders("direct:kafkaProducer", "test message",
107+
Map.of("kafka.KEY", "test-key",
108+
"kafka.PARTITION", 0,
109+
"kafka.OFFSET", "12345"));
110+
111+
mockEndpoint.assertIsSatisfied();
112+
113+
List<OtelTrace> traces = List.copyOf(otelExtension.getTraces().values());
114+
assertEquals(1, traces.size());
115+
116+
List<SpanData> spans = traces.get(0).getSpans();
117+
118+
// Find the mock-kafka EVENT_SENT span
119+
SpanData kafkaSpan = getSpan(spans, "mock-kafka://testTopic", Op.EVENT_SENT);
120+
121+
// Verify OpenTelemetry-specific SpanKind
122+
assertEquals(SpanKind.PRODUCER, kafkaSpan.getKind(),
123+
"Kafka EVENT_SENT should have PRODUCER SpanKind");
124+
125+
// Verify inherited properties from camel-telemetry KafkaSpanDecorator
126+
assertEquals("0", kafkaSpan.getAttributes().get(AttributeKey.stringKey("kafka.partition")),
127+
"Should have kafka.partition tag from inherited decorator");
128+
assertEquals("12345", kafkaSpan.getAttributes().get(AttributeKey.stringKey("kafka.offset")),
129+
"Should have kafka.offset tag from inherited decorator");
130+
assertEquals("test-key", kafkaSpan.getAttributes().get(AttributeKey.stringKey("kafka.key")),
131+
"Should have kafka.key tag from inherited decorator");
132+
}
133+
134+
@Override
135+
protected RoutesBuilder createRouteBuilder() {
136+
return new RouteBuilder() {
137+
@Override
138+
public void configure() {
139+
from("direct:start")
140+
.log("Processing message");
141+
142+
// Mock HTTP client route - tests CLIENT SpanKind
143+
from("direct:httpClient")
144+
.to("mock-http://testEndpoint")
145+
.to("mock:result");
146+
147+
// Mock Kafka producer route - tests PRODUCER SpanKind and inherited properties
148+
from("direct:kafkaProducer")
149+
.to("mock-kafka://testTopic")
150+
.to("mock:result");
151+
}
152+
};
153+
}
154+
155+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry2.mock;
18+
19+
import java.util.Map;
20+
21+
/**
22+
* Mock HTTP component for testing SpanKind. This component is recognized by HttpSpanDecorator based on its class name.
23+
*/
24+
public class MockHttpComponent extends org.apache.camel.support.DefaultComponent {
25+
26+
@Override
27+
protected org.apache.camel.Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
28+
throws Exception {
29+
MockHttpEndpoint endpoint = new MockHttpEndpoint(uri, this);
30+
setProperties(endpoint, parameters);
31+
return endpoint;
32+
}
33+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry2.mock;
18+
19+
/**
20+
* Mock HTTP endpoint for testing SpanKind.
21+
*/
22+
class MockHttpEndpoint extends org.apache.camel.support.DefaultEndpoint {
23+
24+
public MockHttpEndpoint(String endpointUri, org.apache.camel.Component component) {
25+
super(endpointUri, component);
26+
}
27+
28+
@Override
29+
public org.apache.camel.Producer createProducer() throws Exception {
30+
return new MockHttpProducer(this);
31+
}
32+
33+
@Override
34+
public org.apache.camel.Consumer createConsumer(org.apache.camel.Processor processor) throws Exception {
35+
throw new IllegalArgumentException("Not used in MockHttpEndpoint");
36+
}
37+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry2.mock;
18+
19+
/**
20+
* Mock HTTP producer that just echoes the input.
21+
*/
22+
class MockHttpProducer extends org.apache.camel.support.DefaultProducer {
23+
24+
public MockHttpProducer(org.apache.camel.Endpoint endpoint) {
25+
super(endpoint);
26+
}
27+
28+
@Override
29+
public void process(org.apache.camel.Exchange exchange) throws Exception {
30+
// Simple echo - set response body to request body
31+
exchange.getMessage().setBody("HTTP Response: " + exchange.getIn().getBody());
32+
}
33+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry2.mock;
18+
19+
/**
20+
* Span decorator for mock HTTP component used in tests.
21+
*/
22+
public class MockHttpSpanDecorator extends org.apache.camel.telemetry.decorators.AbstractHttpSpanDecorator {
23+
24+
@Override
25+
public String getComponent() {
26+
return "mock-http";
27+
}
28+
29+
@Override
30+
public String getComponentClassName() {
31+
return MockHttpComponent.class.getName();
32+
}
33+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry2.mock;
18+
19+
import java.util.Map;
20+
21+
/**
22+
* Mock Kafka component for testing SpanKind and inherited properties.
23+
*/
24+
public class MockKafkaComponent extends org.apache.camel.support.DefaultComponent {
25+
26+
@Override
27+
protected org.apache.camel.Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
28+
throws Exception {
29+
MockKafkaEndpoint endpoint = new MockKafkaEndpoint(uri, this);
30+
setProperties(endpoint, parameters);
31+
return endpoint;
32+
}
33+
}

0 commit comments

Comments
 (0)