내 입력 폴더에 200 개의 파일이 있습니다. MultipleOutputs는 각 파일 ("map.input.file"을 사용하여 식별 됨)에서 파싱 된 입력을 같은 이름의 출력 파일로 작성하도록합니다. 따라서 수행 할 집계가 없으므로 0 축소 기 옵션 (conf .setNumReduceTasks (0)). 이상적으로, 나는 200 개의 출력 파일을 얻어야한다.MultipleOutputs Zero Reducer
내 출력에는 약 5000 개 이상의 파일이 있습니다. 각 파일에는 스트리밍 출력이 한 줄만 포함됩니다. 분명히 집계가 아닙니다. 내 가정은 이상적으로 제로 감속기 - 매퍼 출력을 집계해야합니다.
도움을 주시면 감사하겠습니다. 감사!
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MultipleOutputEx.class);
conf.setJobName("Duration Count");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setNumReduceTasks(0);
conf.setMapperClass(MultipleOutputExMapper.class);
conf.setReducerClass(MultipleOutputExReducer.class);
conf.setMapOutputKeyClass(NullWritable.class);
MultipleOutputs.addMultiNamedOutput(conf,"mofiles", TextOutputFormat.class, NullWritable.class, Text.class);
JobClient.runJob(conf);
}
그리고 내 매퍼 클래스는,
public class MultipleOutputExMapper extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, Text> {
MultipleOutputs mos = null;
Text fileKey = new Text();
String line = "";
private JobConf conf;
@Override
public void configure(JobConf conf) {
this.conf = conf;
mos = new MultipleOutputs(conf);
}
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
try {
String filename = conf.get("map.input.file");
fileKey.set(filename);
OutputCollector<NullWritable, Text> collector = mos.getCollector(
"mofiles", key.toString(), reporter);
collector.collect(NullWritable.get(), value);
} catch (ArrayIndexOutOfBoundsException E) {
E.printStackTrace();
} catch (Exception E) {
System.out.println(line);
E.printStackTrace();
}
}
@Override
public void close() throws IOException {
mos.close();
}
얼마나 많은 고유 키가 있습니까? 각 키에 대해 새 파일 이름을 만드는 것처럼 보입니다. – climbage
고마워요 @Climbage :) – Learner