2017-11-09 3 views
2

Long 유형의 열에서 RangeBetween을 사용하여 Spark DataFrame에서 창 함수를 수행하려고하는데이 창의 결과가 올바르지 않습니다. 내가 뭔가 잘못하고 있는거야?Spark Window 함수 범위 잘못된 결과 생성 중

val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
     Seq(
     Row("2014-11-01 08:10:10.12345", 141482941L), 
     Row("2014-11-01 09:10:10.12345", 141483301L), 
     Row("2014-11-01 10:10:10.12345", 141483661L), 
     Row("2014-11-02 10:10:10.12345", 141492301L), 
     Row("2014-11-03 10:10:10.12345", 141500941L), 
     Row("2014-11-04 10:10:10.12345", 141509581L), 
     Row("2014-11-05 10:10:10.12345", 141518221L), 
     Row("2014-11-06 10:10:10.12345", 141526861L), 
     Row("2014-11-07 10:10:10.12345", 141535501L), 
     Row("2014-11-08 10:10:10.12345", 141544141L) 
    ) 
    ) 
val schema = new StructType() 
    .add(StructField("dateTime", StringType, true)) 
    .add(StructField("unixTime", LongType, true)) 

val df = spark.createDataFrame(rowsRdd, schema) 
df.show(10, false) 
df.printSchema() 

입니다 : 여기

내 DataFrame입니다

+-------------------------+---------------+ 
|dateTime     |unixTime  | 
+-------------------------+---------------+ 
|2014-11-01 08:10:10.12345|141482941| 
|2014-11-01 09:10:10.12345|141483301| 
|2014-11-01 10:10:10.12345|141483661| 
|2014-11-02 10:10:10.12345|141492301| 
|2014-11-03 10:10:10.12345|141500941| 
|2014-11-04 10:10:10.12345|141509581| 
|2014-11-05 10:10:10.12345|141518221| 
|2014-11-06 10:10:10.12345|141526861| 
|2014-11-07 10:10:10.12345|141535501| 
|2014-11-08 10:10:10.12345|141544141| 
+-------------------------+---------------+ 

스키마 :

root 
|-- dateTime: string (nullable = true) 
|-- unixTime: long (nullable = true) 

첫 번째 열은 이벤트 (문자열의 타임 스탬프는, 우리는하지 않습니다 실제로 그것을 사용한다) 그리고 두 번째 열은 10e-5 초 단위의 타임 스탬프에 해당하는 유닉스 시간이다.

이제 현재 행을 진행하는 창에서 여러 이벤트를 계산하려고합니다. 3 시간의 창 예를 들어 내가 할 : 여기

+-------------------------+---------------+---+ 
|dateTime     |unixTime  |cts| 
+-------------------------+---------------+---+ 
|2014-11-01 08:10:10.12345|141482941|1 | 
|2014-11-01 09:10:10.12345|141483301|2 | 
|2014-11-01 10:10:10.12345|141483661|3 | 
|2014-11-02 10:10:10.12345|141492301|1 | 
|2014-11-03 10:10:10.12345|141500941|1 | 
|2014-11-04 10:10:10.12345|141509581|1 | 
|2014-11-05 10:10:10.12345|141518221|1 | 
|2014-11-06 10:10:10.12345|141526861|1 | 
|2014-11-07 10:10:10.12345|141535501|1 | 
|2014-11-08 10:10:10.12345|141544141|1 | 
+-------------------------+---------------+---+ 

을 6 시간 창에 대한 결과입니다 제대로 반환

val hour: Long = 60*60*100000L 
val w = Window.orderBy(col("unixTime")).rangeBetween(-3*hour, 0) 
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime")) 

. 결과가 모두 0 인 이유는 무엇입니까?

val hour: Long = 60*60*100000L 
val w = Window.orderBy(col("unixTime")).rangeBetween(-6*hour, 0) 
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime")) 

+-------------------------+---------------+---+ 
|dateTime     |unixTime  |cts| 
+-------------------------+---------------+---+ 
|2014-11-01 08:10:10.12345|141482941|0 | 
|2014-11-01 09:10:10.12345|141483301|0 | 
|2014-11-01 10:10:10.12345|141483661|0 | 
|2014-11-02 10:10:10.12345|141492301|0 | 
|2014-11-03 10:10:10.12345|141500941|0 | 
|2014-11-04 10:10:10.12345|141509581|0 | 
|2014-11-05 10:10:10.12345|141518221|0 | 
|2014-11-06 10:10:10.12345|141526861|0 | 
|2014-11-07 10:10:10.12345|141535501|0 | 
|2014-11-08 10:10:10.12345|141544141|0 | 
+-------------------------+---------------+---+ 

12 시간 동안 발생합니다. 결과가 모두 1 인 이유는 무엇입니까?

val hour: Long = 60*60*100000L 
val w = Window.orderBy(col("unixTime")).rangeBetween(-12*hour, 0) 
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime")) 

+-------------------------+---------------+---+ 
|dateTime     |unixTime  |cts| 
+-------------------------+---------------+---+ 
|2014-11-01 08:10:10.12345|141482941|1 | 
|2014-11-01 09:10:10.12345|141483301|1 | 
|2014-11-01 10:10:10.12345|141483661|1 | 
|2014-11-02 10:10:10.12345|141492301|1 | 
|2014-11-03 10:10:10.12345|141500941|1 | 
|2014-11-04 10:10:10.12345|141509581|1 | 
|2014-11-05 10:10:10.12345|141518221|1 | 
|2014-11-06 10:10:10.12345|141526861|1 | 
|2014-11-07 10:10:10.12345|141535501|1 | 
|2014-11-08 10:10:10.12345|141544141|1 | 
+-------------------------+---------------+---+ 

여기 무슨 일이 일어나고 있습니까? 큰 rangeBetween 값에서는 올바르게 작동하지 않습니다.

편집 :이 문 제에 관한 뭔가 2017년 9월 11일

인가? [SPARK-19451][SQL] rangeBetween method should accept Long value as boundary #18540. Spark의 최신 버전에서 이미 구현 되었습니까?

답변

2

실제로 연결된 문제와 관련이 있습니다.

scala> (6 * hour).toInt 
res4: Int = -2134967296 

문제는 현재 마스터 상에 고정되었으며 스파크 2.3 발매합니다 6 * hour보다 큰 Integer.MAX_VALUE (2147483647)에 따라서 정수 오버플 결과이다 2,160,000,000이다.