2013-11-21 2 views
0

텍스트 파일의 총 단어 수를 얻기 위해 간단한 CountReduce 작업 (Word Count 예제에 기반)을 작성했습니다. 나는 한 줄씩 파일을 살펴보고 매핑하기 전에 몇 가지 처리를한다. 그 모든 것은 맵핑하기 전에 라인에서 특정 단어를 삭제하는 것 외에는 효과가있는 것 같습니다.Hadoop : Eclipse 목록 외부에서 데이터가 손실 됨

작업을 시작하기 전에 줄을 매핑하기 전에 삭제해야하는 파일의 단어 목록을 읽었습니다. 나는 그것을 읽은 후에 프로그램이 단어 목록을 출력하게하고 잘 동작한다. 문제는 : 작업을 시작하자마자 단어가 들어있는 ArrayList가 다시 비어있는 것 같습니다. 재밌는 것은, 일식 (jar 파일) 밖에서 프로그램을 시작할 때만 발생합니다. 일식에서 단어가 삭제됩니다. 이클립스 이외의 최종 결과는 총 1340 만 단어 (목록에서 단어를 삭제하지 않고) 여야하지만 1320 만입니다. 일식 안에서 결과는 840 만입니다.

왜 그럴까요? 도와 주셔서 정말 감사합니다! 당신이 그것을위한 클라이언트 측 프로세스를 생성합니다 명령 줄을 통해 작업을 제출하는 경우

import java.io.*; 
import java.util.*; 

import org.apache.hadoop.filecache.DistributedCache; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class WordCount { 

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, IntWritable> { 

     private final static IntWritable one = new IntWritable(1); 
     private final static NullWritable nullKey = NullWritable.get(); 

     public void map(LongWritable key, Text value, OutputCollector< NullWritable, IntWritable> output, Reporter reporter) throws IOException { 

      String processedline = LineProcessor.processLine(value.toString()); 

      StringTokenizer tokenizer = new StringTokenizer(processedline); 
      while (tokenizer.hasMoreTokens()) { 
       tokenizer.nextToken(); 
       output.collect(nullKey, one); 
      } 
     } 

    } 

    public static class Reduce extends MapReduceBase implements Reducer<NullWritable, IntWritable, NullWritable, IntWritable> { 

     private final static NullWritable nullKey = NullWritable.get(); 

     public void reduce(NullWritable key, Iterator<IntWritable> values, OutputCollector<NullWritable, IntWritable> output, Reporter reporter) throws IOException { 

      int sum = 0; 
      while (values.hasNext()) { 
       sum += values.next().get(); 
      } 
      output.collect(nullKey, new IntWritable(sum)); 
     } 

    } 

    public static class LineProcessor{ 
     public static ArrayList<String> stopWordsList = new ArrayList<String>(); 

     public static void initializeStopWords() throws IOException{ 
      Path stop_words = new Path("/user/ds2013/stop_words/english_stop_list.txt"); 
      FileSystem fs = FileSystem.get(new Configuration()); 
      BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(stop_words))); 
      String stopWord; 
      stopWord = br.readLine(); 

      while (stopWord != null){ 
       //addToStopWords 
       stopWordsList.add(stopWord); 
       stopWord = br.readLine(); 
      } 
     } 

     public static String processLine(String line) { 
      line = line.toLowerCase(); 
      //delete some punctuation 
      char[] remove = {'.', ',','"'}; 
      for (char c : remove) { 
       line = line.replace(""+c, ""); 
      } 
      //Replace "-" with Space 
      line = line.replace("-", " "); 

      //delete stop Words 
      StringTokenizer tokenizer = new StringTokenizer(line); 
      String nextWord = tokenizer.nextToken(); 
      while (tokenizer.hasMoreTokens()) {  
       if(stopWordsList.contains(nextWord)){ 
        line = line.replace(nextWord, ""); 
       } 
       nextWord = tokenizer.nextToken(); 
      } 

      return line; 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     JobConf conf = new JobConf(WordCount.class); 
     conf.setJobName("wordcount"); 
     conf.setMapOutputKeyClass(NullWritable.class); 
     conf.setMapOutputValueClass(IntWritable.class); 
     conf.setOutputKeyClass(NullWritable.class); 
     conf.setOutputValueClass(IntWritable.class); 

     conf.setMapperClass(Map.class); 
     conf.setCombinerClass(Reduce.class); 
     conf.setReducerClass(Reduce.class); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 
     //initialize List of words that should be deletet 
     LineProcessor.initializeStopWords(); 

     //Directories 

     FileInputFormat.setInputPaths(conf, new Path("/user/ds2013/data/plot_summaries.txt")); 


     Path outputDir = new Path(args[0]); 
     //delete output folder if it already exists 
     FileSystem fs = FileSystem.get(conf); 
     fs.delete(outputDir, true); 
     FileOutputFormat.setOutputPath(conf, outputDir); 


     JobClient.runJob(conf); 

    } 
} 
+0

"일식 외"란 무엇을 의미합니까? 실제 클러스터에서 시작합니까? –

+0

그 말은 내가 항아리로 내 보낸 다음 "hadoop jar wordCount.jar packageName.WordCount/data/wordcount"와 같은 명령을 사용하여 시작한다는 의미입니다. 가상 머신에서는 모두 로컬입니다. – Ben

답변

1

:

여기 내 코드입니다. 따라서 초기화 방법은 기본 방법으로 수행합니다.

LineProcessor.initializeStopWords(); 

은 완전히 다른 프로세스에서 실행됩니다. 당신은 일반적으로 (당신이 그것이 사용하는 기존의 API에) 재정의 할 수 매퍼에서 설정 기능으로이 초기화 물건을 이동 :

public void setup(Context context) { 
    LineProcessor.initializeStopWords(); 
} 
:
public void configure(JobConf job) { 
    LineProcessor.initializeStopWords(); 
} 

또는 새로운 API에이

+0

고맙습니다. 이해가가는 것 같습니다. 나는 집에 도착하자 마자 이것을 시험해 볼 것이다. 따라서 mapper 클래스에 configure 함수를 추가하면 해당 함수 내에서 (ArrayList가 매퍼 클래스 외부에 있더라도) 모든 작업이 동일한 프로세스에 있기 때문에 예상대로 작동합니까? 그리고이 configure 함수는 항상 실제 맵 함수 (그리고 작업 당 한 번만 실행되므로 목록을 1300 만 번 초기화하지 않습니다) 이전에 실행됩니다. – Ben

+0

예, 모든 맵퍼 프로세스 (hadoop 언어의 태스크)와 맵 호출 직전에 한 번씩 실행됩니다. –

+0

좋아, 방금 집에 가서 시험해 보니 완벽하게 작동했다. 정말 고마워 :-) – Ben

관련 문제