剖析 OTel Collector Delta To Cumulative Processor

本文將深入探討 OpenTelemetry Collector Contrib 中的 deltatocumulative Processor。除了基本的配置與使用外,我們將從 源碼層級 (Source Code Level) 分析其內部運作機制、狀態管理策略,並詳細解釋生產環境中常見的異常現象。
1. 簡介
deltatocumulativeprocessor 的核心任務是將 Metrics 的 Temporality 從 Delta (增量) 轉換為 Cumulative (累積)。這是一個 Stateful (有狀態) 的組件,意味著它必須在記憶體中維護所有活躍 Time Series 的當前數值。
2. 核心架構與處理流程 (Architecture & Processing Flow)
此 Processor 的設計核心在於「高效的狀態管理」與「嚴格的時間序驗證」。為了在高併發下維持準確性,它採用了細粒度的鎖定策略與強型別的狀態存儲。
2.1 關鍵資料結構 (Key Data Structures)
核心邏輯位於 processor/deltatocumulativeprocessor,主要由以下結構支撐:
A. 唯一識別 identity.Stream
每個 Time Series (時間序列) 由 identity.Stream 唯一識別。這不僅僅是 Metric Name,還包含:
Metric Signature: Name, Unit, Type, Monotonicity, Temporality.
Attributes Hash: 所有 DataPoint 屬性 (Labels) 的雜湊值。 這確保了即使是同一個 Metric Name,不同的 Label 組合也會被視為獨立的 Stream。
B. 狀態存儲 state
Processor 內部使用 maps.Parallel (基於 xsync.MapOf) 來存儲狀態。為了效能與型別安全,它針對不同數據類型分開存儲:
nums: 存儲NumberDataPoint(Sum/Gauge)hist: 存儲HistogramDataPointexpo: 存儲ExponentialHistogramDataPoint
C. 併發控制 mutex[T]
這是效能關鍵。Processor 不使用全域鎖 (Global Lock) 來保護所有狀態。 相反,每個 Stream 都有自己獨立的 mutex。
- 意義: 不同 Stream 的更新是完全平行的,互不阻塞。只有當多個請求同時更新 同一個 Stream 時,才會發生鎖競爭。
2.2 處理流程 (ConsumeMetrics)
當 Metrics 進入 Processor 時,數據流經以下嚴格步驟:
過濾 (Filter):
- 檢查
AggregationTemporality。只有 Delta 類型的指標會被處理;Cumulative 指標直接透傳 (Pass-through)。
- 檢查
識別與查找 (Identify & Lookup):
計算 DataPoint 的
identity.Stream。嘗試從
state中檢索現有累積值。容量檢查: 如果是新 Stream 且總數已達
max_streams,則標記error="limit"並丟棄該數據點。
聚合運算 (
delta.Aggregate):在 Stream 級別的鎖保護下執行。
邏輯:
New_Cumulative = Old_Cumulative + New_Delta。
時間序驗證 (Validation): 在聚合前,必須通過兩項關鍵檢查 (位於
internal/delta/delta.go):亂序檢測 (
ErrOutOfOrder):條件:
New.Time <= Stored.Time結果: 丟棄數據。這通常發生在發送端重試或網路亂序時。
重啟檢測 (
ErrOlderStart):條件:
New.Start < Stored.Start結果: 丟棄數據。這代表來源進程可能已重啟,發送了屬於「上一代」的數據,或是時間戳生成有誤。
寫回與標記:
將計算出的 Cumulative 值寫回原始 DataPoint。
將 Temporality 修改為
Cumulative。更新
stalemap 中的最後活躍時間 (Last Seen)。
2.3 垃圾回收機制 (Garbage Collection)
為了釋放不再活躍的 Stream (例如短暫存在的 Pod Metrics),Processor 運行一個背景 Goroutine:
頻率: 每 1 分鐘執行一次。
邏輯: 遍歷
staleMap。如果now - last_seen > max_stale,則從記憶體中刪除該 Stream 的所有狀態。
3. 關鍵配置參數 (Configuration)
processors:
deltatocumulative:
# Stream 閒置多久後被視為過期並清除 (預設 5m)
# 影響: 設置過短會導致頻繁的狀態重置;設置過長會增加記憶體壓力。
max_stale: 5m
# 允許追蹤的最大 Stream 數量 (預設為 Max Int)
# 影響: 這是保護 Collector 不被 High Cardinality 數據撐爆的最後防線。
# 一旦觸發,超出的 Stream 會被無情丟棄。
max_streams: 9223372036854775807
4. 監控指標詳解 (Observability)
Processor 透過 internal/telemetry 暴露了自我監控指標 (Self-monitoring Metrics),這是排查數據丟失問題的首要依據。
4.1 核心指標列表
| 指標名稱 (Metric Name) | 類型 | 說明 | 關鍵標籤 (Labels) |
deltatocumulative_datapoints | Counter | 處理的數據點總數。請密切關注 error 標籤。 | error: |
- (missing): 處理成功 | |||
- limit: 觸發 max_streams 上限而丟棄 | |||
- delta.ErrOutOfOrder: 因時間戳亂序而丟棄 | |||
- delta.ErrOlderStart: 因起始時間異常而丟棄 | |||
deltatocumulative_streams_tracked | Gauge | 當前記憶體中活躍追蹤的 Stream 總數。 | 無 |
deltatocumulative_streams_limit | Gauge | 配置的 max_streams 值。 | 無 |
deltatocumulative_streams_max_stale | Gauge | 配置的 max_stale 值 (秒)。 | 無 |
運營環境常見議題剖析
議題一:狀態丟失與 streams_tracked 的鋸齒狀波動
現象: 監控圖表顯示 streams_tracked 呈現週期性的鋸齒狀下跌,或者劇烈震盪。同時下游看到的數值可能突然歸零或重置。
源碼級原因: 這通常與 GC 機制 (stale check) 有關。
間歇性流量: 如果某個指標每 6 分鐘才送一次,而
max_stale設為 5 分鐘。Processor 會在第 5 分鐘刪除狀態。第 6 分鐘數據進來時,被視為全新的 Stream,累積值從 0 (或當前 Delta) 開始計算,導致狀態丟失。GC 運作: 背景 Goroutine 每分鐘一次的清理動作,會導致
streams_tracked出現階梯式下降。
對策:
- 確保
max_stale顯著大於 metrics 的 scrape interval 或 push interval (建議至少 2-3 倍)。
議題二:Pipeline 順序導致的「數據消失之謎」
現象:batch Processor 報告發送了 2.6k 點,但 exporter 報告只發送了 2.0k 點。中間的 0.6k 憑空消失,且沒有任何 Error Log。
源碼級原因: 這是 deltatocumulative 的 max_streams 限制與 Pipeline 順序共同作用的結果。
// processor.go 片段
if maps.Exceeded(last, loaded) {
attrs.Set(telemetry.Error("limit"))
return drop // 直接返回 false,數據點從 Slice 中移除
}
如果 Pipeline 配置為 [batch, deltatocumulative]:
Batch: 收到數據,計數器
batch_send_size+2.6k。DeltaToCumulative: 發現 Stream 總數超標,靜默丟棄 0.6k 數據點 (僅增加
deltatocumulative_datapoints{error="limit"})。Exporter: 收到剩餘的 2.0k,計數器
sent_metric_points+2.0k。
對策:
調整順序: 改為
[deltatocumulative, batch]。讓過濾發生在打包之前。監控 Drop: 設置告警監控
sum(rate(otelcol_deltatocumulative_datapoints{error="limit"}[2m])) > 0。
進階說明
deltatocumulativeprocessor 不會拆分 metric,它只做:
- Delta → Cumulative 的 temporality 轉換
- 累加數值
拆分成 _total, _sum, _bucket 是 Prometheus Exporter 的工作。
流程圖
┌─────────────────────────────────────────────────────────────────────────┐
│ OTel Collector Pipeline │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Receiver Processor Exporter │
│ ──────── ───────── ──────── │
│ │
│ ┌──────────┐ ┌─────────────────────┐ ┌───────────────────┐ │
│ │ OTLP │ │ deltatocumulative │ │ prometheusremote │ │
│ │ Receiver │ ──▶ │ processor │ ──▶ │ write exporter │ │
│ └──────────┘ └─────────────────────┘ └───────────────────┘ │
│ │
│ OTel Histogram OTel Histogram Prometheus format: │
│ (Delta) (Cumulative) • metric_bucket{le=...} │
│ ↑ • metric_sum │
│ 只改 temporality • metric_count │
│ 不改結構 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
各 Metric 類型在 Prometheus Exporter 的輸出
┌──────────────────────┬──────────────────────────────────────────────────────────┐
│ OTel Metric Type │ Prometheus 輸出 │
├──────────────────────┼──────────────────────────────────────────────────────────┤
│ Sum (Counter) │ metric_total (1 個) │
├──────────────────────┼──────────────────────────────────────────────────────────┤
│ Gauge │ metric (1 個) │
├──────────────────────┼──────────────────────────────────────────────────────────┤
│ Histogram │ metric_bucket{le=...} (N 個) + metric_sum + metric_count │
├──────────────────────┼──────────────────────────────────────────────────────────┤
│ ExponentialHistogram │ 轉成普通 histogram 後同上 │
├──────────────────────┼──────────────────────────────────────────────────────────┤
│ Summary │ metric{quantile=...} (N 個) + metric_sum + metric_count │
└──────────────────────┴──────────────────────────────────────────────────────────┘
範例:一個 Histogram DataPoint
# OTel Histogram (經過 deltatocumulativeprocessor 後)
name: http_request_duration_seconds
type: Histogram
temporality: Cumulative # ← processor 改了這個
datapoint:
attributes: {method: "GET"}
sum: 150.5
count: 1000
bucket_counts: [100, 300, 400, 150, 50]
explicit_bounds: [0.005, 0.01, 0.025, 0.05]
↓ Prometheus Exporter ↓
# 變成多個 time series
http_request_duration_seconds_bucket{method="GET", le="0.005"} 100
http_request_duration_seconds_bucket{method="GET", le="0.01"} 400
http_request_duration_seconds_bucket{method="GET", le="0.025"} 800
http_request_duration_seconds_bucket{method="GET", le="0.05"} 950
http_request_duration_seconds_bucket{method="GET", le="+Inf"} 1000
http_request_duration_seconds_sum{method="GET"} 150.5
http_request_duration_seconds_count{method="GET"} 1000
總結
┌────────────────────────────┬──────────────────────────────────────────────────┐
│ 組件 │ 職責 │
├────────────────────────────┼──────────────────────────────────────────────────┤
│ deltatocumulativeprocessor │ 只改 temporality,不拆分 metric │
├────────────────────────────┼──────────────────────────────────────────────────┤
│ Prometheus Exporter │ 將 OTel 格式轉成 Prometheus 格式,拆分 histogram │
└────────────────────────────┴──────────────────────────────────────────────────┘
所以正常 sent_metric_points 應該要更多才是。
議題三:亂序數據 (Out of Order)
現象: 數據偶爾丟失,deltatocumulative_datapoints 出現 error="out_of_order"。
源碼級原因 (internal/delta/delta.go):
case dp.Timestamp() <= state.Timestamp():
return ErrOutOfOrder{...}
Processor 強制要求數據的時間戳嚴格遞增。如果發送端 (如 Prometheus Remote Write 或某些 SDK) 因重試邏輯發送了重複或舊的時間戳,Processor 會為了保護累積值的單調性而拒絕該數據。
對策: 檢查發送端的 Retry 策略或時鐘同步狀態。
6. PoC Lab 環境 - 本地重現驗證
為了驗證上述理論,我們建立了一個可在本地運行的 Docker Compose PoC 環境。
6.1 環境架構
┌─────────────────┐ ┌─────────────────────────────────────┐ ┌────────────┐
│ telemetrygen │────▶│ OTel Collector │────▶│ Prometheus │
│ (60 instances) │ │ ┌─────────────────────────────┐ │ │ :9090 │
│ app-01 ~ 60 │ │ │ Pipeline: │ │ └────────────┘
└─────────────────┘ │ │ receiver → cumulativetodelta│ │
│ │ → batch │ │
│ │ → deltatocumulative│ │
│ │ → exporter │ │
│ │ │ │
│ │ max_streams: 50 (< 60) │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
設計理念:
60 個 telemetrygen 實例,每個發送不同的
service.name(app-01 ~ app-60)max_streams設為 50,刻意製造 Stream 超限Pipeline 順序為
[cumulativetodelta, batch, deltatocumulative],重現「先 batch 後過濾」的問題
6.2 檔案結構
poc-lab/
├── docker-compose.yaml # Docker Compose 配置
├── otel-config.yaml # OTel Collector 配置
└── prometheus.yaml # Prometheus scrape 配置
6.3 配置檔案
otel-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
# 將 telemetrygen 產生的 Cumulative 轉成 Delta
cumulativetodelta:
batch:
send_batch_size: 100
timeout: 1s
deltatocumulative:
max_stale: 1m
# 【關鍵設定】設定極低的上限,強迫發生 Drop
max_streams: 50
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
namespace: "poc_app"
debug:
verbosity: normal
service:
telemetry:
metrics:
readers:
- pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 8888
pipelines:
metrics:
receivers: [otlp]
# 【關鍵錯誤順序】重現問題
processors: [cumulativetodelta, batch, deltatocumulative]
exporters: [prometheus, debug]
prometheus.yaml
global:
scrape_interval: 5s
evaluation_interval: 5s
scrape_configs:
# Job 1: 監控 Collector 本身
- job_name: 'otel-collector-internal'
static_configs:
- targets: ['otel-collector:8888']
# Job 2: 監控實際輸出的數據
- job_name: 'app-metrics'
static_configs:
- targets: ['otel-collector:8889']
docker-compose.yaml (精簡版)
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.142.0
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-config.yaml:/etc/otel-collector-config.yaml:ro
ports:
- "4317:4317"
- "8888:8888" # Collector Internal Metrics
- "8889:8889" # Prometheus Exporter
networks:
- otel-network
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml:ro
ports:
- "9090:9090"
depends_on:
- otel-collector
networks:
- otel-network
# 使用 YAML anchor 定義 60 個 telemetrygen 實例
telemetrygen-01: &telemetrygen-base
image: ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen:latest
command: ["metrics", "--otlp-insecure", "--otlp-endpoint=otel-collector:4317",
"--rate=5", "--duration=1000h", "--metric-type=Sum", "--service=app-01"]
depends_on: [otel-collector]
networks: [otel-network]
telemetrygen-02: { <<: *telemetrygen-base, command: [..., "--service=app-02"] }
# ... (app-03 ~ app-60)
networks:
otel-network:
driver: bridge
6.4 運行與驗證
步驟 1: 啟動環境
cd poc-lab
docker compose up -d
步驟 2: 等待數據累積 (約 30-60 秒)
docker compose ps # 確認所有容器運行中
步驟 3: 驗證查詢
打開 Prometheus UI (http://localhost:9090) 或使用 curl:
查詢 1: Streams 追蹤數量 (應該達到上限 50)
otelcol_deltatocumulative_streams_tracked
查詢 2: 數據點處理結果 (按 error 分類)
otelcol_deltatocumulative_datapoints_total
查詢 3: 比較 Batch vs Exporter
# Batch 處理量
otelcol_processor_batch_batch_send_size_sum
# Exporter 發送量
otelcol_exporter_sent_metric_points_total
6.5 驗證結果 - 問題重現 (錯誤順序)
使用錯誤的 Pipeline 順序 [cumulativetodelta, batch, deltatocumulative] 運行後:
| 指標 | 數值 | 說明 |
streams_tracked | 50 | 達到 max_streams 上限 |
streams_limit | 50 | 配置值 |
receiver_accepted | 13,000 | Receiver 接收的總數據點 |
batch_send_size_sum | 13,000 | Batch 處理的數據點 |
exporter_sent | 11,000 | Exporter 實際發送的數據點 |
數據點處理詳情:
| error 標籤 | 數值 | 說明 |
error="none" | 16,000 | 成功處理 |
error="limit" | 3,000 | 因達到 Stream 上限而丟棄 |
關鍵發現:
發送端: 60 個 telemetrygen 實例
限制: max_streams = 50
被完全丟棄的實例: 10 個 (60 - 50)
Batch (13,000) ≠ Exporter (11,000):差異 2,000 個數據點「憑空消失」
丟棄僅標記
error="limit",無 Error Log,難以察覺
6.6 修正驗證 - 正確順序
修改 otel-config.yaml 中的 processors 順序:
# 修正前 (問題配置)
processors: [cumulativetodelta, batch, deltatocumulative]
# 修正後 (正確配置)
processors: [cumulativetodelta, deltatocumulative, batch]
修正後驗證結果:
| 指標 | 數值 | 說明 |
streams_tracked | 50 | 仍達到 max_streams 上限 |
receiver_accepted | 17,500 | Receiver 接收的總數據點 |
batch_send_size_sum | 14,950 | Batch 處理的數據點 |
exporter_sent | 14,950 | Exporter 實際發送的數據點 |
數據點處理詳情:
| error 標籤 | 數值 | 說明 |
error="none" | 14,950 | 成功處理 |
error="limit" | 2,490 | 因達到 Stream 上限而丟棄 |
6.7 修正前後對比
| 指標 | 修正前 | 修正後 | 結論 |
| Batch 處理量 | 13,000 | 14,950 | - |
| Exporter 發送量 | 11,000 | 14,950 | - |
| Batch = Exporter? | 否 (差 2,000) | 是 (完全一致) | 問題解決 |
| error="limit" | 3,000 | 2,490 | 仍有丟棄 (預期行為) |
關鍵結論:
修正前: Batch (13,000) ≠ Exporter (11,000) ← 數據「消失」,難以排查
修正後: Batch (14,950) = Exporter (14,950) ← 指標一致,問題可追溯
Pipeline 順序修正有效 - Batch 和 Exporter 的數值現在完全一致
丟棄仍然發生 (error="limit"),但這是預期行為,因為來源數 (60) > max_streams (50)
監控指標正確反映實際狀態 - 運維人員可直接從
error="limit"指標看到丟棄量
6.8 清理環境
docker compose down
7. PHP 與 Delta To Cumulative Processor 的關聯
7.1 為什麼是 PHP? (The "Share-Nothing" Architecture)
大多數的應用程式(如 Java, Go, Python, Node.js)通常是以常駐進程 (Long-running process) 的方式運行。它們在記憶體中有一個全域的計數器,可以一直累加數值:
00:01: 累計 10 次
00:02: 累計 15 次 (累加了 5 次)
00:03: 累計 20 次 (又累加了 5 次)
這就是 Cumulative (累積) 模式,也是 Prometheus 等後端系統最喜歡的格式。
但是,PHP 通常運行在 PHP-FPM 或 CGI 模式下。其生命週期是「一個請求一個進程」 (Per-request process):
- 收到請求 -> 啟動 (或重用) PHP worker。
- 執行腳本 -> 處理指標 (Metrics)。
請求結束 -> 記憶體釋放/重置。
因為 PHP 進程之間通常不共享記憶體 (Share-nothing),要維護一個「自從伺服器啟動以來的所有請求總數」是非常困難且昂貴的(需要依賴外部 Redis 或 Shared Memory)。
因此,PHP 最自然的做法是只回報「這一次請求發生了什麼」:
- 請求 A: 我處理了 1 個 DB 查詢 (Delta) -> 結束
請求 B: 我處理了 1 個 DB 查詢 (Delta) -> 結束
這就是
Delta (增量)模式。
7.2 Delta to Cumulative Processor 的角色
當您的後端資料庫(如 Prometheus)只接受 Cumulative 資料,但您的應用程式(如 PHP 或 Serverless Functions)只能提供 Delta 資料時,就會發生格式不相容。
這時候就需要 deltatocumulative processor 擔任「狀態管理者」 (Stateful Intermediary) 的角色:
- 接收 (Receive): 它接收來自 PHP 的無數個小 Delta (例如:+1, +1, +1)。
- 記憶 (Remember): 它在 Collector 的記憶體中維護一個對應的 Stream,並幫忙做加法運算 (State Management)。
- 轉換 (Convert): 它算出累積值 (例如:目前總共是 3),並將其轉換為 Cumulative 格式。
- 輸出 (Export): 發送給 Prometheus。
雖然 Serverless (如 AWS Lambda) 或 CLI 工具也有類似需求,但 PHP的廣泛使用以及其標準的運行模式,使其成為這個 Processor 最常見的使用案例。
8. 結論與最佳實踐
Pipeline 順序: 將
deltatocumulative放在batch之前,讓過濾發生在打包前監控告警: 設置對
error="limit",error="out_of_order"的告警容量規劃: 根據預期的 Stream 數量合理設置
max_streamsStale 設定:
max_stale應大於最大的 push/scrape interval (建議 2-3 倍)






