2013-07-19 3 views
1

나는 여러 스레드에 url을 전달하기 위해 hadoop을 사용하는 기본적인 웹 파서를 만들었다. 이것은 입력 파일의 끝에 도달하기 전까지는 잘 작동합니다. Hadoop은 아직 실행중인 스레드가있는 동안 자신을 선언합니다. 이로 인해 org.apache.hadoop.fs.FSError 오류가 발생합니다. java.io.IOException : Stream Closed. 스레드가 끝날 수있을만큼 스트림을 길게 열어 둘 수 있습니까? (합리적인 정확성으로 스레드가 단일 URL에 대해 소비 할 최대 시간을 예측할 수 있습니다.)하프 루프 스트림이 닫히는 것을 방지하는 방법은 무엇입니까?

을 Heres 내가 스레드에게

public static class Map extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 
     private Text word = new Text(); 
     private URLPile pile = new URLPile(); 
     private MSLiteThread[] Threads = new MSLiteThread[16]; 
     private boolean once = true; 

     @Override 
     public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, Reporter reporter) { 

      String url = value.toString(); 
      StringTokenizer urls = new StringTokenizer(url); 
      Config.LoggerProvider = LoggerProvider.DISABLED; 
      System.out.println("In Mapper"); 
      if (once) { 
       for (MSLiteThread thread : Threads) { 
        System.out.println("created thread"); 
        thread = new MSLiteThread(pile); 
        thread.start(); 
       } 
       once = false; 
      } 

      while (urls.hasMoreTokens()) { 
       try { 
        word.set(urls.nextToken()); 
        String currenturl = word.toString(); 
        pile.addUrl(currenturl, output); 

       } catch (Exception e) { 
        e.printStackTrace(); 
        continue; 
       } 

      } 

     } 

스레드를 실행하는 방법을 자신이

public void run(){ 
      try { 
      sleep(3000); 
       while(!done()){ 
        try { 
        System.out.println("in thread"); 
         MSLiteURL tempURL = pile.getNextURL(); 
         String currenturl = tempURL.getURL(); 
         urlParser.parse(currenturl); 
         urlText.set(""); 
         titleText.set(currenturl+urlParser.export()); 
         System.out.println(urlText.toString()+titleText.toString()); 
         tempURL.getOutput().collect(urlText, titleText); 
         pile.doneParsing(); 
        sleep(30); 
        } catch (Exception e) { 
          pile.doneParsing(); 
        e.printStackTrace(); 
         continue; 
        } 
       } 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      System.out.println("Thread done"); 

     } 

같은 URL과 urlpile의 관련 방법을 얻을 내가 추측 할 수 있듯이

public synchronized void addUrl(String url,OutputCollector<Text, Text> output) throws InterruptedException { 
     while(queue.size()>16){ 
      System.out.println("queue full"); 
      wait(); 
     } 
     finishedParcing--; 
     queue.add(new MSLiteURL(output,url)); 
     notifyAll(); 
    } 

    private Queue<MSLiteURL> queue = new LinkedList<MSLiteURL>(); 
    private int sent = 0; 
    private int finishedParcing = 0; 
    public synchronized MSLiteURL getNextURL() throws InterruptedException { 

     notifyAll(); 
     sent++; 
     //System.out.println(queue.peek()); 
     return queue.remove(); 

    } 

답변

1

있습니다 아래의 주석에서 map() 함수를 사용하여 쉽게이 작업을 수행 할 수 있습니다. 유휴 스레드를 미리 만들기 위해 다음을 수행하는 것을 보았습니다. 이 한 번 초기화받을 수 있음을, 그래서,에

if (once) { 
    for (MSLiteThread thread : Threads) { 
    System.out.println("created thread"); 
    thread = new MSLiteThread(pile); 
    thread.start(); 
    } 
once = false; 
} 

public static class Map extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 
    @Override 
    public void configure(JobConf job) { 
     for (MSLiteThread thread : Threads) { 
     System.out.println("created thread"); 
     thread = new MSLiteThread(pile); 
     thread.start(); 
     } 
    } 

    @Override 
    public void map(LongWritable key, Text value, 
     OutputCollector<Text, Text> output, Reporter reporter) { 
    } 

} 

