2013-02-06 2 views
0

파일 저장소에 저장하고 싶지 않은 돼지 저장소 클래스를 작성 중입니다. 일부 타사 데이터 저장소 (API 호출 부족)로 보낼 계획입니다.Hadoop 돼지 출력 디렉토리가 설정되지 않았습니다

참고 : Cloudera의 VirtualBox 이미지에서 실행하고 있습니다.

내가 id.pig 스크립트 아래에서 사용하고 내 자바 클래스 (아래)와 생성 mystore.jar 쓴 :

store B INTO 'mylocation' USING MyStore('mynewlocation') 

돼지이 스크립트를 실행하는 동안, 나는 오류가 아래 참조 : 오류 6000 : 출력 위치 확인에 실패했습니다 : 'file : //home/cloudera/test/id.out 따라야 할 추가 정보 : 출력 디렉터리가 설정되지 않았습니다.

or.apache.pig.impl.plan.VisitorException: ERROR 6000: 
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95) 

도와주세요!

-------------------- MyStore.java ----------------------

public class MyStore extends StoreFunc { 
    protected RecordWriter writer = null; 
    private String location = null; 


    public MyStore() { 
     location= null; 
    } 

    public MyStore (String location) { 
     this.location= location; 
    } 

    @Override 
    public OutputFormat getOutputFormat() throws IOException { 
     return new MyStoreOutputFormat(location); 
    } 

    @Override 
    public void prepareToWrite(RecordWriter writer) throws IOException { 
     this.writer = writer; 
    } 

    @Override 
    public void putNext(Tuple tuple) throws IOException { 
     //write tuple to location 

     try { 
      writer.write(null, tuple.toString()); 
     } catch (InterruptedException e) {   
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void setStoreLocation(String location, Job job) throws IOException { 
     if(location!= null) 
      this.location= location; 
    } 

} 

-------------------- MyStoreOutputFormat.java --------------------- -

import java.io.DataOutputStream; 
import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.pig.data.Tuple; 

public class MyStoreOutputFormat extends 
     TextOutputFormat<WritableComparable, Tuple> { 
    private String location = null; 

    public MyStoreOutputFormat(String location) { 

     this.location = location; 
    } 

    @Override 
    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
      TaskAttemptContext job) throws IOException, InterruptedException { 

     Configuration conf = job.getConfiguration(); 

     String extension = location; 
     Path file = getDefaultWorkFile(job, extension);  
     FileSystem fs = file.getFileSystem(conf); 

     FSDataOutputStream fileOut = fs.create(file, false); 

     return new MyStoreRecordWriter(fileOut); 
    } 

    protected static class MyStoreRecordWriter extends 
      RecordWriter<WritableComparable, Tuple> { 

     DataOutputStream out = null; 

     public MyStoreRecordWriter(DataOutputStream out) { 
      this.out = out; 
     } 

     @Override 
     public void close(TaskAttemptContext taskContext) throws IOException, 
       InterruptedException { 
      // close the location 
     } 

     @Override 
     public void write(WritableComparable key, Tuple value) 
       throws IOException, InterruptedException { 

      // write the data to location 
      if (out != null) { 
       out.writeChars(value.toString()); // will be calling API later. let me first dump to the location! 
      } 
     } 

    } 
} 

여기에 아무 것도 없습니다.

+0

도와주세요. 나는 그것을 급하게 필요로한다. 감사! –

답변

1

첫째, 난 당신이 오히려 일을 계획 할 때는 setStoreLocation 방법에서 지역 변수 '위치'에 대한 과제를 호출

변수 인스턴스 (instance)보다, 위치 값을 저장하는 작업 구성을 사용해야한다고 생각 그러나 실행 단계까지는 getOutputFormat 호출이 이루어지지 않을 수 있습니다.이 단계에서는 위치 변수가 더 이상 설정되지 않을 수 있습니다 (클래스의 새 인스턴스가 생성되었을 수 있음).

@Override 
public void setStoreLocation(String location, Job job) throws IOException { 
    job.getConfiguration().set("mapred.textoutputformat.separator", ""); 
    FileOutputFormat.setOutputPath(job, new Path(location)); 

    if("true".equals(job.getConfiguration().get("output.compression.enabled"))) { 
     FileOutputFormat.setCompressOutput(job, true); 
     String codec = job.getConfiguration().get("output.compression.codec"); 
     try { 
      FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(codec)); 
     } catch (ClassNotFoundException e) { 
      throw new RuntimeException("Class not found: " + codec); 
     } 
    } else { 
     // This makes it so that storing to a directory ending with ".gz" or ".bz2" works. 
     setCompression(new Path(location), job); 
    } 
} 

그래서 난 당신이 작업 변수의 위치를 ​​저장해야한다고 생각 : 당신이 PigStorage.setStoreLocation의 소스를 보면

, 당신은 그들이 작업 구성 (2 선)의 위치를 ​​저장하는 것을주의해야 :

@Override 
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
     TaskAttemptContext job) throws IOException, InterruptedException { 

    Configuration conf = job.getConfiguration(); 

    String extension = conf.get("mylocation"); 
    Path file = getDefaultWorkFile(job, extension);  
    FileSystem fs = file.getFileSystem(conf); 

    FSDataOutputStream fileOut = fs.create(file, false); 

    return new MyStoreRecordWriter(fileOut); 
} 
: 사용자 정의 출력 형식은 다음 createRecordReader 방법으로 추출 할 수 있습니다
@Override 
public void setStoreLocation(String location, Job job) throws IOException { 
    if(location!= null) 
     job.getConfiguration().set("mylocation", location); 
} 

마지막으로 (실제 오류의 원인이 될 수 있습니다.) 출력 형식이 TextOutputFormat을 확장하고 레코드 라이터에 getDefaultWorkFile 메서드를 사용합니다.이 메서드는 파일을 HDFS로 출력하는 위치를 알아야합니다. setStoreLocation 메서드에서 FileOutputFormat.setOutputPath(job, new Path(location));을 호출하지 않았습니다 (이전에 붙여 넣은 PigStorage.setStoreLocation 메서드 참조). 따라서 기본 작업 파일을 만들 위치를 알 수 없기 때문에 오류가 발생합니다.

+0

크리스에게 감사드립니다. "FileOutputFormat.setOutputPath (job, new Path (location));"가 누락되었습니다. " 요구. ur 입력에 따라 코드를 변경했습니다. –

관련 문제