2012-06-27 4 views
10

node.js를 사용하여 실시간으로 파일에 기록되는 데이터를 읽는 가장 좋은 방법을 찾아야합니다. 문제는, 노드는 문제를 해결하기위한 최선의 방법을 찾는 것을 어렵게 만드는 빠르게 움직이는 배입니다.Node.js를 사용하여 실시간으로 파일 읽기

난 내가 뭔가를 텍스트 파일로 수행이 일의 결과를 서면으로 무언가를하고있는 자바 프로세스를
를 수행 할 작업. 일반적으로 5 분에서 5 시간까지 실행되는 데 필요한 모든 시간이 소요되며 데이터는 전체 시간에 기록되며 상당한 처리량 속도 (약 1000 회/초)까지 올라갈 수 있습니다.

이 파일을 실시간으로 읽은 다음 node aggregate 데이터를 사용하여 클라이언트에서 그래프로 표시 할 수있는 소켓에 쓰고 싶습니다.

클라이언트, 그래프, 소켓 및 집계 논리는 모두 완료되었지만 파일을 읽는 최선의 방법에 대해 혼란 스럽습니다.

나는 시도 (또는 적어도 연주) 무슨
FIFO - 나는 이것이 우리가 현재 펄을 사용하여 implemted 얼마나 사실에,이 사용 노드는 FIFO에 쓰기 및 읽기 내 자바 프로세스를 알 수 있습니다 ,하지만 다른 모든 노드에서 실행되고 있기 때문에 코드를 포팅하는 것이 좋습니다.

Unix Sockets - 상기와 같습니다.

fs.watchFile - 우리가 필요로하는 방식으로 작동합니까?

fs.createReadStream - 이것이 watchFile보다 낫습니까?

fs & tail -f - 해킹처럼 보입니다.

은, 사실, 난 유닉스 소켓을 사용하는쪽으로 경향이 있어요 내 질문
무엇이 가장 빠른 옵션이 보인다. 그러나 노드는 실시간으로 fs에서 파일을 읽는 더 나은 내장 기능을 가지고 있습니까?

답변

6

당신이 당신의 네트워크에서 시스템 충돌 또는 멤버 중 하나의 경우 스트림의 손실을 방지하기 위해 데이터의 영구 저장소로 파일을 유지하려면 실행중인 프로세스가 종료되면 계속해서 파일에 쓰고 그 파일에서 읽을 수 있습니다.

이 파일을 Java 프로세스에서 생성 된 결과의 영구 저장소로 사용하지 않으려면 Unix 소켓을 사용하는 것이 용이함과 성능면에서 훨씬 좋습니다.

fs.watchFile()은 파일 시스템이 파일을보고 할 때 파일 통계에서 작동하고 이미 작성된 파일을 읽고 싶어하기 때문에 사용자가 원하는 것이 아닙니다.

SHORT UPDATE :은 내가 이전 단락에서 파일 통계를 사용하는 fs.watchFile()을 비난했다하더라도, 내가 아래에있는 내 예제 코드에서 똑같은 일이 자신을했던 것을 실현하기 위해 매우 유감스럽게 생각한다! 나는 이미 독자들에게 "주의를 기울여야한다"고 경고했지만. 테스트를 잘하지 않고도 몇 분 안에 작성했기 때문에. 여전히 기본 시스템에서 지원하는 경우 watchFile 또는 fstatSync 대신 fs.watch()을 사용하면 더 효율적으로 수행 할 수 있습니다. 파일에서 읽기/쓰기를 들어

은, 난 그냥 내 휴식 시간에 재미를 위해 아래에 쓴 :

테스트-FS-writer.js : 당신이 당신의 자바 파일을 작성하기 때문에이 필요하지 않습니다 과정]

var fs = require('fs'), 
    lineno=0; 

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'}); 

stream.on('open', function() { 
    console.log('Stream opened, will start writing in 2 secs'); 
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000); 
}); 

테스트-FS-reader.js : [주의, 이것은 단지 데모 ERR 개체를 확인한다]

var fs = require('fs'), 
    bite_size = 256, 
    readbytes = 0, 
    file; 

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); }); 

function readsome() { 
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense! 
    if(stats.size<readbytes+1) { 
     console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!'); 
     setTimeout(readsome, 3000); 
    } 
    else { 
     fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome); 
    } 
} 

function processsome(err, bytecount, buff) { 
    console.log('Read', bytecount, 'and will process it now.'); 

    // Here we will process our incoming data: 
     // Do whatever you need. Just be careful about not using beyond the bytecount in buff. 
     console.log(buff.toString('utf-8', 0, bytecount)); 

    // So we continue reading from where we left: 
    readbytes+=bytecount; 
    process.nextTick(readsome); 
} 
!

nextTick은 사용하지 말고 직접 readsome()으로 전화하십시오. 아직 여기서 동기화 작업을하고 있기 때문에 어떤 의미에서는 필요하지 않습니다. 나는 그것을 좋아한다. : P

EDIT Oliver Lloyd

상기 예 복용하지만 CSV 데이터를 읽어이를 확장하여 준다 :

