2014-06-19 3 views
0

단어의 빈도별로 WordCount 예제를 정렬하려고합니다. 일부 게시물을 찾아서 MapReduce에서 값순으로 정렬 할 수 없다는 사실을 알았습니다. 그래서 두 개의 맵 축소 작업을 별도로 수행하기로했습니다. 따라서 첫 번째 단어는 원래 단어 수이고 두 번째 단어는 첫 번째 MapReduce의 출력을 읽고 빈도별로 단어를 정렬합니다.Hadoop> WordCount 값으로 정렬

제 MapReduce를 사용하는 입력 파일 등인

1 사과

2 공

1 만화 (제 MapReduce의 출력 인) 다음

4 일

그리고 여기 있습니다. 두 번째 mapreduce 코드의 코드. 여기 불필요하다고 생각되는 Pair 클래스를 만들어 키로 사용했습니다.

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.Comparator; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
import org.apache.hadoop.io.WritableUtils; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.Reducer.Context; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; 
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; 
import org.apache.hadoop.util.GenericOptionsParser; 


import org.apache.hadoop.util.*; 

//Sorting by valuel 
public class Sorting 
{ 
//Using custom writable 
public static class Pair implements WritableComparable<Pair> 
{ 
    private Text t; 
    private IntWritable i; 

    public void set(Text t, IntWritable i) 
    { 
     this.t = t; 
     this.i = i; 
    } 

    public Text getFirst() { return t; } 
    public IntWritable getSecond() { return i; } 


    public Pair() 
    { 
     set(new Text(), new IntWritable()); 
    } 

    public Pair(Text t, IntWritable i) 
    { 
     set(t, i); 
    } 

    public int compareTo(Pair p) 
    { 
     int cmp = t.compareTo(p.t); 
     if(cmp != 0) 
     { 
      return cmp; 
     } 
     return i.compareTo(p.i); 
    } 

    public void write(DataOutput out) throws IOException 
    { 
     t.write(out); 
     i.write(out); 
    } 

    public void readFields(DataInput in) throws IOException 
    { 
     t.readFields(in); 
     i.readFields(in); 
    } 
} 

//public class RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext contxt) 

public static class SortingMapper extends Mapper<IntWritable, Text, Pair, NullWritable> 
{ 
    String[] output1 = null; 
    //private Text word = new Text(); 

    @Override 
    public void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException 
    { 
     output1 = value.toString().split(" "); 

     Text word = new Text(output1[0]); 
     IntWritable freq = new IntWritable(Integer.parseInt(output1[1])); 

     context.write(new Pair(word, freq), NullWritable.get()); 
    } 
      //.write() is the method inherited from interface org.apache.hadoop.mapreduce.TaskInputOutputContext   
} 



public static class FirstPartitioner extends Partitioner<Pair, NullWritable> 
{ 
    @Override 
    public int getPartition(Pair p, NullWritable n, int numPartitions) 
    { 
     System.out.println("Partitioner"); 
     String word = p.getFirst().toString(); 

     char first = word.charAt(0); 
     char middle = 'n'; 

     if(middle < first) 
     { 
      return 0; 
     } 
     else 
      return 1 % numPartitions; //why does % need??? 
    } 
} 

public static class KeyComparator extends WritableComparator 
{ 

    protected KeyComparator() 
    { 
     super(Pair.class, true); 
    } 

    @Override 
    public int compare(WritableComparable w1, WritableComparable w2) 
    { 
     System.out.println("keyComparator"); 
     Pair v1 = (Pair) w1; 
     Pair v2 = (Pair) w2; 

     /* 
     * since we already count word in the first MR we only need to sort the list by frequency 
     * so no need to compare Text again 
     int cmp = Pair.compare(v1.getFirst(), v2.getFirst()); 
     if(cmp != 0) { return cmp; } 
     */ 

     return -1 * v1.compareTo(v2); 
     //possible error: it compares Text first and then compare IntWritable 
    } 
} 

public static class GroupComparator extends WritableComparator 
{ 
    protected GroupComparator() 
    { 
     super(Pair.class, true); 
    } 

