2016-09-05 2 views
0

클래스에 불변의 맵이 있습니다. 로컬 모드에서 코드를 실행할 때 문제가 없으며 맵의 모든 키에 접근 할 수 있습니다. 그러나 클러스터 모드에서 코드를 실행하면 노드가 맵에서 키를 찾지 못하는 것에 대한 오류가 발생합니다.클러스터 모드에서 맵을 사용하여 스파크를 실행하십시오.

지금까지 시도한 내용은 다음과 같습니다.

- 클러스터를 통해 불변 맵을 브로드 캐스트합니다.

broadcast = sc.broadcast(my_immutable_map) 

-Parallelize 쌍 RDD과지도

my_map_rdd = sc.parallelize(my_immutable_map.toSeq) 

내가 로그를 검사 할 때, 나는 키를 찾을 수없는 예외를 참조하십시오. 다음과 같이 내 오류 스택 트레이스는 : 일부 노드가이 맵의 일부 키를하시기 바랍니다 찾을 수없는 가능성이 얼마나

Driver stacktrace: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 15.0 failed 4 times, most recent failure: Lost task 1.3 in stage 15.0 (TID 25, datanode1.big.com): java.util.NoSuchElementException: key not found: 905053199731 
    at scala.collection.MapLike$class.default(MapLike.scala:228) 
    at scala.collection.AbstractMap.default(Map.scala:58) 
    at scala.collection.MapLike$class.apply(MapLike.scala:141) 
    at scala.collection.AbstractMap.apply(Map.scala:58) 
    at havelsan.CDRGenerator$.generate_random_target(CDRGenerator.scala:95) 
    at havelsan.CDRGenerator$$anonfun$main$2$$anonfun$6.apply(CDRGenerator.scala:167) 
    at havelsan.CDRGenerator$$anonfun$main$2$$anonfun$6.apply(CDRGenerator.scala:165) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1197) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

당신이 불꽃지도를 배포하는 방법을 설명 할 수와? Btw 내 스파크 버전이 1.6.0

무엇이 누락 되었습니까?

UPDATE

이 부분은 드라이버에 대한지도를 초기화합니다.

... 
    var pd = sc.textFile("hdfs://...") 
    my_immutable_map = pd.map(line => line.split(":")).map{ line => (line(0), line(1).split(","))}.collectAsMap 
... 

    broadcast = sc.broadcast(my_immutable_map) 
    my_map_rdd = sc.parallelize(my_immutable_map.toSeq) 

그리고 이것은 내가 오류가있는 부분입니다.

def my_func(key:String):String={ 
... 
    my_value = broadcast.value(key) 
... 
} 

my_func는 다음과 같이 맵 내에서 호출됩니다.

my_another_rdd.map{ line => 
val key = line.split(",")(0) 
    my_func(key) 
} 
+0

스파크 버전 ?? – banjara

+0

내 스파크 버전은 1.6.0입니다. –

+0

더 많은 코드를 입력하십시오. 지도가 상당히 작 으면 첫 번째 방법이 올바른 것입니다. –

답변

0

내가 찾은 해결책은 매개 변수로 함수에 브로드 캐스트 값을 전달하는 것입니다. 아직도, 나는 방법을 병렬 처리하기위한 해결책을 찾지 못했습니다.

https://stackoverflow.com/a/34912887/4668959

관련 문제