2014-07-23 2 views
6

정말 원시 InputStream을 사용하여 FSDataInputStream을 인스턴스화하는 방법은 무엇입니까?

Resource resource = new ClassPathResource("somefile"); 
InputStream bla = resource.getInputStream(); 
FSDataInputStream inputStream = new FSDataInputStream (bla); 

는 FS 라인에서 던지는 ... 내가 시크하고 PositionedReadable입니다 같은 inputStream을을 만드는 방법을 이해하지 않는다 : 나는 모의 객체를 할 필요가

java.lang.IllegalArgumentException: In is not an instance of Seekable or PositionedReadable 

그리고 이것은이다 나를위한 차단제.

답변

2

FSDataInputStream 생성자 FSDataInputStream.java 정의 아래와 같이은 InputStream 파라미터 solution 도움 후속하는 instanceSeekable 또는 PositionedReadable

public FSDataInputStream(InputStream in) throws IOException 
{ 
    super(in); 
    if(!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { 
     throw new IllegalArgumentException(
      "In is not an instance of Seekable or PositionedReadable"); 
    } 
} 

희망 될 것으로 예상 너 너야.

import java.io.*; 

import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.PositionedReadable; 
import org.apache.hadoop.fs.Seekable; 
import org.springframework.core.io.ClassPathResource; 
import org.springframework.core.io.Resource; 

public class SeekableTest { 

    public static void main(String[] args) throws IOException 
    { 
     Resource resource = new ClassPathResource("somefile"); 
     InputStream in = resource.getInputStream(); 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     byte buf[] = new byte[1024]; 
     int read; 

     while ((read = in.read(buf)) > 0) 
      baos.write(buf, 0, read); 

     byte data[] = baos.toByteArray(); 
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); 
     FSDataInputStream in2 = new FSDataInputStream(bais); 
    } 

    static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable { 

     public SeekableByteArrayInputStream(byte[] buf) 
     { 
      super(buf); 
     } 
     @Override 
     public long getPos() throws IOException{ 
      return pos; 
     } 

     @Override 
     public void seek(long pos) throws IOException { 
      if (mark != 0) 
      throw new IllegalStateException(); 

      reset(); 
      long skipped = skip(pos); 

      if (skipped != pos) 
      throw new IOException(); 
     } 

     @Override 
     public boolean seekToNewSource(long targetPos) throws IOException { 
      return false; 
     } 

     @Override 
     public int read(long position, byte[] buffer, int offset, int length) throws IOException { 

      if (position >= buf.length) 
      throw new IllegalArgumentException(); 
      if (position + length > buf.length) 
      throw new IllegalArgumentException(); 
      if (length > buffer.length) 
      throw new IllegalArgumentException(); 

      System.arraycopy(buf, (int) position, buffer, offset, length); 
      return length; 
     } 

     @Override 
     public void readFully(long position, byte[] buffer) throws IOException { 
      read(position, buffer, 0, buffer.length); 

     } 

     @Override 
     public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { 
      read(position, buffer, offset, length); 
     } 
    } 
} 

참조 : accumulo

관련 문제