2016-11-08 4 views
5

나는이 스파크 작업 (마루)에 다음과 같은 입력 데이터 :스파크 dataframe 느린 범위에 가입

Person (millions of rows) 
+---------+----------+---------------+---------------+ 
| name | location |  start  |  end  | 
+---------+----------+---------------+---------------+ 
| Person1 |  1230 | 1478630000001 | 1478630000010 | 
| Person2 |  1230 | 1478630000002 | 1478630000012 | 
| Person2 |  1230 | 1478630000013 | 1478630000020 | 
| Person3 |  3450 | 1478630000001 | 1478630000015 | 
+---------+----------+---------------+---------------+ 


Event (millions of rows) 
+----------+----------+---------------+ 
| event | location | start_time | 
+----------+----------+---------------+ 
| Biking |  1230 | 1478630000005 | 
| Skating |  1230 | 1478630000014 | 
| Baseball |  3450 | 1478630000015 | 
+----------+----------+---------------+ 

나는 다음과 같은 예상되는 결과로 변환 할 필요가 :

[{ 
    "name" : "Biking", 
    "persons" : ["Person1", "Person2"] 
}, 
{ 
    "name" : "Skating", 
    "persons" : ["Person2"] 
}, 
{ 
    "name" : "Baseball", 
    "persons" : ["Person3"] 
}] 

단어 : 결과는이 이벤트에 참여한 사람의 목록이있는 각 이벤트의 목록입니다.

사람은 내가 다른 접근을 시도

Person.start < Event.start_time 
&& Person.end > Event.start_time 
&& Person.location == Event.location 

경우 참가자로 계산하지만, 실제로 작동하는 것 같다 유일한 하나/두 dataframes 다음 그룹에 가입 이벤트에 의해 그들을 집계 입니다. 그러나 참여가 매우 느리며 여러 CPU 코어에서 잘 분산되지 않습니다. 참가에 대한

현재 코드 :이 차이가 있는지

final DataFrame fullFrame = persons.as("persons") 
    .join(events.as("events"), col("persons.location").equalTo(col("events.location")) 
       .and(col("events.start_time").geq(col("persons.start"))) 
       .and(col("events.start_time").leq(col("persons.end"))), "inner"); 

//count to have an action 
fullFrame.count(); 

하는 나는, 스파크 독립 및 Java를 사용하고 있습니다.

누구나 Spark 1.6.2로이 문제를 해결하는 방법을 더 잘 알고 있습니까?

답변

1

범위 조인은 다음 필터 단계에서 교차 제품으로 수행됩니다. 잠재적으로 더 나은 해결책은 브로드 캐스트에 작은 events 테이블을 매핑 한 다음 persons 테이블을 매핑하는 것입니다.지도 내부에서 조인 조건을 확인하고 각각의 결과를 생성하십시오.

+0

실제로 "브로드 캐스트 조인"을 사용하면이 기능이 많이 향상되었습니다. 이벤트 테이블을 메모리에 들어있는 여러 개의 작은 청크로 나누어 하나씩 추가해야했습니다. –

관련 문제