# SingleFlight

# Go Singleflight 實作全攻略：優化 API 消耗、並發控制與監控實務

在開發高併發應用程式（如股票分析機器人）時，我們常面臨 **「驚群效應」(Thundering Herd)**：當快取失效或系統剛啟動時，大量請求同時湧入，導致昂貴的 API（如 Gemini）成本爆炸或資料庫崩潰。

`singleflight` 是 Go 官方擴充套件（[`golang.org/x/sync/singleflight`](https://pkg.go.dev/golang.org/x/sync/singleflight)）中的神兵利器，確保**當多個請求同時要求同一個結果時，實際的運算只會執行一次**。

---

## 1. 為什麼需要 Singleflight？

想像 1,000 個用戶同時查詢「台積電 (2330)」的分析報告：

* **無控制**：1,000 次 Gemini API 呼叫（帳單飆升）、1,000 次資料庫連線（系統卡死）。
* **有 Singleflight**：**1 次** API 呼叫，其餘 999 人在記憶體中等待結果，隨後共享同一份數據。

---

## 2. 深入核心：Singleflight 原始碼拆解

理解 `singleflight` 的底層源碼結構，能幫助我們更精準地掌握並發控制。

### 核心結構定義

1. **`Group`**: 代表一個命名空間，管理所有正在進行中的任務。
* `m map[string]*call`: 記錄目前有哪些 Key 正在「飛行中」(In-flight)。
* `mu sync.Mutex`: 保護 Map，防止高併發下的 Race Condition。


2. **`call`**: 代表一個正在執行的具體任務。
* `wg sync.WaitGroup`: **最關鍵的同步元件**。用來讓「後到」的請求等待「先到」的請求完成。
* `val / err`: 存放執行後的結果與錯誤。



### 運作流程圖解：Do(key, fn)

```mermaid
sequenceDiagram
    autonumber
    participant G as Group (櫃台)
    participant C as Call (任務)
    participant FN as 執行函數 (fn)

    Note over G: 1. 鎖定 Map 並檢查 Key
    alt 情境 A：已經有人在做了 (Duplicate Call)
        G->>C: 發現 Key 存在
        G->>C: c.dups++ (記錄重複)
        G->>G: Unlock Map
        G->>C: c.wg.Wait() (原地卡住等待)
        C-->>G: 喚醒並回傳結果
    else 情境 B：我是第一個做的 (Original Call)
        G->>C: 建立 new(call)
        G->>C: c.wg.Add(1) (任務開始標記)
        G->>G: 將 call 掛入 Map
        G->>G: Unlock Map
        G->>FN: g.doCall(c, key, fn)
        FN-->>C: 填入結果與錯誤
        Note over G: 2. 清理與廣播
        G->>G: 鎖定 Map 並 delete(key)
        G->>C: c.wg.Done() (大喊：好囉！)
    end

```

---

## 3. 核心實作範例：L1 快取 + Singleflight

我們在 `AnalysisService` 中構建兩層防護：**記憶體快取 (L1)** 與 **Singleflight**。

```go
package analyzer

import (
	"context"
	"sync"

	"golang.org/x/sync/singleflight" 
	"github.com/nathan/stock_bot/internal/storage"
)

type AnalysisService struct {
	genai      *GenAIClient
	d1Client   *storage.D1Client
	stockCache map[string]*StockAnalysisResult
	mu         sync.RWMutex
	sf         singleflight.Group
}

func (s *AnalysisService) analyzeStock(ctx context.Context, code, name string) (*StockAnalysisResult, error) {
	// 1. 第一層防護：檢查記憶體快取 (L1 Cache)
	s.mu.RLock()
	if result, ok := s.stockCache[code]; ok {
		s.mu.RUnlock()
		return result, nil
	}
	s.mu.RUnlock()

	// 2. 第二層防護：Singleflight (請求合併)
	key := "stock:" + code
	v, err, _ := s.sf.Do(key, func() (interface{}, error) {
		// 3. 執行昂貴的邏輯 (DB + Gemini API)
		result, err := s.doAnalyzeStock(ctx, code, name) 
		if err != nil {
			return nil, err
		}

		// 4. 寫入快取 (務必在 singleflight 內部完成，防止下一波瞬間擊穿)
		s.mu.Lock()
		s.stockCache[code] = result
		s.mu.Unlock()

		return result, nil
	})

	if err != nil {
		return nil, err
	}
	return v.(*StockAnalysisResult), nil
}

```

---

## 4. 進階實戰：處理「巢狀 Context」與非同步操作

在 `doAnalyzeStock` 內部，如果我們有其他的非同步操作（例如同時查多個 API），我們必須正確傳遞並檢查 `ctx.Err()`。這樣做是為了確保：**如果外部請求（領頭者）超時了，後續的耗時運算能立即停止，釋放資源。**

```go
func (s *AnalysisService) doAnalyzeStock(ctx context.Context, code, name string) (*StockAnalysisResult, error) {
    // 建立一個子 Context 用於內部的多個非同步任務
    g, ctx := errgroup.WithContext(ctx)
    
    var dbData string
    var aiResult string

    // 任務 1：查資料庫
    g.Go(func() error {
        // 隨時檢查 Context 是否已取消
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // 模擬資料庫查詢
            dbData = "Historical Data"
            return nil
        }
    })

    // 任務 2：呼叫 Gemini API
    g.Go(func() error {
        // 將 ctx 傳入 API 客戶端，讓它能跟隨整體的超時控制
        res, err := s.genai.Generate(ctx, "Analyze this: "+code)
        if err != nil {
			return err
		}
        aiResult = res
        return nil
    })

    // 等待所有任務完成或其中一個出錯
    if err := g.Wait(); err != nil {
        return nil, err
    }

    return &StockAnalysisResult{Data: dbData, Analysis: aiResult}, nil
}

```

---

## 5. 性能數據對比 (100 個併發請求)

| 指標 | 無 Singleflight | 有 Singleflight | 優化率 |
| --- | --- | --- | --- |
| **Gemini API 呼叫次數** | 100 次 | **1 次** | **99%** |
| **資料庫讀取壓力** | 高 (100 併發) | **極低 (1 併發)** | **99%** |
| **平均回應時間** | ~2,500ms | **~2,100ms** | **~16%** |

---

## 6. 監控與量化：加入 OpenTelemetry 追蹤

為了知道 Singleflight 幫我們省了多少錢，我們可以追蹤 `shared` 這個回傳值。`shared` 為 `true` 表示該請求是「搭便車」成功的。

```go
func (s *AnalysisService) analyzeStockWithMetrics(ctx context.Context, code string) (*StockAnalysisResult, error) {
	key := "stock:" + code
	v, err, shared := s.sf.Do(key, func() (interface{}, error) {
		return s.doAnalyzeStock(ctx, code, "Name")
	})

	// 紀錄監控指標：分辨是「原始呼叫」還是「共享結果」
	status := "original"
	if shared {
		status = "shared"
	}
	
	s.sfCounter.Add(ctx, 1, metric.WithAttributes(
		attribute.String("stock_code", code),
		attribute.String("type", status),
	))

	if err != nil {
		return nil, err
	}
	return v.(*StockAnalysisResult), nil
}
```

儀表板觀測指標 (Prometheus / Grafana)
透過這組指標，你可以在 Grafana 畫出以下圖表：

1. **Request Stacked Area Chart**:

  - type="original": 實際產生的 API 呼叫次數（你的成本預算）。

  - type="shared": 被合併的請求次數（你省下的錢）。

2. **Saving Ratio (節省率)**:

  - 公式：sum(rate(shared)) / sum(rate(total))。

---

## 7. 總結

`singleflight` 的機制就像是：

1. **進門先看櫃台 (Map)**：有沒有掛牌子 (Key)？
2. **有牌子**：代表有人處理了，我就站在旁邊等 (**Wait**)，等他處理好直接拿結果。
3. **沒牌子**：我掛上牌子 (**Add Map**)，開始處理 (**Do Fn**)。

**開發者筆記：**

* **快取寫入**：務必在 `sf.Do` 的 func 內部寫入快取。
* **Context 意識**：傳遞並檢查 `ctx.Err()` 是確保系統在高壓下不會資源洩漏的關鍵。
* **可觀測性**：沒有監控，優化就只是傳說。加入 OTel 讓數據說話。

> **「多人請求，一人做事，結果共享」** —— 這不僅是效能優化，更是對昂貴資源與系統穩定性的極致追求。


