2016-06-29 1 views
0

을 처리 할 수없는 코드입니다 :GROUPBY 여기에 큰 RDDs

val words = sc.textFile("/Users/kaiyin/IdeaProjects/learnSpark/src/main/resources/eng_words.txt") 
words.take(1000000).foreach(println _) 
words.take(150000).groupBy((x: String) => x.head).map { 
    case (c, iter) => (c, iter.toList.size) 
}.foreach { 
    println _ 
} 

eng_words.txt 약 1 백만 영어 단어, 한 줄에 하나씩 포함 된 텍스트 파일입니다. RDD는 150,000 이상 진행되면, groupBy이 오류와 충돌합니다 :

java.util.NoSuchElementException: next on empty iterator 
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) 
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) 
    at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63) 
    at scala.collection.IterableLike$class.head(IterableLike.scala:107) 
    at scala.collection.immutable.StringOps.scala$collection$IndexedSeqOptimized$$super$head(StringOps.scala:30) 
    at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126) 
    at scala.collection.immutable.StringOps.head(StringOps.scala:30) 
    at $anon$1$$anonfun$run$1.apply(<console>:23) 
    at $anon$1$$anonfun$run$1.apply(<console>:23) 
    at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:332) 
    at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:331) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:331) 
    at scala.collection.mutable.ArrayOps$ofRef.groupBy(ArrayOps.scala:186) 
    at $anon$1.run(<console>:23) 
    at Helper.HasRun$class.newRun(HasRun.scala:21) 
    at $anon$1.newRun(<console>:19) 
    ... 55 elided 

가 무엇이 잘못되었는지?

답변

3

이 특별한 경우에는 빈 문자열을 처리하지 못할 가능성이 큽니다. 그럼에도 불구하고 groupBy을 호출하지 말고 toList을 호출하지 말고 입력 내용이 올바른지 맹목적으로 신뢰하지 마십시오.

  • head는 각 키는 실행 프로그램 메모리에 맞게에 대한 모든 기록을 필요로

  • groupBy 같은 groupByKey으로 볼 수있는 오류에 빈 줄에 실패합니다.

또 다른 단어 개수 당신이 여기에있는 것입니다 :

words 
    // Make sure that it won't fail on empty string with 
    // java.util.NoSuchElementException: next on empty iterator 
    .flatMap(_.headOption) 
    // Map to pairs and reduce to avoid excessive shuffling and limit memory usage 
    .map((_, 1)) 
    .reduceByKey(_ + _) 
+1

기다립니다 맞나요? 집행자는 충분한 메모리에 액세스 할 수없는 경우 디스크에 필요한 항목을 원활하게 유출 할 수 있다고 생각했습니다. 그것이 큰 경기 침체와 함께 제공되지만 그것은 여전히 ​​작동해야합니다. 어쨌든,'reduceByKey'는 당신이 제안한 것처럼 더 낫다. 왜냐하면 Executor가 나머지를하기 전에 집계를 수행 할 수 있기 때문이다. – Jeff

+1

@ JeffL. 여기서 부분적으로 정확합니다. 꼭 필요한 경우 데이터가 유출 될 수 있지만 단일 키가 아닐 수 있습니다. 값은 ArrayBuffer에 대한 변형 일 뿐이므로 메모리에 저장해야합니다. – zero323

+1

재미 있습니다. 감사합니다. Spark가 가끔씩 두포에서하는 일에 대한 좋은 정보를 찾기가 어렵습니다. 또한이 중 일부는 클러스터의 집행자에게 할당 한 리소스로 완화 될 수 있으므로 생각해야합니다. 이것은 틀린 것이 아니라면'reduceByKey'가 데이터 프레임과 동등하지 않기 때문에 우선적으로 중요합니다. – Jeff