2013-08-24 2 views
1

나는 HDFS에서 출력으로 CSV 파일을 생성하는 간단한 하둡 작업을 유지하고있어에 헤더를 추가 할 수 있습니다. 작업은 TextOutputFormat을 사용합니다. 는 내가 CSV 파일 (내가 그들 각각의 헤더를 얻는 경우에 그 문제가되지 않습니다, 일부 파일이 다른 노동자에 의해 만들어집니다 알고)로 선두 헤더 행을 추가합니다. 어떻게 이것을 수행할까요?하둡 TextOutputFormat : CSV 출력

편집 : 캐스 케이 딩은 help하지만 첫눈에 나는 새로운 프레임 워크를

편집 사용을 시작하지 않을 수 있습니다

은 그래서 출력 CSV 파일 헤더를 추가 할. 열 수 은 결정적입니다.

import java.io.IOException; 

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 

public final class Reducer extends Reducer<Text, IntWritable, Text, IntWritable> 
{ 
    private MultipleOutputs<Text, IntWritable> mos; 

    private static final Text KEY_HOLDER = new Text(); 

    private static final IntWritable VALUE_HOLDER = new IntWritable(1); 

    @Override 
    public void setup(final Context context) 
    { 
     mos = new MultipleOutputs<Text, IntWritable>(context); 
    } 

    @Override 
    public void cleanup(final Context context) throws IOException, InterruptedException 
    { 
     mos.close(); 
    } 

    @Override 
    public void reduce(final Text key, final Iterable<IntWritable> values, final Context context) 
      throws IOException, InterruptedException 
    { 
     // [... some business logic ...]   
     mos.write(KEY_HOLDER, VALUE_HOLDER, "myFileName"); 
     context.progress(); 
    } 
} 
+0

왜 downvote? 질문이 의미가 없다면 말해주십시오. – gyorgyabraham

+0

매퍼 또는 감속기에 헤더를 추가 할 수 있습니까? 실제 데이터 앞에 헤더를 출력 할 수 있습니다. – zsxwing

+0

몇 가지 코드를 표시하십시오, 귀하의 설명에서 우리는 당신을 도울 수 없습니다. 맵퍼를 @zsxwing –

답변

0

당신은 당신의 requirement.For 예에 따라 헤더를 추가하는 매퍼/감속기 클래스의 실행()를 재정의 할 수 여기 내 감속기 클래스의 골격이다. ur final o/p에 FisrtName과 LastName을 추가하려면 아래 코드를 참조하십시오.

public void run(Context context) throws IOException, InterruptedException 
    { 
     setup(context); 
     column = new Text("ColumnName") ; 
     values = new Text("FirstName" + "\t" + "LastName") ; 
     context.write(column, values); 
     try 
     { 
      while (context.nextKey()) 
      { 
      reduce(context.getCurrentKey(), context.getValues(), context); 
      Iterator<IntWritable> iter = context.getValues().iterator(); 
      if(iter instanceof ReduceContext.ValueIterator) 
      {    ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();   
      } 
      } 
     } 
     finally 
     { 
      cleanup(context); 
     } 
    }