728x90
반응형
#
1. 파일 스트림이란
파일 스트림은 대용량 파일을 메모리에 한 번에 로드하지 않고 작은 청크(chunk) 단위로 읽거나 쓰는 방식입니다. 메모리 효율적이며, 파일 크기에 관계없이 일정한 메모리만 사용합니다.
const fs = require('fs');
// 일반 방식: 전체 파일을 메모리에 로드
const data = fs.readFileSync('large-file.txt'); // 메모리 부족 가능
// 스트림 방식: 청크 단위로 처리
const stream = fs.createReadStream('large-file.txt'); // 메모리 효율적
2. 읽기 스트림 (Readable Stream)
2.1 createReadStream 기본 사용
const fs = require('fs');
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 청크 크기 (기본 64KB)
});
readStream.on('data', (chunk) => {
console.log('청크 크기:', chunk.length);
console.log('내용:', chunk);
});
readStream.on('end', () => {
console.log('읽기 완료');
});
readStream.on('error', (err) => {
console.error('오류:', err);
});
2.2 옵션 설정
const fs = require('fs');
const readStream = fs.createReadStream('file.txt', {
encoding: 'utf8', // 문자 인코딩
highWaterMark: 16384, // 청크 크기 (바이트)
start: 0, // 시작 위치
end: 999, // 끝 위치 (포함)
autoClose: true, // 완료 시 자동 닫기
emitClose: true // close 이벤트 발생
});
2.3 일시정지와 재개
const fs = require('fs');
const readStream = fs.createReadStream('large-file.txt');
readStream.on('data', (chunk) => {
console.log('청크 수신');
// 처리가 느린 경우 일시정지
readStream.pause();
// 비동기 작업 후 재개
processChunk(chunk).then(() => {
readStream.resume();
});
});
async function processChunk(chunk) {
// 청크 처리 로직
await new Promise(resolve => setTimeout(resolve, 100));
}
2.4 비동기 이터레이터 사용
const fs = require('fs');
async function readFileByChunks(filepath) {
const readStream = fs.createReadStream(filepath, {
encoding: 'utf8',
highWaterMark: 1024
});
for await (const chunk of readStream) {
console.log('청크:', chunk.length, '바이트');
// 청크 처리
}
console.log('완료');
}
readFileByChunks('large-file.txt');
3. 쓰기 스트림 (Writable Stream)
3.1 createWriteStream 기본 사용
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt', {
encoding: 'utf8'
});
writeStream.write('첫 번째 줄\n');
writeStream.write('두 번째 줄\n');
writeStream.write('세 번째 줄\n');
// 스트림 종료
writeStream.end('마지막 줄');
writeStream.on('finish', () => {
console.log('쓰기 완료');
});
writeStream.on('error', (err) => {
console.error('오류:', err);
});
3.2 옵션 설정
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt', {
encoding: 'utf8',
flags: 'w', // 쓰기 모드 (w: 덮어쓰기, a: 추가)
mode: 0o644, // 파일 권한
highWaterMark: 16384, // 버퍼 크기
autoClose: true, // 자동 닫기
start: 0 // 시작 위치 (flags가 r+일 때)
});
3.3 백프레셔 처리
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');
function writeData(data) {
// write()가 false를 반환하면 버퍼가 찼음
const canContinue = writeStream.write(data);
if (!canContinue) {
console.log('버퍼 가득 참, 대기 중...');
// drain 이벤트를 기다림
}
return canContinue;
}
writeStream.on('drain', () => {
console.log('버퍼 비워짐, 계속 쓰기 가능');
});
// 대용량 데이터 쓰기
for (let i = 0; i < 1000000; i++) {
const canContinue = writeData(`Line ${i}\n`);
if (!canContinue) {
// 실제로는 drain을 기다려야 함
}
}
3.4 Promise로 백프레셔 처리
const fs = require('fs');
function writeWithBackpressure(stream, data) {
return new Promise((resolve, reject) => {
const canContinue = stream.write(data);
if (canContinue) {
resolve();
} else {
stream.once('drain', resolve);
stream.once('error', reject);
}
});
}
async function writeLargeData() {
const writeStream = fs.createWriteStream('large-output.txt');
for (let i = 0; i < 1000000; i++) {
await writeWithBackpressure(writeStream, `Line ${i}\n`);
}
writeStream.end();
}
writeLargeData();
반응형
4. 파이프 (Pipe)
4.1 기본 파이프
const fs = require('fs');
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('dest.txt');
// 읽기 스트림을 쓰기 스트림에 연결
readStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('복사 완료');
});
4.2 여러 스트림 연결
const fs = require('fs');
const zlib = require('zlib');
// 파일 읽기 → 압축 → 파일 쓰기
fs.createReadStream('file.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('file.txt.gz'))
.on('finish', () => {
console.log('압축 완료');
});
// 압축 해제
fs.createReadStream('file.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('file-unzipped.txt'));
4.3 pipeline 사용 (권장)
const fs = require('fs');
const { pipeline } = require('stream');
const zlib = require('zlib');
// 콜백 방식
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('파이프라인 오류:', err);
} else {
console.log('완료');
}
}
);
// Promise 방식
const { pipeline: pipelineAsync } = require('stream/promises');
async function compressFile() {
await pipelineAsync(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz')
);
console.log('압축 완료');
}
5. Transform 스트림
5.1 내장 Transform 스트림
const fs = require('fs');
const { Transform } = require('stream');
// 대문자 변환 스트림
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
fs.createReadStream('input.txt')
.pipe(upperCaseTransform)
.pipe(fs.createWriteStream('output.txt'));
5.2 커스텀 Transform 클래스
const { Transform } = require('stream');
class LineNumberTransform extends Transform {
constructor(options) {
super(options);
this.lineNumber = 1;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// 마지막 줄은 불완전할 수 있으므로 버퍼에 유지
this.buffer = lines.pop();
for (const line of lines) {
this.push(`${this.lineNumber++}: ${line}\n`);
}
callback();
}
_flush(callback) {
// 남은 버퍼 처리
if (this.buffer) {
this.push(`${this.lineNumber}: ${this.buffer}\n`);
}
callback();
}
}
const fs = require('fs');
const { pipeline } = require('stream/promises');
async function addLineNumbers() {
await pipeline(
fs.createReadStream('input.txt'),
new LineNumberTransform(),
fs.createWriteStream('numbered.txt')
);
}
6. 실전 예시
6.1 대용량 파일 복사
const fs = require('fs');
const { pipeline } = require('stream/promises');
async function copyFile(source, dest, onProgress) {
const sourceStats = await fs.promises.stat(source);
const totalBytes = sourceStats.size;
let copiedBytes = 0;
const progressStream = new (require('stream').Transform)({
transform(chunk, encoding, callback) {
copiedBytes += chunk.length;
if (onProgress) {
onProgress(copiedBytes, totalBytes);
}
callback(null, chunk);
}
});
await pipeline(
fs.createReadStream(source),
progressStream,
fs.createWriteStream(dest)
);
}
await copyFile('large-file.bin', 'copy.bin', (copied, total) => {
const percent = ((copied / total) * 100).toFixed(2);
console.log(`진행률: ${percent}%`);
});
6.2 CSV 파일 처리
const fs = require('fs');
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
class CSVParser extends Transform {
constructor(options = {}) {
super({ objectMode: true });
this.headers = null;
this.buffer = '';
this.delimiter = options.delimiter || ',';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop();
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(this.delimiter);
if (!this.headers) {
this.headers = values;
} else {
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i]?.trim();
});
this.push(obj);
}
}
callback();
}
}
async function processCSV(filepath) {
const readStream = fs.createReadStream(filepath);
const parser = new CSVParser();
for await (const row of readStream.pipe(parser)) {
console.log(row);
// { name: '홍길동', age: '30', city: '서울' }
}
}
6.3 로그 파일 테일
const fs = require('fs');
function tailFile(filepath, onLine) {
let position = 0;
let buffer = '';
// 파일 변경 감시
fs.watchFile(filepath, { interval: 100 }, async (curr, prev) => {
if (curr.size > prev.size) {
const stream = fs.createReadStream(filepath, {
start: position,
encoding: 'utf8'
});
stream.on('data', (chunk) => {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop();
for (const line of lines) {
if (line.trim()) {
onLine(line);
}
}
});
stream.on('end', () => {
position = curr.size;
});
}
});
return () => fs.unwatchFile(filepath);
}
// 사용
const stopTail = tailFile('./app.log', (line) => {
console.log('새 로그:', line);
});
// 종료 시
// stopTail();
6.4 파일 암호화/복호화
const fs = require('fs');
const crypto = require('crypto');
const { pipeline } = require('stream/promises');
async function encryptFile(source, dest, password) {
const key = crypto.scryptSync(password, 'salt', 32);
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv('aes-256-cbc', key, iv);
// IV를 파일 시작 부분에 저장
const destStream = fs.createWriteStream(dest);
destStream.write(iv);
await pipeline(
fs.createReadStream(source),
cipher,
destStream
);
}
async function decryptFile(source, dest, password) {
const key = crypto.scryptSync(password, 'salt', 32);
// IV 읽기
const fd = await fs.promises.open(source, 'r');
const ivBuffer = Buffer.alloc(16);
await fd.read(ivBuffer, 0, 16, 0);
await fd.close();
const decipher = crypto.createDecipheriv('aes-256-cbc', key, ivBuffer);
await pipeline(
fs.createReadStream(source, { start: 16 }),
decipher,
fs.createWriteStream(dest)
);
}
await encryptFile('secret.txt', 'secret.enc', 'mypassword');
await decryptFile('secret.enc', 'decrypted.txt', 'mypassword');
7. 스트림 유틸리티
7.1 finished - 스트림 완료 감지
const { finished } = require('stream/promises');
const fs = require('fs');
async function waitForStream() {
const writeStream = fs.createWriteStream('output.txt');
writeStream.write('데이터 1\n');
writeStream.write('데이터 2\n');
writeStream.end();
await finished(writeStream);
console.log('스트림 완료');
}
7.2 Readable.from - 이터러블을 스트림으로
const { Readable } = require('stream');
const fs = require('fs');
// 배열을 스트림으로
const arrayStream = Readable.from(['Hello', ' ', 'World']);
// 제너레이터를 스트림으로
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield `Line ${i}\n`;
}
}
const generatorStream = Readable.from(generateData());
generatorStream.pipe(fs.createWriteStream('generated.txt'));
결론
Node.js의 파일 스트림은 대용량 파일을 메모리 효율적으로 처리하는 핵심 기능입니다. createReadStream과 createWriteStream으로 스트림을 생성하고, pipe나 pipeline으로 연결합니다. Transform 스트림을 사용하면 데이터를 변환하면서 처리할 수 있습니다. 백프레셔를 적절히 처리하고, pipeline을 사용하면 에러 처리도 자동으로 됩니다.
728x90
반응형
'Node.js' 카테고리의 다른 글
| Node.js의 디렉토리 생성 및 삭제 (0) | 2026.03.07 |
|---|---|
| Node.js의 파일 삭제 및 이동 (0) | 2026.03.02 |
| Node.js의 파일 읽기와 쓰기 (0) | 2026.02.28 |
| Node.js의 이벤트 에미터(EventEmitter) 사용법 (0) | 2026.02.25 |
| Node.js의 타이머 함수(Timer Functions) (0) | 2026.02.25 |