각 파티션으로부터 N 값을 복용I는 다음과 같은 데이터를 구비하고 가정 스파크
scala>DataSortRDD.glom().take(2).head
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4))
scala>DataSortRDD.glom().take(2).tail
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1)))
그것은 가정 그런 모든 파티션 데이터에 sortWithinPartitions(col("src").desc,col("rank").desc)
(데이터 프레임의 경우이지만 설명하기위한 것)과 같은 것을 사용하여 이미 정렬되어 있습니다.
각 파티션에서 원하는 것은 처음 두 개의 값 (두 개 이상의 값이있는 경우)을 얻습니다. 따라서이 예제에서 각 파티션의 결과는 다음과 같아야합니다
scala>HypotheticalRDD.glom().take(2).head
Array(("a",5),("b",13),("b",2),("c",4))
scala>HypotheticalRDD.glom().take(2).tail
Array(Array(("a",1),("b",15),("c",3),("c",2)))
나는 내가 각 파티션의 값을 반복 할 수 방법 mapPartition
기능을하지만 내 마음은 명확하지를 사용하여 첫 번째를 얻을 수 있는지 알고 2. 팁이 있습니까?
: 좀 더 정확하게 말하십시오. 나는 각 파티션에서 데이터가 이미 '문자'에 의해 처음부터 끝까지 '카운트'에 의해 정렬된다는 것을 알고 있습니다. 제 생각으로는 mapPartition
의 입력 기능은 파티션을 반복하고 각 문자의 처음 두 값은 yield
입니다. 그리고 이것은 매번 .next()
값을 반복하여 검사하여 수행 할 수 있습니다. 당신은에 의해 키에 파티션 ID를 통합하는 mapPartitionsWithIndex
을 사용할 수 있습니다
def limit_on_sorted(iterator):
oldKey = None
cnt = 0
while True:
elem = iterator.next()
if not elem:
return
curKey = elem[0]
if curKey == oldKey:
cnt +=1
if cnt >= 2:
yield None
else:
oldKey = curKey
cnt = 0
yield elem
DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None)
는 최종 결과가 _partitioned_을 어떻게 문제를 수행합니다 당신이 셔플을 방지하고 각 파티션 내에서 모든 작업을 수행 할 경우? 다른 말로하면 - 동일한 결과를 얻었지만 다른 방식으로 파티션을 나누면 여전히 괜찮은가요? 필터링은 예상대로 원래의 파티션을 기반으로합니다. –