스파크 작업 (2.0, 하프 2.7.2)을 제출하려고하지만 어떤 이유로 인해 EMR에서 약간의 NPE가 발생합니다. 스칼라 프로그램으로 모든 것이 잘 실행되므로 문제의 원인을 정확히 알 수 없습니다. org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.agg_doAggregateWithKeys $에서 작업 에게 java.lang.NullPointerException이 을 중단 (- 91 :스파크 작업으로 제출할 때 Spark RDD 맵에서 NullPointerException이 발생했습니다.
18 : 02 : 55,271 ERROR의 Utils 여기에 스택 추적입니다 알 수없는 소스) at org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext (알 수없는 소스) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext (BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext (WholeStageCodegenExec.scala : 370) at scala.collection.Iterator $$ anon $ 12.hasNext (Iterator.scala : 438)org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp (WriterContainer.scala : 253)에서 at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $ $ anonfun $ writeRows $ 1.apply (WriterContainer.scala : 252) org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply (WriterContainer.scala : 252) at org.apache. spark.util.Utils $ .tryWithSafeFinallyAndFailureCallbacks (Utils.scala : 1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows (WriterContainer.scala : 258) at org.apache.spark.sql.execution .datasources.InsertIntoHadopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (InsertIntoHadopFsRelationCommand.scala : 143) at org.apache.spark.sql.execution.datasource s.InsertIntoHadoopFsRelationCommand $$ anonfun $ 실행 anonfun $를 적용 $ (1) $$ $ MCV $ SP $ 1.apply (InsertIntoHadoopFsRelationCommand.scala : 143) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala : 70)에서 에서 org.apache.spark.scheduler.Task.run (Task.scala : 85) at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala : 274) at java.util.concurrent.ThreadPoolExecutor. java.lang.Thread.run에서 java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) 에서 runWorker (ThreadPoolExecutor.java:1142) (Thread.java:745)
우리가 알 수있는 한 다음과 같은 방법으로 발생합니다 :
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.map(row =>
"text|label"
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
우리는이 같은지도 기능으로 좁혀 한 스파크 작업으로 제출하면 작동합니다
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
사람이 문제의 원인이 될 수있는 어떤 생각을 가지고 있습니까? 또한 어떻게 해결할 수 있습니까? 우리는 꽤 난처한 처지입니다.
'coalesce()'없이 시도 했습니까? – gsamaras
@gsamaras 아니요! 그러나 그것은 연합없이 일하는 것처럼 보입니다. 무슨 일 이니? – cscan