2016-08-15 2 views
3

, 내가이 불꽃 RDD으로 일부 대형 텍스트 파일을로드하는 PySpark 작업을스파크. ~ 1 억 행. 크기가 Integer.MAX_VALUE를 초과합니까?

(이 작은 세 개의 기계 아마존 EMR 클러스터에서 실행되는 스파크 2.0이다)을 성공적으로 158598155를 반환 카운트()한다.

그런 다음 작업은 각 행을 pyspark.sql.Row 인스턴스로 구문 분석하고 DataFrame을 빌드 한 다음 다른 수를 계산합니다. DataFrame의 두 번째 count()는 Spark 내부 코드 Size exceeds Integer.MAX_VALUE에서 예외를 발생시킵니다. 이는 더 적은 양의 데이터와 함께 작동합니다. 누군가가 왜/어떻게 이런 일이 일어날 지 설명 할 수 있습니까?

org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 1.0 failed 4 times, most recent failure: Lost task 22.3 in stage 1.0 (TID 77, ip-172-31-97-24.us-west-2.compute.internal): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) 
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) 
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) 
    at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439) 
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:604) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

PySpark 코드 :

raw_rdd = spark_context.textFile(full_source_path) 

# DEBUG: This call to count() is expensive 
# This count succeeds and returns 158,598,155 
logger.info("raw_rdd count = %d", raw_rdd.count()) 
logger.info("completed getting raw_rdd count!!!!!!!") 

row_rdd = raw_rdd.map(row_parse_function).filter(bool) 
data_frame = spark_sql_context.createDataFrame(row_rdd, MySchemaStructType) 

data_frame.cache() 
# This will trigger the Spark internal error 
logger.info("row count = %d", data_frame.count()) 
+0

두 번째'counts()'의 예상 결과는 무엇입니까? – gsamaras

+0

오류가 발생한 스 니펫을 공유하십시오. – javadba

+0

@ gsamaras, 기본적으로 첫 번째 계산과 동일합니다. – clay

답변

0

오류가 자체 data_frame.count()에서하지 오는 아니라 row_parse_function를 통해 행을 구문 분석하기 때문에 MySchemaStructType에 지정된 정수 타입에 맞지 않는 일부 정수를 산출한다.

스키마의 정수 유형을 pyspark.sql.types.LongType()으로 늘리거나 스키마를 생략하여 spark이 유형을 추정하도록하십시오 (그러나 속도가 느려질 수 있습니다).

+0

이제'row_parse_function'은 바운드 값을 체크하고 있습니다. 범위를 벗어난 구문 분석 오류로 인해 의미가없는 'FileChannelImpl.map'에서 예외가 발생합니다. – clay

+0

@clay'row_parse_function'과'MySchemaStructType'을 게시 할 수 있습니까? – antonislav

관련 문제