Skip to content

Commit 097fe8b

Browse files
authored
Add OTLP/HTTP receiver support for traces, logs, and metrics (#13826)
Each existing OTLP gRPC handler now also registers an HTTP handler Supported endpoints: - POST /v1/traces (protobuf + JSON) - POST /v1/logs (protobuf + JSON) - POST /v1/metrics (protobuf + JSON)
1 parent 52af116 commit 097fe8b

17 files changed

Lines changed: 335 additions & 11 deletions

File tree

.github/workflows/skywalking.yaml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ jobs:
769769
if: matrix.test.docker != null
770770
run: docker build -t ${{ matrix.test.docker.name }} -f ${{ matrix.test.docker.base }}/${{ matrix.test.docker.file }} ${{ matrix.test.docker.base }}
771771
- name: ${{ matrix.test.name }}
772-
uses: apache/skywalking-infra-e2e@530f9976e4bf12b65a6604ea0efbeafabeeea2a3
772+
uses: apache/skywalking-infra-e2e@0d91769411db83f6329633df810a36c6ea1a9b02
773773
with:
774774
e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
775775
- if: ${{ failure() }}
@@ -843,7 +843,7 @@ jobs:
843843
username: ${{ github.repository_owner }}
844844
password: ${{ secrets.GITHUB_TOKEN }}
845845
- name: ALS ${{ matrix.storage }}, ${{ matrix.analyzer }}, istio-${{ matrix.versions.istio }}, k8s-${{ matrix.versions.kubernetes }}
846-
uses: apache/skywalking-infra-e2e@530f9976e4bf12b65a6604ea0efbeafabeeea2a3
846+
uses: apache/skywalking-infra-e2e@0d91769411db83f6329633df810a36c6ea1a9b02
847847
env:
848848
ISTIO_VERSION: ${{ matrix.versions.istio }}
849849
KUBERNETES_VERSION: ${{ matrix.versions.kubernetes }}
@@ -915,7 +915,7 @@ jobs:
915915
username: ${{ github.repository_owner }}
916916
password: ${{ secrets.GITHUB_TOKEN }}
917917
- name: ${{ matrix.test.name }}
918-
uses: apache/skywalking-infra-e2e@530f9976e4bf12b65a6604ea0efbeafabeeea2a3
918+
uses: apache/skywalking-infra-e2e@0d91769411db83f6329633df810a36c6ea1a9b02
919919
env:
920920
ISTIO_VERSION: ${{ matrix.versions.istio }}
921921
KUBERNETES_VERSION: ${{ matrix.versions.kubernetes }}
@@ -979,7 +979,7 @@ jobs:
979979
shell: bash
980980
run: ./mvnw -B -q -f test/e2e-v2/java-test-service/pom.xml clean package
981981
- name: Java version ${{ matrix.java-version }}
982-
uses: apache/skywalking-infra-e2e@530f9976e4bf12b65a6604ea0efbeafabeeea2a3
982+
uses: apache/skywalking-infra-e2e@0d91769411db83f6329633df810a36c6ea1a9b02
983983
env:
984984
SW_AGENT_JDK_VERSION: ${{ matrix.java-version }}
985985
with:
@@ -1075,7 +1075,7 @@ jobs:
10751075
fi
10761076
docker compose -f ${BANYANDB_DATA_GENERATE_ROOT}/docker-compose.yml down -v
10771077
- name: ${{ matrix.test.name }}
1078-
uses: apache/skywalking-infra-e2e@530f9976e4bf12b65a6604ea0efbeafabeeea2a3
1078+
uses: apache/skywalking-infra-e2e@0d91769411db83f6329633df810a36c6ea1a9b02
10791079
with:
10801080
e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }}
10811081
- if: ${{ failure() }}

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* LAL: add `sourceAttribute()` function for non-persistent OTLP resource attribute access in LAL scripts.
2929
* LAL: add `layer: auto` mode for dynamic layer assignment when `service.layer` is absent.
3030
* Add two-phase `SpanListener` SPI mechanism for extensible trace span processing. Refactor GenAI from hardcoded `SpanForward.processGenAILogic()` to `GenAISpanListener`.
31+
* Add OTLP/HTTP receiver support for traces, logs, and metrics (`/v1/traces`, `/v1/logs`, `/v1/metrics`). Supports both `application/x-protobuf` and `application/json` content types.
3132

