2017-09-05 1 views
0

동적 스키마 생성을 사용하여 Dataframe을 만들려고합니다. 여기 코드는 다음과 같습니다RDD 용 DataFrame을 만들 수 없습니다.

내가 런타임에 이러한 예외가 계속하지만
def mapMetricList(row: Row): Seq[Metric] = ??? 

val fields = Seq("Field1", "Field2") 

case class Metric(name: String, count: Long) 
def convertMetricList(df: DataFrame): DataFrame = { 
    val outputFields = df.schema.fieldNames.filter(f => fields.contains(f)) 

    val rdd = df.rdd.map(row => { 
    val schema = row.schema 
    val metrics = mapMetricList(row) 
    val s = outputFields.map(name => row.get(schema.fieldIndex(name))) 
    Row.fromSeq(s ++ Seq(metrics)) 
    }) 

    val nonMetricsSchema = outputFields.map(f => df.schema.apply(f)) 
    val metricField = StructField("total",ArrayType(ScalaReflection.schemaFor[Metric].dataType.asInstanceOf[StructType]),nullable=true) 
    val schema = StructType(nonMetricsSchema ++ Seq(metricField)) 
    schema.printTreeString() 
    val dff = spark.createDataFrame(rdd, schema) 
    dff 
} 

: 나는 불꽃 2.1.0을 사용하고

Caused by: java.lang.RuntimeException: Metric is not a valid external type for schema of struct<name:string,count:bigint> 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfCondExpr3$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr4$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) 

+0

"Metric"클래스가 내부에 있으면 이러한 오류가 발생할 수 있습니다. "Metric"클래스를 자신의 파일로 이동하십시오. – pasha701

+0

케이스 클래스를 별도의 파일로 이동하려고 시도했지만 오류가 여전히 있습니다. –

답변

0

내 컴퓨터에 스파크 1.6 괜찮 았는데, I "convertMetricList"함수의 결과를 인쇄합니다. "metricField"필드의 "count"유형에서 문제가 발생할 수 있습니다. 언급 당신의 추적 "BIGINT"에서, 내 ENV 유형에 "LongType"입니다 :

StructField(total,ArrayType(
    StructType(StructField(name,StringType,true), 
    StructField(count,LongType,false) 
),true),true) 

당신은 당신의 ENV의 "metricField"유형을 확인할 수 있습니다. 다른 경우, 해결 방법은 Metric 구조를 하드 코딩하는 것입니다.

+0

답해 주셔서 감사합니다. Spark 1.6에서 코드를 테스트했으며 작동합니다. 2.0에서 변경된 것이 작동을 멈추게하는 것은 무엇인지 모르겠습니다. –

+0

분명히 이것은 1.6에서 실수로 구현되었으며 2.0에서 제거되었습니다. https://issues.apache.org/jira/browse/SPARK-15507 –

+0

나는 또한 동일한 문제에 직면하고 있습니다. 이러한 문제를 해결하기위한 해결책이나 해결 방법 –

관련 문제