1 주일 이상 문제를 겪고 나서 문제의 원인을 찾은 것 같습니다.
동일한 문제로 어려움을 겪고있는 경우 시작하는 것이 좋은 점은 Spark 인스턴스가 올바르게 구성되어 있는지 확인하는 것입니다. 그것에 대해 큰 cloudera blog post가 있습니다.
그러나 구성에 문제가없는 경우 (나와 같은 경우) 문제는 코드 내의 어딘가에 있습니다. 문제는 때로는 다른 이유로 인해 (조인, 데이터 소스의 고르지 않은 파티션 등) 작업중인 RDD가 2-3 개의 파티션에 많은 데이터를 가져오고 나머지 파티션에 데이터가 거의 없음으로 인해 발생합니다.
네트워크 전반의 데이터 셔플을 줄이기 위해 Spark은 각 실행 프로그램이 해당 노드에서 로컬로 상주하는 데이터를 처리하도록 시도합니다. 따라서 2-3 명의 집행자는 오랫동안 일하고 있으며 나머지 집행자는 몇 밀리 초 만에 데이터를 처리합니다. 그래서 위의 질문에서 설명한 문제가 발생했습니다.
이 문제를 디버깅하는 방법은 먼저 RDD의 파티션 크기를 확인하는 것입니다. 하나 이상의 파티션이 다른 파티션과 비교하여 매우 큰 경우 다음 단계는 큰 파티션에서 레코드를 찾는 것입니다. 특히 비뚤어진 조인의 경우 어떤 키가 왜곡되는지 알 수 있습니다. ,
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
그것은 나에게 최소 및 최대 파티션 크기를 제공하고,이 둘 사이의 차이가 5 회 이상이면, 그것은 가장 큰 파티션의 5 개 요소를 인쇄 :이 디버깅 할 수있는 작은 함수를 작성했습니다 무슨 일이 일어나고 있는지 대략적인 아이디어를 줄 수 있어야합니다.
문제가 파티션에 왜곡되어 있다고 생각되면 비뚤어진 키를 없애거나 데이터 프레임을 다시 파티션하여 균등하게 분산되도록 할 수 있습니다. 이제 모든 유언 집행자가 동등한 시간 동안 일하게 될 것이고 훨씬 덜 무서운 OOM 오류를 보게 될 것이고 처리 또한 상당히 빨라질 것입니다.
이것은 스파크 초보자로서의 나의 두 센트입니다. 스파크 전문가들은이 문제에 좀 더 많은 것을 추가 할 수 있기를 바랍니다. 스파크 세계에서 많은 새내기가 비슷한 종류의 문제에 직면 해 있다고 생각하기 때문입니다.