2017-10-30 2 views
1

하위 그룹 내에서 해당 열의 첫 번째 값을 가져올 수 있습니까?Dataframe은 해당 열의 첫 번째 및 마지막 값을 가져옵니다.

import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.{Window, WindowSpec} 

object tmp { 
    def main(args: Array[String]): Unit = { 
    val spark = SparkSession.builder().master("local").getOrCreate() 
    import spark.implicits._ 

    val input = Seq(
     (1235, 1, 1101, 0), 
     (1235, 2, 1102, 0), 
     (1235, 3, 1103, 1), 
     (1235, 4, 1104, 1), 
     (1235, 5, 1105, 0), 
     (1235, 6, 1106, 0), 
     (1235, 7, 1107, 1), 
     (1235, 8, 1108, 1), 
     (1235, 9, 1109, 1), 
     (1235, 10, 1110, 0), 
     (1235, 11, 1111, 0) 
    ).toDF("SERVICE_ID", "COUNTER", "EVENT_ID", "FLAG") 

    lazy val window: WindowSpec = Window.partitionBy("SERVICE_ID").orderBy("COUNTER") 
    val firsts = input.withColumn("first_value", first("EVENT_ID", ignoreNulls = true).over(window.rangeBetween(Long.MinValue, Long.MaxValue))) 
    firsts.orderBy("SERVICE_ID", "COUNTER").show() 

    } 
} 

출력 싶습니다.

먼저 (또는 이전) FLAG에 기초하여 열 EVENT_ID 값 = FLAG에 기초하여 열 EVENT_ID 1 그리고 마지막 (또는 다음) 값 = SERVICE_ID 1 개 파티션 카운터 정렬

+----------+-------+--------+----+-----------+-----------+ 
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value|last_value| 
+----------+-------+--------+----+-----------+-----------+ 
|  1235|  1| 1101| 0|   0|  1103| 
|  1235|  2| 1102| 0|   0|  1103| 
|  1235|  3| 1103| 1|   0|  1106| 
|  1235|  4| 1104| 0|  1103|  1106| 
|  1235|  5| 1105| 0|  1103|  1106| 
|  1235|  6| 1106| 1|   0|  1108| 
|  1235|  7| 1107| 0|  1106|  1108| 
|  1235|  8| 1108| 1|   0|  1109| 
|  1235|  9| 1109| 1|   0|  1110| 
|  1235|  10| 1110| 1|   0|   0| 
|  1235|  11| 1111| 0|  1110|   0| 
|  1235|  12| 1112| 0|  1110|   0| 
+----------+-------+--------+----+-----------+-----------+ 

답변

0

우선 데이터 프레임을 그룹으로 구성해야합니다. 새로운 그룹은 "시간"열이이 작업을 수행하려면 1. 동일마다 시작, 먼저 dataframe에 열 "ID"를 추가 : 이제

+----------+-------+--------+----+---+ 
|SERVICE_ID|COUNTER|EVENT_ID|FLAG| ID| 
+----------+-------+--------+----+---+ 
|  1235|  1| 1111| 1| 1| 
|  1235|  2| 1112| 0| 1| 
|  1235|  3| 1114| 0| 1| 
|  1235|  4| 2221| 1| 2| 
|  1235|  5| 2225| 0| 2| 
|  1235|  6| 2226| 0| 2| 
|  1235|  7| 2227| 1| 3| 
+----------+-------+--------+----+---+ 

을 우리 것을 :

lazy val window: WindowSpec = Window.partitionBy("SERVICE_ID").orderBy("COUNTER") 
val df_flag = input.filter($"FLAG" === 1) 
    .withColumn("ID", row_number().over(window)) 
val df_other = input.filter($"FLAG" =!= 1) 
    .withColumn("ID", lit(0)) 

// Create a group for each flag event 
val df = df_flag.union(df_other) 
    .withColumn("ID", max("ID").over(window.rowsBetween(Long.MinValue, 0))) 
    .cache() 

df.show() 준다 이벤트를 분리하는 열이 있으면 올바른 "EVENT_ID"(이름이 "first_value"로 바뀜)를 각 이벤트에 추가해야합니다. "first_value"외에도 다음에 플래그가 지정된 이벤트의 ID 인 두 번째 열 "last_value"를 계산하고 추가하십시오.

val df_event = df.filter($"FLAG" === 1) 
    .select("EVENT_ID", "ID", "SERVICE_ID", "COUNTER") 
    .withColumnRenamed("EVENT_ID", "first_value") 
    .withColumn("last_value", lead($"first_value",1,0).over(window)) 
    .drop("COUNTER") 

