2017-03-14 2 views
1

R/spark를 사용하여 여러 csv 데이터 파일을 반복합니다. 각 파일의 약 1 %는 유지되어야하고 (특정 기준에 따라 필터링되어야 함) 다음 데이터 파일과 병합되어야합니다 (나는 union/rbind을 사용했습니다). 그러나 루프가 실행되면 spark에서 모든 이전 데이터 집합 및 filter() -s를 기억하므로 데이터의 계보가 길어지고 길어집니다.SparkR의 체크 포인트 데이터 프레임

spark R API에서 검사 점을 수행하는 방법이 있습니까? Spark 2.1에서 DataFrames에 대한 검사 점이 있다는 것을 알았지 만 R에서 얻을 수있는 것 같지 않습니다.

답변

1

꽤 큰 그래프 (수십억 데이터)에서 Scala/GraphX와 동일한 문제가 발생했으며 연결된 검색 구성 요소.

특정 버전에서 R에서 사용할 수있는 것이 확실하지 않지만 일반적인 해결 방법은 데이터를 "저장하여"다시로드하여 계보를 깨는 것입니다.

def refreshGraph[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED], checkpointDir: String, iterationCount: Int, numPartitions: Int): Graph[VD, ED] = { 
    val path = checkpointDir + "/iter-" + iterationCount 
    saveGraph(g, path) 
    g.unpersist() 
    loadGraph(path, numPartitions) 
} 
+0

감사합니다. 각 루프마다 다른 파일을 사용합니다. 같은 파일로 이전에 시도 했으므로 작동하지 않았습니다. 그것이 나를 위해 작동한다면 나는 당신의 대답을 시험하고 받아 들일 것입니다. –

+0

지금은 작동하고있는 것 같습니다 :-) 다른 병목 현상이 발생하더라도 :- ( –

+0

새 질문에 대한 첫 번째 질문이나 링크를 편집 할 수 있습니다 – glefait

0

불완전한 솔루션/해결 방법은 R 객체로 collect() 당신의 dataframe에, 그리고 나중에 createDataFrame()에 의해 다시 병렬 : 우리의 경우, 우리는 혈통마다 15 반복을 휴식. 이것은 작은 데이터에서는 잘 작동하지만 큰 데이터 세트에서는 너무 느려서 너무 큰 작업에 대해 불평합니다.