2016-07-05 1 views
0

3 개의 열이있는 테이블이 있다고 가정합니다. user, time, place. 각 사용자에 대한 전환 시간이 특정 임계 값 이하인 경우 place 전환 그래프를 만들고 싶습니다. 즉 사용자에 의해 그룹화하고 시간 순서에 따라 연속적인 행 (i, j)으로부터 에지 place_i 내지 place_j의 유향 그래프를 생성하고, (place_i, place_j)의 각 인스턴스에 대한 에지의 가중치를 증가시킨다. 소스 테이블의 행은 특별한 순서가 아닙니다. 이것이 파이썬 API로 가능합니까? 그렇지 않다면 스칼라에서 어떻게 할 수 있습니까?시계열 테이블에서 스파크 그래프 만들기

샘플 테이블 : 우리는 시간 임계 값 제한을 무시하면

user,time,place 
joe,1,A 
jack,1,B 
joe,2,B 
jack,3,C 
joe,4,D 
jane,5,A 
jane,1,B 

이 그래프는, {(A, B), (B에서 4 개 정점 (A, B, C, D)와 가장자리가 있어야 C), (B, D), (B, A)}.

답변

0

groupBy 다음에 flatMapGroups을 사용했습니다. 지도 내부에서 iterator를 정렬하기 위해 목록에 인스턴스화했습니다. 그런 다음 sliding을 사용하여 쌍으로 목록을 반복하고 가장자리를 만들었습니다.

ds.groupBy(_.user).flatMapGroups((uid, iter) => 
val result = ListBuffer[MySchema]() 
iter.toList.sortBy(_.time).sliding(2).foreach { case List(x,y => 
    result += MySchema(uid, x.place, if (y.time - x.time < Threshold) y.place else 0) 
} 
result.toList 
}.as[AggSchema].groupBy($"src, $"dst).count.as[Schema] 
0

geodata 작업에 관한 내용은 Advanced Analytics with Spark입니다. 8 장입니다. 저자는 질문에 관련된 세션 화에 관한 토론을 시작했습니다. 당신이 시간에 서로 가까이에 각 특정 사용자의 모든 전환을위한

, 샌디 RYZA 통화 완전히 스파크 코어에서 구현되지 않습니다 SecondarySort, 그러나 그는 그와 당신이 할 수있는 몇 가지 코드를 제공 그것을 찾을 수 있습니다 here

나는 당신이 그 장을 읽고 코드를 따라한다면, 당신은 당신이 찾고있는 것을 얻을 것이라고 믿습니다.