2017-12-27 1 views
0

안녕하세요. 왜이 코드가 너무 많은 시간이 걸리는지 이해가 가지 않습니다.Spark read.parquet에 너무 많은 시간이 걸립니다.

val newDataDF = sqlContext.read.parquet("hdfs://192.168.111.70/u01/dw/prod/stage/br/ventas/201711*/*") 

드라이버 프로그램으로 전송되는 바이트가없는 것 같습니다. 맞습니까? read.parquet은 어떻게 작동합니까?

내가 Spark 웹 UI에서 볼 수있는 것은 read.spark가 약 4000 개의 작업 (해당 폴더 내부에 많은 쪽모퉁 파일이 있음)입니다.

답변

2

대부분의 문제는 파일 인덱싱이 DataFrame을로드하는 첫 번째 단계로 발생해야한다는 것입니다. spark.read.parquet은 4000 개의 작업을 실행하므로 많은 파티션 폴더가있을 것이라고 말했습니까? Spark은 HDFS 디렉토리 목록을 얻고 각 폴더의 모든 파일을 재귀 적으로 FileStatus (크기 및 분할)으로 가져옵니다. 효율성을 위해 Spark는 파일을 병렬로 인덱싱하므로 최대한 빨리 코어를 만들 수 있도록하려는 것입니다. 읽으려는 폴더에서 더 명시적일 수도 있고, 파킹 (parquet) 데이터 소스 테이블을 데이터 위에 정의하여로드 할 때마다 파티션 검색을 피할 수 있습니다. 이 시점에서

spark.sql(""" 
create table mydata 
using parquet 
options(
    path 'hdfs://192.168.111.70/u01/dw/prod/stage/br/ventas/201711*/*' 
) 
""") 

spark.sql("msck repair table mydata") 

, 당신은 데이터를 쿼리 할 때 더 이상 파티션 검색을 할 필요가 없습니다,하지만 여전히 당신은 쿼리 폴더 내의 파일에 대한 FileStatus을 얻을해야합니다. 새 파티션을 추가하는 경우 파티션을 명시 적으로 추가하여 repair table

spark.sql(""" 
alter table mydata add partition(foo='bar') 
location 'hdfs://192.168.111.70/u01/dw/prod/stage/br/ventas/201711/foo=bar' 
""") 
관련 문제