    @Override 
    public int compare(WritableComparable w1, WritableComparable w2) 
    { 
     System.out.println("group Comparator"); 
     Pair v1 = (Pair) w1; 
     Pair v2 = (Pair) w2; 
     return v1.getFirst().compareTo(v2.getFirst()); 
     //this compareTo is under binarycomparable 
    } 
} 

public static class SortingReducer extends Reducer<Pair, NullWritable, Pair, NullWritable> 
{ 
    @Override 
    public void reduce(Pair p, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException 
    { 
     System.out.println("sortingReducer"); 
     context.write(p, NullWritable.get()); 
    } 
} 

public static void main(String[] args) throws Exception 
{ 

    Configuration conf2 = new Configuration(); 
    //String[] otherArgs2 = new GenericOptionsParser(conf1, args).getRemainingArgs(); 

    ControlledJob cJob2 = new ControlledJob(conf2); 
    conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); 
    cJob2.setJobName("Sorting"); 

    Job job2 = cJob2.getJob(); 

    job2.setJarByClass(Sorting.class); 

    job2.setInputFormatClass(KeyValueTextInputFormat.class); 

    job2.setMapperClass(SortingMapper.class); 
    job2.setPartitionerClass(FirstPartitioner.class); 
    job2.setSortComparatorClass(KeyComparator.class); 
    job2.setGroupingComparatorClass(GroupComparator.class); 
    job2.setReducerClass(SortingReducer.class); 

    job2.setOutputKeyClass(Pair.class); 
    job2.setOutputValueClass(NullWritable.class); 

    job2.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.addInputPath(job2, new Path("hdfs:///tmp/inter/part-r-00000")); 
    FileOutputFormat.setOutputPath(job2, new Path(args[0])); 

    job2.waitForCompletion(true); 

} 

은} 나는 오류를 캐스팅있어 위의 코드를 실행합니다.

FAILED 
Error: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to  org.apache.hadoop.io.IntWritable 
    at Sorting$SortingMapper.map(Sorting.java:1) 
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) 
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340) 
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:415) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557) 
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) 

나는 단지 hadoop을 배우기 시작했고, 나는 스스로 만든 첫 번째 작품이다. 이 오류를 해결하는 방법을 알려주십시오. 전체 코드에 대한 의견을 보내 주시면 감사하겠습니다.

+0

예 보조 정렬에 대해 읽었지만 mapreduce 하나만으로이 문제를 해결할 수 있습니까? 나는 그렇게 할 수 없다고 생각하여 chaining mapreduce를 사용하려고 시도했다. – user3067818

+0

나는 나의 의견을 되 찾을 것이다. 2 차적 정렬은 전체 키가 아닌 각 키와 연관된 값을 정렬합니다. 나는 당신의 문제 진술을 잘못 이해했다. –

답변

0

은 논리의 나머지 부분은 동일하게 유지 있도록 어쨌든 매퍼의 키를 사용하지 않기 때문에 고원이 실행 얻을 다음에 매퍼 정의를 변경하십시오 Text 객체가 IntWritable 캐스트 할 수없는, 당신이 얻을 이유

public static class SortingMapper extends Mapper<**Text**, Text, Pair, NullWritable> 
    { 
    String[] output1 = null; 
//private Text word = new Text(); 

    @Override 
public void map(**Text** key, Text value, Context context) throws IOException,  InterruptedException { 
/////// 
    } 
+0

나는 그것을 시도해 주셔서 감사합니다. – user3067818

+0

하지만 내가 사용하는 입력은 (intWritable, Text) (2, ball)과 같은 형식이므로 IntWritable을 키로 사용해야한다고 생각했습니다. 매퍼가 주어진 입력 파일에서 키와 값을 어떻게 구별하는지 완전히 이해하지 못할 수도 있습니다. 매퍼가 키와 값을 인식하는 방법을 설명 할 수 있습니까? 다시 한 번 감사합니다 – user3067818

+0

모두 사용중인 입력 형식에 따라 다릅니다.이 경우 FileInputFormat을 확장하는 KeyValueTextInputFormat을 사용합니다 기본적으로 매퍼가 가져야 할 키가 무엇인지 알려주면 Hadoop의 최종 가이드를보다 효과적으로 얻을 수 있습니다. 만약 당신이 미래에 대해 생각하고 있다면 – dpsdce

0

귀하의 SortingMapper.map() 방법은 IntWrtiable을 키로 사용하지만 KeyValueTextInputFormatText 개체를 생성합니다.

Error: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.IntWritable 
+0

그런 다음 IntWritable을 키로 사용하도록 새 클래스를 만들어야합니까? 사실 나는이 [link] (https://developer.yahoo.com/hadoop/tutorial/module5.html#fileformat)이 내가해야 할 일이라는 것을 알았습니까 ?? – user3067818