2014-11-04 5 views
11

그래서 파이크 (Pyspark)를 사용하여 스파크를 배우려고합니다. 나는 기능이 mapPartitions 어떻게 작동하는지 알고 싶다. 그것이 Input이 취하는 것과 Output이주는 것입니다. 인터넷에서 적절한 예를 찾을 수 없었습니다. 말하자면, 아래 목록과 같은 목록을 포함하는 RDD 객체가 있습니다.pyspark mapPartitions 함수는 어떻게 작동합니까?

[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ] 

그리고 나는 모든 목록에서 요소 (2)를 제거하려면, 어떻게 내가 mapPartitions를 사용하는 것을 달성 할 것입니다.

답변

17

mapPartition은 파티션의 맵 작업이 아닌 파티션의 맵 조작으로 간주되어야합니다. 그것은 입력의 현재 파티션의 집합입니다 출력은 다른 파티션의 집합이 될 것입니다.

당신이지도는 mapPartition이 RDD 유형의 반복자를 수행해야 통과하여 RDD

기능의 개별 요소를 가지고 다른 또는 같은 종류의 반환과 반복 가능해야한다 전달하는 기능. 귀하의 경우에는

당신은 아마 그냥 그것은 yield를 사용하여 발전기 기능을 mapPartitions를 사용하는 것이 더 쉽습니다

def filterOut2FromPartion(list_of_lists): 
    final_iterator = [] 
    for sub_list in list_of_lists: 
    final_iterator.append([x for x in sub_list if x != 2]) 
    return iter(final_iterator) 

filtered_lists = data.mapPartition(filterOut2FromPartion) 
+0

filterOut2FromPartition f에서 아무 것도 반환하지 않는 이유는 무엇입니까? 기름 부음. 둘째, 파이썬에서 마지막으로 키워드가 있습니까? 제 말은 final_iterator 대신 final.iterator = []라고 말한 것 같습니다. – MetallicPriest

+0

문제를 해결했습니다. – bearrito

+0

이것을 구현하려고했지만 "목록 객체가 반복자가 아닙니다"라는 오류가 발생합니다. 또한, 당신이 [x = 2 인 경우 x에 대해 x에 대해 썼을 때]라고 생각합니다. x! = 2이면 [x는 목록에서 x를 의미합니다.]라고 생각합니다. 거기에 목록을 사용했습니다. – MetallicPriest

18

될 것이다 당신이 mapPartition을 사용하기를 원한다면

def filterOut2(line): 
    return [x for x in line if x != 2] 

filtered_lists = data.map(filterOut2) 

같은 것을하고 싶어 구문 :

def filter_out_2(partition): 
    for element in partition: 
     if element != 2: 
      yield element 

filtered_lists = data.mapPartition(filter_out_2) 
+0

목록을 반환하는 것보다 빠릅니다. – cgreen

+1

@cgreen 파티션에 모든 데이터가 들어 있습니다. 모든 데이터를 목록에로드하려고하는지 확신 할 수 없습니다. 생성자는 데이터를 반복 할 때 목록보다 우선합니다. – Narek

+0

@cgreen 생성기는 처음에는 객체의 전체 목록을 생성하는 대신 필요한 각 항목을 생성하기 때문에 메모리를 덜 사용합니다. 그래서 확실히 적은 메모리를 사용하므로 더 빠를 것입니다. [다음은 Python의 생성자에 대한 좋은 설명입니다.] (https://medium.freecodecamp.org/python-list-comprehensions-vs-generator-expressions-cef70ccb49db). –

관련 문제