2017-12-18 2 views
1

Spark에서 누적 합계를하고 싶습니다.Spark의 누적 합계

+---------------+-------------------+----+----+----+ 
|  product_id|   date_time| ack|val1|val2| 
+---------------+-------------------+----+----+----+ 
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52| 
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52| 
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52| 
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52| 
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1| 
+---------------+-------------------+----+----+----+ 

하이브 쿼리 :

select *, SUM(val1) over (Partition by product_id, ack order by date_time rows between unbounded preceding and current row) val1_sum, SUM(val2) over (Partition by product_id, ack order by date_time rows between unbounded preceding and current row) val2_sum from test 

출력 : 스파크 로직을 사용

+---------------+-------------------+----+----+----+-------+--------+ 
|  product_id|   date_time| ack|val1|val2|val_sum|val2_sum| 
+---------------+-------------------+----+----+----+-------+--------+ 
|4008607333T.upf|2017-12-13:02:27:01|3-46| 53| 52|  53|  52| 
|4008607333T.upf|2017-12-13:02:27:08|3-46| 53| 52| 106|  104| 
|4008607333T.upf|2017-12-13:02:28:07|3-46| 15| 1| 121|  105| 
|4008607333T.upf|2017-12-13:02:27:03|3-47| 53| 52|  53|  52| 
|4008607333T.upf|2017-12-13:02:28:01|3-47| 53| 52| 106|  104| 
+---------------+-------------------+----+----+----+-------+--------+ 

, 나는 점점 오전 같은 위의 출력 :

import org.apache.spark.sql.expressions.Window 
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time) 
import org.apache.spark.sql.functions._ 

val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w) 
newDf.show 
다음 레지스터 테이블 (입력)입니다

그러나이 logi를 시도하면 스파크 클러스터 val_sum 값은 누적 합계의 절반이고 다른 시간 값입니다. 스파크 클러스터에서 왜 이런 일이 일어나고 있는지 모르겠습니다. 그것은 파티션 때문입니까?

스파크 클러스터의 누적 합계를 어떻게 할 수 있습니까?

답변

1

DataFrame API를 사용하여 누적 합계를 얻으려면 rowsBetween 창 메서드를 설정해야합니다. Spark 2.1 및 이후 버전에서 :

val w = Window.partitionBy($"product_id", $"ack") 
    .orderBy($"date_time") 
    .rowsBetween(Window.unboundedPreceding, Window.currentRow) 

이렇게하면 Spark에 파티션의 시작 부분부터 현재 행까지의 값을 사용하게됩니다. 이전 버전의 스파크를 사용하는 경우 동일한 효과로 rowsBetween(Long.MinValue, 0)을 사용하십시오.

창을 사용하려면 이전과 같은 방법을 사용하십시오. 나는.

val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w)) 
    .withColumn("val2_sum", sum($"val2").over(w)) 
+0

당신은 https://stackoverflow.com/questions/47908545/how-to-remember-the-previous-batch-of-spark-streaming-to-calculate-cumulative-su에 대한 답을 적어주세요 수 있습니다 – lucy