나는 mappers가 Python으로 작성된 단순한 데이터 정리와 R을 사용하는 특정 시계열 분석을 실행하려는 감속기 부분 인 hadoop 스트리밍 작업을 작성하고 있습니다. 그러나 실제로 그렇게 쉬운 것은 아닙니다. mapreduce 작업 자체를 디버그하고 필자는 키와 값이 무엇인지 인식하고 수정되지 않은 결과를 출력하는 방식으로 내 감속기를 작성했습니다. 그러나, 그것은 여전히 작동하지 않습니다 그리고 똑같은 일을 내 파이썬 코드 문제없이 작동합니다.HadoopStreaming R in reducer Fail
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320) at
org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533) at
org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at
org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:237) at
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459) at
org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:415) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) at
org.apache.hadoop.mapred.YarnChild.main (YarnChild.java:162)
I :
는 감속기로 R을 사용하여 streaing 작업이 오류 메시지는 다음과 같습니다 때 이 스트리밍 작업을 4 개의 데이터 노드 (각각 64GB mem)의 클러스터에서 실행 중이며 총 500 개의 매퍼와 60 개의 축소자를 생성합니다. 두 언어의 내 감속기 코드가 게시됩니다. 어떤 제안이나 도움이 appreicated입니다!
이
은 reducer.py#!/usr/bin/python
import sys
delimiter = '\t'
for line in sys.stdin:
line = line.strip()
mykey, myvalue = line.split(delimiter)
print delimiter.join([mykey, myvalue])
있는 프로그램이 실패했지만 실행이는 reducer.R는
#!/usr/bin/Rscript
library(dplyr)
library(outliers)
library(zoo)
library(forecast)
#library(tsoutliers)
f <- file("stdin")
open(f, open="r")
options(warn=-1)
mydelimiter <- '\t'
sink('/dev/null')
while(length(line<-readLines(f, n=1)) > 0){
tryCatch(
{
line <- gsub('\n', '', line)
fields <- unlist(strsplit(line, split=mydelimiter))
mykey_new <- fields[1]
myvalue_new <- fields[2]
sink()
cat(mykey_new);cat(mydelimiter);cat(myvalue_new);cat('\n')
sink('/dev/null')
},
error=function(e){}
)
}
close(f)