5
나는이 스파크 작업 (마루)에 다음과 같은 입력 데이터 :스파크 dataframe 느린 범위에 가입
Person (millions of rows)
+---------+----------+---------------+---------------+
| name | location | start | end |
+---------+----------+---------------+---------------+
| Person1 | 1230 | 1478630000001 | 1478630000010 |
| Person2 | 1230 | 1478630000002 | 1478630000012 |
| Person2 | 1230 | 1478630000013 | 1478630000020 |
| Person3 | 3450 | 1478630000001 | 1478630000015 |
+---------+----------+---------------+---------------+
Event (millions of rows)
+----------+----------+---------------+
| event | location | start_time |
+----------+----------+---------------+
| Biking | 1230 | 1478630000005 |
| Skating | 1230 | 1478630000014 |
| Baseball | 3450 | 1478630000015 |
+----------+----------+---------------+
나는 다음과 같은 예상되는 결과로 변환 할 필요가 :
[{
"name" : "Biking",
"persons" : ["Person1", "Person2"]
},
{
"name" : "Skating",
"persons" : ["Person2"]
},
{
"name" : "Baseball",
"persons" : ["Person3"]
}]
단어 : 결과는이 이벤트에 참여한 사람의 목록이있는 각 이벤트의 목록입니다.
사람은 내가 다른 접근을 시도
Person.start < Event.start_time
&& Person.end > Event.start_time
&& Person.location == Event.location
경우 참가자로 계산하지만, 실제로 작동하는 것 같다 유일한 하나/두 dataframes 다음 그룹에 가입 이벤트에 의해 그들을 집계 입니다. 그러나 참여가 매우 느리며 여러 CPU 코어에서 잘 분산되지 않습니다. 참가에 대한
현재 코드 :이 차이가 있는지
final DataFrame fullFrame = persons.as("persons")
.join(events.as("events"), col("persons.location").equalTo(col("events.location"))
.and(col("events.start_time").geq(col("persons.start")))
.and(col("events.start_time").leq(col("persons.end"))), "inner");
//count to have an action
fullFrame.count();
하는 나는, 스파크 독립 및 Java를 사용하고 있습니다.
누구나 Spark 1.6.2로이 문제를 해결하는 방법을 더 잘 알고 있습니까?
실제로 "브로드 캐스트 조인"을 사용하면이 기능이 많이 향상되었습니다. 이벤트 테이블을 메모리에 들어있는 여러 개의 작은 청크로 나누어 하나씩 추가해야했습니다. –