단어 수를 줄일 수있는 작업입니다. 내 자신의 InputFormat 있습니다.MapReduce 작업에서 감속 작업이 호출되지 않습니다.
JobExecutor :
val job = new Job(new Configuration())
job.setMapperClass(classOf[CountMapper])
job.setReducerClass(classOf[CountReducer])
job.setJobName("tarun-test-1")
job.setInputFormatClass(classOf[MyInputFormat])
FileInputFormat.setInputPaths(job, new Path(args(0)))
FileOutputFormat.setOutputPath(job, new Path(args(1)))
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[LongWritable])
job.setNumReduceTasks(1)
println("status: " + job.waitForCompletion(true))
매퍼 :
class CountMapper extends Mapper[LongWritable, Text, Text, LongWritable] {
private val valueOut = new LongWritable(1L)
override def map(k: LongWritable, v: Text, context: Mapper[LongWritable, Text, Text, LongWritable]#Context): Unit = {
val str = v.toString
str.split(",").foreach(word => {
val keyOut = new Text(word.toLowerCase.trim)
context.write(keyOut, valueOut)
})
}
}
감속기 :
class CountReducer extends Reducer[Text, LongWritable, Text, LongWritable] {
override def reduce(k: Text, values: Iterable[LongWritable], context: Reducer[Text, LongWritable, Text, LongWritable]#Context): Unit = {
println("Inside reduce method..")
val valItr = values.iterator()
var sum = 0L
while (valItr.hasNext) {
sum = sum + valItr.next().get()
}
context.write(k, new LongWritable(sum))
println("done reducing.")
}
}
매퍼가 호출되고 있으며 RecordReader 제대로 로그를 기반으로 분할을 읽고 있습니다. 그러나 감속기가 호출되지 않습니다.
당신은 당신 자신의 InputFormat을 가지고 있다는 것을 무엇을 의미합니까? 어디 있니? 그리고 감축이 호출되지 않는다는 것을 무엇을 의미합니까? 어떻게 알았어? 모든 입력/출력? 카운터? 오류? 로그? – vefthym
MyInputFormat은 내 자신의 InputFormat입니다. InputFormat가 예상대로 작동하고 있는데, 매퍼의 입력 (키, 값)이 RecordReader에서 올바르게 읽혀지고 있음을 알 수 있습니다. 지도 작업에 로깅을 추가했으며 예상대로 로깅했습니다. 그러나 축소 로그는 인쇄되지 않으며 최종 상태는 false입니다. –