2017-05-08 1 views
0

wholeTextFiles을 사용하여 디렉토리의 각 파일을 읽습니다. 그 후을 사용하여 rdd의 각 요소에 대한 함수를 호출합니다. 전체 프로그램은 각 파일의 단지 50 줄만 사용합니다. 코드는 다음과 같습니다 :apache spark : 디렉토리에서 큰 파일 읽기

def processFiles(fileNameContentsPair): 
    fileName= fileNameContentsPair[0] 
    result = "\n\n"+fileName 
    resultEr = "\n\n"+fileName 
    input = StringIO.StringIO(fileNameContentsPair[1]) 
    reader = csv.reader(input,strict=True) 

    try: 
     i=0 
     for row in reader: 
     if i==50: 
      break 
     // do some processing and get result string 
     i=i+1 
    except csv.Error as e: 
    resultEr = resultEr +"error occured\n\n" 
    return resultEr 
    return result 



if __name__ == "__main__": 
    inputFile = sys.argv[1] 
    outputFile = sys.argv[2] 
    sc = SparkContext(appName = "SomeApp") 
    resultRDD = sc.wholeTextFiles(inputFile).map(processFiles) 
    resultRDD.saveAsTextFile(outputFile) 

디렉토리의 각 파일의 크기는 내 경우에는 매우 클 수 때문에 wholeTextFiles API의 이러한 이유로 사용의 경우 비효율적 일 것입니다. 이 작업을 수행하는 효율적인 방법이 있습니까? 디렉토리의 각 파일에 대해 하나씩 반복하는 것을 생각할 수 있지만 비효율적 인 것으로 보입니다. 나는 불꽃에 처음이다. 이 작업을 효율적으로 수행 할 수 있는지 알려 주시기 바랍니다.

+1

각 파일의 크기는 얼마나 큽니까? 파일을 더 작은 파일로 분할 할 수 없습니까? –

+0

@DatTran 각 파일의 크기는 몇 GBS에있을 수 있으며 디렉토리의 파일 수는 100 개가 넘을 수 있습니다. 파일 분할에 대해 생각할 수있는 한 가지 방법은 각 파일을 하나씩 나누고 각 파일에서 첫 번째 분할을 가져 와서 유지하는 것입니다 임시 디렉터리에. 그 후에 우리는'wholeTextFiles'을 그 임시 디렉토리에 적용 할 수 있습니다. 이 방법으로 파일을 분할하는 것이 좋습니다? 그렇지 않다면 파일 분할 방법을 알려주십시오. – mcurious

답변

1

오케이 내가 제안하는 것은 작은 덩어리로 파일을 먼저 분할하는 것입니다. 몇 Gbs는 지연의 주요 원인을 읽기에는 너무 큽니다. 데이터가 HDFS에있는 경우 각 파일에 대해 64MB와 같은 것을 사용할 수 있습니다. 그렇지 않으면 파일 실행자의 수에 따라 파일 크기를 시험해야합니다. 따라서 더 작은 조각이 있다면 병렬성을 높이기 위해이 조각을 늘릴 수 있습니다. 마찬가지로 processFiles 함수가 CPU를 많이 사용하지 않는 것처럼 파티션을 조정하여 조정할 수도 있습니다. 많은 executor의 유일한 문제점은 I/O가 증가하지만 파일 크기가 작 으면 문제가 많지 않아야한다는 것입니다.

그런데 임시 디렉토리가 필요하지 않습니다. wholeTextFiles*과 같은 와일드 카드를 지원합니다. 또한 S3를 파일 시스템으로 사용하면 작은 파일이 너무 많으면 큰 파일 대신 읽을 때 병목 현상이 발생할 수 있습니다. 그래서 이것은 사소한 것이 아닙니다.

희망이 도움이됩니다.