val df_final = df.join(df_event, Seq("ID", "SERVICE_ID")) 
    .drop("ID") 
    .withColumn("first_value", when($"FLAG" === 1, lit(0)).otherwise($"first_value")) 

df_final.show()

우리를 제공합니다

+----------+-------+--------+----+-----------+----------+ 
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value|last_value| 
+----------+-------+--------+----+-----------+----------+ 
|  1235|  1| 1111| 1|   0|  2221| 
|  1235|  2| 1112| 0|  1111|  2221| 
|  1235|  3| 1114| 0|  1111|  2221| 
|  1235|  4| 2221| 1|   0|  2227| 
|  1235|  5| 2225| 0|  2221|  2227| 
|  1235|  6| 2226| 0|  2221|  2227| 
|  1235|  7| 2227| 1|   0|   0| 
+----------+-------+--------+----+-----------+----------+ 
+0

유용. 6 백만 행의 클러스터에서이를 실행하지 않습니다. 하지만 그 전에는 플래그 (편집 내 원래 게시물)에 대한 마지막 (또는 다음) 값에 대한 또 다른 열을 추가해야합니다. – xstack2000

+0

@ xstack2000 이미 답변을 얻은 후에 너무 많이 추가하지 않으려하면 일부 답변이 무용지물이되어 미래의 방문자를 혼란스럽게 할 수 있습니다. 그러나 원하는 열을 추가했으며, 나중에 창의 행을 보는'lead' 함수를 사용합니다. – Shaido

+0

Shaido 귀하의 답변은 매우 도움이됩니다. 내 원래 게시물에 더 추가했지만 귀하의 회신에 감사드립니다. 다른 누군가가 볼 수 있다면 추가 한 연속적인 레코드에 2 개의 플래그가 있는지 처리하는 방법은 여전히 ​​필요합니다. 하지만 답장으로 답장을드립니다. 감사. – xstack2000

0

는 두 단계로 해결 될 수있다 "FLAG"== 1이 이벤트에 대한 유효 범위

  1. 얻을 이벤트;
  2. 입력과 함께 1. 범위로. 일부 열은 개명 가시성 포함

이 단축 될 수있다 :

val window = Window.partitionBy("SERVICE_ID").orderBy("COUNTER").rowsBetween(Window.currentRow, 1) 
val eventRangeDF = input.where($"FLAG" === 1) 
    .withColumn("RANGE_END", max($"COUNTER").over(window)) 
    .withColumnRenamed("COUNTER", "RANGE_START") 
    .select("SERVICE_ID", "EVENT_ID", "RANGE_START", "RANGE_END") 
eventRangeDF.show(false) 

val result = input.where($"FLAG" === 0).as("i").join(eventRangeDF.as("e"), 
    expr("e.SERVICE_ID=i.SERVICE_ID And i.COUNTER>e.RANGE_START and i.COUNTER<e.RANGE_END")) 
    .select($"i.SERVICE_ID", $"i.COUNTER", $"i.EVENT_ID", $"i.FLAG", $"e.EVENT_ID".alias("first_value")) 
    // include FLAG=1 
    .union(input.where($"FLAG" === 1).select($"SERVICE_ID", $"COUNTER", $"EVENT_ID", $"FLAG", lit(0).alias("first_value"))) 

result.sort("COUNTER").show(false) 

출력 :

+----------+--------+-----------+---------+ 
|SERVICE_ID|EVENT_ID|RANGE_START|RANGE_END| 
+----------+--------+-----------+---------+ 
|1235  |1111 |1   |4  | 
|1235  |2221 |4   |7  | 
|1235  |2227 |7   |7  | 
+----------+--------+-----------+---------+ 

+----------+-------+--------+----+-----------+ 
|SERVICE_ID|COUNTER|EVENT_ID|FLAG|first_value| 
+----------+-------+--------+----+-----------+ 
|1235  |1  |1111 |1 |0   | 
|1235  |2  |1112 |0 |1111  | 
|1235  |3  |1114 |0 |1111  | 
|1235  |4  |2221 |1 |0   | 
|1235  |5  |2225 |0 |2221  | 
|1235  |6  |2226 |0 |2221  | 
|1235  |7  |2227 |1 |0   | 
+----------+-------+--------+----+-----------+ 
+0

감사합니다. Pasha. 나는 이것을 클러스터에 대해 측정해야하고 응답으로 돌아 가야한다고 생각합니다. 그런데 원래 게시물을 편집했습니다. 더 많은 것을 추가하는 것에 대해 유감스럽게 생각합니다. – xstack2000

관련 문제