파일 저장소에 저장하고 싶지 않은 돼지 저장소 클래스를 작성 중입니다. 일부 타사 데이터 저장소 (API 호출 부족)로 보낼 계획입니다.Hadoop 돼지 출력 디렉토리가 설정되지 않았습니다
참고 : Cloudera의 VirtualBox 이미지에서 실행하고 있습니다.
내가 id.pig 스크립트 아래에서 사용하고 내 자바 클래스 (아래)와 생성 mystore.jar 쓴 :
이store B INTO 'mylocation' USING MyStore('mynewlocation')
돼지이 스크립트를 실행하는 동안, 나는 오류가 아래 참조 : 오류 6000 : 출력 위치 확인에 실패했습니다 : 'file : //home/cloudera/test/id.out 따라야 할 추가 정보 : 출력 디렉터리가 설정되지 않았습니다.
or.apache.pig.impl.plan.VisitorException: ERROR 6000:
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95)
도와주세요!
-------------------- MyStore.java ----------------------
public class MyStore extends StoreFunc {
protected RecordWriter writer = null;
private String location = null;
public MyStore() {
location= null;
}
public MyStore (String location) {
this.location= location;
}
@Override
public OutputFormat getOutputFormat() throws IOException {
return new MyStoreOutputFormat(location);
}
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
}
@Override
public void putNext(Tuple tuple) throws IOException {
//write tuple to location
try {
writer.write(null, tuple.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
if(location!= null)
this.location= location;
}
}
-------------------- MyStoreOutputFormat.java --------------------- -
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.data.Tuple;
public class MyStoreOutputFormat extends
TextOutputFormat<WritableComparable, Tuple> {
private String location = null;
public MyStoreOutputFormat(String location) {
this.location = location;
}
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
String extension = location;
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new MyStoreRecordWriter(fileOut);
}
protected static class MyStoreRecordWriter extends
RecordWriter<WritableComparable, Tuple> {
DataOutputStream out = null;
public MyStoreRecordWriter(DataOutputStream out) {
this.out = out;
}
@Override
public void close(TaskAttemptContext taskContext) throws IOException,
InterruptedException {
// close the location
}
@Override
public void write(WritableComparable key, Tuple value)
throws IOException, InterruptedException {
// write the data to location
if (out != null) {
out.writeChars(value.toString()); // will be calling API later. let me first dump to the location!
}
}
}
}
여기에 아무 것도 없습니다.
도와주세요. 나는 그것을 급하게 필요로한다. 감사! –