2017-10-29 1 views
0

보통 우리는 하나의 텍스트 파일을 java 파일의 입력으로 제공합니다 (예 : 간단한 단어 수의 경우). 대신 이제는 100 개의 csv 파일이 있습니다. (모든 파일을 단일 파일로 병합 할 수는 없습니다.) 주어진 100 개 주식의 최대/최소 주가 변동성을 예측하려고하므로 각 CSV 파일은 고유합니다. 그래서 csv 파일의 전체 폴더를 java 프로그램의 입력 스트림으로 지정하는 방법.Hadoop Dfs 및 mapreduce에 입력으로 여러 개의 파일이 있습니다.

+0

맵리 듀스는 이미 ... –

+0

을 받아? /* 연산자를 사용합니까? –

+0

어떻게 맵리 듀스는 폴더를 사용하기 위해 당신은, 당신은 위에 하이브 또는 스파크 쿼리를 사용하여 그대로 HDFS에 CSV를 둘 것, 이상적으로 폴더 – user2336157

답변

4

용액 1이 해결을 위해, 우리는 FileInputFormat.addInputPaths() 메소드를 사용하여, 다수의 입력 쉼표로 구분하여 수행 할 수 있으며 우리는

FileInputFormat.addInputPaths(“file0,file1,....”) 

또는

가정하자로 쓸 수 2 파일을 분석해야하며 Facebook 및 YouTube 서비스를 사용하는 사람들의 목록 (이 중 하나의 출력 파일 필요)

두 개의 파일 facebook.txt 및 youtube.txt가 있습니다.

Path YoutubePath = new Path(args[0]); 
Path FacebookPath = new Path(args[1]); 
Path outputPath = new Path(args[2]); 
MultipleInputs.addInputPath(job, FacebookPath, TextInputFormat.class, JoinFacebookMapper.class); 
MultipleInputs.addInputPath(job, YoutubePath, TextInputFormat.class, YoutubeMapper.class); 
FileOutputFormat.setOutputPath(job, outputPath); 

코드에 다음 행을 추가하면 단일 맵 축소 작업 내에서 여러 파일이 전달됩니다.

또는

당신은 인수 여기

0

HDFS에 많은 파일을 복사하여 병합 내 테스트 코드, 그것은 또한 내가 그에게 도움이 될 것 같아, 다른 파일 형식을 필터링 할 수있다로 전체 폴더를 통과 할 수있다 너!

public class FilesMergeToHDFS { 
private static FileSystem fs = null; 
private static FileSystem local = null; 

public static void main(String[] args) throws IOException, URISyntaxException { 
    // TODO Auto-generated method stub 
    list(); 
} 

private static void list() throws IOException, URISyntaxException { 
    // TODO Auto-generated method stub 

      Configuration conf = new Configuration(); 
      URI uri = new URI("hdfs://xxx:9000");//HDFS address 
      fs = FileSystem.get(uri,conf); 


      local = FileSystem.getLocal(conf); 

      FileStatus[] dirsStatus = local.globStatus(new Path("E://data/73/*"), new RegexExcludePathFilter("^.*svn$")); 
      Path[] dirs = FileUtil.stat2Paths(dirsStatus); 
      FSDataInputStream in = null; 
      FSDataOutputStream out = null; 
      for(Path p:dirs){ 
        //upload 
       String filename = p.getName(); 
       FileStatus[] localStatus = local.globStatus(new Path(p+"/*"),new RegexAcceptPathFilter("^.*txt$")); 
       Path[] listedPaths = FileUtil.stat2Paths(localStatus); 
       //set outputpath 
       Path block = new Path("hdfs://hadoop:9000/mergehdfs/filesmerge/"+filename+".txt"); 
       out =fs.create(block); 
       for(Path path:listedPaths){ 
        in = local.open(path); 
        IOUtils.copyBytes(in, out, 4096, false); // copydata 
        in.close(); 
       } 
       if (out != null) { 
        out.close(); 
       } 
      } 
} 

private static class RegexAcceptPathFilter implements PathFilter { 

private final String regex; 

    public RegexAcceptPathFilter(String regex) { 
     this.regex = regex; 
    } 

    @Override 
    public boolean accept(Path path) { 
     // TODO Auto-generated method stub 
     boolean flag = path.toString().matches(regex); 
     return flag; 
    } 

} 

private static class RegexExcludePathFilter implements PathFilter { 
private final String regex; 
    public RegexExcludePathFilter (String regex) { 
     this.regex = regex; 
    } 

    @Override 
    public boolean accept(Path path) { 
     // TODO Auto-generated method stub 
     boolean flag = path.toString().matches(regex); 
     return !flag; 
    } 
} 
} 
관련 문제