2016-10-09 5 views
0

나는 (Key, (Timestamp, Value)) 항목으로 구성된 Pair RDD를 가지고 있습니다.스파크가 타임 스탬프에서 갭을 발견했습니다

데이터를 읽을 때 엔트리는 타임 스탬프에 따라 정렬되므로 RDD의 각 파티션은 타임 스탬프에 따라 정렬되어야합니다. 내가 원하는 것은 2 개의 순차적 타임 스탬프 사이의 가장 큰 차이 인 모든 키를 찾아내는 것입니다.

나는이 문제에 대해 오랫동안 생각하고 있으며, 스파크가 제공하는 기능을 고려할 때 어떻게 실현 될지 모르겠다. 내가 보는 문제는 : 단순한지도를 만들 때 주문 정보가 느슨해지기 때문에 가능성이 없다. 나에게도 특정 키에 대한 항목이 너무 많아서 groupByKey가 실패한 것으로 보입니다. 수행하려고하면 java.io.IOException: No space left on device

이 방법에 대한 도움은 대단히 도움이됩니다.

+1

정렬. 그런 다음 데이터를 선형으로 통과합니다. –

+1

나는 스파크에서 이것을하지 않을 것입니다. 본질적으로 선형 패스 인 것은 스파크에 잘 맞지 않습니다. 아마도 DF로 변환하고 창을 사용할 수는 있지만 결코 해보지 않았습니다. https://cran.r-project.org/web/packages/dplyr/vignettes/window-functions.html 관련성이 있습니다. –

+0

@TheArchetypalPaul 더 적합한 인기 기술을 가르쳐 주시겠습니까? – amaik

답변

2

The Archetypal Paul에서 제안한대로 DataFrame과 창 기능을 사용할 수 있습니다. 먼저 필요한 수입 :

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.lag 

다음 데이터는 DataFrame로 변환 할 수 있습니다

val keyTimestampWindow = Window.partitionBy("key").orderBy("timestamp") 

:

val df = rdd.mapValues(_._1).toDF("key", "timestamp") 

우리가 윈도우 정의가 필요합니다 lag 기능을 사용할 수 있으려면 선택할 수 있습니다 :

val withGap = df.withColumn(
    "gap", $"timestamp" - lag("timestamp", 1).over(keyTimestampWindow) 
) 

마지막으로 groupBymax로 :

withGap.groupBy("key").max("gap") 

당신은 키와 타임 스탬프에 의해 종류의 The Archetypal Paul하여 두 번째 조언을 할 수 있습니다 후.

import org.apache.spark.mllib.rdd.RDDFunctions._ 

sorted.sliding(2).collect { 
    case Array((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1) 
}.reduceByKey(Math.max(_, _)) 

같은 생각의 또 다른 변종 종류의 첫번째 다시 분할하고 :

val partitionedAndSorted = rdd 
    .mapValues(_._1) 
    .repartitionAndSortWithinPartitions(
    new org.apache.spark.HashPartitioner(rdd.partitions.size) 
) 
데이터와

val sorted = rdd.mapValues(_._1).sortBy(identity) 

은 키에 의해 슬라이딩 줄임으로써 각 키에 대해 최대 간격을 찾을 수있는이 같은 배열

데이터를 변환 할 수 있습니다.

val lagged = partitionedAndSorted.mapPartitions(_.sliding(2).collect { 
    case Seq((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1) 
}, preservesPartitioning=true) 
012 3,516,

reduceByKey : 키와 타임 스탬프에 의해

lagged.reduceByKey(Math.max(_, _)) 
관련 문제