1

문자열 용 사용자 정의 Aggregator[]을 만들었습니다.여러 열에 사용자 정의 Spark Aggregator 적용 (Spark 2.0)

DataFrame의 모든 열에 적용하고 싶습니다. 모든 열은 문자열이지만 열 번호는 임의입니다.

올바른 표현을 쓰는 데 어려움이 있습니다. 다음과 같이 쓰고 싶습니다 :

df.agg(df.columns.map(c => myagg(df(c))) : _*) 

이것은 다양한 인터페이스에서 분명히 잘못되었습니다.

나는 RelationalGroupedDataset.agg(expr: Column, exprs: Column*) 코드를 보았지만 표현 조작에 익숙하지 않습니다.

아이디어가 있으십니까?

+3

어 그리 게이터 코드를 표시하십시오. 당신이하려고하는 것을 설명하십시오. –

+0

@AssafMendelson, 실제로 우리는 다양한 데이터 유형에 대해 다양한 통계를위한 많은 맞춤 애그리 게이터를 계획하고 있습니다. 가장 짧은 문자열과 가장 긴 문자열을 얻기 위해 애그리 게이터 (aggregator)로 작게 시작하고 있습니다. class ShortestLongestAggregator()는 Aggregator [String, (String, String), (String, String)]를 확장합니다. 지금은 임의의 데이터 프레임의 모든 열에 대해 (가장 짧은, 가장 긴) 모든 쌍을 갖기를 원할 것입니다 (문자열 열만있는 경우). – mathieu

답변

5

개별 필드 (열)에서 작동하는 UserDefinedAggregateFunctions과 달리 Aggregtors은 전체/값을 필요로합니다.

스 니펫에서 사용할 수있는 Aggregator을 원하면 열 이름으로 매개 변수화하고 값 유형으로 을 사용해야합니다.

import org.apache.spark.sql.expressions.Aggregator 
import org.apache.spark.sql.{Encoder, Encoders, Row} 

case class Max(col: String) 
    extends Aggregator[Row, Int, Int] with Serializable { 

    def zero = Int.MinValue 
    def reduce(acc: Int, x: Row) = 
    Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero)) 

    def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2) 
    def finish(acc: Int) = acc 

    def bufferEncoder: Encoder[Int] = Encoders.scalaInt 
    def outputEncoder: Encoder[Int] = Encoders.scalaInt 
} 

사용 예제는 정적으로 Dataset<Row>보다 Datasets을 입력과 결합 될 때

val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z") 

@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)")) 

df.agg(exprs.head, exprs.tail: _*) 
+------+------+------+ 
|max(x)|max(y)|max(z)| 
+------+------+------+ 
|  4|  5|  3| 
+------+------+------+ 

는 틀림없이 Aggregators 훨씬 더 의미합니다.

당신은 Seq[_] 누적를 사용하여 하나의 패스를 또한 요구 사항에 집계 여러 열을 수 따라 단일 merge 전화에 전체 (기록) 처리.