2016-11-09 1 views
0

큰 데이터의 구성 요소 연결을 처리 할 때 스파크에서 병합하는 것이 매우 어렵습니다.데이터가 너무 큰 경우 Spark의 구성 요소를 연결하는 방법

내 연구의 데이터 구조는 RDD[Array[Int]]으로 단순화 될 수 있습니다. 예를 들어 : 그들은 교차로, 어떤 교차로없이 배열 결말 설정 한 경우

RDD[Array(1,2,3), Array(1,4), Array(5,6), Array(5,6,7,8), Array(9), Array(1)]

목적은 두 개의 배열을 병합하는 것입니다. 따라서 병합 후에는 같아야

RDD[Array(1,2,3,4), Array(5,6,7,8), Array(9)]

문제 그래프 너 한테의 프레 겔 골격에서 연결 요소의 일종이다. 한 가지 해결책은 Array 두 개 사이의 가장자리 연결을 찾아 데카르트 제품을 사용하여 병합 한 다음 병합하는 것입니다. 그러나 제 경우에는 300KArray이고 전체 크기는 1G입니다. 따라서 시간과 메모리 복잡성은 대략 300K * 300K입니다. Mac Pro에서 spark 프로그램을 실행할 때 완전히 멈추었습니다.

Baiscally, 그것은 같은 것입니다 :

enter image description here

감사

+1

미안하지만, 1G는 빅 데이터는 아니지만 ... 어떻게이 너무 커서? 대부분의 Mac에는 요즘 8G RAM이 있습니다. 맞습니까? –

+0

@ cricket_007 예, 1G는 큰 데이터가 아니지만 결국에는 알고리즘을 큰 데이터로 확장합니다. 내 Mac에는 진정으로 8G RAM이 있습니다. 따라서 Mac에서이 데이터 세트를 실행할 수 없다면 알 고가 충분히 바람직하지 않다는 것을 의미합니다. –

+0

나는 특별한 코드를 염두에두고 있지는 않지만, 스파크가 실제로이 특정 알고리즘/태스크를위한 것인지 확실하지는 않습니다. 필자는 데이터 프레임과 SparkSQL과 비슷한 작업을 수행하고 Windowing 함수를 사용했지만 연속 번호가 보장되었습니다. –

답변

0

여기 내 솔루션입니다. 괜찮은 편이 아니 겠지만, 작은 데이터에 대해서만 작동합니다. 대용량 데이터에 적용 할 수 있는지 여부는 추가적인 증거가 필요합니다.

def mergeCanopy(canopies:RDD[Array[Int]]):Array[Array[Int]] = { /* try to merge two canopies */ val s = Set[Array[Int]]() val c = canopies.aggregate(s)(mergeOrAppend, _++_) return c.toArray

def mergeOrAppend(disjoint: Set[Array[Int]], cluster: Array[Int]):Set[Array[Int]] = { var disjoints = disjoint for (clus <- disjoint) { if (clus.toSet.&(cluster.toSet) != Set()) { disjoints += (clus.toSet++cluster.toSet).toArray disjoints -= clus return disjoints } } disjoints += cluster return disjoints }

관련 문제