Node.js

Node.js Streams : traiter de gros volumes de données

Mark Toledo

Mark Toledo

1 avril 2026

Node.js Streams : traiter de gros volumes de données

Sur un projet client, j'ai passé deux jours à débugger un memory leak causé par un stream mal fermé. Le service Node.js traitait des exports CSV de plusieurs gigaoctets et crashait toutes les quatre heures avec une erreur "JavaScript heap out of memory". La cause : on chargeait le fichier entier en mémoire avec fs.readFile avant de le traiter. La solution : les streams.

Les streams sont l'une des fonctionnalités les plus puissantes de Node.js, et paradoxalement l'une des moins bien comprises. Ce guide va changer ça.

Le problème que les streams résolvent

Sans streams, traiter un fichier de 2 Go ressemble à ça :

// ❌ Charge tout en mémoire — plante sur les gros fichiers
const contenu = await fs.readFile('export-2go.csv', 'utf-8');
const lignes = contenu.split('\n');
lignes.forEach(traiterLigne);

Node.js doit charger les 2 Go dans la RAM avant de commencer le traitement. Sur un VPS avec 2 Go de RAM totale, c'est la catastrophe.

Avec les streams, les données arrivent par petits morceaux (chunks). Tu traites chaque morceau à mesure qu'il arrive, sans jamais avoir plus que quelques kilooctets en mémoire.

Les quatre types de streams

Node.js définit quatre types de streams :

  • Readable : source de données (lecture de fichier, requête HTTP entrante, stdin)
  • Writable : destination de données (écriture de fichier, réponse HTTP, stdout)
  • Duplex : lecture et écriture (socket TCP)
  • Transform : transforme les données au passage (compression, chiffrement, parsing)

Streams Readable : consommer des données

import { createReadStream } from 'fs';

const stream = createReadStream('gros-fichier.txt', {
  encoding: 'utf-8',
  highWaterMark: 64 * 1024 // Taille des chunks : 64 Ko
});

stream.on('data', (chunk) => {
  console.log(`Chunk reçu : ${chunk.length} caractères`);
  // Traiter le chunk
});

stream.on('end', () => {
  console.log('Fichier entièrement lu');
});

stream.on('error', (erreur) => {
  console.error('Erreur de lecture:', erreur);
});

Le paramètre highWaterMark contrôle la taille des chunks. Par défaut c'est 16 Ko pour les streams en mode objet. Adapter cette valeur selon ton cas d'usage peut améliorer significativement les performances.

Le pipe : connecter des streams

La méthode .pipe() connecte un stream Readable à un stream Writable. C'est le pattern le plus naturel pour transformer des données en flux.

import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

// Compresser un fichier sans le charger en mémoire
const source = createReadStream('gros-fichier.txt');
const compression = createGzip();
const destination = createWriteStream('gros-fichier.txt.gz');

source
  .pipe(compression)
  .pipe(destination)
  .on('finish', () => console.log('Compression terminée'))
  .on('error', (err) => console.error('Erreur:', err));

En une ligne, on lit, compresse et écrit un fichier de n'importe quelle taille. La mémoire utilisée est proportionnelle à la taille du chunk, pas à la taille du fichier.

pipeline : la version moderne et robuste

.pipe() a un défaut : si un stream intermédiaire émet une erreur, les autres streams ne sont pas automatiquement détruits, ce qui cause des fuites de ressources. C'est exactement le bug que j'ai eu sur ce projet client.

La fonction pipeline du module stream gère ça correctement :

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

async function compresserFichier(source, destination) {
  await pipeline(
    createReadStream(source),
    createGzip(),
    createWriteStream(destination)
  );
  console.log(`${source} compressé vers ${destination}`);
}

// Si n'importe quel stream échoue, tous sont correctement fermés
await compresserFichier('rapport.txt', 'rapport.txt.gz');

pipeline avec les promesses (disponible depuis Node.js 15) est la façon recommandée de chaîner des streams aujourd'hui.

Créer un stream Transform

Les streams Transform sont particulièrement utiles pour parser ou transformer des données ligne par ligne.

import { Transform } from 'stream';

