2016-07-27 2 views
0

각 파티션으로부터 N 값을 복용I는 다음과 같은 데이터를 구비하고 가정 스파크

scala>DataSortRDD.glom().take(2).head 
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4)) 
scala>DataSortRDD.glom().take(2).tail 
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1))) 

그것은 가정 그런 모든 파티션 데이터에 sortWithinPartitions(col("src").desc,col("rank").desc) (데이터 프레임의 경우이지만 설명하기위한 것)과 같은 것을 사용하여 이미 정렬되어 있습니다.

각 파티션에서 원하는 것은 처음 두 개의 값 (두 개 이상의 값이있는 경우)을 얻습니다. 따라서이 예제에서 각 파티션의 결과는 다음과 같아야합니다

scala>HypotheticalRDD.glom().take(2).head 
Array(("a",5),("b",13),("b",2),("c",4)) 
scala>HypotheticalRDD.glom().take(2).tail 
Array(Array(("a",1),("b",15),("c",3),("c",2))) 

나는 내가 각 파티션의 값을 반복 할 수 방법 mapPartition 기능을하지만 내 마음은 명확하지를 사용하여 첫 번째를 얻을 수 있는지 알고 2. 팁이 있습니까?

: 좀 더 정확하게 말하십시오. 나는 각 파티션에서 데이터가 이미 '문자'에 의해 처음부터 끝까지 '카운트'에 의해 정렬된다는 것을 알고 있습니다. 제 생각으로는 mapPartition의 입력 기능은 파티션을 반복하고 각 문자의 처음 두 값은 yield입니다. 그리고 이것은 매번 .next() 값을 반복하여 검사하여 수행 할 수 있습니다. 당신은에 의해 키에 파티션 ID를 통합하는 mapPartitionsWithIndex을 사용할 수 있습니다

def limit_on_sorted(iterator): 
    oldKey = None 
    cnt = 0 
    while True: 
     elem = iterator.next() 
     if not elem: 
      return 
     curKey = elem[0] 
     if curKey == oldKey: 
      cnt +=1 
      if cnt >= 2: 
       yield None 
     else: 
      oldKey = curKey 
      cnt = 0 
     yield elem 

DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None) 
+0

는 최종 결과가 _partitioned_을 어떻게 문제를 수행합니다 당신이 셔플을 방지하고 각 파티션 내에서 모든 작업을 수행 할 경우? 다른 말로하면 - 동일한 결과를 얻었지만 다른 방식으로 파티션을 나누면 여전히 괜찮은가요? 필터링은 예상대로 원래의 파티션을 기반으로합니다. –

답변

1

당신이 정말로 결과의 분할에 대해 걱정하지 않는다 가정 : 이것은 내가 파이썬에서 그것을 쓸 수있는 방법입니다 이는 당신이 groupBy, 당신은 쉽게 이러한 각 키의 처음 두 항목을 수행 할 수 있습니다

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitionsWithIndex { 
    // add the partition ID into the "key" of every record: 
    case (partitionId, itr) => itr.map { case (k, v) => ((k, partitionId), v) } 
    } 
    .groupByKey() // groups by letter and partition id 
    // take only first two records, and drop partition id 
    .flatMap { case ((k, _), itr) => itr.take(2).toArray.map((k, _)) } 

println(result.collect().toList) 
// prints: 
// List((a,5), (b,15), (b,13), (b,2), (a,1), (c,4), (c,3)) 

주의 마십시오 최종 결과가 (groupByKey이 파티션을 변경하는 경우) 이라고 가정하면이라고 가정합니다.이 작업은 사용자가 수행하려고 시도하는 작업에 중요하지 않습니다 (솔직히 말해서, 나를 벗어납니다).

편집 :

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitions(_.toList.groupBy(_._1).mapValues(_.take(2)).values.flatten.iterator, true) 
+0

답변 해 주셔서 감사합니다. 어쩌면 나는 그 질문에서 언급해야 할 것이다. 이유는'mapPartition'을 사용하고자하는 이유는 효율적인 이유로 파티션 간의 셔플을 피하고자하기 때문입니다. 'groupByKey'를 가진 당신의 솔루션에는 셔플이 있습니다. –

+0

알겠습니다. 셔플 링없이 솔루션을 포함하도록 내 대답 편집 (파티션 보존) –

+0

답변이 정확합니다. 내 관심사는'groupBy (_._1)'에 관한 것이다. 값이 문자별로 정렬되고 카운트 된 후에 그룹화해야하는 이유는 무엇입니까? 나는 내 생각을 더 분명하게하기 위해 나의 질문을 업데이트했다. –

관련 문제