-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathtrace_flusher.rs
More file actions
225 lines (202 loc) · 8.86 KB
/
trace_flusher.rs
File metadata and controls
225 lines (202 loc) · 8.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
use dogstatsd::api_key::ApiKeyFactory;
use libdd_common::Endpoint;
use libdd_trace_utils::{
config_utils::trace_intake_url_prefixed,
send_data::SendData,
send_with_retry::{RetryBackoffType, RetryStrategy},
trace_utils::{self},
tracer_payload::TracerPayloadCollection,
};
use std::str::FromStr;
use std::sync::Arc;
use tokio::task::JoinSet;
use tracing::{debug, error};
use crate::FLUSH_RETRY_COUNT;
use crate::config::Config;
use crate::lifecycle::invocation::processor::S_TO_MS;
use crate::traces::http_client::HttpClient;
use crate::traces::trace_aggregator_service::AggregatorHandle;
/// Retry strategy for trace flushing using the shared `FLUSH_RETRY_COUNT`
/// with no delay between attempts. In Lambda, every millisecond of wall-clock
/// time matters, and the per-attempt request timeout already bounds how long
/// each retry can take.
fn trace_retry_strategy() -> RetryStrategy {
RetryStrategy::new(
u32::try_from(FLUSH_RETRY_COUNT).unwrap_or(3),
0,
RetryBackoffType::Constant,
None,
)
}
pub struct TraceFlusher {
pub aggregator_handle: AggregatorHandle,
pub config: Arc<Config>,
pub api_key_factory: Arc<ApiKeyFactory>,
/// Additional endpoints for dual-shipping traces to multiple Datadog sites.
/// Configured via `DD_APM_ADDITIONAL_ENDPOINTS` (e.g., sending to both US and EU).
/// Each trace batch is sent to the primary endpoint AND all additional endpoints.
pub additional_endpoints: Vec<Endpoint>,
http_client: HttpClient,
}
impl TraceFlusher {
#[must_use]
pub fn new(
aggregator_handle: AggregatorHandle,
config: Arc<Config>,
api_key_factory: Arc<ApiKeyFactory>,
http_client: HttpClient,
) -> Self {
// Parse additional endpoints for dual-shipping from config.
// Format: { "https://trace.agent.datadoghq.eu": ["api-key-1", "api-key-2"], ... }
// Each URL + API key combination becomes a separate endpoint.
let mut additional_endpoints: Vec<Endpoint> = Vec::new();
for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() {
for api_key in api_keys {
let trace_intake_url = trace_intake_url_prefixed(&endpoint_url);
let endpoint = Endpoint {
url: hyper::Uri::from_str(&trace_intake_url)
.expect("can't parse additional trace intake URL, exiting"),
api_key: Some(api_key.clone().into()),
timeout_ms: config.flush_timeout * S_TO_MS,
test_token: None,
use_system_resolver: false,
};
additional_endpoints.push(endpoint);
}
}
TraceFlusher {
aggregator_handle,
config,
api_key_factory,
additional_endpoints,
http_client,
}
}
/// Flushes traces by getting every available batch on the aggregator.
/// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces.
/// Returns any traces that failed to send and should be retried.
pub async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>> {
let Some(api_key) = self.api_key_factory.get_api_key().await else {
error!(
"TRACES | Failed to resolve API key, dropping aggregated data and skipping flushing."
);
if let Err(e) = self.aggregator_handle.clear() {
error!("TRACES | Failed to clear aggregator data: {e}");
}
return None;
};
debug!("TRACES | Flushing traces with api_key={api_key}");
let http_client = &self.http_client;
let mut failed_batch: Vec<SendData> = Vec::new();
if let Some(traces) = failed_traces {
// If we have traces from a previous failed attempt, try to send those first.
if !traces.is_empty() {
debug!(
"TRACES | Retrying to send {} previously failed batches",
traces.len()
);
let retry_result = Self::send_traces(traces, http_client.clone()).await;
if retry_result.is_some() {
// Still failed, return to retry later
return retry_result;
}
}
}
let all_batches = match self.aggregator_handle.get_batches().await {
Ok(v) => v,
Err(e) => {
error!("TRACES | Failed to fetch batches from aggregator service: {e}");
return None;
}
};
let mut batch_tasks = JoinSet::new();
for trace_builders in all_batches {
let traces_with_tags: Vec<_> = trace_builders
.into_iter()
.map(|info| {
let trace = info
.builder
.with_api_key(api_key.as_str())
.with_retry_strategy(trace_retry_strategy())
.build();
debug!("TRACES | Built trace: {:?}", trace.get_payloads());
(trace, info.header_tags)
})
.collect();
// Send to ADDITIONAL endpoints for dual-shipping.
// Construct separate SendData objects per endpoint by cloning the inner
// V07 payload data (TracerPayload is Clone, but SendData is not).
for endpoint in self.additional_endpoints.clone() {
let additional_traces: Vec<_> = traces_with_tags
.iter()
.filter_map(|(trace, tags)| match trace.get_payloads() {
TracerPayloadCollection::V07(payloads) => {
let mut send_data = SendData::new(
trace.len(),
TracerPayloadCollection::V07(payloads.clone()),
tags.to_tracer_header_tags(),
&endpoint,
);
send_data.set_retry_strategy(trace_retry_strategy());
Some(send_data)
}
// All payloads in the extension are V07 (produced by
// collect_pb_trace_chunks), so this branch is unreachable.
_ => None,
})
.collect();
let client_clone = http_client.clone();
batch_tasks
.spawn(async move { Self::send_traces(additional_traces, client_clone).await });
}
// Send to PRIMARY endpoint (moves traces into the task).
let traces: Vec<_> = traces_with_tags.into_iter().map(|(t, _)| t).collect();
let client_clone = http_client.clone();
batch_tasks.spawn(async move { Self::send_traces(traces, client_clone).await });
}
// Collect failed traces from all endpoints (primary + additional).
while let Some(result) = batch_tasks.join_next().await {
if let Ok(Some(mut failed)) = result {
failed_batch.append(&mut failed);
}
}
if !failed_batch.is_empty() {
return Some(failed_batch);
}
None
}
/// Sends traces to the Datadog intake endpoint using the provided HTTP client.
///
/// Each `SendData` is sent to its own configured target endpoint.
/// Returns the traces back (by value) if there was an error sending them (for retry).
async fn send_traces(traces: Vec<SendData>, http_client: HttpClient) -> Option<Vec<SendData>> {
if traces.is_empty() {
return None;
}
let start = tokio::time::Instant::now();
let coalesced_traces = trace_utils::coalesce_send_data(traces);
tokio::task::yield_now().await;
debug!("TRACES | Flushing {} traces", coalesced_traces.len());
for trace in &coalesced_traces {
let result = trace.send(&http_client).await;
if let Err(e) = &result.last_result {
error!(
"TRACES | Request failed after {} attempts ({} timeouts, {} network errors, {} status code errors): {e:?}",
result.requests_count,
result.errors_timeout,
result.errors_network,
result.errors_status_code,
);
return Some(coalesced_traces);
}
debug!(
"TRACES | Successfully sent trace ({} attempts, {} bytes)",
result.requests_count, result.bytes_sent,
);
}
debug!("TRACES | Flushing took {} ms", start.elapsed().as_millis());
None
}
}