Skip to main content

Command Palette

Search for a command to run...

Nodejs Stream 的基礎概念

Updated
6 min read
Nodejs Stream 的基礎概念

大綱

本文主要會說明 Nodejs 中,透過 stream api 可以用達成以下好處

  1. 可以比較有效率的處理大型資料,避免大型檔案遭成程序阻塞
  2. 可以透過有限度的資源處理相同量級資料

What is Stream?

Stream 代表一個資料流。

通常是指處理大型資料把切分為一堆堆小型區塊資量逐步處理的方式

Why use Stream?

  1. 節省資源(記憶體使用)

  2. 透過 Buffer 可以效率處理資料 I/O

Stream 類別

根據資料的流向以及處理方式可以分為以下四類 Data Stream

1. Writable Stream

Writable Stream 是一種用來處理寫把資料輸出的 Stream 物件

具有一個屬性叫作 writableHighWaterMark ,代表累積多少資料才做輸出

writableHighWaterMark 是 16384

內部具有一個預設的 Buffer 結構,預設大小是 writableHighWaterMark Bytes 約為 16MB,用來對寫入資料做暫存

可以透過對 Writable Stream 做 write 方法來把資料寫入 Internal Buffer

直到資料到達 writableHighWaterMark 才做寫入

但當資料超過 writableHighWaterMark 時,超過的部份就會直接被加入記憶體內

造成 Nodejs 記憶體使用量超過原本預期。直到 Nodejs 程序把 Internal Buffer的資料處理完。

比較好的處理方式是透過判斷寫入時判斷回傳值

stream.write 如果回傳值是 true 代表 Internal buffer 還有空間可以做寫入

stream.write 如果回傳值是 false 代表 Internal buffer 還沒有空間可以做寫入 需要花時間處理

當遇到 false 時,可以使用 stream.pause 來暫時停止寫入資料

直到 stream 發出 drain 事件代表,已經處理完 Buffer 內資料,則可以透過 stream.resume 繼續往下處理資料

範例如下:

import * as fs from 'fs/promises';
import * as path from 'path';

(async () => {
  console.time("writeMany");
  const fileHandle = await fs.open(path.join(__dirname, "../test.txt"), "w");
  const stream = fileHandle.createWriteStream();
  console.log(stream.writableHighWaterMark);
  let i = 0;
  const numberOfWrites = 10000000;
  const writeMany = () => {
    while ( i < numberOfWrites) {
      const buff = Buffer.from(` ${i} `, 'utf-8');
      // last write
      if (i === numberOfWrites -1) {
        return stream.end(buff);
      }
      // if stream.write return false stop the loop
      if (!stream.write(buff)) {
        break;
      }
      i++;
    }
  }
  writeMany();
  // resume our loop once our stream's internal buffer is empty
  stream.on('drain', () => {
    writeMany();
  });

  stream.on('finish', () =>{ 
    console.timeEnd("writeMany");
    fileHandle.close();
  })
})()

2. Readable Stream

Readable Stream 是一種用來處理寫把資料輸入的 Stream 物件

具有一個屬性叫作 highWaterMark ,代表累積多少資料才做輸入

highWaterMark 是 65536

內部具有一個預設的 Buffer 結構,預設大小是 highWaterMark Bytes 約為 64MB,用來對輸出資料做暫存

可以透過對 Readable Stream 做 push 或是 end 方法來把資料寫入 Internal Buffer

直到資料到達 highWaterMark 或是讀取到 EOF 可以從 stream.on('data') 事件的讀出輸入的部份資料

範例如下

import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
  console.time('readBig');
  const fileHandleRead = await fs.open(path.join(__dirname, "../src.txt"), "r");
  const fileHandleWrite = await fs.open(path.join(__dirname, "../dest.txt"), "w");
  const streamRead = fileHandleRead.createReadStream({ highWaterMark: 64 * 1024});
  const streamWrite = fileHandleWrite.createWriteStream();
  let split = '';
  streamRead.on('data', (chunk) => {
    const numbers = chunk.toString('utf-8').split('  ');
    if (Number(numbers[0]) !== Number(numbers[1]) -1) {
      if (split) numbers[0] = split + numbers[0].trim();
    }
    if (Number(numbers[numbers.length - 2]) + 1 !==  Number(numbers[numbers.length - 1]) ) {
      split = numbers.pop();
    }

    numbers.forEach((number) => {
      let n = Number(number);
      if (n%10 == 0) {
        if (!streamWrite.write(" "+n+" ")) {
          streamRead.pause();
        }
      }
    });
  });
  streamWrite.on("drain", () => {
    streamRead.resume();
  });
  streamRead.on('end', () => {
    console.log("Done Reading");
    console.timeEnd('readBig');
  });
})();

2.1 把 Writable Stream 與 Readable Stream 串接起來

  1. 透過 pipe 運算

透過 pipe 運算可以把 Readable Strream 串接到 Writable Stream

範例如下

