2016-06-01 2 views
3

나는 대략 60.000.000 라인을 포함하는 많은 파일을 가지고 있습니다. 모든 파일 형식이 다음과 같은 형식으로되어 있습니다. {timestamp}#{producer}#{messageId}#{data_bytes}\nSpark의 Map Task에서 거대한 메모리 사용

나는 하나씩 파일을보고 또한 입력 파일 하나당 하나의 출력 파일을 만들고 싶습니다. 일부 선은 이전 선에 의존하기 때문에 프로듀서별로 그룹화했습니다. 한 라인이 하나 이상의 이전 라인에 의존 할 때마다 그 라인의 생성자는 항상 동일합니다. 모든 행을 그룹화 한 후이를 Java 파서에 제공합니다. 파서는 구문 분석 된 모든 데이터 객체를 메모리에 포함하고 나중에 JSON으로 출력합니다.

내 작업이 처리되는 방법을 시각화하기 위해 다음 플로우 그래프를 작성했습니다. 나는 groupByKey-Shuffeling-Process를 시각화하지 않았다.
flow graph

내 문제 :

  • 나는 별도의 작업과 파일, 프로세스 분할을 분할하고 "부분"- 파일에 각 작업의 출력을 저장하기 위해 불꽃을 기대했다.
  • 그러나 작업이 끝나기 전에 메모리가 부족하여 YARN에 의해 ​​종료됩니다. Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB physical memory used
  • 내 파서가 구문 분석 된 모든 데이터 객체를 메모리에 던지고 있습니다. 나는 파서의 코드를 바꿀 수 없다. 나는 그 불꽃 확인 할 수있는 방법

    1. :
    2. 내 코드는

