동적 스키마 생성을 사용하여 Dataframe을 만들려고합니다. 여기 코드는 다음과 같습니다RDD 용 DataFrame을 만들 수 없습니다.
내가 런타임에 이러한 예외가 계속하지만def mapMetricList(row: Row): Seq[Metric] = ???
val fields = Seq("Field1", "Field2")
case class Metric(name: String, count: Long)
def convertMetricList(df: DataFrame): DataFrame = {
val outputFields = df.schema.fieldNames.filter(f => fields.contains(f))
val rdd = df.rdd.map(row => {
val schema = row.schema
val metrics = mapMetricList(row)
val s = outputFields.map(name => row.get(schema.fieldIndex(name)))
Row.fromSeq(s ++ Seq(metrics))
})
val nonMetricsSchema = outputFields.map(f => df.schema.apply(f))
val metricField = StructField("total",ArrayType(ScalaReflection.schemaFor[Metric].dataType.asInstanceOf[StructType]),nullable=true)
val schema = StructType(nonMetricsSchema ++ Seq(metricField))
schema.printTreeString()
val dff = spark.createDataFrame(rdd, schema)
dff
}
: 나는 불꽃 2.1.0을 사용하고
Caused by: java.lang.RuntimeException: Metric is not a valid external type for schema of struct<name:string,count:bigint>
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfCondExpr3$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr4$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
을
"Metric"클래스가 내부에 있으면 이러한 오류가 발생할 수 있습니다. "Metric"클래스를 자신의 파일로 이동하십시오. – pasha701
케이스 클래스를 별도의 파일로 이동하려고 시도했지만 오류가 여전히 있습니다. –