2017-09-27 1 views
0

을 바탕으로 집계하고 피봇 팅 나는 불꽃불꽃 스칼라 - 시간주기

지금 현재로

, 내가는 SqlContext을 사용하고 SQL 내에서 모든 변환을 적용하고있어에서 SQL 서버와 유사한 선회 구현하기 위해 노력했다. SQL Server에서 직접 끌어 오기를 수행하고 spark를 사용하여 피벗 펑션을 구현할 수 있는지 알고 싶습니다. | 다음은

내가

create table #temp(ID Int, MonthPrior int, Amount float);

insert into #temp values (100,1,10),(100,2,20),(100,3,30),(100,4,10),(100,5,20),(100,6,60),(200,1,10),(200,2,20),(200,3,30),(300,4,10),(300,5,20),(300,6,60);

select * from #temp;

below- SQL Server 쿼리를 achieve- 노력하고있어의 예입니다 ID | MonthPrior | 금액 |
| ------- | ---------- | ------ |
| 100 | 1 | 10 |
| 100 | 2 | 20 |
| 100 | 3 | 30 |
| 100 | 4 | 10 |
| 100 | 5 | 20 |
| 100 | 6 | 60 |
| 200 | 1 | 10 |
| 200 | 2 | 20 |
| 200 | 3 | 30 |
| 300 | 4 | 10 |
| 300 | 5 | 20 |
| 300 | 6 | 60 |

