2016-09-15 3 views
1

드라이버 호출 할 수 org.apache.spark.rdd.RDD [ (문자열, com.model.item.CountryInfo)] = MapPartitionsRDD [8]SPARK-5063 RDD 변환과 행동은 내가보고하려고 나는 RDD [행]이

val itemList = df.filter(not($"newItemType" === "Unknown Type")).map(row => { 
    val customerId = row.getAs[String](0); 

    val itemId = row.getAs[String](1); 
    val itemType = row.getAs[String](4); 

    val priceType = if (StringUtils.isNotBlank(pairMap.lookup(itemType).head.getpriceType)) pairMap.lookup(itemType).head.getpriceType else "unknown" 
    val kidsAdults = if (pairMap.lookup(itemType).head.getItems.size() > 0) "Kids" else "Adults" 
    val tvMovie = if (pairMap.lookup(itemType).head.getbarCode != barCode) "TV" else "Movie" 
    Row(customerId ,itemId,itemType,priceType,kidsAdults,tvMovie) 
    }) 

내가 한 : itemList.first()

그러나이 오류가 계속를 :

,363,210
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 34, ip-172-31-0-28.ec2.internal): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 
    at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:928) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:89) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:83) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1314) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1314) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1314) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1288) 
    at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1328) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.first(RDD.scala:1327) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:86) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:91) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:93) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:103) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:105) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:107) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:109) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:111) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:113) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:115) 
    at $iwC$$iwC$$iwC.<init>(<console>:117) 
    at $iwC$$iwC.<init>(<console>:119) 
    at $iwC.<init>(<console>:121) 
    at <init>(<console>:123) 
    at .<init>(<console>:127) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
    at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664) 
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629) 
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622) 
    at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) 
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) 
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276) 
    at org.apache.zeppelin.scheduler.Job.run(Job.java:170) 
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 
    at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:928) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:89) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:83) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1314) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1314) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    ... 3 more 

는 또한 시도 :

val tvMovieUDF = udf {(itemType: String) => if (StringUtils.isNotBlank(pairMap.lookup(itemType).head.getpriceType)) pairMap.lookup(itemType).head.getpriceType else "unknown" } 

    val priceUDF = udf {(itemType: String) => if (pairMap.lookup(itemType).head.getItems.size() > 0) "Kids" else "Adults" } 

    val kidsUDF = udf {(itemType: String) => if (pairMap.lookup(itemType).head.getbarCode != barCode) "TV" else "Movie" } 

    val broDF = df.filter(not($"newItemType" === "Unknown Type")).withColumn("tvMovie",tvMovieUDF($"newItemType")).withColumn("priceType",priceUDF($"newItemType")).withColumn("kids",kidsUDF($"newItemType")) 

하지만 여전히 같은 오류가. 누군가 그것을 해결할 수있는 방법을 말해 줄 수 있습니까? 데이터가 gzip 파일로 저장되기를 원합니다.

val json = itemList.toJSON 
    json.saveAsTextFile("s3://...", classOf[GzipCodec]) 
+1

을 b.value.lookup 사용할 수 있습니다, 스파크 할 수 없습니다 해. 'pairMap'이 작 으면'collect()'메소드를 호출하고 브로드 캐스트 변수를 사용할 수 있습니다. http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables – Leonard

답변

2

음, 변환에서 다른 RDD에 액세스 할 수 없습니다. 그건 허용되지 않습니다. 당신이 달성하려고 노력하는 것 lookMap 할 수 있도록 함수에 pairMap을 보내는 것입니다 생각합니다. 그렇다면 방송을 사용할 수 있습니다.

B = sc.broadcast (parMap.collect()) 대신 parMap.lookup의 다음

, 당신은 당신이 다른 RDD에서 RDD를 호출

+0

b.value이어야합니다. .get ('key') 대신 조회 –