2016-08-17 4 views
3

스파크 작업 (2.0, 하프 2.7.2)을 제출하려고하지만 어떤 이유로 인해 EMR에서 약간의 NPE가 발생합니다. 스칼라 프로그램으로 모든 것이 잘 실행되므로 문제의 원인을 정확히 알 수 없습니다. org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.agg_doAggregateWithKeys $에서 작업 에게 java.lang.NullPointerException이 을 중단 (- 91 :스파크 작업으로 제출할 때 Spark RDD 맵에서 NullPointerException이 발생했습니다.

18 : 02 : 55,271 ERROR의 Utils 여기에 스택 추적입니다 알 수없는 소스) at org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext (알 수없는 소스) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext (BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext (WholeStageCodegenExec.scala : 370) at scala.collection.Iterator $$ anon $ 12.hasNext (Iterator.scala : 438)org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp (WriterContainer.scala : 253)에서 at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $ $ anonfun $ writeRows $ 1.apply (WriterContainer.scala : 252) org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply (WriterContainer.scala : 252) at org.apache. spark.util.Utils $ .tryWithSafeFinallyAndFailureCallbacks (Utils.scala : 1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows (WriterContainer.scala : 258) at org.apache.spark.sql.execution .datasources.InsertIntoHadopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (InsertIntoHadopFsRelationCommand.scala : 143) at org.apache.spark.sql.execution.datasource s.InsertIntoHadoopFsRelationCommand $$ anonfun $ 실행 anonfun $를 적용 $ (1) $$ $ MCV $ SP $ 1.apply (InsertIntoHadoopFsRelationCommand.scala : 143) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala : 70)에서 에서 org.apache.spark.scheduler.Task.run (Task.scala : 85) at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala : 274) at java.util.concurrent.ThreadPoolExecutor. java.lang.Thread.run에서 java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) 에서 runWorker (ThreadPoolExecutor.java:1142) (Thread.java:745)

우리가 알 수있는 한 다음과 같은 방법으로 발생합니다 :

def process(dataFrame: DataFrame, S3bucket: String) = { 
    dataFrame.map(row => 
     "text|label" 
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket) 
} 

우리는이 같은지도 기능으로 좁혀 한 스파크 작업으로 제출하면 작동합니다

def process(dataFrame: DataFrame, S3bucket: String) = { 
    dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket) 
} 

사람이 문제의 원인이 될 수있는 어떤 생각을 가지고 있습니까? 또한 어떻게 해결할 수 있습니까? 우리는 꽤 난처한 처지입니다.

+0

'coalesce()'없이 시도 했습니까? – gsamaras

+0

@gsamaras 아니요! 그러나 그것은 연합없이 일하는 것처럼 보입니다. 무슨 일 이니? – cscan

답변

5

작업자가 아니라 드라이버에만있는 SparkContext 개체에 액세스하려고하면 작업자가 NullPointerException을 던집니다.

coalesce() 귀하의 데이터를 다시 분할하십시오. 한 파티션 만 요청하면 모두 한 파티션의 데이터를 압축하려고 시도합니다 *. 이는 응용 프로그램의 메모리 보행에 많은 압력을 가할 수 있습니다.

일반적으로 1에서 파티션을 축소하지 않는 것이 좋습니다.

자세한 내용은 Spark NullPointerException with saveAsTextFilethis을 참조하십시오. 경우


+0

우리가 coalesce (1)을 사용하는 이유는 많은 파일 대신 단일 파일에 모든 데이터를 쓰는 것이 었습니다. 이 작업을 수행 할 수있는 다른 방법이 있습니까? – cscan

+0

@cscan no. 어쩌면 메모리 설정을 늘리면 응용 프로그램이 하나의 파티션에서 작동 할 수 있지만 내가 게시 한 오류가 그와 같은 것을 나타내지는 않습니다. 왜 그들이 1 파일에 있기를 원하는 이유가 있습니까? – gsamaras

+1

이 오류는 다섯 개의 레코드로 테스트했을 때 발생했습니다. 메모리 사용과 관련이 없다고 생각합니다. – cscan

관련 문제