2017-01-27 1 views
2

각 파일에는 여러 줄에 걸쳐 텍스트가 포함 된 많은 파일이 하나의 디렉토리에 있습니다. 는 현재 내가 스파크 데이터 세트 (> 2.0)각 파일을 데이터 세트 행에 복사합니다.

val ddf = spark.read.text("file:///input/*") 

그러나,이 각 행은 라인이 아닌 파일 데이터 세트를 생성에 모든 파일을 읽을 다음 코드를 사용합니다. 데이터 집합의 행마다 각 파일을 (문자열로) 갖고 싶습니다.

각 파일을 반복하지 않고 별도로 RDD으로 읽지 않고 어떻게이 작업을 수행 할 수 있습니까?

답변

3

사용 wholeTextFiles()

val rdd: RDD[(String, String)] = spark.sparkContext 
             .wholeTextFiles("file/path/to/read/as/rdd") 
SparkContext

SparkContext.wholeTextFiles는 여러 작은 텍스트 파일이 들어있는 디렉토리를 읽을 수 및 (파일 이름, 내용) 쌍로 각각 반환합니다. 이것은 textfile과는 대조적으로 각 파일에서 한 줄에 하나의 레코드로 을 반환합니다.

+1

아름다운 대답, 내가 찾던 구조를 감안할 때. – Tim

1

@ mrsrinivas의 대답에 대한 대안은 input_file_name을 그룹화하는 것입니다.

[email protected]>~/junk/so> find .   
. 
./d2 
./d2/t.txt 
./d1 
./d1/t.txt 
[email protected]>~/junk/so> cat */*.txt 
d1_1 
d1_2 
d2_1 
d2_2 

우리가 같은 입력 파일을 기반으로 목록을 수집 할 수 있습니다 :

scala> val ddf = spark.read.textFile("file:///home/evan/junk/so/*"). 
    | select($"value", input_file_name as "fName") 
ddf: org.apache.spark.sql.DataFrame = [value: string, fName: string] 

scala> ddf.show(false) 
+-----+----------------------------------+ 
|value|fName        | 
+-----+----------------------------------+ 
|d2_1 |file:///home/evan/junk/so/d2/t.txt| 
|d2_2 |file:///home/evan/junk/so/d2/t.txt| 
|d1_1 |file:///home/evan/junk/so/d1/t.txt| 
|d1_2 |file:///home/evan/junk/so/d1/t.txt| 
+-----+----------------------------------+ 

scala> ddf.groupBy("fName").agg(collect_list($"value") as "value"). 
    | drop("fName").show 
+------------+ 
|  value| 
+------------+ 
|[d1_1, d1_2]| 
|[d2_1, d2_2]| 
+------------+ 
+2

'groupBy'에서'input_file_name'을 사용하는 좋은 방법은 그 기능을 알지 못했습니다 :). 그래도 @mrsrinivas 대답은 조금 깔끔합니다. – Tim

+0

확실하게, 내 대답은'DataFrame' 다. 그러나이 경우에는'RDD'를 사용하는 것이 조금 더 좋다. –

+0

사실, 항상'toDF' 함수가있다. – Tim

관련 문제