2016-06-21 2 views
0

Spark 2.0 미리보기와 함께 Databricks Community Edition을 사용하고 있습니다. 내가 할 내가SparkSQL Aggregator : 형식 불일치 오류

val g_c1 = teams.groupByKey(_.f1).agg(C1Agg.toColumn).collect 

를 사용하는 경우

error: type mismatch;
found: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
required: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
val g_c1 = teams.groupByKey(_.f1).aggSeq[C1]

: 나는 다음과 같은 오류 메시지가

import org.apache.spark.sql.expressions.Aggregator 
import org.apache.spark.sql.Encoder 
import java.util.Calendar 
import spark.implicits._ 

case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double) 
val teams = sc.parallelize(Seq(C1("hash1", "NLC", "Cubs", Java.sql.Date.valueOf("2016-01-23"), 3253.21), C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88), C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12),C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72))).toDS 

object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]] { 
    def zero: Seq[C1] = Seq.empty[C1] //Nil 
    def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a 
    def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2 
    def finish(r: Seq[C1]): Seq[C1] = r 

    override def bufferEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1] 
    override def outputEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1] 
} 
val g_c1 = teams.groupByKey(_.f1).agg[Seq[C1]](C1Agg.toColumn).collect 

: 나는 다음 (단순) 코드를 시도

error: type mismatch;
found: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
required: org.apache.spark.sql.TypedColumn[C1,?]
val g_c1 = teams.groupByKey(_.f1).aggSeq[C1]

힌트가 있습니까?

답변

0

이유를 찾았습니다. 이것은 하나의 셀 (노트북)에서 사례 클래스를 선언하고 다른 후속 셀에서 사용하기 때문에 발생합니다.

동일한 셀에 전체 코드를 넣으면이 문제가 해결됩니다. (불행히도 지금은 다른 문제가 있음 MissingRequirementError)