-1
나는 스칼라의 DataFrame에서 작동하는 함수를 정의하려고 시도 해왔다. 스칼라의 입력은 스칼라 세트를 입력으로 사용하고 정수를 출력한다.Spark sqlContext 집합에 작용하는 UDF
// generate sample data
case class Dummy(x:Array[Integer])
val df = sqlContext.createDataFrame(Seq(
Dummy(Array(1,2,3)),
Dummy(Array(10,20,30,40))
))
// define the UDF
import org.apache.spark.sql.functions._
def setSize(A:Set[Integer]):Integer = {
A.size
}
// For some reason I couldn't get it to work without this valued function
val sizeWrap: (Set[Integer] => Integer) = setSize(_)
val sizeUDF = udf(sizeWrap)
// this produces the error
df.withColumn("colSize", sizeUDF('x)).show
내가 여기 실종 무엇 :
여기org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 25.0 failed 1 times, most recent failure: Lost task 20.0 in stage 25.0 (TID 473, localhost): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Set
문제의 핵심을 제공하는 간단한 코드이다 : 나는 다음과 같은 오류를 받고 있어요? 어떻게 작동시킬 수 있습니까? RDD로 전송하여이 작업을 수행 할 수 있지만 RDD와 DataFrames간에 앞뒤로 가고 싶지 않습니다.
감사합니다. 그것은 작동합니다. 나는 또한 입력으로 2 세트로 일반화 할 수 있었다 (나는 원래 필요했다) – Avision