3233
#### UI
3334

docs/en/setup/backend/otlp-trace.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
11
# OpenTelemetry Trace Format
22

3-
SkyWalking can receive traces from Traces in OTLP format and convert them to Zipkin Trace format eventually.
3+
SkyWalking can receive traces in OTLP format and convert them to Zipkin Trace format eventually.
44
For data analysis and queries related to Zipkin Trace, please [refer to the relevant documentation](./zipkin-trace.md#zipkin-query).
55

66
OTLP Trace handler references the [Zipkin Exporter in the OpenTelemetry Collector](https://opentelemetry.io/docs/specs/otel/trace/sdk_exporters/zipkin/#summary) to convert the data format.
77

8+
## Supported Protocols
9+
10+
Both **OTLP/gRPC** and **OTLP/HTTP** are supported for traces, logs, and metrics:
11+
12+
| Signal | OTLP/gRPC (port 11800) | OTLP/HTTP (port 12800) |
13+
|---------|------------------------------|-------------------------|
14+
| Traces | gRPC `TraceService/Export` | `POST /v1/traces` |
15+
| Logs | gRPC `LogsService/Export` | `POST /v1/logs` |
16+
| Metrics | gRPC `MetricsService/Export` | `POST /v1/metrics` |
17+
18+
OTLP/HTTP supports both `application/x-protobuf` and `application/json` content types.
19+
820
## Set up backend receiver
921

1022
1. Make sure to enable **otlp-traces** handler in OTLP receiver of `application.yml`.

oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.protobuf.InvalidProtocolBufferException;
2121
import com.linecorp.armeria.common.HttpResponse;
22+
import com.linecorp.armeria.server.annotation.Blocking;
2223
import com.linecorp.armeria.common.HttpStatus;
2324
import com.linecorp.armeria.server.annotation.ConsumesJson;
2425
import com.linecorp.armeria.server.annotation.Default;
@@ -35,6 +36,7 @@
3536

3637
@Slf4j
3738
@AllArgsConstructor
39+
@Blocking
3840
public class FirehoseHTTPHandler {
3941
private final OpenTelemetryMetricRequestProcessor openTelemetryMetricRequestProcessor;
4042
private final String firehoseAccessKey;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.skywalking.oap.server.receiver.otel.otlp;
19+
20+
import com.google.protobuf.InvalidProtocolBufferException;
21+
import com.google.protobuf.util.JsonFormat;
22+
import com.linecorp.armeria.common.AggregatedHttpRequest;
23+
import com.linecorp.armeria.common.HttpResponse;
24+
import com.linecorp.armeria.common.HttpStatus;
25+
import com.linecorp.armeria.common.MediaType;
26+
import com.linecorp.armeria.server.annotation.Blocking;
27+
import com.linecorp.armeria.server.annotation.Consumes;
28+
import com.linecorp.armeria.server.annotation.ConsumesJson;
29+
import com.linecorp.armeria.server.annotation.Post;
30+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
31+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
32+
import lombok.RequiredArgsConstructor;
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
/**
36+
* OTLP/HTTP handler for log data. Supports both protobuf and JSON encoding.
37+
* Delegates processing to {@link OpenTelemetryLogHandler#processExport}.
38+
*/
39+
@Slf4j
40+
@RequiredArgsConstructor
41+
public class OpenTelemetryLogHTTPHandler {
42+
private static final byte[] EMPTY_RESPONSE =
43+
ExportLogsServiceResponse.getDefaultInstance().toByteArray();
44+
45+
private final OpenTelemetryLogHandler logHandler;
46+
47+
@Blocking
48+
@Post("/v1/logs")
49+
@Consumes("application/x-protobuf")
50+
public HttpResponse collectProtobuf(AggregatedHttpRequest request) {
51+
try {
52+
final ExportLogsServiceRequest exportRequest =
53+
ExportLogsServiceRequest.parseFrom(request.content().array());
54+
logHandler.processExport(exportRequest);
55+
return HttpResponse.of(HttpStatus.OK, MediaType.PROTOBUF, EMPTY_RESPONSE);
56+
} catch (InvalidProtocolBufferException e) {
57+
log.warn("Failed to parse OTLP/HTTP log request", e);
58+
return HttpResponse.of(HttpStatus.BAD_REQUEST);
59+
} catch (Exception e) {
60+
log.error("Failed to process OTLP/HTTP log request", e);
61+
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR);
62+
}
63+
}
64+
65+
@Blocking
66+
@Post("/v1/logs")
67+
@ConsumesJson
68+
public HttpResponse collectJson(AggregatedHttpRequest request) {
69+
try {
70+
final ExportLogsServiceRequest.Builder builder =
71+
ExportLogsServiceRequest.newBuilder();
72+
JsonFormat.parser().ignoringUnknownFields().merge(
73+
request.contentUtf8(), builder);
74+
logHandler.processExport(builder.build());
75+
return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, "{}");
76+
} catch (InvalidProtocolBufferException e) {
77+
log.warn("Failed to parse OTLP/HTTP JSON log request", e);
78+
return HttpResponse.of(HttpStatus.BAD_REQUEST);
79+
} catch (Exception e) {
80+
log.error("Failed to process OTLP/HTTP JSON log request", e);
81+
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR);
82+
}
83+
}
84+
}

oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryLogHandler.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
import org.apache.skywalking.oap.server.core.source.LogMetadataUtils;
3737
import org.apache.skywalking.oap.log.analyzer.v2.module.LogAnalyzerModule;
3838
import org.apache.skywalking.oap.log.analyzer.v2.provider.log.ILogAnalyzerService;
39+
import com.linecorp.armeria.common.HttpMethod;
40+
import java.util.Collections;
3941
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
42+
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
4043
import org.apache.skywalking.oap.server.library.module.ModuleManager;
4144
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
4245
import org.apache.skywalking.oap.server.receiver.otel.Handler;
@@ -88,10 +91,26 @@ public void active() throws ModuleStartException {
8891
.provider()
8992
.getService(GRPCHandlerRegister.class);
9093
grpcHandlerRegister.addHandler(this);
94+
95+
HTTPHandlerRegister httpHandlerRegister = manager.find(SharingServerModule.NAME)
96+
.provider()
97+
.getService(HTTPHandlerRegister.class);
98+
httpHandlerRegister.addHandler(
99+
new OpenTelemetryLogHTTPHandler(this),
100+
Collections.singletonList(HttpMethod.POST));
91101
}
92102

93103
@Override
94104
public void export(ExportLogsServiceRequest request, StreamObserver<ExportLogsServiceResponse> responseObserver) {
105+
processExport(request);
106+
responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance());
107+
responseObserver.onCompleted();
108+
}
109+
110+
/**
111+
* Process an OTLP log export request. Shared by both gRPC and HTTP handlers.
112+
*/
113+
void processExport(ExportLogsServiceRequest request) {
95114
request.getResourceLogsList().forEach(resourceLogs -> {
96115
final var resource = resourceLogs.getResource();
97116
final var attributes = resource
@@ -122,8 +141,6 @@ public void export(ExportLogsServiceRequest request, StreamObserver<ExportLogsSe
122141
}
123142
});
124143
});
125-
responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance());
126-
responseObserver.onCompleted();
127144
}
128145

