지도 작성 응용 프로그램을 개발하여 사용자가 처음 및 마지막으로 응답 한 시간과 Donald Miner가 쓴 책을 기반으로 해당 사용자의 총 댓글 수를 확인합니다.지도를 사용하여 최소 최대 개수 감소
하지만 내 알고리즘의 문제는 감속기입니다. 사용자 ID를 기반으로 설명을 그룹화했습니다. 내 테스트 데이터에는 각각 다른 날짜에 3 개의 의견을 게시하는 두 개의 사용자 ID가 포함되어 있습니다. 따라서 총 6 행.
내 감속기 출력 결과는 사용자가 처음 및 마지막으로 기록한 시간과 각 사용자 ID에 대한 전체 의견을 보여주는 두 개의 레코드를 인쇄해야합니다.
그러나 내 감속기는 6 개의 레코드를 인쇄 중입니다. 어떤 코드가 다음 코드를 잘못 해석 할 수 있습니까?
protected void reduce(Text userId, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException {
은 기본적으로 그냥 Context
하지 org.apache.hadoop.mapreduce.Reducer.Context
지금 출력의 모양을 가지고해야합니다
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.arjun.mapreduce.patterns.mapreducepatterns.MRDPUtils;
import com.sun.el.parser.ParseException;
public class MinMaxCount {
public static class MinMaxCountMapper extends
Mapper<Object, Text, Text, MinMaxCountTuple> {
private Text outuserId = new Text();
private MinMaxCountTuple outTuple = new MinMaxCountTuple();
private final static SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSS");
@Override
protected void map(Object key, Text value,
org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
Map<String, String> parsed =
MRDPUtils.transformXMLtoMap(value.toString());
String date = parsed.get("CreationDate");
String userId = parsed.get("UserId");
try {
Date creationDate = sdf.parse(date);
outTuple.setMin(creationDate);
outTuple.setMax(creationDate);
} catch (java.text.ParseException e) {
System.err.println("Unable to parse Date in XML");
System.exit(3);
}
outTuple.setCount(1);
outuserId.set(userId);
context.write(outuserId, outTuple);
}
}
public static class MinMaxCountReducer extends
Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {
private MinMaxCountTuple result = new MinMaxCountTuple();
protected void reduce(Text userId, Iterable<MinMaxCountTuple> values,
org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
result.setMin(null);
result.setMax(null);
result.setCount(0);
int sum = 0;
int count = 0;
for(MinMaxCountTuple tuple: values)
{
if(result.getMin() == null ||
tuple.getMin().compareTo(result.getMin()) < 0)
{
result.setMin(tuple.getMin());
}
if(result.getMax() == null ||
tuple.getMax().compareTo(result.getMax()) > 0) {
result.setMax(tuple.getMax());
}
System.err.println(count++);
sum += tuple.getCount();
}
result.setCount(sum);
context.write(userId, result);
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String [] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if(otherArgs.length < 2)
{
System.err.println("Usage MinMaxCout input output");
System.exit(2);
}
Job job = new Job(conf, "Summarization min max count");
job.setJarByClass(MinMaxCount.class);
job.setMapperClass(MinMaxCountMapper.class);
//job.setCombinerClass(MinMaxCountReducer.class);
job.setReducerClass(MinMaxCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MinMaxCountTuple.class);
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean result = job.waitForCompletion(true);
if(result)
{
System.exit(0);
}else {
System.exit(1);
}
}
}
Input:
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-07-30T07:29:33.343" UserId="831878" />
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-01T07:29:33.343" UserId="831878" />
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-02T07:29:33.343" UserId="831878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-06-30T07:29:33.343" UserId="931878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-07-01T07:29:33.343" UserId="931878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-02T07:29:33.343" UserId="931878" />
output file contents part-r-00000:
831878 2011-07-30T07:29:33.343 2011-07-30T07:29:33.343 1
831878 2011-08-01T07:29:33.343 2011-08-01T07:29:33.343 1
831878 2011-08-02T07:29:33.343 2011-08-02T07:29:33.343 1
931878 2011-06-30T07:29:33.343 2011-06-30T07:29:33.343 1
931878 2011-07-01T07:29:33.343 2011-07-01T07:29:33.343 1
931878 2011-08-02T07:29:33.343 2011-08-02T07:29:33.343 1
job submission output:
12/12/16 11:13:52 INFO input.FileInputFormat: Total input paths to process : 1
12/12/16 11:13:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/12/16 11:13:52 WARN snappy.LoadSnappy: Snappy native library not loaded
12/12/16 11:13:52 INFO mapred.JobClient: Running job: job_201212161107_0001
12/12/16 11:13:53 INFO mapred.JobClient: map 0% reduce 0%
12/12/16 11:14:06 INFO mapred.JobClient: map 100% reduce 0%
12/12/16 11:14:18 INFO mapred.JobClient: map 100% reduce 100%
12/12/16 11:14:23 INFO mapred.JobClient: Job complete: job_201212161107_0001
12/12/16 11:14:23 INFO mapred.JobClient: Counters: 26
12/12/16 11:14:23 INFO mapred.JobClient: Job Counters
12/12/16 11:14:23 INFO mapred.JobClient: Launched reduce tasks=1
12/12/16 11:14:23 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=12264
12/12/16 11:14:23 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
12/12/16 11:14:23 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
12/12/16 11:14:23 INFO mapred.JobClient: Launched map tasks=1
12/12/16 11:14:23 INFO mapred.JobClient: Data-local map tasks=1
12/12/16 11:14:23 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10124
12/12/16 11:14:23 INFO mapred.JobClient: File Output Format Counters
12/12/16 11:14:23 INFO mapred.JobClient: Bytes Written=342
12/12/16 11:14:23 INFO mapred.JobClient: FileSystemCounters
12/12/16 11:14:23 INFO mapred.JobClient: FILE_BYTES_READ=204
12/12/16 11:14:23 INFO mapred.JobClient: HDFS_BYTES_READ=888
12/12/16 11:14:23 INFO mapred.JobClient: FILE_BYTES_WRITTEN=43479
12/12/16 11:14:23 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=342
12/12/16 11:14:23 INFO mapred.JobClient: File Input Format Counters
12/12/16 11:14:23 INFO mapred.JobClient: Bytes Read=761
12/12/16 11:14:23 INFO mapred.JobClient: Map-Reduce Framework
12/12/16 11:14:23 INFO mapred.JobClient: Map output materialized bytes=204
12/12/16 11:14:23 INFO mapred.JobClient: Map input records=6
12/12/16 11:14:23 INFO mapred.JobClient: Reduce shuffle bytes=0
12/12/16 11:14:23 INFO mapred.JobClient: Spilled Records=12
12/12/16 11:14:23 INFO mapred.JobClient: Map output bytes=186
12/12/16 11:14:23 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200
12/12/16 11:14:23 INFO mapred.JobClient: Combine input records=0
12/12/16 11:14:23 INFO mapred.JobClient: SPLIT_RAW_BYTES=127
12/12/16 11:14:23 INFO mapred.JobClient: Reduce input records=6
12/12/16 11:14:23 INFO mapred.JobClient: Reduce input groups=2
12/12/16 11:14:23 INFO mapred.JobClient: Combine output records=0
12/12/16 11:14:23 INFO mapred.JobClient: Reduce output records=6
12/12/16 11:14:23 INFO mapred.JobClient: Map output records=6
사용중인 입력 데이터를 게시 할 수 있습니까 (주석이 아닌 원래 질문으로 되돌릴 수 있습니까). –
이 코드 예는 My Book MapReduce 디자인 패턴의 "Numerical Summarizations"에서 가져온 것입니다. 이 문제를 파악하는 데 관심이 있지만 제공된 정보를 실제로 볼 수는 없습니다. 코드를 살펴보고 샘플 데이터를 통해 실행 해 보겠습니다. 보고있는 샘플 입출력을 게시 할 수 있다면 매우 유용 할 것입니다. –
https://github.com/adamjshook/mapreducepatterns/blob/master/MRDP/src/main/java/mrdp/ch2/MinMaxCountDriver.java <--- 원본 코드, 관심있는 사람 –