2014-10-12 2 views
0

하나의 mapreduce 작업을 실행할 때 문제가 있습니다. 내 맵 감속 작업의 일환으로 여러 맵 메서드와 단일 감속기 메서드가 포함 된 mapreduce 조인을 사용하고 있습니다.(Hadoop) : mapreduce 작업을 실행하는 중에 reduce 메소드가 실행/호출되지 않습니다.

내 맵 메서드가 모두 실행되고 있지만 내 감속기가 실행되지 않거나 내 드라이버 클래스에서 호출되지 않습니다.

이 때문에 최종 출력에는지도 단계에서 수집 된 데이터 만 있습니다.

감축 단계에서 잘못된 입력 및 출력 값을 사용하고 있습니까? 지도와 축소 단계 사이에 입/출력 불일치가 있습니까?

도와주세요.

다음

내 코드는 ...

public class CompareInputTest extends Configured implements Tool { 

public static class FirstFileInputMapperTest extends Mapper<LongWritable,Text,Text,Text>{ 


    private Text word = new Text(); 
    private String keyData,data,sourceTag = "S1$"; 

    public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ 

     String[] values = value.toString().split(";"); 
     keyData = values[1]; 
     data = values[2]; 

     context.write(new Text(keyData), new Text(data+sourceTag)); 


    } 
} 

public static class SecondFileInputMapperTest extends Mapper<LongWritable,Text,Text,Text>{ 
    private Text word = new Text(); 
    private String keyData,data,sourceTag = "S2$"; 
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 

     String[] values = value.toString().split(";"); 
     keyData = values[1]; 
     data = values[2]; 


     context.write(new Text(keyData), new Text(data+sourceTag)); 

    } 

       } 

public static class CounterReducerTest extends Reducer 
{ 
    private String status1, status2; 

    public void reduce(Text key, Iterable<Text> values, Context context) 
     throws IOException, InterruptedException { 
     System.out.println("in reducer"); 

     for(Text value:values) 
      { 
      String splitVals[] = currValue.split("$"); 
     System.out.println("in reducer"); 
     /* 
     * identifying the record source that corresponds to a commonkey and 
     * parses the values accordingly 
     */ 
     if (splitVals[0].equals("S1")) { 
     status1 = splitVals[1] != null ? splitVals[1].trim(): "status1"; 
     } else if (splitVals[0].equals("S2")) { 
      // getting the file2 and using the same to obtain the Message 
      status2 = splitVals[2] != null ? splitVals[2].trim(): "status2"; 
     } 
      } 

     context.write(key, new Text(status1+"$$$")); 
    } 






public static void main(String[] args) throws Exception { 


    int res = ToolRunner.run(new Configuration(), new CompareInputTest(), 
      args); 
System.exit(res); 

    } 

}

public int run(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Job job = new Job(conf, "count"); 
    job.setJarByClass(CompareInputTest.class); 
    MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,FirstFileInputMapperTest.class); 
    MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,SecondFileInputMapperTest.class); 
    job.setReducerClass(CounterReducerTest.class); 
    //job.setNumReduceTasks(1); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 




    FileOutputFormat.setOutputPath(job, new Path(args[2])); 



    return (job.waitForCompletion(true) ? 0 : 1); 

} 

}

+0

에 하둡의 버전을

public static class CounterReducerTest extends Reducer 

의 정의를 변경? –

답변

1

그냥 감속기 클래스의 프로토 타입을 확인한다. 만회 입력으로 가져 와서 출력 텍스트로 방출하기 때문에 귀하의 경우에는

extends Reducer<KEY, VALUE, KEY,VALUE> 

public static class CounterReducerTest extends Reducer<Text,Text,Text,Text> 
관련 문제