import { pipeline } from 'node:stream';
import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
  console.time("copyStream");
  const srcFile = await fs.open(path.join(__dirname, '../dest.txt'), 'r');
  const destFile = await fs.open(path.join(__dirname, '../dest-copy-stream.txt'), 'w');
  const readStream = srcFile.createReadStream();
  const writeStream = destFile.createWriteStream();
  readStream.pipe(writeStream);
  readStream.on('end', () => {
    console.timeEnd("copyStream");
  });
})();

然而 pipe 運算並不是一種好的作法

主要原因是沒有良好的錯誤處理接口

需要額外針對兩個 Stream 各自做錯誤事件的監聽並且對錯誤的 Stream 做 close

取而代之可以使用 pipeline 來做串接

範例如下:

import { pipeline } from 'node:stream';
import * as fs from 'node:fs/promises';
import * as path from 'path';
(async () => {
  console.time("copyStream");
  const srcFile = await fs.open(path.join(__dirname, '../dest.txt'), 'r');
  const destFile = await fs.open(path.join(__dirname, '../dest-copy-stream.txt'), 'w');
  const readStream = srcFile.createReadStream();
  const writeStream = destFile.createWriteStream();
  pipeline(readStream, writeStream, (err) => {
    console.log(err);
    console.timeEnd("copyStream");
  })
  // readStream.pipe(writeStream);
  // readStream.on('end', () => {
  //   console.timeEnd("copyStream");
  // });
})();

3. Duplex Stream

Duplex 是一種同時具有 Writable Stream 與 Readable Stream 的 Stream 物件

同時具有 Writable 與 Readable Stream 的屬性

同時具有 Writable 與 Readable Stream 的 Buffer

可以分開對不同資料流做讀以及寫入

範例如下

import { Duplex } from 'node:stream';
import * as fs from 'node:fs';
import * as path from 'node:path';

class DuplexStream extends Duplex {
  private readFileName: string = '';
  private writeFileName: string = '';
  private readFd: number =  -1;
  private writeFd: number = -1;
  private chunks: Uint8Array[] = [];
  private chunksSize = 0;
  constructor({ writableHighWaterMark, readableHighWaterMark, readFileName, writeFileName }: 
    { writableHighWaterMark?: number, readableHighWaterMark?: number, readFileName: string, writeFileName: string }) {
    super({writableHighWaterMark, readableHighWaterMark })
    this.readFileName = readFileName;
    this.writeFileName = writeFileName;

  }
  _construct(callback: (error?: Error) => void): void {
    fs.open(this.readFileName, 'r', (err, readFd) => {
      if (err) return callback(err);

      this.readFd = readFd;
      fs.open(this.writeFileName, 'w', (err, writeFd) => {
        if (err) return callback(err);

        this.writeFd = writeFd;
        callback();
      });
    });
  }
  _write(chunk: Buffer, encoding: BufferEncoding, callback: (error?: Error) => void): void {
    this.chunks.push(chunk);
    this.chunksSize += chunk.length;
    // do our write operation...
    if (this.chunksSize > this.writableHighWaterMark) {
      fs.write(this.writeFd, Buffer.concat(this.chunks), (err) => {
        if (err) {
          return callback(err);
        }
        this.chunks = [];
        this.chunksSize = 0;
        callback();
      });
    } else {
      // when we're done, we should call the callback function
      callback();
    }
  }
  _read(size: number): void {
    const buff = Buffer.alloc(size);
    fs.read(this.readFd, buff, 0, size, null, (err, bytesRead) => {
      if (err) return this.destroy(err);
      // nul is to indicate the end of stream
      this.push(bytesRead > 0 ? buff.subarray(0, bytesRead): null);
    });
  }
  _final(callback: (error?: Error) => void): void {
    fs.write(this.writeFd, Buffer.concat(this.chunks), (err) => {
      if (err) return callback(err);

      this.chunks = [];
      callback();
    });
  }
  _destroy(error: Error, callback: (error: Error) => void): void {
    callback(error);
  }
}

const duplex = new DuplexStream({ 
  readFileName: path.join(__dirname, '../read.txt'), 
  writeFileName: path.join(__dirname, '../write.txt') });

duplex.write(Buffer.from("this is a string 0\n"));
duplex.write(Buffer.from("this is a string 1\n"));
duplex.write(Buffer.from("this is a string 2\n"));
duplex.write(Buffer.from("this is a string 3\n"));
duplex.end(Buffer.from("end of write"));
duplex.on('data', (chunk: Buffer) => {
  console.log(chunk.toString('utf-8'));
});

4. Transform Stream

Transform 是指把一個 Readable Stream 導入 Writable Stream 的 Stream 物件

同時具有 Writable 與 Readable Stream 的屬性

同時具有 Wriatable 與 Readable Stream 的 Buffer

特別針對需要做轉換的資料流

範例如下

encrypt.ts 把每個 byte 做 +1

