나는 대략 60.000.000 라인을 포함하는 많은 파일을 가지고 있습니다. 모든 파일 형식이 다음과 같은 형식으로되어 있습니다. {timestamp}#{producer}#{messageId}#{data_bytes}\n
Spark의 Map Task에서 거대한 메모리 사용
나는 하나씩 파일을보고 또한 입력 파일 하나당 하나의 출력 파일을 만들고 싶습니다. 일부 선은 이전 선에 의존하기 때문에 프로듀서별로 그룹화했습니다. 한 라인이 하나 이상의 이전 라인에 의존 할 때마다 그 라인의 생성자는 항상 동일합니다. 모든 행을 그룹화 한 후이를 Java 파서에 제공합니다. 파서는 구문 분석 된 모든 데이터 객체를 메모리에 포함하고 나중에 JSON으로 출력합니다.
내 작업이 처리되는 방법을 시각화하기 위해 다음 플로우 그래프를 작성했습니다. 나는 groupByKey
-Shuffeling-Process를 시각화하지 않았다.
내 문제 :
- 나는 별도의 작업과 파일, 프로세스 분할을 분할하고 "부분"- 파일에 각 작업의 출력을 저장하기 위해 불꽃을 기대했다.
- 그러나 작업이 끝나기 전에 메모리가 부족하여 YARN에 의해 종료됩니다.
Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB physical memory used
- 내 파서가 구문 분석 된 모든 데이터 객체를 메모리에 던지고 있습니다. 나는 파서의 코드를 바꿀 수 없다. 나는 그 불꽃 확인 할 수있는 방법
- :
- 내 코드는
내 질문에 (내 작업에 입력으로 600.000 라인을 각각 예를 들어 두 개의 파일을) 작은 파일 작동 있습니다 맵 작업에서 모든 파일 분할에 대한 결과를 생성합니까? (내 작업이 성공하더라도 어쩌면 출력 될 것입니다.)
지도 변환 val lineMap = lines.map ...
(아래 스칼라 코드 참조)이 분할 된 rdd를 생성한다고 생각했습니다. 따라서 나는 두 번째 맵 작업을 호출하기 전에 rdd의 값을 어떤 식 으로든 분할 할 것을 기대합니다.
또한이 rdd lineMap
에 saveAsTextFile을 호출하면 각 맵 작업이 완료된 후에 실행되는 출력 작업이 생성 될 것이라고 생각했습니다. 내 가정이 정확하다면, 왜 집행자는 여전히 추억을 남기고 있습니까? Spark은 여러 개의 큰 파일 분할을 처리하고 동시에 처리하므로 파서가 메모리를 가득 채우고 있습니까?
lineMap
rdd를 다시 분할하는 것이 좋습니다.스칼라 코드 (내가 unrelevant 코드 부분을 왼쪽) :
def main(args: Array[String]) {
val inputFilePath = args(0)
val outputFilePath = args(1)
val inputFiles = fs.listStatus(new Path(inputFilePath))
inputFiles.foreach(filename => {
processData(filename.getPath, ...)
})
}
def processData(filePath: Path, ...) {
val lines = sc.textFile(filePath.toString())
val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()
val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
//each output should be saved separately
parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)
}
def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
val importer = new LogFileImporter(...)
importer.parseData(values.toIterator.asJava, ...)
//importer from now contains all parsed data objects in memory that could be parsed
//from the given values.
val jsonMapper = getJsonMapper(...)
val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)
(key, jsonStringData)
}
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html – zero323
오류 로그를 추적하여 메모리 오버 플로우를 일으키는 행을 알 수 있습니까? – GameOfThrows
오류 로그에 다음이 표시됩니다. [link] (http://i.imgur.com/cY6Hk5S.png). – j9dy