2010-04-26 3 views
6

나는 하나의 매핑에 N 개의 줄을 보내기 위해 Hadoop을 사용하려고 노력 해왔다. 나는 이미 줄이 쪼개 질 필요가 없다.한 줄로 된 여러 줄의 텍스트

NLineInputFormat을 사용하려고했지만 데이터에서 각 매퍼에 한 줄에 한 줄씩 텍스트를 전송합니다. [N 번째 줄 이후에 단절].

나는 옵션을 설정하려고 노력하며 각 맵에 한 번에 한 줄에 보내기 입력의 N 라인을 취 내가 LineRecordReader을 무시 나를 추천 메일 링리스트를 발견했습니다

job.setInt("mapred.line.input.format.linespermap", 10); 

:: 그 다음으로 내부 데이터 멤버가 모두 비공개이므로 간단하지 않습니다.

NLineInputFormat의 소스를 확인한 후 LineReader를 하드 코드하므로 무시가 도움이되지 않습니다.

또한 btw Amazon EC2 MapReduce와의 호환성을 위해 Hadoop 0.18을 사용하고 있습니다.

+0

왜이 작업을 수행하려고? 어떤 의미에서 여러 줄이 하나의 레코드를 구성합니까? –

+0

나는 N 개의 랜덤 라인이 [세트로] 필요하지만, 결과적으로 살 수 있습니다. 오른쪽 감속기로 보내야합니다. – monksy

+0

질문에 대답하려면 예. – monksy

답변

7

은 사용자 입력 형식을 구현해야합니다. 당신은 또한 자신의 레코드 리더를 정의 할 수 있습니다.

불행히도 getSplits() - 메서드를 정의해야합니다. 내 의견으로는 이것은 레코드 판독기를 구현하는 것보다 더 어려울 것이다.이 방법은 입력 데이터를 청크 (chunk)하기위한 로직을 구현해야한다. "하둡 - 확실한 가이드"

는에서 다음 발췌 참조 (내가 항상 추천 좋은 책을!) :

public interface InputFormat<K, V> { 
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; 
    RecordReader<K, V> getRecordReader(InputSplit split, 
            JobConf job, 
            Reporter reporter) throws IOException; 
} 

JobClient가 getSplits()를 호출 방법 : 여기

인터페이스입니다 원하는 수의 맵 태스크 을 numSplits 인수로 전달하십시오. 이 숫자는 힌트로 취급됩니다. InputFormat이 numSplits에 지정된 숫자로 다른 수의 스플릿을 반환 할 수 있습니다. 스플릿을 계산하면 클라이언트가 작업 추적자에게 메시지를 보냅니다.이 작업자는 저장 위치를 ​​사용하여 맵 작업을 예약하여 작업 추적자에서 처리합니다.

tasktracker에서 맵 태스크는 InputFormat의 getRecordReader() 메소드에 분할을 전달하여 해당 분할에 대한 RecordReader를 가져옵니다. RecordReader는 레코드에 대한 반복자가 이상이며, 맵 태스크는 레코드 키 - 값 쌍인 을 생성하여 맵 기능에 전달합니다. (MapRunner의 코드 기준) 코드 조각 아이디어를 보여

K key = reader.createKey(); 
V value = reader.createValue(); 
while (reader.next(key, value)) { 
    mapper.map(key, value, output, reporter); 
} 
+0

그건 다소 효과가 있습니다. 그러나 그것은 정말로 그 질문에 답하지 않습니다. 18.3에서 새로운 InputFormats를 추가 할 때 문제가 있습니다. – monksy

+2

오케이 죄송합니다. 실제로 물음표가 보이지 않기 때문에 진짜 질문은 없습니다. -P 그래서 더 구체적으로 알아야 할 것이 있습니까? –

1