var lastLineFeed, 
    lineArray; 
function processsome(err, bytecount, buff) { 
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n'); 

    if(lastLineFeed > -1){ 

     // Split the buffer by line 
     lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n'); 

     // Then split each line by comma 
     for(i=0;i<lineArray.length;i++){ 
      // Add read rows to an array for use elsewhere 
      valueArray.push(lineArray[i].split(',')); 
     } 

     // Set a new position to read from 
     readbytes+=lastLineFeed+1; 
    } else { 
     // No complete lines were read 
     readbytes+=bytecount; 
    } 
    process.nextTick(readFile); 
} 
+0

이것은 제 질문에 직접적으로 부합하는 좋은 예입니다. 한 번에 한 줄만 처리하는 것이 좋지만, 틀림없이 이것은 좋은 일입니다. 노드의 기존 fs 인터페이스가 없다는 것은 완전히 사용자 정의가 가능하므로 추가 코드를 작성해야 할 경우에도 필요한 부분을 정확하게 얻을 수 있다는 것을 의미합니다. –

+0

위 예제를 확장하여 CSV 파일로 작업했습니다. –

+0

이것은 노드로 실행될 때 절대적으로 작동하지만이 코드를 app.js에 넣고 결과를 HTML 페이지로 가져올 수 있습니까? – sand

4

tail -f이 해킹이라고 생각하십니까?

나는 좋은 예를 발견하면서 비슷한 것을 할 것입니다. Node.js를 및 웹 소켓과 실시간 온라인 활동 모니터 예 :
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

그냥 내가 당신에게 0.8.0에서 실행됩니다 예제 코드를 작성,이 답변이 완료 될 수 있도록 - (HTTP 서버가 해킹 어쩌면이다).

는 자식 프로세스는 꼬리 실행 양산하고, 자식 프로세스는 세 개의 스트림과 EventEmitter는 (우리는 우리의 경우 표준 출력 사용)이기 때문에 당신은 on

파일 이름으로 리스너를 추가 할 수 있습니다 tailServer.JS

사용 : node tailServer /var/log/filename.log

var http = require("http"); 
var filename = process.argv[2]; 


if (!filename) 
    return console.log("Usage: node tailServer filename"); 

var spawn = require('child_process').spawn; 
var tail = spawn('tail', ['-f', filename]); 

http.createServer(function (request, response) { 
    console.log('request starting...'); 

    response.writeHead(200, {'Content-Type': 'text/plain' }); 

    tail.stdout.on('data', function (data) { 
     response.write('' + data);     
    }); 
}).listen(8088); 

console.log('Server running at http://127.0.0.1:8088/'); 
+0

꼬리 -f 나의 관심은이 파일이 기록되기 전에 데이터가없는 경우 읽기 프로세스가 활성화 될 것을 요구한다는 것입니다 잃어버린. 필자의 유스 케이스는 데이터가 쓰여진 후 오랫동안 읽을 수있는 경우이다. +1을 0.8로 업데이트하면 쓰기와 읽기가 같은 소스에서 제어되는 경우 좋은 해결책입니다. –

+0

watchFile도 이벤트에 의해 구동되지만 문서에 따르면 안정적입니다. 위의 예는 상위 레벨 코드에서 폴링을 통해 파일 변경 사항을 처리합니다. 나를 위해 그것은 마치 해킹처럼 보인다. 그러나 그것은 당신을 위해 일하는만큼 좋은 일입니다. 그렇지 않으면 파일이 존재하지 않고 어떤 데이터도 잃지 않고'wc -l message.text | awk '{print $ 1}'을'tail -f -n'에 건네십시오. – vik

1

이 모듈은 원칙적 @hasanyasin의 구현 제안이다

https://github.com/felixge/node-growing-file

+0

감사합니다. 여기서는 제대로 작동 할 것 같습니다. felixge의 다른 프로젝트는 견고하기 때문에이 모듈을 사용해 보니 기쁩니다. –

0

나는 @hasanyasin의 답을 가져 와서 포장했습니다. 모듈 방식으로 약속드립니다. 기본 개념은 파일에서 문자열로 변환 된 버퍼를 사용하여 파일과 핸들러 함수를 전달하는 것입니다. 핸들러 함수가 true를 반환하면 파일 읽기가 중지됩니다. 핸들러가 충분히 빠르게 true를 리턴하지 않으면 읽기를 중지시키는 시간 종료를 설정할 수도 있습니다.

timeout으로 인해 resolve()가 호출되면 promiser는 true를 반환하고 그렇지 않으면 false를 반환합니다.

사용 예제는 아래를 참조하십시오.

// https://stackoverflow.com/a/11233045 

var fs = require('fs'); 
var Promise = require('promise'); 