Select ID,coalesce([1],0) as Amount1Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0) as Amount1to3Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0)+coalesce([4],0)+coalesce([5],0)+coalesce([6],0) as Amount_AllMonths from (select * from #temp) A pivot (sum(Amount) for MonthPrior in ([1],[2],[3],[4],[5],[6])) as Pvt

| ID | 금액 1Mth | Amount1to3Mth | Amount_AllMonths |
| ------- | ------- | ------- | --- |
| 100 | 10 | 60 | 150 |
| 200 | 10 | 60 | 60 |
| 300 | 0 | 0 | 90 |

답변

1

경우를 Decimal 유형의 경우 대응하는 인수 유형으로 java.math.BigDecimal을 사용하는 것이 가장 좋습니다. +sum 메서드는 더 이상 적용 할 수 없으므로 각각 addreduce으로 대체됩니다.

import org.apache.spark.sql.functions._ 
import java.math.BigDecimal 

val df = Seq(
    (100, 1, new BigDecimal(10)), 
    (100, 2, new BigDecimal(20)), 
    (100, 3, new BigDecimal(30)), 
    (100, 4, new BigDecimal(10)), 
    (100, 5, new BigDecimal(20)), 
    (100, 6, new BigDecimal(60)), 
    (200, 1, new BigDecimal(10)), 
    (200, 2, new BigDecimal(20)), 
    (200, 3, new BigDecimal(30)), 
    (300, 4, new BigDecimal(10)), 
    (300, 5, new BigDecimal(20)), 
    (300, 6, new BigDecimal(60)) 
).toDF("ID", "MonthPrior", "Amount") 

// UDF to combine 2 array-type columns to map 
def arrayToMap = udf(
    (a: Seq[Int], b: Seq[BigDecimal]) => (a zip b).toMap 
) 

// Create array columns which get zipped into a map 
val df2 = df.groupBy("ID").agg(
    collect_list(col("MonthPrior")).as("MonthList"), 
    collect_list(col("Amount")).as("AmountList") 
).select(
    col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap") 
) 

// UDF to sum map values for keys from 1 thru n (0 for all) 
def sumMapValues = udf(
    (m: Map[Int, BigDecimal], n: Int) => 
    if (n > 0) 
     m.collect{ case (k, v) => if (k <= n) v else new BigDecimal(0) }.reduce(_ add _) 
    else 
     m.collect{ case (k, v) => v }.reduce(_ add _) 
) 

val df3 = df2.withColumn("Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1))). 
    withColumn("Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3))). 
    withColumn("Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0))). 
    select(col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths")) 

df3.show(truncate=false) 
+---+--------------------+--------------------+--------------------+ 
| ID|   Amount1Mth|  Amount1to3Mth| Amount_AllMonths| 
+---+--------------------+--------------------+--------------------+ 
|300|    0E-18|    0E-18|90.00000000000000...| 
|100|10.00000000000000...|60.00000000000000...|150.0000000000000...| 
|200|10.00000000000000...|60.00000000000000...|60.00000000000000...| 
+---+--------------------+--------------------+--------------------+ 
1

한 가지 방법은 MonthPriorAmount의 배열에서지도 형 열을 생성 할 수 있으며, 정수 매개 변수를 기반으로지도 값의 합계 UDF 적용됩니다 :

Amount 열입니다
val df = Seq(
    (100, 1, 10), 
    (100, 2, 20), 
    (100, 3, 30), 
    (100, 4, 10), 
    (100, 5, 20), 
    (100, 6, 60), 
    (200, 1, 10), 
    (200, 2, 20), 
    (200, 3, 30), 
    (300, 4, 10), 
    (300, 5, 20), 
    (300, 6, 60) 
).toDF("ID", "MonthPrior", "Amount") 

import org.apache.spark.sql.functions._ 

// UDF to combine 2 array-type columns to map 
def arrayToMap = udf(
    (a: Seq[Int], b: Seq[Int]) => (a zip b).toMap 
) 

// Aggregate columns into arrays and apply arrayToMap UDF to create map column 
val df2 = df.groupBy("ID").agg(
    collect_list(col("MonthPrior")).as("MonthList"), 
    collect_list(col("Amount")).as("AmountList") 
).select(
    col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap") 
) 

// UDF to sum map values for keys from 1 thru n (0 for all) 
def sumMapValues = udf(
    (m: Map[Int, Int], n: Int) => 
    if (n > 0) m.collect{ case (k, v) => if (k <= n) v else 0 }.sum else 
     m.collect{ case (k, v) => v }.sum 
) 

// Apply sumMapValues UDF to the map column 
val df3 = df2.withColumn("Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1))). 
    withColumn("Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3))). 
    withColumn("Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0))). 
    select(col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths")) 

df3.show 
+---+----------+-------------+----------------+ 
| ID|Amount1Mth|Amount1to3Mth|Amount_AllMonths| 
+---+----------+-------------+----------------+ 
|300|   0|   0|    90| 
|100|  10|   60|    150| 
|200|  10|   60|    60| 
+---+----------+-------------+----------------+ 
+0

감사합니다. @LeoC 이 접근 방식을 분석하겠습니다. 일하는 것 같아요. –

+0

도움이 된 것을 기쁘게 생각합니다.문제가 해결되면 답변을 수락하여 질문을 마무리하십시오. –

+0

(a : Seq [Int], b : Seq [Int]) => (zip b) .toMap에서 문제가 발생했습니다. 이 부분은 'UDF (col_1, col_2) '데이터 형식이 일치하지 않아서 해결할 수 없습니다. 인수 2에는 배열 형식이 필요하지만'col_2 '는 배열 형식입니다. ' udf에서 숫자/십진수를 사용하여 시도했습니다. 여전히 작동하지 않았습니다 –

0

감사합니다. @LeoC. 솔루션 위에 효과가있다. 또한 다음을 시도했습니다.

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Column 


lazy val months = (((df select ($"MonthPrior") distinct) sort 
($"MonthPrior".asc)).rdd map (_.getAs[Int](0)) collect).toList 

lazy val sliceSpec = List((0, 2, "1-2"), (0, 3, "1-3"), (0, 4, "1-4"), (0, 5, "1-5"), (0, 6, "1-6")) 

lazy val createGroup: List[Any] => ((Int, Int, String) => Column) = sliceMe => (start, finish, aliasName) => 
    sliceMe slice (start, finish) map (value => col(value.toString)) reduce (_ + _) as aliasName 

lazy val grouper = createGroup(months).tupled 

lazy val groupedCols = sliceSpec map (group => grouper(group)) 

val pivoted = df groupBy ($"ID") pivot ("MonthPrior") agg (sum($"Amount")) 

val writeMe = pivoted select ((pivoted.columns map col) ++ (groupedCols): _*) 

z.show(writeMe sort ($"ID".asc))