I am learning hadoop mapreduce framework ,i am trying to join 2 data sets have first record(Text) in the line as the Key , i tried to search in stackoverflow previous posts but nothing worked out.Here i am trying to customize the InputFormat and trying to join with the ID which is first record in the each line of data set.
내 입력 파일 1 : cdd8dde3-0349-4f0d-b97a-7ae84b687f9c, 에스더, 가너, 4071 헤이븐 레인, 오키 모스, MI 81a43486-07e1- 4b92-b92b-03d0caa87b5f, 디모데, 던컨, 753 스타디움 드라이브, 톤턴, MA맵리 듀스 하둡 - mapside은 customInputFormat를 사용하여이 데이터 세트에 합류
있는 File2 : cdd8dde3-0349-4f0d-b97a-7ae84b687f9c, 517-706-9565, EstherJGarner @ teleworm.us, Waskepter38, NoL2ieghie, MasterCard, 5305687295670850 81a43486-07e1-4b92-b92b-03d0caa87b5f, 508-307-3433, TimothyDDuncan @ ein rot.com, Conerse, Gif4Edeiba, 마스터 카드, 5265896533330445
**Driver class:**
conf.setInputFormat(CompositeInputFormat.class);
String strJoinStmt = CompositeInputFormat.compose("inner",
KeyValueLongInputFormat.class, dirEmployeesData, dirSalaryData);
conf.set("mapred.join.expr", strJoinStmt);
conf.setNumReduceTasks(0);
dirOutput.getFileSystem(conf).delete(dirOutput);
TextOutputFormat.setOutputPath(conf, dirOutput);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
**Custom RecordReader class:**
public class KeyValueLongLineRecordReader implements
RecordReader<Text, Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) ',';
private LongWritable dummyKey;
private Text innerValue;
public Class getKeyClass() {
return Text.class;
}
public Text createKey() {
return new Text("");
}
public Text createValue() {
return new Text();
}
public KeyValueLongLineRecordReader(Configuration job, FileSplit split)
throws IOException {
lineRecordReader = new LineRecordReader(job, split);
dummyKey = lineRecordReader.createKey();
innerValue = lineRecordReader.createValue();
String sepStr = job.get("key.value.separator.in.input.line", ",");
this.separator = (byte) sepStr.charAt(0);
}
public static int findSeparator(byte[] utf, int start, int length, byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1;
}
/** Read key/value pair in a line. */
public synchronized boolean next(Text key, Text value)
throws IOException {
Text tKey = key;
Text tValue = value;
byte[] line = null;
int lineLen = -1;
if (!lineRecordReader.next(dummyKey, innerValue)) {
return false;
}
else
line = innerValue.getBytes();
lineLen = innerValue.getLength();
if (line == null)
return false;
int pos = findSeparator(line, 0, lineLen, this.separator);
if (pos == -1) {
tKey.set(new String(line, 0, lineLen));
tValue.set("");
} else {
int keyLen = pos;
byte[] keyBytes = new byte[keyLen];
System.arraycopy(line, 0, keyBytes, 0, keyLen);
int valLen = lineLen - keyLen - 1;
byte[] valBytes = new byte[valLen];
System.arraycopy(line, pos + 1, valBytes, 0, valLen);
tKey.set(new String(keyBytes));
tValue.set(valBytes);
}
return true;
}
} 로그지도 입력 레코드에서
**InputFormat class:**
public class KeyValueLongInputFormat extends
FileInputFormat<Text, Text> implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
@Override
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
return compressionCodecs.getCodec(file) == null;
}
@Override
public RecordReader<Text, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit);
}
}
**Finally Mapper class:**
enter code here
public class MapperMapSideJoinLargeDatasets extends MapReduceBase implements
Mapper<Text, TupleWritable, Text, Text> {
Text txtKey = new Text("");
Text txtValue = new Text("");
@Override
public void map(Text key, TupleWritable value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
if (value.toString().length() > 0) {
txtKey.set(key.toString());
String arrEmpAttributes[] = value.get(0).toString().split(",");
String arrDeptAttributes[] = value.get(1).toString().split(",");
txtValue.set(arrEmpAttributes[1].toString() + "\t"
+ arrEmpAttributes[2].toString() + "\t"
+ arrDeptAttributes[0].toString());
output.collect(txtKey, txtValue);
}
}
는 .Someone 날이를 이해하는 데 도움이 바랍니다 0.No 출력이 HDFS에서 볼 수있다 발행물. 감사합니다
Sudha을, 나는 keyvalueTextInputFormat을 시도하지만 난 .I 그렇게하려고합니다 설정의 conf를 놓친 ... 감사합니다 – dkovi