2012-12-07 5 views
0

저는 Hadoop과 짧은 시간에 작업 중이며 Java로 조인을 구현하려고합니다. 지도 측 또는 축소 측은 문제가되지 않습니다. 구현하기가 쉬워 졌기 때문에 Reduce-Side Join을했습니다. 나는 Java가 조인, 집계 등에 최상의 선택이 아니며, 내가 이미 한 하이브 (Hive) 또는 돼지 (Pig)를 더 잘 선택해야한다는 것을 알고있다. 그러나 저는 연구 프로젝트를 진행하고 있으며, 비교를 제공하기 위해이 3 개 언어를 모두 사용해야합니다.자바로 Hadoop에 가입하십시오.

어쨌든, 나는 구조가 다른 두 개의 입력 파일을 가지고 있습니다. 하나는 키 값이고 다른 하나는 키 | 값 1, 값 2, 값 3, 값 4입니다.

  • 입력 1 : 각 입력 파일에서 하나 개의 기록은 다음과 같습니다 1;2010-01-10T00:00:01
  • 입력 2를 : 나는 하둡 Definitve 가이드 북의 예를 따라,하지만 나를 위해 작동하지 않았다

1;23;Blue;2010-01-11T00:00:01;9999-12-31T23:59:59 . 내 Java 파일을 여기에 게시하므로 잘못된 것을 확인할 수 있습니다.


public class Joiner extends Configured implements Tool { 

public static final String DATA_SEPERATOR =";";          //Define the symbol for seperating the output data 
public static final String NUMBER_OF_REDUCER = "1";         //Define the number of the used reducer jobs 
public static final String COMPRESS_MAP_OUTPUT = "true";        //if the output from the mapping process should be compressed, set COMPRESS_MAP_OUTPUT = "true" (if not set it to "false") 
public static final String 
      USED_COMPRESSION_CODEC = "org.apache.hadoop.io.compress.SnappyCodec"; //set the used codec for the data compression 
public static final boolean JOB_RUNNING_LOCAL = true;        //if you run the Hadoop job on your local machine, you have to set JOB_RUNNING_LOCAL = true 
                        //if you run the Hadoop job on the Telefonica Cloud, you have to set JOB_RUNNING_LOCAL = false 
public static final String OUTPUT_PATH = "/home/hduser"; //set the folder, where the output is saved. Only needed, if JOB_RUNNING_LOCAL = false 



public static class KeyPartitioner extends Partitioner<TextPair, Text> { 
    @Override 
    public int getPartition(/*[*/TextPair key/*]*/, Text value, int numPartitions) { 
     System.out.println("numPartitions: " + numPartitions); 
      return (/*[*/key.getFirst().hashCode()/*]*/ & Integer.MAX_VALUE) % numPartitions; 
     } 
} 

private static Configuration hadoopconfig() { 
    Configuration conf = new Configuration(); 

    conf.set("mapred.textoutputformat.separator", DATA_SEPERATOR); 
    conf.set("mapred.compress.map.output", COMPRESS_MAP_OUTPUT); 
    //conf.set("mapred.map.output.compression.codec", USED_COMPRESSION_CODEC); 
    conf.set("mapred.reduce.tasks", NUMBER_OF_REDUCER); 
    return conf; 
} 

@Override 
public int run(String[] args) throws Exception { 
    // TODO Auto-generated method stub 
    if ((args.length != 3) && (JOB_RUNNING_LOCAL)) { 

     System.err.println("Usage: Lookup <CDR-inputPath> <Attribute-inputPath> <outputPath>"); 
     System.exit(2); 
    } 

    //starting the Hadoop job 
    Configuration conf = hadoopconfig(); 
    Job job = new Job(conf, "Join cdrs and attributes"); 
    job.setJarByClass(Joiner.class); 

    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, CDRMapper.class); 
    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AttributeMapper.class); 
    //FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //expecting a folder instead of a file 

    if(JOB_RUNNING_LOCAL) 
     FileOutputFormat.setOutputPath(job, new Path(args[2])); 
    else 
     FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); 


    job.setPartitionerClass(KeyPartitioner.class); 
    job.setGroupingComparatorClass(TextPair.FirstComparator.class); 
    job.setReducerClass(LookupReducer.class); 

    job.setMapOutputKeyClass(TextPair.class); 
    job.setMapOutputValueClass(Text.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    return job.waitForCompletion(true) ? 0 : 1; 
} 