다음 코드를 이동할 수 있으며, 그 문제에 대해 더 이상 상태 확인 '번'을 필요하지 않습니다 .

또한 위와 같이 유휴 스레드를 만들 필요가 없습니다. 16 개의 유휴 스레드를 얼마나 많이 생성하는지 알지 못합니다.

가 어쨌든, 여기에 솔루션입니다

당신은 N의 배치 이상에서 URL을 처리하고 완료 될 때까지 차단할 수있는 CountDownLatch를 Read more here 같은 것을 사용할 수 있습니다 (그러나 완벽하지 않을 수 있습니다). 이는 들어오는 각 URL 레코드를 하나의 스레드에 공개하면 다음 url이 즉시 반입되고 동일한 방법으로 마지막 url을 처리 할 때 스레드가 남아 있어도 map() 함수가 반환 될 가능성이 있기 때문입니다. 처리 할 대기열에 있습니다. 필연적으로 언급 한 예외가 발생합니다.

여기 예에서 카운트 다운 래치를 사용하여 차단할 수 있습니다. 당신의 코드를 볼 수

public static class Map extends MapReduceBase implements 
       Mapper<LongWritable, Text, Text, Text> { 

      @Override 
      public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, Reporter reporter) { 

       String url = value.toString(); 
       StringTokenizer urls = new StringTokenizer(url); 
       Config.LoggerProvider = LoggerProvider.DISABLED; 

      //setting countdownlatch to urls.countTokens() to block off that many threads. 
      final CountDownLatch latch = new CountDownLatch(urls.countTokens()); 
      while (urls.hasMoreTokens()) { 
       try { 
        word.set(urls.nextToken()); 
        String currenturl = word.toString(); 
        //create thread and fire for current URL here 
        thread = new URLProcessingThread(currentURL, latch); 
        thread.start(); 
       } catch (Exception e) { 
        e.printStackTrace(); 
        continue; 
       } 

      } 

      latch.await();//wait for 16 threads to complete execution 
      //sleep here for sometime if you wish 

     } 

    } 

마지막으로, URLProcessingThread에서 즉시 URL 래치 카운터를 감소 처리로,

public class URLProcessingThread implments Runnable { 
    CountDownLatch latch; 
    URL url; 
    public URLProcessingThread(URL url, CountDownLatch latch){ 
     this.latch = latch; 
     this.url = url; 
    } 
    void run() { 
     //process url here 
     //after everything finishes decrement the latch 
     latch.countDown();//reduce count of CountDownLatch by 1 

    } 
} 

아마 문제 : pile.addUrl(currenturl, output);에서, 새 URL을 추가, 그 동안에는 모두 012 스레드 버전 객체가 16 스레드로 전달되기 때문에 모든 16 스레드가 업데이트를 얻습니다. (확실하지 않습니다.) 이 동일합니다. 귀하의 URL이 다시 처리되거나 다른 부작용이 발생할 가능성이 있습니다 (그 점에 대해서는 확실하지 않습니다).

다른 제안 : 또한

당신이

mapred.task.timeout

(기본값 = 600000ms) =의 10 분

Description: The number of milliseconds before a task will be terminated if it neither reads an input, writes an output, nor updates its status string.

당신은 추가 할 수 있습니다

/우선이를 사용하여지도 작업 제한 시간을 늘릴 수 있습니다 mapred-site.xml의 속성

+0

그 dec 실제로 내가 원하는 것을 정확하게 타임 아웃하는 경우 작업이 실패로 끝나지 만 제대로 된 것처럼 보입니다. – Chenab

+1

아! 나는 그 질문에서 세부 사항 중 일부를 놓쳤을 수도있다. 단일 맵 작업에서 실행중인 스레드가 있고 그 맵이 입력을 처리하면 Hadoop이 종료된다고 말하는 것입니까? –

+0

다소 차이가 있습니다. 스레드는 각 입력을 처리하기 위해 잠시 동안 기다린다. 그래서 나는 그 중 하나 이상을 가지고있다. 그러나 일단 hadoop이 완료되면 맵 태스크를 선언하면 스레드는 출력을 배치 할 위치를 갖지 않습니다. – Chenab

관련 문제