2014-11-19 2 views
16

나는 HDFS에 디렉토리의 디렉토리를 가지고 있으며 디렉토리를 반복하고 싶다. SparkContext 객체를 사용하여 Spark에서 쉽게이 작업을 수행 할 수 있습니까?Spark iterate HDFS 디렉토리

+0

당신이 내 하위 디렉토리와 파일의 목록을 얻을처럼 '으로 반복'을 의미? 또는 모든 하위 디렉토리에서 모든 파일을 가져 오는 중입니까? – maasg

+0

목록에서와 같이 모든 하위 디렉토리를 반복합니다. 각 하위 디렉토리에는 여러 가지 방법으로 처리하려는 텍스트 파일이 들어 있습니다. – Jon

답변

30

org.apache.hadoop.fs.FileSystem을 사용할 수 있습니다. 특히,

그리고 스파크와 FileSystem.listFiles([path], true) ...

FileSystem.get(sc.hadoopConfiguration()).listFiles(..., true) 

+1

완벽한, 감사합니다. – Jon

+0

정말 멋지다! [나는이 질문이 있었다] (http://stackoverflow.com/questions/34738296/spark-spark-submit-jars-arguments-wants-comma-list-how-to-declare-a-directory/35550151#35550151), 부여 된, 나는 이것이 원래의 spark-submit 호출에서 작동하지 않을 것이라고 생각한다. – JimLohse

+0

이것이 생성하는 RemoteIterator를 사용하여 파일 목록을 어떻게 만들 수 있습니까? – horatio1701d

5
import org.apache.hadoop.fs.{FileSystem,Path} 

FileSystem.get(sc.hadoopConfiguration).listStatus(new Path("hdfs:///tmp")).foreach(x => println(x.getPath)) 

이 나를 위해 일했다.

스파크 버전 1.5.0 - cdh5.5.2

2
당신은뿐만 아니라 누군가가 관심이 있다면 여기에
val listStatus = org.apache.hadoop.fs.FileSystem.get(new URI(url), sc.hadoopConfiguration).globStatus(new org.apache.hadoop.fs.Path(url)) 

     for (urlStatus <- listStatus) { 
     println("urlStatus get Path:"+urlStatus.getPath()) 
} 
4

이 PySpark 버전의 globStatus 상태로 시도 할 수 있습니다

:

hadoop = sc._jvm.org.apache.hadoop 

fs = hadoop.fs.FileSystem 
conf = hadoop.conf.Configuration() 
path = hadoop.fs.Path('/hivewarehouse/disc_mrt.db/unified_fact/') 

for f in fs.get(conf).listStatus(path): 
    print f.getPath() 

이 특히 나는 disk_mrt.unified_fact 하이브 테이블을 구성하는 모든 파일의 목록을 얻습니다.() 파일 크기를 얻기 위해 여기에 설명되어 있습니다 getLen 같은 FileStatus 객체의

다른 방법 :

Class FileStatus