The Archetypal Paul에서 제안한대로 DataFrame
과 창 기능을 사용할 수 있습니다. 먼저 필요한 수입 :
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag
다음 데이터는 DataFrame
로 변환 할 수 있습니다
val keyTimestampWindow = Window.partitionBy("key").orderBy("timestamp")
:
val df = rdd.mapValues(_._1).toDF("key", "timestamp")
우리가 윈도우 정의가 필요합니다 lag
기능을 사용할 수 있으려면 선택할 수 있습니다 :
val withGap = df.withColumn(
"gap", $"timestamp" - lag("timestamp", 1).over(keyTimestampWindow)
)
마지막으로 groupBy
max
로 :
withGap.groupBy("key").max("gap")
당신은 키와 타임 스탬프에 의해 종류의 The Archetypal Paul하여 두 번째 조언을 할 수 있습니다 후.
import org.apache.spark.mllib.rdd.RDDFunctions._
sorted.sliding(2).collect {
case Array((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1)
}.reduceByKey(Math.max(_, _))
같은 생각의 또 다른 변종 종류의 첫번째 다시 분할하고 :
val partitionedAndSorted = rdd
.mapValues(_._1)
.repartitionAndSortWithinPartitions(
new org.apache.spark.HashPartitioner(rdd.partitions.size)
)
데이터와 val sorted = rdd.mapValues(_._1).sortBy(identity)
은 키에 의해 슬라이딩 줄임으로써 각 키에 대해 최대 간격을 찾을 수있는이 같은 배열
데이터를 변환 할 수 있습니다.
val lagged = partitionedAndSorted.mapPartitions(_.sliding(2).collect {
case Seq((key1, val1), (key2, val2)) if key1 == key2 => (key1, val2 - val1)
}, preservesPartitioning=true)
012 3,516,및 reduceByKey
: 키와 타임 스탬프에 의해
lagged.reduceByKey(Math.max(_, _))
정렬. 그런 다음 데이터를 선형으로 통과합니다. –
나는 스파크에서 이것을하지 않을 것입니다. 본질적으로 선형 패스 인 것은 스파크에 잘 맞지 않습니다. 아마도 DF로 변환하고 창을 사용할 수는 있지만 결코 해보지 않았습니다. https://cran.r-project.org/web/packages/dplyr/vignettes/window-functions.html 관련성이 있습니다. –
@TheArchetypalPaul 더 적합한 인기 기술을 가르쳐 주시겠습니까? – amaik