import { Transform } from 'node:stream';
import { TransformCallback } from 'stream';
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
class Encrypt extends Transform {
  _transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
    // console.log(chunk.toString('utf-8'));
    for (let i = 0; i < chunk.length; ++i) {
      if (chunk[i] !== 255) {
        chunk[i] = chunk[i] + 1;
      }
    }
    // this.push(chunk);
    callback(null, chunk);
  }
}
(async () => {
  const readFileHandle = await fs.open(path.join(__dirname, '../test.txt'), 'r');
  const writeFileHandle = await fs.open(path.join(__dirname, '../write.txt'), 'w');

  const readStream = readFileHandle.createReadStream();
  con

decrypt.ts 把每個 byte 做 -1

import { Transform } from 'node:stream';
import { TransformCallback } from 'stream';
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
class Decrypt extends Transform {
  _transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
    // console.log(chunk.toString('utf-8'));
    for (let i = 0; i < chunk.length; ++i) {
      if (chunk[i] !== 255) {
        chunk[i] = chunk[i] - 1;
      }
    }
    callback(null, chunk);
  }
}
(async () => {
  const readFileHandle = await fs.open(path.join(__dirname, '../write.txt'), 'r');
  const writeFileHandle = await fs.open(path.join(__dirname, '../decrypted.txt'), 'w');

  const readStream = readFileHandle.createReadStream();
  const writeStream = writeFileHandle.createWriteStream();
  const decrypt = new Decrypt();

  readStream.pipe(decrypt).pipe(writeStream);
})()

使用到 Stream 的 node 套件

針對 node pg module ,對於較大的回傳資料提供了 pq-query-stream 這個套件

可以透過 stream 的方式慢慢把鉅量資料逐步傳遞 https://github.com/brianc/node-postgres/tree/master/packages/pg-query-stream

Web Stream API

透過 Web Stream API, 現在可以直接在 javascript 使用 stream 的作法來更有效率的處理巨型檔案 https://developer.mozilla.org/en-US/docs/Web/API/Streams_API

Reference Data

Nodejs stream 講解

Nodejs Stream api 文件

readable pipe 官方文件

More from this blog

Claude Code 監控秘錄:OpenTelemetry(OTel/OTLP)實戰指南

稟告主公:此乃司馬懿進呈之兵書,詳解如何以 OpenTelemetry 陣法,令臥龍神算之一舉一動盡在掌握,知糧草消耗、察兵器效能、辨戰報異常,使主公運籌帷幄於大帳之中。 為何需要斥候情報? 司馬懿稟告主公: 臥龍神算(Claude Code)乃當世利器,然若無斥候回報,主公便如蒙眼行軍——兵器耗損幾何、糧草消費幾許、哪路斥候出了差錯,一概不知。臣以為,此乃兵家大忌。 無情報之弊,有四: 軍

Feb 19, 202610 min read184
Claude Code 監控秘錄:OpenTelemetry(OTel/OTLP)實戰指南

工程師的 Claude Code 實戰指南:從零開始到高效開發

工程師的 Claude Code 實戰指南:從零開始到高效開發 本文整合 Anthropic 官方 Best Practices 與社群實戰 Tips,帶你由淺入深掌握 Claude Code。 什麼是 Claude Code?為什麼值得學? 如果你還在用「複製程式碼貼到 ChatGPT,再複製答案貼回去」的工作流程,Claude Code 會讓你大開眼界。 Claude Code 是 Anthropic 推出的命令列工具,它直接活在你的 terminal 裡,能夠讀懂你的整個 codeb...

Feb 18, 20265 min read81
工程師的 Claude Code 實戰指南:從零開始到高效開發

System Design Interview Ch 12 Digital Wallet

確立問題與設計範疇 角色對話內容 面試者我們應該只關注兩個數位錢包之間的餘額轉帳操作嗎?我們是否需要擔心其他功能? 面試官讓我們只關注餘額轉帳操作。 面試者該系統需要支援多少 TPS(每秒交易次數)? 面試官讓我們假設是 1,000,000 TPS (每秒 100 萬次交易)。 面試者數位錢包對正確性有嚴格的要求。我們可以假設事務保證 就足夠了嗎? 面試官聽起來不錯。 面試者我們需要證明正確性嗎? 面試官這是一個很好的問題。正確性(Correctness)通常只有在交...

Feb 2, 202610 min read230
System Design Interview Ch 12 Digital Wallet

Claude Code 利用 Event-Driven Hooks 打造自動化開發大腦

在現代 AI 輔助開發中,我們不僅需要 AI 寫程式,更需要它懂規則、記性好,並且能自動處理那些繁瑣的雜事。透過 Claude Code Hooks 機制,我們可以介入 AI 的思考與執行迴圈,實現真正的「人機協作自動化」。 一、 動機與痛點:為什麼你需要介入 AI 的生命週期? 在預設狀態下,Claude Code 雖然強大,但它是「被動」且「無狀態」的,這導致了開發者常遇到以下痛點: 記憶重置 (Session Amnesia): 痛點:每次重啟終端機,AI 就像失憶一樣。 解法:你...

Jan 24, 20266 min read551
Claude Code 利用 Event-Driven Hooks 打造自動化開發大腦
M

MicroFIRE

71 posts