public static void main(String[] args) throws Exception { 

    int exitCode = ToolRunner.run(new Joiner(), args); 
    System.exit(exitCode); 

} 
} 

public class Attribute { 

public static final String ATT_TAG = "1"; 


public static class AttributeMapper 
extends Mapper<LongWritable, Text, TextPair, Text>{ 

    private static Text values = new Text(); 
    //private Object output = new Text(); 

    @Override 
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     //partition the input line by the separator semicolon 
     String[] attributes = value.toString().split(";"); 
     String valuesInString = ""; 

     if(attributes.length != 5) 
      System.err.println("Input column number not correct. Expected 5, provided " + attributes.length 
        + "\n" + "Check the input file"); 
     if(attributes.length == 5) 
     { 
      //setting the values with the input values read above 
      valuesInString = attributes[1] + ";" + attributes[2] + ";" + attributes[3] + ";" + attributes[4]; 
      values.set(valuesInString); 
     //writing out the key and value pair 
     context.write(new TextPair(new Text(String.valueOf(attributes[0])), new Text(ATT_TAG)), values); 
      } 
    } 
} 

} 

public class CDR { 


public static final String CDR_TAG = "0"; 

public static class CDRMapper 
    extends Mapper<LongWritable, Text, TextPair, Text>{ 

     private static Text values = new Text(); 
     private Object output = new Text(); 

    @Override 
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     //partition the input line by the separator semicolon 
    String[] cdr = value.toString().split(";"); 

    //setting the values with the input values read above 
    values.set(cdr[1]); 
    //output = CDR_TAG + cdr[1]; 

    //writing out the key and value pair 
    context.write(new TextPair(new Text(String.valueOf(cdr[0])), new Text(CDR_TAG)), values); 
     } 


    } 

} 

내가 기쁠