class liveReaderPromiseMe { 
    constructor(file, buffStringHandler, opts) { 
     /* 
      var opts = { 
       starting_position: 0, 
       byte_size: 256, 
       check_for_bytes_every_ms: 3000, 
       no_handler_resolution_timeout_ms: null 
      }; 
     */ 

     if (file == null) { 
      throw new Error("file arg must be present"); 
     } else { 
      this.file = file; 
     } 

     if (buffStringHandler == null) { 
      throw new Error("buffStringHandler arg must be present"); 
     } else { 
      this.buffStringHandler = buffStringHandler; 
     } 

     if (opts == null) { 
      opts = {}; 
     } 

     if (opts.starting_position == null) { 
      this.current_position = 0; 
     } else { 
      this.current_position = opts.starting_position; 
     } 

     if (opts.byte_size == null) { 
      this.byte_size = 256; 
     } else { 
      this.byte_size = opts.byte_size; 
     } 

     if (opts.check_for_bytes_every_ms == null) { 
      this.check_for_bytes_every_ms = 3000; 
     } else { 
      this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms; 
     } 

     if (opts.no_handler_resolution_timeout_ms == null) { 
      this.no_handler_resolution_timeout_ms = null; 
     } else { 
      this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms; 
     } 
    } 


    startHandlerTimeout() { 
     if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) { 
      var that = this; 
      this._handlerTimer = setTimeout(
       function() { 
        that._is_handler_timed_out = true; 
       }, 
       this.no_handler_resolution_timeout_ms 
      ); 
     } 
    } 

    clearHandlerTimeout() { 
     if (this._handlerTimer != null) { 
      clearTimeout(this._handlerTimer); 
      this._handlerTimer = null; 
     } 
     this._is_handler_timed_out = false; 
    } 

    isHandlerTimedOut() { 
     return !!this._is_handler_timed_out; 
    } 


    fsReadCallback(err, bytecount, buff) { 
     try { 
      if (err) { 
       throw err; 
      } else { 
       this.current_position += bytecount; 
       var buff_str = buff.toString('utf-8', 0, bytecount); 

       var that = this; 

       Promise.resolve().then(function() { 
        return that.buffStringHandler(buff_str); 
       }).then(function(is_handler_resolved) { 
        if (is_handler_resolved) { 
         that.resolve(false); 
        } else { 
         process.nextTick(that.doReading.bind(that)); 
        } 
       }).catch(function(err) { 
        that.reject(err); 
       }); 
      } 
     } catch(err) { 
      this.reject(err); 
     } 
    } 

    fsRead(bytecount) { 
     fs.read(
      this.file, 
      new Buffer(bytecount), 
      0, 
      bytecount, 
      this.current_position, 
      this.fsReadCallback.bind(this) 
     ); 
    } 

    doReading() { 
     if (this.isHandlerTimedOut()) { 
      return this.resolve(true); 
     } 

     var max_next_bytes = fs.fstatSync(this.file).size - this.current_position; 
     if (max_next_bytes) { 
      this.fsRead((this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size); 
     } else { 
      setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms); 
     } 
    } 


    promiser() { 
     var that = this; 
     return new Promise(function(resolve, reject) { 
      that.resolve = resolve; 
      that.reject = reject; 
      that.doReading(); 
      that.startHandlerTimeout(); 
     }).then(function(was_resolved_by_timeout) { 
      that.clearHandlerTimeout(); 
      return was_resolved_by_timeout; 
     }); 
    } 
} 


module.exports = function(file, buffStringHandler, opts) { 
    try { 
     var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts); 
     return live_reader.promiser(); 
    } catch(err) { 
     return Promise.reject(err); 
    } 
}; 

는 다음과 같이 위의 코드를 사용

var fs = require('fs'); 
var path = require('path'); 
var Promise = require('promise'); 
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser'); 

var ending_str = '_THIS_IS_THE_END_'; 
var test_path = path.join('E:/tmp/test.txt'); 

var s_list = []; 
var buffStringHandler = function(s) { 
    s_list.push(s); 
    var tmp = s_list.join(''); 
    if (-1 !== tmp.indexOf(ending_str)) { 
     // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms 
     // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true 
     return true; 
     // you can also return a promise: 
     // return Promise.resolve().then(function() { return true; }); 
    } 
}; 

var appender = fs.openSync(test_path, 'a'); 
try { 
    var reader = fs.openSync(test_path, 'r'); 
    try { 
     var options = { 
      starting_position: 0, 
      byte_size: 256, 
      check_for_bytes_every_ms: 3000, 
      no_handler_resolution_timeout_ms: 10000, 
     }; 

     liveReadAppendingFilePromiser(reader, buffStringHandler, options) 
     .then(function(did_reader_time_out) { 
      console.log('reader timed out: ', did_reader_time_out); 
      console.log(s_list.join('')); 
     }).catch(function(err) { 
      console.error('bad stuff: ', err); 
     }).then(function() { 
      fs.closeSync(appender); 
      fs.closeSync(reader); 
     }); 

     fs.write(appender, '\ncheck it out, I am a string'); 
     fs.write(appender, '\nwho killed kenny'); 
     //fs.write(appender, ending_str); 
    } catch(err) { 
     fs.closeSync(reader); 
     console.log('err1'); 
     throw err; 
    } 
} catch(err) { 
    fs.closeSync(appender); 
     console.log('err2'); 
    throw err; 
} 
관련 문제