2016-09-01 4 views
-1

나는 스칼라의 DataFrame에서 작동하는 함수를 정의하려고 시도 해왔다. 스칼라의 입력은 스칼라 세트를 입력으로 사용하고 정수를 출력한다.Spark sqlContext 집합에 작용하는 UDF

// generate sample data 
case class Dummy(x:Array[Integer]) 
val df = sqlContext.createDataFrame(Seq(
    Dummy(Array(1,2,3)), 
    Dummy(Array(10,20,30,40)) 
)) 

// define the UDF 
import org.apache.spark.sql.functions._ 
def setSize(A:Set[Integer]):Integer = { 
    A.size 
} 
// For some reason I couldn't get it to work without this valued function 
val sizeWrap: (Set[Integer] => Integer) = setSize(_) 
val sizeUDF = udf(sizeWrap) 

// this produces the error 
df.withColumn("colSize", sizeUDF('x)).show 

내가 여기 실종 무엇 :

여기
org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 25.0 failed 1 times, most recent failure: Lost task 20.0 in stage 25.0 (TID 473, localhost): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Set 

문제의 핵심을 제공하는 간단한 코드이다 : 나는 다음과 같은 오류를 받고 있어요? 어떻게 작동시킬 수 있습니까? RDD로 전송하여이 작업을 수행 할 수 있지만 RDD와 DataFrames간에 앞뒤로 가고 싶지 않습니다.

답변

1

사용 Seq :

val sizeUDF = udf((x: Seq) => setSize(x.toSet)) 
+0

감사합니다. 그것은 작동합니다. 나는 또한 입력으로 2 세트로 일반화 할 수 있었다 (나는 원래 필요했다) – Avision