2016-07-20 5 views
0

Spark-Shell에서는 제대로 작동하지만 Scala 플러그인에서는 Eclipse에서는 작동하지 않는 작은 스칼라 코드가 있습니다. 나는 또 다른 파일을 작성 시도 플러그인을 사용하여 HDFS에 액세스 할 수 있으며 일 ..스파크 쉘에서 코드가 작동하지 않습니다.

FirstSpark.scala

package bigdata.spark 
import org.apache.spark.SparkConf 
import java. io. _ 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 

object FirstSpark { 

    def main(args: Array[String])={ 
    val conf = new SparkConf().setMaster("local").setAppName("FirstSparkProgram") 
    val sparkcontext = new SparkContext(conf) 
    val textFile =sparkcontext.textFile("hdfs://pranay:8020/spark/linkage") 
    val m = new Methods() 
    val q =textFile.filter(x => !m.isHeader(x)).map(x=> m.parse(x)) 
    q.saveAsTextFile("hdfs://pranay:8020/output") } 
} 

에게

package bigdata.spark 
import java.util.function.ToDoubleFunction 

class Methods { 
def isHeader(s:String):Boolean={ 
    s.contains("id_1") 
} 
def parse(line:String) ={ 
    val pieces = line.split(',') 
    val id1=pieces(0).toInt 
    val id2=pieces(1).toInt 
    val matches=pieces(11).toBoolean 
    val mapArray=pieces.slice(2, 11).map(toDouble) 
    MatchData(id1,id2,mapArray,matches) 
    } 
def toDouble(s: String) = { 
    if ("?".equals(s)) Double.NaN else s.toDouble 
} 
} 
case class MatchData(id1: Int, id2: Int, 
scores: Array[Double], matched: Boolean) 

오류 메시지 Methods.scala :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:335) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:334) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) 

을 수 아무도 이것으로 나를 도와주세요

답변

0

class Methods { .. }object Methods { .. }으로 변경해보십시오.

문제는 val q =textFile.filter(x => !m.isHeader(x)).map(x=> m.parse(x))에 있다고 생각합니다. Spark이 filtermap 함수를 보면 전달 된 함수 (x => !m.isHeader(x)x=> m.parse(x))를 직렬화하여 모든 실행자에게 실행 작업을 전달할 수 있습니다 (참조 된 작업 임). 그러나이 개체는 함수 내에서 참조되므로 (이 메서드는 두 개의 익명 메서드를 닫음) 은 직렬화 할 수 없기 때문에 m을 serialize해야합니다. Methods 클래스에 extends Serializable을 추가 할 수 있지만이 경우에는 object이 더 적절합니다 (이미 직렬화 가능).

+0

알렉에 감사드립니다. 그것은 효과가 있었다. – Pranay

+0

@Pranay 다행스럽게 도와 줬어! 완전성을 위해, 여러분이'x =>! (new Methods()). isHeader (x)'를 사용했다하더라도, Spark은 정의를 얻기 위해'메소드'전체를 직렬화하려고 시도 했으므로 여전히 동일한 문제를 겪었을 것입니다 isHeader의. – Alec

관련 문제