내가 귀하의 경우에는 위임 패턴을 따를 수 및 (필요한 방법은 다음 즉, 우선 LineRecordReader 주위에 래퍼를 구현하는 것이 생각) (또는 새 API의 nextKeyValue()를 사용하여 값을 한 줄이 아닌 N 줄의 연결로 설정합니다.

나는 ParagraphRecordReader의 모범적 인 구현을 봤는데, EOF 나 공백 행이 발생할 때까지 LineRecordReader를 사용하여 입력 데이터를 한 줄씩 읽고 연결했다. 그런 다음 쌍을 반환합니다. 여기서 value는 단락 (한 줄 대신)입니다. 또한이 ParagraphRecordReader의 ParagraphInputFormat은 표준 TextInputFormat만큼 간단합니다.

이 구현에 필요한 링크와 다음 게시물에 대한 몇 가지 단어를 찾을 수 있습니다. http://hadoop-mapreduce.blogspot.com/2011/03/little-more-complicated-recordreaders.html.

2

베스트 단순히 NLineInputFormat을 무시하고 사용자 정의 MultiLineRecordReader 대신 기본 LineReader를 구현하는 내 자신의 InputFormat을 만들어 최근에이 문제를 해결했다.

NLineInputFormat을 확장하기로 한 이유는 분할 당 정확히 N 줄의 동일한 보장이 있기 때문입니다.

이 기록 독자는 내가 수정 http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

유일한 것들에서와 같이 거의 이제 새 API를 사용 maxLineLength의 속성, 그리고 하드 코딩되는 NLineInputFormat의 setNumLinesPerSplit() INSEAD에서 읽을됩니다 NLINESTOPROCESS의 값은 촬영 (더 많은 유연성을 위해). 여기

은 결과입니다

public class MultiLineInputFormat extends NLineInputFormat{ 
    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) { 
     context.setStatus(genericSplit.toString()); 
     return new MultiLineRecordReader(); 
    } 

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{ 
     private int NLINESTOPROCESS; 
     private LineReader in; 
     private LongWritable key; 
     private Text value = new Text(); 
     private long start =0; 
     private long end =0; 
     private long pos =0; 
     private int maxLineLength; 

     @Override 
     public void close() throws IOException { 
      if (in != null) { 
       in.close(); 
      } 
     } 

     @Override 
     public LongWritable getCurrentKey() throws IOException,InterruptedException { 
      return key; 
     } 

     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (start == end) { 
       return 0.0f; 
      } 
      else { 
       return Math.min(1.0f, (pos - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException { 
      NLINESTOPROCESS = getNumLinesPerSplit(context); 
      FileSplit split = (FileSplit) genericSplit; 
      final Path file = split.getPath(); 
      Configuration conf = context.getConfiguration(); 
      this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE); 
      FileSystem fs = file.getFileSystem(conf); 
      start = split.getStart(); 
      end= start + split.getLength(); 
      boolean skipFirstLine = false; 
      FSDataInputStream filein = fs.open(split.getPath()); 

      if (start != 0){ 
       skipFirstLine = true; 
       --start; 
       filein.seek(start); 
      } 
      in = new LineReader(filein,conf); 
      if(skipFirstLine){ 
       start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); 
      } 
      this.pos = start; 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (key == null) { 
       key = new LongWritable(); 
      } 
      key.set(pos); 
      if (value == null) { 
       value = new Text(); 
      } 
      value.clear(); 
      final Text endline = new Text("\n"); 
      int newSize = 0; 
      for(int i=0;i<NLINESTOPROCESS;i++){ 
       Text v = new Text(); 
       while (pos < end) { 
        newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength)); 
        value.append(v.getBytes(),0, v.getLength()); 
        value.append(endline.getBytes(),0, endline.getLength()); 
        if (newSize == 0) { 
         break; 
        } 
        pos += newSize; 
        if (newSize < maxLineLength) { 
         break; 
        } 
       } 
      } 
      if (newSize == 0) { 
       key = null; 
       value = null; 
       return false; 
      } else { 
       return true; 
      } 
     } 
    } 

} 
관련 문제