2014-04-07 2 views
0

독립 실행 형 모드에서 실행할 때 완벽하게 실행되는 map reduce 프로그램이 있지만 학교에서 Hadoop Cluster에서 실행할 때 감속기에서 예외가 발생합니다. 나는 그것이 어떤 예외인지 전혀 모른다. 나는 이것을 줄이기 위해 감속기를 잡을 때와 같이 알지만, 작업은 통과하지만 빈 출력이됩니다. try/catch를 유지하지 않으면 작업이 실패합니다. 학교 클러스터이므로 취업 활동 기록이나 다른 파일에 액세스 할 수 없습니다. 내가 찾을 수있는 것은 프로그래밍 방식으로 만 가능합니다. 거기에 런타임 동안 hadoop에 무슨 예외가 일어 났는지 찾을 수있는 방법이 있나요? 다음클러스터에서 실행할 때 하둡의 감속기에서 예외가 발생했습니다

public static class RowMPreMap extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 

    private Text keyText = new Text(); 
    private Text valText = new Text(); 

    public void map(LongWritable key, Text value, 
      OutputCollector<Text, Text> output, Reporter reporter) 
      throws IOException { 

     // Input: (lineNo, lineContent) 

     // Split each line using seperator based on the dataset. 
     String line[] = null; 

     line = value.toString().split(Settings.INPUT_SEPERATOR); 

     keyText.set(line[0]); 
     valText.set(line[1] + "," + line[2]); 

     // Output: (userid, "movieid,rating") 
     output.collect(keyText, valText); 
    } 
} 

public static class RowMPreReduce extends MapReduceBase implements 
     Reducer<Text, Text, Text, Text> { 

    private Text valText = new Text(); 

    public void reduce(Text key, Iterator<Text> values, 
      OutputCollector<Text, Text> output, Reporter reporter) 
      throws IOException { 

     // Input: (userid, List<movieid, rating>) 

     float sum = 0.0F; 
     int totalRatingCount = 0; 

     ArrayList<String> movieID = new ArrayList<String>(); 
     ArrayList<Float> rating = new ArrayList<Float>(); 

     while (values.hasNext()) { 
      String[] movieRatingPair = values.next().toString().split(","); 
      movieID.add(movieRatingPair[0]); 
      Float parseRating = Float.parseFloat(movieRatingPair[1]); 
      rating.add(parseRating); 

      sum += parseRating; 
      totalRatingCount++; 
     } 

     float average = ((float) sum)/totalRatingCount; 

     for (int i = 0; i < movieID.size(); i++) { 
      valText.set("M " + key.toString() + " " + movieID.get(i) + " " 
        + (rating.get(i) - average)); 
      output.collect(null, valText); 
     } 

     // Output: (null, <M userid, movieid, normalizedrating>) 
    } 
} 

예외는 위의 감속기에서 일어나는 내 코드의 조각이다.

Counters counters = job.getCounters(); 

을하고 로컬 콘솔에 카운터의 집합을 덤프 : 서버에 액세스 할 수없는 경우에도 다음은 설정

public void normalizeM() throws IOException, InterruptedException { 
    JobConf conf1 = new JobConf(UVDriver.class); 
    conf1.setMapperClass(RowMPreMap.class); 
    conf1.setReducerClass(RowMPreReduce.class); 
    conf1.setJarByClass(UVDriver.class); 

    conf1.setMapOutputKeyClass(Text.class); 
    conf1.setMapOutputValueClass(Text.class); 

    conf1.setOutputKeyClass(Text.class); 
    conf1.setOutputValueClass(Text.class); 

    conf1.setKeepFailedTaskFiles(true); 

    conf1.setInputFormat(TextInputFormat.class); 
    conf1.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.addInputPath(conf1, new Path(Settings.INPUT_PATH)); 
    FileOutputFormat.setOutputPath(conf1, new Path(Settings.TEMP_PATH + "/" 
      + Settings.NORMALIZE_DATA_PATH_TEMP)); 

    JobConf conf2 = new JobConf(UVDriver.class); 
    conf2.setMapperClass(ColMPreMap.class); 
    conf2.setReducerClass(ColMPreReduce.class); 
    conf2.setJarByClass(UVDriver.class); 

    conf2.setMapOutputKeyClass(Text.class); 
    conf2.setMapOutputValueClass(Text.class); 

    conf2.setOutputKeyClass(Text.class); 
    conf2.setOutputValueClass(Text.class); 

    FileInputFormat.addInputPath(conf2, new Path(Settings.TEMP_PATH + "/" 
      + Settings.NORMALIZE_DATA_PATH_TEMP)); 
    FileOutputFormat.setOutputPath(conf2, new Path(Settings.TEMP_PATH + "/" 
      + Settings.NORMALIZE_DATA_PATH)); 

    Job job1 = new Job(conf1); 
    Job job2 = new Job(conf2); 

    JobControl jobControl = new JobControl("jobControl"); 
    jobControl.addJob(job1); 
    jobControl.addJob(job2); 
    job2.addDependingJob(job1); 
    handleRun(jobControl); 

} 
+0

포트 : 8808에 액세스 할 수 있습니까? http : // 에서 작업 상태를 모니터링하고 스택 추적을 볼 수 있습니다. 8088 – anu

+0

@Anupam, 포트 또는 추적기 또는 내 프로그램을 실행하는 것 이외의 다른 액세스 권한이 없습니다. – TechCrunch

답변

0

, 당신은 작업에 대한 카운터를 얻을 수 있습니다. 이 카운터는 매퍼 (Mappers) 및 리듀서 (Reduceer)에서 입력 및 기록 된 레코드 수에 대한 카운트를 표시합니다. 값이 0 인 카운터는 워크 플로의 문제 위치를 나타냅니다. 자신의 카운터를 계측하여 플로우를 디버그/모니터 할 수 있습니다.

1

감속기에서 예외를 발견하고 스택 추적을 파일 시스템의 파일에 기록합니다. 나는 이것이 이것을하기의 더러운 가능한 방법다는 것을 알고있다, 그러나 나는이 시점에서 아무 선택권도 없다. 다음에 코드가 있으면 도움이된다. catch 블록에 코드를 넣으십시오.

   String valueString = ""; 
       while (values.hasNext()) { 
        valueString += values.next().toString(); 
       } 

       StringWriter sw = new StringWriter(); 
       e.printStackTrace(new PrintWriter(sw)); 
       String exceptionAsString = sw.toString(); 

       Path pt = new Path("errorfile"); 
       FileSystem fs = FileSystem.get(new Configuration()); 
       BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); 
       br.write(exceptionAsString + "\nkey: " + key.toString() + "\nvalues: " + valueString); 
       br.close(); 

입력 깨끗한 방법으로이 작업을 수행하실 수 있습니다.

마지막으로 나는 NumberFormatException이라는 것을 발견했다. 카운터가 나를 식별하는 데 도움이되지 못했습니다. 나중에 독립형 및 클러스터에서 입력을 분할하는 형식이 다른 방식으로 발생했음을 알았습니다. 아직 그 이유를 찾을 수 없습니다.

+0

필자는 감속기와 매퍼 로직 주위에 try catch 블록을 두었습니다. 예외를 기록하는 대신 그룹 "Exception"및 카운터 이름 e.getClass(). getPackage() + ""에 대한 카운터도 증가시킵니다. + e.getClass(). getName(). 즉, 카운터를 사용하여 각 종류의 오류 수를 드라이버 클래스 (내 대답 당)에보고합니다. 그게 내가 무엇이 잘못되었는지 알 수있게 해줍니다. –

관련 문제