SingleFlight

Go Singleflight 實作全攻略:優化 API 消耗、並發控制與監控實務
在開發高併發應用程式(如股票分析機器人)時,我們常面臨 「驚群效應」(Thundering Herd):當快取失效或系統剛啟動時,大量請求同時湧入,導致昂貴的 API(如 Gemini)成本爆炸或資料庫崩潰。
singleflight 是 Go 官方擴充套件(golang.org/x/sync/singleflight)中的神兵利器,確保當多個請求同時要求同一個結果時,實際的運算只會執行一次。
1. 為什麼需要 Singleflight?
想像 1,000 個用戶同時查詢「台積電 (2330)」的分析報告:
- 無控制:1,000 次 Gemini API 呼叫(帳單飆升)、1,000 次資料庫連線(系統卡死)。
- 有 Singleflight:1 次 API 呼叫,其餘 999 人在記憶體中等待結果,隨後共享同一份數據。
2. 深入核心:Singleflight 原始碼拆解
理解 singleflight 的底層源碼結構,能幫助我們更精準地掌握並發控制。
核心結構定義
Group: 代表一個命名空間,管理所有正在進行中的任務。m map[string]*call: 記錄目前有哪些 Key 正在「飛行中」(In-flight)。mu sync.Mutex: 保護 Map,防止高併發下的 Race Condition。
call: 代表一個正在執行的具體任務。wg sync.WaitGroup: 最關鍵的同步元件。用來讓「後到」的請求等待「先到」的請求完成。val / err: 存放執行後的結果與錯誤。
運作流程圖解:Do(key, fn)
3. 核心實作範例:L1 快取 + Singleflight
我們在 AnalysisService 中構建兩層防護:記憶體快取 (L1) 與 Singleflight。
package analyzer
import (
"context"
"sync"
"golang.org/x/sync/singleflight"
"github.com/nathan/stock_bot/internal/storage"
)
type AnalysisService struct {
genai *GenAIClient
d1Client *storage.D1Client
stockCache map[string]*StockAnalysisResult
mu sync.RWMutex
sf singleflight.Group
}
func (s *AnalysisService) analyzeStock(ctx context.Context, code, name string) (*StockAnalysisResult, error) {
// 1. 第一層防護:檢查記憶體快取 (L1 Cache)
s.mu.RLock()
if result, ok := s.stockCache[code]; ok {
s.mu.RUnlock()
return result, nil
}
s.mu.RUnlock()
// 2. 第二層防護:Singleflight (請求合併)
key := "stock:" + code
v, err, _ := s.sf.Do(key, func() (interface{}, error) {
// 3. 執行昂貴的邏輯 (DB + Gemini API)
result, err := s.doAnalyzeStock(ctx, code, name)
if err != nil {
return nil, err
}
// 4. 寫入快取 (務必在 singleflight 內部完成,防止下一波瞬間擊穿)
s.mu.Lock()
s.stockCache[code] = result
s.mu.Unlock()
return result, nil
})
if err != nil {
return nil, err
}
return v.(*StockAnalysisResult), nil
}
4. 進階實戰:處理「巢狀 Context」與非同步操作
在 doAnalyzeStock 內部,如果我們有其他的非同步操作(例如同時查多個 API),我們必須正確傳遞並檢查 ctx.Err()。這樣做是為了確保:如果外部請求(領頭者)超時了,後續的耗時運算能立即停止,釋放資源。
func (s *AnalysisService) doAnalyzeStock(ctx context.Context, code, name string) (*StockAnalysisResult, error) {
// 建立一個子 Context 用於內部的多個非同步任務
g, ctx := errgroup.WithContext(ctx)
var dbData string
var aiResult string
// 任務 1:查資料庫
g.Go(func() error {
// 隨時檢查 Context 是否已取消
select {
case <-ctx.Done():
return ctx.Err()
default:
// 模擬資料庫查詢
dbData = "Historical Data"
return nil
}
})
// 任務 2:呼叫 Gemini API
g.Go(func() error {
// 將 ctx 傳入 API 客戶端,讓它能跟隨整體的超時控制
res, err := s.genai.Generate(ctx, "Analyze this: "+code)
if err != nil {
return err
}
aiResult = res
return nil
})
// 等待所有任務完成或其中一個出錯
if err := g.Wait(); err != nil {
return nil, err
}
return &StockAnalysisResult{Data: dbData, Analysis: aiResult}, nil
}
5. 性能數據對比 (100 個併發請求)
| 指標 | 無 Singleflight | 有 Singleflight | 優化率 |
| Gemini API 呼叫次數 | 100 次 | 1 次 | 99% |
| 資料庫讀取壓力 | 高 (100 併發) | 極低 (1 併發) | 99% |
| 平均回應時間 | ~2,500ms | ~2,100ms | ~16% |
6. 監控與量化:加入 OpenTelemetry 追蹤
為了知道 Singleflight 幫我們省了多少錢,我們可以追蹤 shared 這個回傳值。shared 為 true 表示該請求是「搭便車」成功的。
func (s *AnalysisService) analyzeStockWithMetrics(ctx context.Context, code string) (*StockAnalysisResult, error) {
key := "stock:" + code
v, err, shared := s.sf.Do(key, func() (interface{}, error) {
return s.doAnalyzeStock(ctx, code, "Name")
})
// 紀錄監控指標:分辨是「原始呼叫」還是「共享結果」
status := "original"
if shared {
status = "shared"
}
s.sfCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("stock_code", code),
attribute.String("type", status),
))
if err != nil {
return nil, err
}
return v.(*StockAnalysisResult), nil
}
儀表板觀測指標 (Prometheus / Grafana) 透過這組指標,你可以在 Grafana 畫出以下圖表:
Request Stacked Area Chart:
type="original": 實際產生的 API 呼叫次數(你的成本預算)。
type="shared": 被合併的請求次數(你省下的錢)。
Saving Ratio (節省率):
- 公式:sum(rate(shared)) / sum(rate(total))。
7. 總結
singleflight 的機制就像是:
- 進門先看櫃台 (Map):有沒有掛牌子 (Key)?
- 有牌子:代表有人處理了,我就站在旁邊等 (Wait),等他處理好直接拿結果。
- 沒牌子:我掛上牌子 (Add Map),開始處理 (Do Fn)。
開發者筆記:
- 快取寫入:務必在
sf.Do的 func 內部寫入快取。 - Context 意識:傳遞並檢查
ctx.Err()是確保系統在高壓下不會資源洩漏的關鍵。 - 可觀測性:沒有監控,優化就只是傳說。加入 OTel 讓數據說話。
「多人請求,一人做事,結果共享」 —— 這不僅是效能優化,更是對昂貴資源與系統穩定性的極致追求。






