단어의 빈도별로 WordCount 예제를 정렬하려고합니다. 일부 게시물을 찾아서 MapReduce에서 값순으로 정렬 할 수 없다는 사실을 알았습니다. 그래서 두 개의 맵 축소 작업을 별도로 수행하기로했습니다. 따라서 첫 번째 단어는 원래 단어 수이고 두 번째 단어는 첫 번째 MapReduce의 출력을 읽고 빈도별로 단어를 정렬합니다.Hadoop> WordCount 값으로 정렬
제 MapReduce를 사용하는 입력 파일 등인
1 사과
2 공
1 만화 (제 MapReduce의 출력 인) 다음
4 일
그리고 여기 있습니다. 두 번째 mapreduce 코드의 코드. 여기 불필요하다고 생각되는 Pair 클래스를 만들어 키로 사용했습니다.
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.*;
//Sorting by valuel
public class Sorting
{
//Using custom writable
public static class Pair implements WritableComparable<Pair>
{
private Text t;
private IntWritable i;
public void set(Text t, IntWritable i)
{
this.t = t;
this.i = i;
}
public Text getFirst() { return t; }
public IntWritable getSecond() { return i; }
public Pair()
{
set(new Text(), new IntWritable());
}
public Pair(Text t, IntWritable i)
{
set(t, i);
}
public int compareTo(Pair p)
{
int cmp = t.compareTo(p.t);
if(cmp != 0)
{
return cmp;
}
return i.compareTo(p.i);
}
public void write(DataOutput out) throws IOException
{
t.write(out);
i.write(out);
}
public void readFields(DataInput in) throws IOException
{
t.readFields(in);
i.readFields(in);
}
}
//public class RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext contxt)
public static class SortingMapper extends Mapper<IntWritable, Text, Pair, NullWritable>
{
String[] output1 = null;
//private Text word = new Text();
@Override
public void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException
{
output1 = value.toString().split(" ");
Text word = new Text(output1[0]);
IntWritable freq = new IntWritable(Integer.parseInt(output1[1]));
context.write(new Pair(word, freq), NullWritable.get());
}
//.write() is the method inherited from interface org.apache.hadoop.mapreduce.TaskInputOutputContext
}
public static class FirstPartitioner extends Partitioner<Pair, NullWritable>
{
@Override
public int getPartition(Pair p, NullWritable n, int numPartitions)
{
System.out.println("Partitioner");
String word = p.getFirst().toString();
char first = word.charAt(0);
char middle = 'n';
if(middle < first)
{
return 0;
}
else
return 1 % numPartitions; //why does % need???
}
}
public static class KeyComparator extends WritableComparator
{
protected KeyComparator()
{
super(Pair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
System.out.println("keyComparator");
Pair v1 = (Pair) w1;
Pair v2 = (Pair) w2;
/*
* since we already count word in the first MR we only need to sort the list by frequency
* so no need to compare Text again
int cmp = Pair.compare(v1.getFirst(), v2.getFirst());
if(cmp != 0) { return cmp; }
*/
return -1 * v1.compareTo(v2);
//possible error: it compares Text first and then compare IntWritable
}
}
public static class GroupComparator extends WritableComparator
{
protected GroupComparator()
{
super(Pair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
System.out.println("group Comparator");
Pair v1 = (Pair) w1;
Pair v2 = (Pair) w2;
return v1.getFirst().compareTo(v2.getFirst());
//this compareTo is under binarycomparable
}
}
public static class SortingReducer extends Reducer<Pair, NullWritable, Pair, NullWritable>
{
@Override
public void reduce(Pair p, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
{
System.out.println("sortingReducer");
context.write(p, NullWritable.get());
}
}
public static void main(String[] args) throws Exception
{
Configuration conf2 = new Configuration();
//String[] otherArgs2 = new GenericOptionsParser(conf1, args).getRemainingArgs();
ControlledJob cJob2 = new ControlledJob(conf2);
conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
cJob2.setJobName("Sorting");
Job job2 = cJob2.getJob();
job2.setJarByClass(Sorting.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setMapperClass(SortingMapper.class);
job2.setPartitionerClass(FirstPartitioner.class);
job2.setSortComparatorClass(KeyComparator.class);
job2.setGroupingComparatorClass(GroupComparator.class);
job2.setReducerClass(SortingReducer.class);
job2.setOutputKeyClass(Pair.class);
job2.setOutputValueClass(NullWritable.class);
job2.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job2, new Path("hdfs:///tmp/inter/part-r-00000"));
FileOutputFormat.setOutputPath(job2, new Path(args[0]));
job2.waitForCompletion(true);
}
은} 나는 오류를 캐스팅있어 위의 코드를 실행합니다.
FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.IntWritable
at Sorting$SortingMapper.map(Sorting.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
나는 단지 hadoop을 배우기 시작했고, 나는 스스로 만든 첫 번째 작품이다. 이 오류를 해결하는 방법을 알려주십시오. 전체 코드에 대한 의견을 보내 주시면 감사하겠습니다.
예 보조 정렬에 대해 읽었지만 mapreduce 하나만으로이 문제를 해결할 수 있습니까? 나는 그렇게 할 수 없다고 생각하여 chaining mapreduce를 사용하려고 시도했다. – user3067818
나는 나의 의견을 되 찾을 것이다. 2 차적 정렬은 전체 키가 아닌 각 키와 연관된 값을 정렬합니다. 나는 당신의 문제 진술을 잘못 이해했다. –