2016-11-25 6 views
2

올바르게 작동하는 Python으로 Spark 프로그램을 작성했습니다.값이 RDD에 있는지 확인하십시오.

그러나 메모리 소비면에서 비효율적입니다. & 그것을 최적화하려고합니다. AWS EMR에서 실행 중이며 EMR은 너무 많은 메모리를 사용하여 작업을 종료합니다.

Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

나는이 메모리 문제는 나중 단계에서, 내가 어떤 값이 목록에 있는지 테스트 할 필요가 있기 때문에 (즉 .collect 사용하여()) 내 RDDs를 수집하고 여러 인스턴스에 있다는 사실 때문이라고 생각 그 RDD로 만들어 졌는지 아닌지.

그래서, 현재 내 코드는 다음과 같습니다 : 언젠가 나중에 코드

if word in myrdd: 
    mylist.append(word) 

myrdd2 = data2.map(lambda word: (word,1))  \ 
     .reduceByKey(lambda a,b: a+b) \ 
     .filter(lambda (a, b): b >= 5) \ 
     .map(lambda (a,b) : a)   \ 
     .collect() 

if word in myrdd2: 
    mylist2.append(word) 

후 나는이 패턴을 반복하고 여러 번에

myrdd = data.map(lambda word: (word,1))  \ 
     .reduceByKey(lambda a,b: a+b) \ 
     .filter(lambda (a, b): b >= 5) \ 
     .map(lambda (a,b) : a)   \ 
     .collect() 

하고 있습니다.

먼저 RDD를 수집하지 않고 작동

if word in myrdd: 
    do something 

을 할 수있는 방법이 있나요?

rdd.contains()와 같은 기능이 있습니까?

추신 : 나는 메모리에 아무것도 캐싱하지 않을 것입니다. 내 스파크 상황은 다음과 같습니다 YARN에서

jobName = "wordcount" 
sc = SparkContext(appName = jobName) 

...... 
...... 

sc.stop() 
+2

이 .collect 사용하지 않는 : RDD 그냥이 값을 기준으로 필터링 예를 들어, count 또는 first를 사용하여 확인 후 일부 값이 포함되어 있는지 확인하려면, http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey

마지막 : numPartitions 매개 변수를 봐() 그것은 더 큰 데이터 세트를 가지고 있다면 문제를 만들 모든 데이터를 드라이버로 가져올 것입니다. myrdd2.foreachRDD를 사용하여 현재 값이 – Backtrack

+0

word = sc.broadcast ([w1, w2, w3])인지 확인하십시오 valuepresent = myrdd.filter {lambda : x in word} 이것도 이와 비슷한 해결 방법입니다. – Backtrack

답변

1

오류 메시지가 집행 (그리고 운전자가) 메모리 문제가 있기 때문에 collect이 문제가 아니라고 말한다.

먼저 오류 메시지 제안을 따르고 spark.yarn.executor.memoryOverhead을 부스트하십시오. YARN에서 pyspark를 실행할 때 파이썬 작업자가 메모리를 처리하기 위해 조금 더 큰 컨테이너를 할당하도록 Yarn에 지시 할 수 있습니다.

다음으로, 실행 프로그램이 많은 양의 메모리를 필요로하는 조작을보십시오. reduceByKey을 사용합니다. 사용 된 메모리 측면에서 파티션을 더 작게 만들 수 있습니다.

looking_for = "....." 
contains = rdd.filter(lambda a: a == looking_for).count() > 0 
+0

고맙습니다. RDD를 많이 사용하면 집행자에게 부담이됩니까? 예를 들면. myrddalias = myrdd와 같은 일을하면 추억이 늘어나거나 괜찮습니까? – Piyush

+1

그냥 참조를 복사합니다. rdds 자체는 복제되지 않습니다. – Mariusz

+0

문제는 looking_for가 RDD이고 다른 RDD에서 필터를 수행 할 때 하나의 변환을 다른 변환에 넣을 수 없다는 오류가 표시된다는 것입니다. Looking_for는 목록이며 looking_for rdd에 존재하는 특정 값에 기초하여 내 rdd를 정리합니다.정확한 오류 - 예외 : 작업 또는 변환에서 RDD를 브로드 캐스트하거나 RDD를 참조하려고 시도하는 것 같습니다. RDD 변환 및 동작은 드라이버에 의해서만 호출 될 수 있고 다른 변환에서는 호출되지 않을 수 있습니다. – Piyush

관련 문제