나는 여러 스레드에 url을 전달하기 위해 hadoop을 사용하는 기본적인 웹 파서를 만들었다. 이것은 입력 파일의 끝에 도달하기 전까지는 잘 작동합니다. Hadoop은 아직 실행중인 스레드가있는 동안 자신을 선언합니다. 이로 인해 org.apache.hadoop.fs.FSError 오류가 발생합니다. java.io.IOException : Stream Closed. 스레드가 끝날 수있을만큼 스트림을 길게 열어 둘 수 있습니까? (합리적인 정확성으로 스레드가 단일 URL에 대해 소비 할 최대 시간을 예측할 수 있습니다.)하프 루프 스트림이 닫히는 것을 방지하는 방법은 무엇입니까?
을 Heres 내가 스레드에게
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private URLPile pile = new URLPile();
private MSLiteThread[] Threads = new MSLiteThread[16];
private boolean once = true;
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter) {
String url = value.toString();
StringTokenizer urls = new StringTokenizer(url);
Config.LoggerProvider = LoggerProvider.DISABLED;
System.out.println("In Mapper");
if (once) {
for (MSLiteThread thread : Threads) {
System.out.println("created thread");
thread = new MSLiteThread(pile);
thread.start();
}
once = false;
}
while (urls.hasMoreTokens()) {
try {
word.set(urls.nextToken());
String currenturl = word.toString();
pile.addUrl(currenturl, output);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
스레드를 실행하는 방법을 자신이
public void run(){
try {
sleep(3000);
while(!done()){
try {
System.out.println("in thread");
MSLiteURL tempURL = pile.getNextURL();
String currenturl = tempURL.getURL();
urlParser.parse(currenturl);
urlText.set("");
titleText.set(currenturl+urlParser.export());
System.out.println(urlText.toString()+titleText.toString());
tempURL.getOutput().collect(urlText, titleText);
pile.doneParsing();
sleep(30);
} catch (Exception e) {
pile.doneParsing();
e.printStackTrace();
continue;
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Thread done");
}
같은 URL과 urlpile의 관련 방법을 얻을 내가 추측 할 수 있듯이
public synchronized void addUrl(String url,OutputCollector<Text, Text> output) throws InterruptedException {
while(queue.size()>16){
System.out.println("queue full");
wait();
}
finishedParcing--;
queue.add(new MSLiteURL(output,url));
notifyAll();
}
private Queue<MSLiteURL> queue = new LinkedList<MSLiteURL>();
private int sent = 0;
private int finishedParcing = 0;
public synchronized MSLiteURL getNextURL() throws InterruptedException {
notifyAll();
sent++;
//System.out.println(queue.peek());
return queue.remove();
}
그 dec 실제로 내가 원하는 것을 정확하게 타임 아웃하는 경우 작업이 실패로 끝나지 만 제대로 된 것처럼 보입니다. – Chenab
아! 나는 그 질문에서 세부 사항 중 일부를 놓쳤을 수도있다. 단일 맵 작업에서 실행중인 스레드가 있고 그 맵이 입력을 처리하면 Hadoop이 종료된다고 말하는 것입니까? –
다소 차이가 있습니다. 스레드는 각 입력을 처리하기 위해 잠시 동안 기다린다. 그래서 나는 그 중 하나 이상을 가지고있다. 그러나 일단 hadoop이 완료되면 맵 태스크를 선언하면 스레드는 출력을 배치 할 위치를 갖지 않습니다. – Chenab