129146
private void doAnalysisQuietly(String service, String layer, String serviceInstance,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.skywalking.oap.server.receiver.otel.otlp;
19+
20+
import com.google.protobuf.InvalidProtocolBufferException;
21+
import com.google.protobuf.util.JsonFormat;
22+
import com.linecorp.armeria.common.AggregatedHttpRequest;
23+
import com.linecorp.armeria.common.HttpResponse;
24+
import com.linecorp.armeria.common.HttpStatus;
25+
import com.linecorp.armeria.common.MediaType;
26+
import com.linecorp.armeria.server.annotation.Blocking;
27+
import com.linecorp.armeria.server.annotation.Consumes;
28+
import com.linecorp.armeria.server.annotation.ConsumesJson;
29+
import com.linecorp.armeria.server.annotation.Post;
30+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
31+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
32+
import lombok.RequiredArgsConstructor;
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
/**
36+
* OTLP/HTTP handler for metric data. Supports both protobuf and JSON encoding.
37+
* Delegates processing to {@link OpenTelemetryMetricRequestProcessor#processMetricsRequest}.
38+
*/
39+
@Slf4j
40+
@RequiredArgsConstructor
41+
public class OpenTelemetryMetricHTTPHandler {
42+
private static final byte[] EMPTY_RESPONSE =
43+
ExportMetricsServiceResponse.getDefaultInstance().toByteArray();
44+
45+
private final OpenTelemetryMetricRequestProcessor metricProcessor;
46+
47+
@Blocking
48+
@Post("/v1/metrics")
49+
@Consumes("application/x-protobuf")
50+
public HttpResponse collectProtobuf(AggregatedHttpRequest request) {
51+
try {
52+
final ExportMetricsServiceRequest exportRequest =
53+
ExportMetricsServiceRequest.parseFrom(request.content().array());
54+
metricProcessor.processMetricsRequest(exportRequest);
55+
return HttpResponse.of(HttpStatus.OK, MediaType.PROTOBUF, EMPTY_RESPONSE);
56+
} catch (InvalidProtocolBufferException e) {
57+
log.warn("Failed to parse OTLP/HTTP metric request", e);
58+
return HttpResponse.of(HttpStatus.BAD_REQUEST);
59+
} catch (Exception e) {
60+
log.error("Failed to process OTLP/HTTP metric request", e);
61+
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR);
62+
}
63+
}
64+
65+
@Blocking
66+
@Post("/v1/metrics")
67+
@ConsumesJson
68+
public HttpResponse collectJson(AggregatedHttpRequest request) {
69+
try {
70+
final ExportMetricsServiceRequest.Builder builder =
71+
ExportMetricsServiceRequest.newBuilder();
72+
JsonFormat.parser().ignoringUnknownFields().merge(
73+
request.contentUtf8(), builder);
74+
metricProcessor.processMetricsRequest(builder.build());
75+
return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, "{}");
76+
} catch (InvalidProtocolBufferException e) {
77+
log.warn("Failed to parse OTLP/HTTP JSON metric request", e);
78+
return HttpResponse.of(HttpStatus.BAD_REQUEST);
79+
} catch (Exception e) {
80+
log.error("Failed to process OTLP/HTTP JSON metric request", e);
81+
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR);
82+
}
83+
}
84+
}

oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
2424
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
2525
import lombok.extern.slf4j.Slf4j;
26+
import com.linecorp.armeria.common.HttpMethod;
27+
import java.util.Collections;
2628
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
29+
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
2730
import org.apache.skywalking.oap.server.library.module.ModuleManager;
2831
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
2932
import org.apache.skywalking.oap.server.receiver.otel.Handler;
@@ -58,6 +61,13 @@ public void active() throws ModuleStartException {
5861
.provider()
5962
.getService(GRPCHandlerRegister.class);
6063
grpcHandlerRegister.addHandler(this);
64+
65+
HTTPHandlerRegister httpHandlerRegister = manager.find(SharingServerModule.NAME)
66+
.provider()
67+
.getService(HTTPHandlerRegister.class);
68+
httpHandlerRegister.addHandler(
69+
new OpenTelemetryMetricHTTPHandler(metricRequestProcessor),
70+
Collections.singletonList(HttpMethod.POST));
6171
}
6272

6373
@Override

0 commit comments

Comments
 (0)