public class LookupReducer extends Reducer<TextPair,Text,Text,Text> { 


private String result = ""; 
private String msisdn; 
private String attribute, product; 
private long trans_dt_long, start_dt_long, end_dt_long; 
private String trans_dt, start_dt, end_dt; 

@Override 
public void reduce(TextPair key, Iterable<Text> values, Context context) 
     throws IOException, InterruptedException { 

    context.progress(); 
    //value without key to remember 

    Iterator<Text> iter = values.iterator(); 

for (Text val : values) { 

Text recordNoKey = val;  //new Text(iter.next()); 

String valSplitted[] = recordNoKey.toString().split(";"); 

//if the input is coming from CDR set corresponding values 

    if(key.getSecond().toString().equals(CDR.CDR_TAG)) 
    { 
     trans_dt = recordNoKey.toString(); 
     trans_dt_long = dateToLong(recordNoKey.toString()); 
    } 
    //if the input is coming from Attributes set corresponding values 
    else if(key.getSecond().toString().equals(Attribute.ATT_TAG)) 
    { 
     attribute = valSplitted[0]; 
     product = valSplitted[1]; 
     start_dt = valSplitted[2]; 
     start_dt_long = dateToLong(valSplitted[2]); 
     end_dt = valSplitted[3]; 
     end_dt_long = dateToLong(valSplitted[3]);; 
    } 

     Text record = val; //iter.next(); 
     //System.out.println("RECORD: " + record); 
     Text outValue = new Text(recordNoKey.toString() + ";" + record.toString());  

if(start_dt_long < trans_dt_long && trans_dt_long < end_dt_long) 
     { 
    //concat output columns 
     result = attribute + ";" + product + ";" + trans_dt;  

    context.write(key.getFirst(), new Text(result)); 
    System.out.println("KEY: " + key); 
     } 
    } 
} 

private static long dateToLong(String date){ 
    DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    Date parsedDate = null; 
    try { 
     parsedDate = formatter.parse(date); 
    } catch (ParseException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
    long dateInLong = parsedDate.getTime(); 

    return dateInLong; 

} 

public static class TextPair implements WritableComparable<TextPair> { 

    private Text first; 
    private Text second; 

    public TextPair(){ 
     set(new Text(), new Text()); 
    } 

    public TextPair(String first, String second){ 
     set(new Text(first), new Text(second)); 
    } 

    public TextPair(Text first, Text second){ 
     set(first, second); 
    } 

    public void set(Text first, Text second){ 
     this.first = first; 
     this.second = second; 
    } 

    public Text getFirst() { 
     return first; 
    } 

    public void setFirst(Text first) { 
     this.first = first; 
    } 

    public Text getSecond() { 
     return second; 
    } 

    public void setSecond(Text second) { 
     this.second = second; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     // TODO Auto-generated method stub 
     first.readFields(in); 
     second.readFields(in); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     // TODO Auto-generated method stub 
     first.write(out); 
     second.write(out); 
    } 

    @Override 
    public int hashCode(){ 
     return first.hashCode() * 163 + second.hashCode(); 
    } 

    @Override 
    public boolean equals(Object o){ 
     if(o instanceof TextPair) 
     { 
      TextPair tp = (TextPair) o; 
      return first.equals(tp.first) && second.equals(tp.second); 
     } 
     return false; 
    } 

    @Override 
    public String toString(){ 
     return first + ";" + second; 
    } 

    @Override 
    public int compareTo(TextPair tp) { 
     // TODO Auto-generated method stub 
     int cmp = first.compareTo(tp.first); 
     if(cmp != 0) 
      return cmp; 
     return second.compareTo(tp.second); 
    } 


    public static class FirstComparator extends WritableComparator { 

     protected FirstComparator(){ 
      super(TextPair.class, true); 
     } 

     @Override 
     public int compare(WritableComparable comp1, WritableComparable comp2){ 
      TextPair pair1 = (TextPair) comp1; 
      TextPair pair2 = (TextPair) comp2; 
      int cmp = pair1.getFirst().compareTo(pair2.getFirst()); 

      if(cmp != 0) 
       return cmp; 

      return -pair1.getSecond().compareTo(pair2.getSecond()); 
     } 
    } 

    public static class GroupComparator extends WritableComparator { 
     protected GroupComparator() 
     { 
      super(TextPair.class, true); 
     } 

     @Override 
     public int compare(WritableComparable comp1, WritableComparable comp2) 
     { 
      TextPair pair1 = (TextPair) comp1; 
      TextPair pair2 = (TextPair) comp2; 

      return pair1.compareTo(pair2); 
     } 
    } 

} 

} 

경우 사람 수 적어도 포스트 튜토리얼에 대한 링크 또는 조인 기능이 구현되는 간단한 예. 나는 많은 것을 수색했으나, 코드가 완전하지 않았거나 충분한 설명이 없었다.

+0

위의 각 형식에서 하나의 레코드가 주어지면 무엇을 만들고 싶습니까? 형식은 무엇입니까? –

답변

2

솔직히 말해서 코드가 무엇을 하려는지는 모르겠지만 다른 방법으로 사용하고 사용하고있는 API에 익숙하지 않기 때문일 수 있습니다. 다음과 같이

나는 처음부터 시작할 것 :

  • 이 파일 중 하나를 읽어 매퍼를 만듭니다. 읽어내는 각 행에 대해서, 문맥에 키값 페어를 기입합니다. 키는 키에서 작성된 텍스트이고 값은 "1"을 전체 입력 레코드와 연결하여 작성된 또 다른 텍스트입니다.
  • 다른 파일에 대한 다른 매퍼를 만듭니다. 이 매퍼는 첫 번째 매퍼와 같이 작동하지만 값은 "2"를 전체 입력 레코드와 연결하여 만든 텍스트입니다.
  • 결합을 수행하는 감속기를 작성하십시오. reduce() 메소드는 특정 키에 대해 작성된 모든 레코드를 가져옵니다. 값이 "1"또는 "2"로 시작하는지 확인하여 어느 입력 파일 (따라서 레코드의 데이터 형식)을 알 수 있습니다. 한 레코드 유형, 다른 레코드 유형 또는 두 레코드 유형이 있는지 여부를 알게되면 하나 또는 두 개의 레코드에서 데이터를 병합하는 데 필요한 논리를 작성할 수 있습니다.

그런데 MultipleInputs 클래스를 사용하면 작업/드라이버 클래스에 둘 이상의 매퍼를 구성 할 수 있습니다.