2016-10-21 4 views
0

내가이 말 다음의 예와 같은 RDD [(문자열, INT)] : I 효율적으로 0, 1을 포함하는 레코드의 총 양을 인쇄 할스파크 필터와 카운트 큰 RDD 여러 번

(A, 0) 
(B, 0) 
(C, 1) 
(D, 0) 
(E, 2) 
(F, 1) 
(G, 1) 
(H, 3) 
(I, 2) 
(J, 0) 
(K, 3) 

, 2 등 RDD에는 수백만 항목이 포함되어 있으므로 가능한 한 효율적으로 처리하려고합니다.

같은 반환이 예제의 출력 :

Number of records containing 0 = 4 
Number of records containing 1 = 3 
Number of records containing 2 = 2 
Number of records containing 3 = 2 

은 현재 내가 따로 ..., 0, 1, 2 다음 count() 큰 RDD에 필터를 수행하여이를 구현하고 시도합니다. 나는 스칼라를 사용하고있다.

더 효율적인 방법이 있습니까? 이미 RDD를 캐시했지만, 여전히 프로그램에 메모리가 부족합니다 (드라이버 메모리를 5G로 설정했습니다).

편집 : 두번째 요소 중 하나 "m"또는 "F는 튜플 (에 문자열 값을 변경하여

rdd.map(_.swap).countByKey() 

내가이 구체화 할 수 있습니다 : Tzach에 의해 제안 지금 countByKey 사용 "),이 튜플의 두 번째 요소의 고유 값 당 키당 카운트를 얻으시겠습니까? 예를 들어

:

(A,m), 0) 
(B,f), 0) 
(C,m), 1) 
(D,m), 0) 
(E,f), 2) 
(F,f), 1) 
(G,m), 1) 
(H,m), 3) 
(I,f), 2) 
(J,f), 0) 
(K,m), 3) 

사전에

((0,m), 2) 
((0,f), 2) 
((1,m), 2) 
((1,f), 1) 
((2,m), 0) 
((2,f), 2) 
((3,m), 2) 
((3,f), 0) 

감사 결과겠습니까!

답변

2

당신은 단지 그것을 위해 편리한 countByKey을 사용할 수 있습니다 - 바로 입력에서 자리를 바꾸고 미리 키 숫자 값을 만들기 위해 :

val rdd = sc.parallelize(Seq(
    ("A", 0), ("B", 0), ("C", 1), ("D", 0), ("E", 2), 
    ("F", 1), ("G", 1), ("H", 3), ("I", 2), ("J", 0), ("K", 3) 
)) 

rdd.map(_.swap).countByKey().foreach(println) 
// (0,4) 
// (1,3) 
// (3,2) 
// (2,2) 

편집 : 그것은 같은 소리 무엇 countByKey 정확히 수행을 - 그래서 당신이 사용하기 원하는 키를 바로 가지고 당신의 RDD 변환에 들어있는 튜플의 왼쪽 부분, 같은 :

rdd.map { case ((a, b), i) => ((i, b), a) }.countByKey() 

나 :

rdd.keyBy { case ((_, b), i) => (i, b) }.countByKey() 
+0

감사합니다. 나는 문제의 조금 더 세련된 것에 대한 나의 대답을 편집했다. – Laurens

+1

그에 따라 수정 된 답변 (후속 질문이 실제로 SO 에티켓과 인라인되지는 않지만 향후 독자가 따라야하거나 응답자에게 불공정 할 수 있음) –

관련 문제