2014-12-15 1 views
-1
I am learning hadoop mapreduce framework ,i am trying to join 2 data sets have first record(Text) in the line as the Key , i tried to search in stackoverflow previous posts but nothing worked out.Here i am trying to customize the InputFormat and trying to join with the ID which is first record in the each line of data set. 

내 입력 파일 1 : cdd8dde3-0349-4f0d-b97a-7ae84b687f9c, 에스더, 가너, 4071 헤이븐 레인, 오키 모스, MI 81a43486-07e1- 4b92-b92b-03d0caa87b5f, 디모데, 던컨, 753 스타디움 드라이브, 톤턴, MA맵리 듀스 하둡 - mapside은 customInputFormat를 사용하여이 데이터 세트에 합류

있는 File2 : cdd8dde3-0349-4f0d-b97a-7ae84b687f9c, 517-706-9565, EstherJGarner @ teleworm.us, Waskepter38, NoL2ieghie, MasterCard, 5305687295670850 81a43486-07e1-4b92-b92b-03d0caa87b5f, 508-307-3433, TimothyDDuncan @ ein rot.com, Conerse, Gif4Edeiba, 마스터 카드, 5265896533330445

**Driver class:** 
     conf.setInputFormat(CompositeInputFormat.class); 
     String strJoinStmt = CompositeInputFormat.compose("inner", 
     KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData); 
     conf.set("mapred.join.expr", strJoinStmt); 
     conf.setNumReduceTasks(0); 
     dirOutput.getFileSystem(conf).delete(dirOutput); 
     TextOutputFormat.setOutputPath(conf, dirOutput); 
     conf.setOutputKeyClass(Text.class); 
     conf.setOutputValueClass(Text.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

**Custom RecordReader class:** 

public class KeyValueLongLineRecordReader implements 
RecordReader<Text, Text> { 
private final LineRecordReader lineRecordReader; 
private byte separator = (byte) ','; 
private LongWritable dummyKey; 
private Text innerValue; 
public Class getKeyClass() { 
return Text.class; 
} 
public Text createKey() { 
return new Text(""); 
} 
public Text createValue() { 
return new Text(); 
} 
public KeyValueLongLineRecordReader(Configuration job, FileSplit split) 
throws IOException { 
lineRecordReader = new LineRecordReader(job, split); 
dummyKey = lineRecordReader.createKey(); 
innerValue = lineRecordReader.createValue(); 
String sepStr = job.get("key.value.separator.in.input.line", ","); 
this.separator = (byte) sepStr.charAt(0); 
} 
public static int findSeparator(byte[] utf, int start, int length, byte sep) { 
for (int i = start; i < (start + length); i++) { 
if (utf[i] == sep) { 
return i; 
} 
} 
return -1; 
} 
/** Read key/value pair in a line. */ 
public synchronized boolean next(Text key, Text value) 
throws IOException { 
Text tKey = key; 
Text tValue = value; 
byte[] line = null; 
int lineLen = -1; 
if (!lineRecordReader.next(dummyKey, innerValue)) { 
    return false; 
} 
else 
    line = innerValue.getBytes(); 
lineLen = innerValue.getLength(); 

if (line == null) 
return false; 
int pos = findSeparator(line, 0, lineLen, this.separator); 
if (pos == -1) { 
tKey.set(new String(line, 0, lineLen)); 
tValue.set(""); 
} else { 
int keyLen = pos; 
byte[] keyBytes = new byte[keyLen]; 
System.arraycopy(line, 0, keyBytes, 0, keyLen); 
int valLen = lineLen - keyLen - 1; 
byte[] valBytes = new byte[valLen]; 
System.arraycopy(line, pos + 1, valBytes, 0, valLen); 
tKey.set(new String(keyBytes)); 
tValue.set(valBytes); 
} 
return true; 
} 

} 로그지도 입력 레코드에서

**InputFormat class:** 

public class KeyValueLongInputFormat extends 
FileInputFormat<Text, Text> implements JobConfigurable { 
private CompressionCodecFactory compressionCodecs = null; 
@Override 
public void configure(JobConf conf) { 
compressionCodecs = new CompressionCodecFactory(conf); 
} 
protected boolean isSplitable(FileSystem fs, Path file) { 
return compressionCodecs.getCodec(file) == null; 
} 
@Override 
public RecordReader<Text, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter) 
throws IOException { 
reporter.setStatus(genericSplit.toString()); 
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit); 
} 
} 

**Finally Mapper class:**  
    enter code here 

public class MapperMapSideJoinLargeDatasets extends MapReduceBase implements 
Mapper<Text, TupleWritable, Text, Text> { 
Text txtKey = new Text(""); 
Text txtValue = new Text(""); 
@Override 
public void map(Text key, TupleWritable value, 
OutputCollector<Text, Text> output, Reporter reporter) 
throws IOException { 
if (value.toString().length() > 0) { 
txtKey.set(key.toString()); 
String arrEmpAttributes[] = value.get(0).toString().split(","); 
String arrDeptAttributes[] = value.get(1).toString().split(","); 
txtValue.set(arrEmpAttributes[1].toString() + "\t" 
+ arrEmpAttributes[2].toString() + "\t" 
+ arrDeptAttributes[0].toString()); 
output.collect(txtKey, txtValue); 
} 
} 

는 .Someone 날이를 이해하는 데 도움이 바랍니다 0.No 출력이 HDFS에서 볼 수있다 발행물. 감사합니다

답변

2

문제는 드라이버 서식 파일에 strJoinStmt 속성에 KeyValueLongInputFormat.class로 언급 된 InputFormat에서 실제로 LongWritable 키와 텍스트 값으로 작동하는 문제입니다.

대신 KeyValueTextInputFormat.class은 키와 값이 모두 텍스트 유형 인 경우 사용할 수 있습니다.

입력이 쉼표로 구분 된 파일이기 때문에 Driver 클래스에서 다음과 같이 작업 구성 객체의 속성을 설정하여 쉼표로 사용자 정의 구분 기호를 지정하십시오. 자세한 내용은
conf.set("key.value.separator.in.input.line",",");

은 아래의 예를 확인하십시오 :
https://github.com/sudha-pn/CompositeInputFormat

+0

Sudha을, 나는 keyvalueTextInputFormat을 시도하지만 난 .I 그렇게하려고합니다 설정의 conf를 놓친 ... 감사합니다 – dkovi

관련 문제