2016-10-15 4 views
0

파일 세트에서 구문을 찾기 위해 hadoop에서 mapreduce 프로그램을 작성하고 있습니다. 나는 mapreduce 작업 2 개를 사용하고 있습니다. 그러나 문제는 두 번째 작업이 종료되지 않는다는 것입니다. 아래에 표시된 것처럼 "감속기 작업 실행자 완료"를 보여줍니다. 그러나 그것은 종결되지 않습니다.mapreduce 작업 체인의 두 번째 작업이 하둡에서 종료되지 않습니다.

16/10/15 19:04:16 INFO mapred.Task: Task:attempt_local1574338353_0002_r_000000_0 is done. And is in the process of committing 
16/10/15 19:04:16 INFO mapred.LocalJobRunner: 1/1 copied. 
16/10/15 19:04:16 INFO mapred.Task: Task attempt_local1574338353_0002_r_000000_0 is allowed to commit now 
16/10/15 19:04:16 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1574338353_0002_r_000000_0' to hdfs://localhost:54310/gopal/output/_temporary/0/task_local1574338353_0002_r_000000 
16/10/15 19:04:16 INFO mapred.LocalJobRunner: reduce > reduce 
16/10/15 19:04:16 INFO mapred.Task: Task 'attempt_local1574338353_0002_r_000000_0' done. 
16/10/15 19:04:16 INFO mapred.LocalJobRunner: Finishing task: attempt_local1574338353_0002_r_000000_0 
16/10/15 19:04:16 INFO mapred.LocalJobRunner: reduce task executor complete. 

다음은 나의 코드입니다.

public int run(String[] arg0) throws Exception 
{ 

    Configuration conf = getConf(); 
    conf.set("mapred.textoutputformat.separator", ";"); 
    System.out.println("********* First Job Started **********"); 

    JobConf job = new JobConf(conf, WordCount.class);   


    Path in = new Path(arg0[0]); 
    Path out = new Path("/gopal/temp"); 


    FileInputFormat.addInputPath(job, in); 
    FileOutputFormat.setOutputPath(job, out); 

    job.setJobName("Inverted Index");  

    job.setMapperClass(TokenizerMapper.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(MyPair.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(MyArrayWritable.class); 

    String phrase = ""; 
    phrase = phrase + "Sachin " + "Tendulkar"; 

    JobConf job2 = null; 

    System.out.println("Time taken by first map task: "+elapsedTime1); 
    System.out.println("Time taken by first reduce task: "+elapsedTime2); 

    System.out.println("********* Second Job Started **********"); 

    Configuration conf2 = getConf(); 

    job2 = new JobConf(conf2, WordCount.class); 

    Path out2 = new Path(arg0[1]); 

    FileInputFormat.addInputPath(job2, out); 
    FileOutputFormat.setOutputPath(job2, out2); 

    job2.setJobName("SearchQuery Mapper"); 

    job2.set("PhraseSearch", phrase); 


    job2.setMapperClass(QuerySearchMapper.class); 
    job2.setReducerClass(QuerySearchReducer.class); 

    job2.setMapOutputKeyClass(Text.class); 
    job2.setMapOutputValueClass(MyArrayWritable.class); 

    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(MyArrayWritable.class); 

    Job j1 = new Job(job); 
    Job j2 = new Job(job2); 

    JobControl jbcntrl = new JobControl ("jbcntrl"); 

    jbcntrl.addJob(j1); 
    jbcntrl.addJob(j2); 

    j2.addDependingJob(j1); 
    jbcntrl.run();   

    System.out.println("********* Second Job Completed**********");  

    return 0; 
} 

public static void main(String[] args) throws Exception 
{ 
    if (args.length != 2) 
    { 
     System.err.println("Enter valid number of arguments <Inputdirectory> <Outputlocation>"); 
     System.exit(0); 
    } 

    int res = ToolRunner.run(new Configuration(), new WordCount(), args); 
    System.exit(res); 
} 

완료 두 번째 작업이 인쇄 점점되지 메시지.

답변

0

동일한 문제가 발생했습니다. 문제를 해결하기 위해 아래에 언급 된 코드를 사용했습니다. 이를 사용하여 작업 제어 대기 시간을 추가합니다. 그 시간 동안 대기하고 종료됩니다.

public class JobHandler { 

    public static void handleRun(JobControl control) { 
     JobRunner runner = new JobRunner(control); 
     Thread t = new Thread(runner); 
     t.start(); 

     while (!control.allFinished()) { 
      System.out.println("Still running..."); 
      try { 
       Thread.sleep(5000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

} 


JobHandler.handleRun(jobControl); 
관련 문제