class ParserCSV extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
    this.entete = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lignes = this.buffer.split('\n');
    
    // Garder la dernière ligne incomplète dans le buffer
    this.buffer = lignes.pop();

    for (const ligne of lignes) {
      if (!ligne.trim()) continue;
      
      const valeurs = ligne.split(',');
      
      if (!this.entete) {
        this.entete = valeurs;
        continue;
      }
      
      const objet = Object.fromEntries(
        this.entete.map((col, i) => [col.trim(), valeurs[i]?.trim()])
      );
      
      this.push(objet); // Pousser l'objet parsé vers le stream suivant
    }
    
    callback();
  }

  _flush(callback) {
    // Traiter la dernière ligne restante dans le buffer
    if (this.buffer.trim() && this.entete) {
      const valeurs = this.buffer.split(',');
      const objet = Object.fromEntries(
        this.entete.map((col, i) => [col.trim(), valeurs[i]?.trim()])
      );
      this.push(objet);
    }
    callback();
  }
}

Utilisation avec pipeline :

let totalLignes = 0;

await pipeline(
  createReadStream('donnees.csv'),
  new ParserCSV(),
  new Writable({
    objectMode: true,
    write(ligne, encoding, callback) {
      totalLignes++;
      // Insérer en DB, transformer, filtrer...
      traiterLigne(ligne);
      callback();
    }
  })
);

console.log(`${totalLignes} lignes traitées`);

Streams et backpressure

La backpressure est un mécanisme qui empêche un stream rapide de saturer un stream lent. Si tu lis un fichier depuis un SSD ultra-rapide et que tu l'envoies sur une connexion réseau lente, Node.js va automatiquement mettre en pause la lecture pour éviter d'accumuler des chunks en mémoire.

Quand tu crées un stream Writable personnalisé, respecter la backpressure est fondamental :

class WritableDB extends Writable {
  constructor(db) {
    super({ objectMode: true, highWaterMark: 100 }); // Buffer de 100 objets
    this.db = db;
  }

  async _write(objet, encoding, callback) {
    try {
      await this.db.inserer(objet);
      callback(); // Signale que ce chunk est traité — autorise la suite
    } catch (erreur) {
      callback(erreur); // Propage l'erreur
    }
  }

  // Pour les insertions en batch, utiliser _writev
  async _writev(chunks, callback) {
    try {
      await this.db.insererBatch(chunks.map(c => c.chunk));
      callback();
    } catch (erreur) {
      callback(erreur);
    }
  }
}

Streams async iterable

Depuis Node.js 12, les streams Readable implémentent le protocole async iterable. C'est souvent plus lisible que d'utiliser des événements.

import { createReadStream } from 'fs';
import readline from 'readline';

async function traiterFichierLigneParLigne(chemin) {
  const stream = createReadStream(chemin);
  const rl = readline.createInterface({ input: stream });
  
  let numeroLigne = 0;
  
  for await (const ligne of rl) {
    numeroLigne++;
    
    if (ligne.startsWith('#')) continue; // Ignorer les commentaires
    
    await traiterLigne(ligne, numeroLigne);
  }
  
  return numeroLigne;
}

Cette syntaxe for await...of est nettement plus lisible que la gestion d'événements et gère automatiquement la fin du stream.

Cas pratique : traitement d'un export volumineux

Voici un exemple réaliste : transformer un export JSON lines (un objet JSON par ligne) et l'insérer en base de données.

import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { Transform } from 'stream';

async function importerDonnees(fichier, db) {
  let inseres = 0;
  let erreurs = 0;

  const parseur = new Transform({
    objectMode: true,
    transform(chunk, enc, cb) {
      const lignes = chunk.toString().split('\n').filter(Boolean);
      lignes.forEach(ligne => {
        try {
          this.push(JSON.parse(ligne));
        } catch {
          erreurs++;
        }
      });
      cb();
    }
  });

  const inserteur = new Transform({
    objectMode: true,
    async transform(objet, enc, cb) {
      try {
        await db.inserer('produits', objet);
        inseres++;
        if (inseres % 1000 === 0) {
          console.log(`${inseres} insertions...`);
        }
        cb();
      } catch (err) {
        cb(err);
      }
    }
  });

  await pipeline(
    createReadStream(fichier),
    parseur,
    inserteur
  );

  return { inseres, erreurs };
}

Ce qu'on retient

Les streams Node.js ne sont pas une option avancée pour les cas d'usage extrêmes — ils devraient être le réflexe par défaut dès qu'on manipule des fichiers volumineux ou des flux de données.

La règle simple : si la taille du fichier peut dépasser la RAM disponible, utilise un stream. Si les données arrivent en continu (logs, événements, données temps réel), utilise un stream. Si tu as des doutes, utilise un stream.

Et souviens-toi : préfère pipeline à .pipe() pour la gestion automatique des erreurs et des fuites de ressources. Cette seule habitude aurait épargné deux jours de debugging sur ce projet.