0
하나의 mapreduce 작업을 실행할 때 문제가 있습니다. 내 맵 감속 작업의 일환으로 여러 맵 메서드와 단일 감속기 메서드가 포함 된 mapreduce 조인을 사용하고 있습니다.(Hadoop) : mapreduce 작업을 실행하는 중에 reduce 메소드가 실행/호출되지 않습니다.
내 맵 메서드가 모두 실행되고 있지만 내 감속기가 실행되지 않거나 내 드라이버 클래스에서 호출되지 않습니다.
이 때문에 최종 출력에는지도 단계에서 수집 된 데이터 만 있습니다.
감축 단계에서 잘못된 입력 및 출력 값을 사용하고 있습니까? 지도와 축소 단계 사이에 입/출력 불일치가 있습니까?
도와주세요.
다음내 코드는 ...
public class CompareInputTest extends Configured implements Tool {
public static class FirstFileInputMapperTest extends Mapper<LongWritable,Text,Text,Text>{
private Text word = new Text();
private String keyData,data,sourceTag = "S1$";
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] values = value.toString().split(";");
keyData = values[1];
data = values[2];
context.write(new Text(keyData), new Text(data+sourceTag));
}
}
public static class SecondFileInputMapperTest extends Mapper<LongWritable,Text,Text,Text>{
private Text word = new Text();
private String keyData,data,sourceTag = "S2$";
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] values = value.toString().split(";");
keyData = values[1];
data = values[2];
context.write(new Text(keyData), new Text(data+sourceTag));
}
}
public static class CounterReducerTest extends Reducer
{
private String status1, status2;
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
System.out.println("in reducer");
for(Text value:values)
{
String splitVals[] = currValue.split("$");
System.out.println("in reducer");
/*
* identifying the record source that corresponds to a commonkey and
* parses the values accordingly
*/
if (splitVals[0].equals("S1")) {
status1 = splitVals[1] != null ? splitVals[1].trim(): "status1";
} else if (splitVals[0].equals("S2")) {
// getting the file2 and using the same to obtain the Message
status2 = splitVals[2] != null ? splitVals[2].trim(): "status2";
}
}
context.write(key, new Text(status1+"$$$"));
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CompareInputTest(),
args);
System.exit(res);
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "count");
job.setJarByClass(CompareInputTest.class);
MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,FirstFileInputMapperTest.class);
MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,SecondFileInputMapperTest.class);
job.setReducerClass(CounterReducerTest.class);
//job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
return (job.waitForCompletion(true) ? 0 : 1);
}
}
에 하둡의 버전을
의 정의를 변경? –