개별 필드 (열)에서 작동하는 UserDefinedAggregateFunctions
과 달리 Aggregtors
은 전체/값을 필요로합니다.
스 니펫에서 사용할 수있는 Aggregator
을 원하면 열 이름으로 매개 변수화하고 값 유형으로 을 사용해야합니다.
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, Row}
case class Max(col: String)
extends Aggregator[Row, Int, Int] with Serializable {
def zero = Int.MinValue
def reduce(acc: Int, x: Row) =
Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero))
def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2)
def finish(acc: Int) = acc
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
사용 예제는 정적으로 Dataset<Row>
보다 Datasets
을 입력과 결합 될 때
val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z")
@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)"))
df.agg(exprs.head, exprs.tail: _*)
+------+------+------+
|max(x)|max(y)|max(z)|
+------+------+------+
| 4| 5| 3|
+------+------+------+
는 틀림없이 Aggregators
훨씬 더 의미합니다.
당신은 Seq[_]
누적를 사용하여 하나의 패스를 또한 요구 사항에 집계 여러 열을 수 따라 단일 merge
전화에 전체 (기록) 처리.
어 그리 게이터 코드를 표시하십시오. 당신이하려고하는 것을 설명하십시오. –
@AssafMendelson, 실제로 우리는 다양한 데이터 유형에 대해 다양한 통계를위한 많은 맞춤 애그리 게이터를 계획하고 있습니다. 가장 짧은 문자열과 가장 긴 문자열을 얻기 위해 애그리 게이터 (aggregator)로 작게 시작하고 있습니다. class ShortestLongestAggregator()는 Aggregator [String, (String, String), (String, String)]를 확장합니다. 지금은 임의의 데이터 프레임의 모든 열에 대해 (가장 짧은, 가장 긴) 모든 쌍을 갖기를 원할 것입니다 (문자열 열만있는 경우). – mathieu