2016-09-23 5 views
0

저는 apache spark와 scala를 처음 사용하고 예제를 통해 그것을 배우려고합니다. 나는 시내 버스 위치 (행 번호, 시간, 경도, latitute)의 간단한 세트있어 : 몇 가지 변환 후RDD에서 가장 가까운 기록을 찾는 방법

9, 23/09/16 10:20, 123.3, 123.3 
9, 23/09/16 10:21, 125.3, 125.3 

을 나는 개체의 RDD를 얻을 :

,571,996 :
class BusPosition(val line: String, val time: DateTime, val position: Point) 

다음으로 내가 좋아하는, 프레임의 RDD을 갖고 싶어3210

각 프레임은 가장 가까운 두 개의 시간 기록을 결합합니다. 누구든지 그런 세트를 만들고 가장 가까운 이웃을 찾는 법을 알고 있습니까? 검색했지만 적절한 답변을 찾을 수 없습니다.

+0

일부 옵션 : a) 파티션 재 분류, 파티션 분류 및 선형 스캔 수행, b) 래그/리드와 함께 창 기능 사용. – zero323

+0

고맙지 만 완전히 얻지는 못했습니다. 간단한 코드 예제를 알려주시겠습니까? – Hejwo

답변

1

이 문제에 접근 할 수있는 방법이 많이 있습니다.

import java.sql.Timestamp 

case class Point(longitude: Double, latitute: Double) 
case class BusPosition(line: String, time: Timestamp, position: Point) 

case class BusFrame(
    line: String, time1: Timestamp, time2: Timestamp, 
    position1: Point, position2: Point) 

val data = Seq(
    BusPosition(
    "9", Timestamp.valueOf("2016-09-23 10:20:00"), Point(123.3, 123.3)), 
    BusPosition(
    "9", Timestamp.valueOf("2016-09-23 10:21:00"), Point(125.3, 125.3)), 
    BusPosition(
    "7", Timestamp.valueOf("2015-08-01 00:20:12"), Point(123.9, 122.9)), 
    BusPosition(
    "7", Timestamp.valueOf("2015-08-01 00:00:22"), Point(124.0, 122.6)) 
).toDS() 
  1. 윈도우 함수 :

    import org.apache.spark.sql.expressions.Window 
    
    val w = Window.partitionBy("line").orderBy("time1") 
    
    val time2 = lag($"time1", 1).over(w).alias("time2") 
    val position2 = lag($"position1", 1).over(w).alias("position2") 
    
    data.toDF("line", "time1", "position1") 
        .select($"*", time2, position2) 
        .na.drop(Array("time2", "position2")) 
        .as[BusFrame] 
    
  2. 슬라이딩 창

    import org.apache.spark.mllib.rdd.RDDFunctions._ 
    
    data.orderBy("line", "time").rdd.sliding(2).collect { 
        case Array(BusPosition(l1, t1, p1), BusPosition(l2, t2, p2)) if l1 == l2 => 
        BusFrame(...) 
    } 
    
  3. 사용자 정의 파티션 프로그램 및

    주문 먼저 스파크 SQL과 더 나은 상호 운용성을 위해 클래스를 조정할 수 있습니다
    import org.apache.spark.Partitioner 
    import scala.math.Ordering 
    
    class LineTimestampPartitioner(n: Int) extends Partitioner { 
        def numPartitions: Int = n 
        def getPartition(key: Any): Int = ??? // Partition based on line 
    } 
    
    // Order by line first, timestamp second 
    implicit val lineTimestampOrd: Ordering[(String, java.sql.Timestamp)] = ??? 
    
    data.rdd 
        .keyBy(bp => (bp.line, bp.time)) 
        .repartitionAndSortWithinPartitions(new LineTimestampPartitioner(n)) 
        .values 
        .mapPartitions(_.sliding(2).collect { 
        ??? // Like for mllib sliding 
        }) 
    
+0

제로 323을 고맙습니다 !!! 신난다 답변 – Hejwo

관련 문제