2017-12-01 1 views
0

여러 번 호출되는 메서드가 있습니다. 이 방법은 다음과 같습니다 :언제 내 RDD를 unpersist 할 수 있습니까?

def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = { 
    val newRDD = myRDD.map(......) //do stuff 
    newRDD.cache //newRDD has 2 actions performed on it 

    val badRDD = newRDD.filter(row => row.contains("bad")) 
    badRDD.count 

    val goodRDD = newRDD.filter(row => row.contains("good")) 
    goodRDD.count 

    newRDD.unpersist // I am unpersisting because this method gets called several times 

    goodRDD 
} 

내가 말했듯이, 나는 방법은 여러 번 호출되는 내가 다른 캐시 newRDDs의 4 개 복사본을 원하지 않기 때문에 newRDD을 unpersist 싶다. 다음 코드 예제입니다.

val firstRDD = separateGoodAndBad(originalRDD) 
val firstRDDTransformed = doStuffToFirstRDD(firstRDD) 

val secondRDD = separateGoodAndBad(firstRDDTransformed) 
val secondRDDTransformed = doStuffToSecondRDD(secondRDD) 

val thirdRDD = separateGoodAndBad(secondRDDTransformed) 
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD) 

그러나 secondRDDthirdRDD 내가 unpersist (separateGoodAndBad()에 위 참조를 추가 한 더 이상 지금 복용하고 그들이 newRDD을 다시 계산해야하는 것 같다

때 할 수 있습니다. 이 재 계산 얻을하는 결코 그래서는

+0

당신이 작업 (많은 시간이 소요됩니다) 병목 인 여부를 확인하기 위해 스파크 UI에 체크 한 :

def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = { val newRDD = myRDD.map(......) //do stuff newRDD.cache //newRDD has 2 actions performed on it val badRDD = newRDD.filter(row => row.contains("bad")) badRDD.count val goodRDD = newRDD.filter(row => row.contains("good")) goodRDD.cache // this will cache goodRDD to avoid recomputing in next call goodRDD.count newRDD.unpersist // I am unpersisting because this method gets called several times goodRDD } 

그럼 당신은 이런 식으로 함수 호출 뭔가 외부를 unpersist 수 있습니까? –

+0

@vatsalmevada 그 시점까지 전체 스파크 애플리케이션을 실행해야하기 때문에 시간이 오래 걸립니다. 그 카운트는 내 첫 스파크 "행동"입니다. 내가 혼란스러워하는 이유는 모든 것을 재 계산해야하는 이유입니다. –

답변

2

당신은 goodRDD.count을 할 때 한 번 계산으로도 goodRDD를 캐시 할 수 있습니다? newRDD unpersist 및 일부을 수행 할 때 그것은 다시 재 계산됩니다 그 RDD에 doStuffToFirstRDD 방법 안에 ction.

val firstRDD = separateGoodAndBad(originalRDD) 
val firstRDDTransformed = doStuffToFirstRDD(firstRDD) 

val secondRDD = separateGoodAndBad(firstRDDTransformed) 
firstRDD .unpersist //as your secondRDD will be cached by above `separateGoodAndBad` call 
val secondRDDTransformed = doStuffToSecondRDD(secondRDD) 

val thirdRDD = separateGoodAndBad(secondRDDTransformed) 
secondRDD.unpersist //as your thirdRDD will be cached by above `separateGoodAndBad` call 
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD) 
+0

우수 답변. 당신이 질문에 대답했을 때 실제로이 정확한 해결책을 테스트하고있었습니다! –

관련 문제