스파크 SQL은 나에게 꽤 분명하다. 그러나, 나는 spark의 RDD API로 시작하고 있습니다. spark apply function to columns in parallel가 지적 하듯이 나를 의사 코드에서스파크 SQL에서 RDD API로 변환
def handleBias(df: DataFrame, colName: String, target: String = this.target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
}
}
에 대한 느린 셔플을 제거 할 수 있도록해야합니다 df foreach column (handleBias(column)
그래서 최소한의 데이터 프레임
val input = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
)
val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
을로드하지만 제대로
를 매핑하는 데 실패
val rdd1_inputDf = inputDf.rdd.flatMap { x => {(0 until x.size).map(idx => (idx, x(idx)))}}
rdd1_inputDf.toDF.show
이 질문에서 설명한 문제의 예제는 https://github.com/geoHeil/sparkContrastCodinghttps://github.com/geoHeil/sparkContrastCoding/blob/master/src/main/scala/ColumnParallel.scala입니다.
.Pipeline이고 출력 단계는 DataFrame이며 "스키마 손실"입니다. 패턴 일치를 사용해야합니까? 이 올바른지? 그러나 꽤 많은 칼럼들이 있습니다. 다소 복잡합니다. ( –
'DF => RDD' 변환은 안타깝게도 스키마를 사용하지 않습니다. (그렇다고 생각하지 않습니다.) 강제로 사용하는 좋은 방법이다.) 그러나 새로운 'Dataset' 예제를 살펴 보자 : 중개자'Dataframe '을 사용할 필요가 없으며'DataSet'이 유형을 멋지게 추론하는 것처럼 보인다 (Spark 2.0에서 나는 생각한다. DF로 할 수있는 일은 –
@GeorgHeiler (^^^^에 대한 통보를 받았는지 확실하지 않음) –