내 질문에 (내 작업에 입력으로 600.000 라인을 각각 예를 들어 두 개의 파일을) 작은 파일 작동 있습니다 맵 작업에서 모든 파일 분할에 대한 결과를 생성합니까? (내 작업이 성공하더라도 어쩌면 출력 될 것입니다.)

  • 지도 변환 val lineMap = lines.map ... (아래 스칼라 코드 참조)이 분할 된 rdd를 생성한다고 생각했습니다. 따라서 나는 두 번째 맵 작업을 호출하기 전에 rdd의 값을 어떤 식 으로든 분할 할 것을 기대합니다.

    또한이 rdd lineMap에 saveAsTextFile을 호출하면 각 맵 작업이 완료된 후에 실행되는 출력 작업이 생성 될 것이라고 생각했습니다. 내 가정이 정확하다면, 왜 집행자는 여전히 추억을 남기고 있습니까? Spark은 여러 개의 큰 파일 분할을 처리하고 동시에 처리하므로 파서가 메모리를 가득 채우고 있습니까?

  • 내 파서에 대한 더 많은 (더 작은) 입력을 얻기 위해 lineMap rdd를 다시 분할하는 것이 좋습니다.
  • 내가 알지 못하는 추가 감속기 단계가 있습니까? 파일에 기록되기 전에 결과가 집계되는 것과 비슷합니까?

  • 스칼라 코드 (내가 unrelevant 코드 부분을 왼쪽) :

    def main(args: Array[String]) { 
        val inputFilePath = args(0) 
        val outputFilePath = args(1) 
    
        val inputFiles = fs.listStatus(new Path(inputFilePath)) 
        inputFiles.foreach(filename => { 
         processData(filename.getPath, ...) 
        }) 
    } 
    
    
    def processData(filePath: Path, ...) { 
        val lines = sc.textFile(filePath.toString()) 
        val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey() 
    
        val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) } 
        //each output should be saved separately 
        parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)  
    } 
    
    
    def parseLinesByKey(key: String, values: Iterable[String], config : Config) = { 
        val importer = new LogFileImporter(...) 
        importer.parseData(values.toIterator.asJava, ...) 
    
        //importer from now contains all parsed data objects in memory that could be parsed 
        //from the given values. 
    
        val jsonMapper = getJsonMapper(...) 
        val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject) 
    
        (key, jsonStringData) 
    } 
    
    +2

    https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html – zero323

    +0

    오류 로그를 추적하여 메모리 오버 플로우를 일으키는 행을 알 수 있습니까? – GameOfThrows

    +0

    오류 로그에 다음이 표시됩니다. [link] (http://i.imgur.com/cY6Hk5S.png). – j9dy

    답변

    0

    나는 나의 한계를 제거 GroupByKey에서 호출을 제거하고 새로운 FileInputFormat뿐만 아니라 RecordReader을 구현하여이 문제를 해결하는 라인은 다른 라인에 의존합니다. 지금은 각 분할에 이전 분할의 50.000 바이트 오버 헤드가 포함될 수 있도록 구현했습니다. 이렇게하면 이전 행에 의존하는 모든 행을 정확하게 파싱 할 수 있습니다.

    이제 이전 분할의 마지막 50.000 바이트를 살펴보고 현재 분할의 구문 분석에 실제로 영향을주는 행만 복사합니다. 따라서 오버 헤드를 최소화하고 여전히 병렬 처리가 가능한 작업을 얻습니다.

    다음 링크는 나를 올바른 방향으로 끌었습니다.

    관련 코드 부분. 작성자 (@Gurdt)는이 메시지를 사용하여 채팅 메시지에 회선이 "\"로 끝나는 지 여부를 검색하고 이스케이프 처리 된 회선을 이스케이프 처리되지 않은 \ n이 발견 될 때까지 첨부합니다. 이렇게하면 둘 이상의 행에 걸쳐있는 메시지를 검색 할 수 있습니다. 스칼라로 작성된 코드 :

    사용

    val conf = new Configuration(sparkContext.hadoopConfiguration) 
    val rdd = sparkContext.newAPIHadoopFile("data.txt", classOf[MyFileInputFormat], 
    classOf[LongWritable], classOf[Text], conf) 
    

    FileInputFormat

    class MyFileInputFormat extends FileInputFormat[LongWritable, Text] { 
        override def createRecordReader(split: InputSplit, context: TaskAttemptContext): 
        RecordReader[LongWritable, Text] = new MyRecordReader() 
    } 
    

    RecordReader

    class MyRecordReader() extends RecordReader[LongWritable, Text] { 
        var start, end, position = 0L 
        var reader: LineReader = null 
        var key = new LongWritable 
        var value = new Text 
    
        override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = { 
         // split position in data (start one byte earlier to detect if 
         // the split starts in the middle of a previous record) 
         val split = inputSplit.asInstanceOf[FileSplit] 
         start = 0.max(split.getStart - 1) 
         end = start + split.getLength 
    
         // open a stream to the data, pointing to the start of the split 
         val stream = split.getPath.getFileSystem(context.getConfiguration) 
         .open(split.getPath) 
         stream.seek(start) 
         reader = new LineReader(stream, context.getConfiguration) 
    
         // if the split starts at a newline, we want to start yet another byte 
         // earlier to check if the newline was escaped or not 
         val firstByte = stream.readByte().toInt 
         if(firstByte == '\n') 
          start = 0.max(start - 1) 
         stream.seek(start) 
    
         if(start != 0) 
          skipRemainderFromPreviousSplit(reader) 
        } 
    
        def skipRemainderFromPreviousSplit(reader: LineReader): Unit = { 
         var readAnotherLine = true 
         while(readAnotherLine) { 
          // read next line 
          val buffer = new Text() 
          start += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE) 
          pos = start 
    
          // detect if delimiter was escaped 
          readAnotherLine = buffer.getLength >= 1 && // something was read 
          buffer.charAt(buffer.getLength - 1) == '\\' && // newline was escaped 
          pos <= end // seek head hasn't passed the split 
         } 
        } 
    
        override def nextKeyValue(): Boolean = { 
         key.set(pos) 
    
         // read newlines until an unescaped newline is read 
         var lastNewlineWasEscaped = false 
         while (pos < end || lastNewlineWasEscaped) { 
          // read next line 
          val buffer = new Text 
          pos += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE) 
    
          // append newly read data to previous data if necessary 
          value = if(lastNewlineWasEscaped) new Text(value + "\n" + buffer) else buffer 
    
          // detect if delimiter was escaped 
          lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1) == '\\' 
    
          // let Spark know that a key-value pair is ready! 
          if(!lastNewlineWasEscaped) 
           return true 
         } 
    
         // end of split reached? 
         return false 
        } 
    } 
    

    참고 : 당신은 가까운 및 getProgress getCurrentKey,의 getCurrentValue을 구현해야 할 수도 있습니다 귀하의 RecordReader도 있습니다.

    관련 문제