2015-02-06 2 views
0

나는 PipedInputStreamPipedOutputStream에 관련된 문제가 있고, 나는 이러한 클래스의 디자인을 잘못 이해 한 경우 모르거나 내가 PipedInputStream을 이해 지금까지 PipedInputStream.javaPipedInputStream.java 디자인에 버그가 있습니까? 아니면 그 디자인을 오해 했습니까?

에서 자바 코드에 버그가있는 경우와없는 PipedOutputStream은 두 개의 서로 다른 스레드간에 스트림을 생성하는 데 사용할 수있는 메커니즘을 구현합니다. 제작자 스레드는 PipedOutputStream에 뭔가를 쓰고 소비자 스레드는 연결된 PipedInputStream에서 스레드를 읽습니다. 버퍼링 된 통신을 허용하는 내부 버퍼가 있습니다. 기본적으로이 버퍼의 크기는 1024 바이트입니다.

소비자 스레드가 PipedInputStream을 읽고 버퍼가 비어 있으면 스레드가 대기합니다. 제작자 스레드가 PipedOutputStream을 쓰고 버퍼가 가득차면 스레드도 대기합니다.

PipedInputStream은 내부 버퍼를 유지합니다. PipedOutputStreamPipedInputStream에 선언 된 함수 만 사용합니다.

모든 PipedInputStream의 내부 (원) 버퍼와 관련된 필드합니다 (byte [] buffer 상기 int inint outPipedInputStream.java 볼 수 -as)는 모든 protected 선언된다. PipedInputStream은 2 개의 다른 PipedInputStream.receive 기능을 사용하여 데이터를 주입합니다.

모든 입력 스트림에는 read()read(byte [], int, int)의 두 가지 읽기 버전이 있습니다. 모든 OutputStream에는 2 개의 기입 버젼 write(byte b)write(byte [], int, int)가 있습니다. 모두 단일 바이트 버전과 다중 바이트 버전이 있습니다. PipedInputStreamPipedOutputStream에는 이러한 기능이 있습니다.

PipedOutputStream.write(byte b)PipedInputStream에 연결된 바이트를 삽입하는 기능을 사용합니다 (PipedInputStream.receive(int b)). 이 수신 기능은 protected으로 선언되어 있으므로이 기능을 오버로드하여 PipedOutputStream에서 PipedInputStream으로 바이트 주입을 차단할 수 있습니다.

PipedOutputStream.write(byte b[], int offset, int len)은 에 연결된 바이트 배열을 삽입하기 위해 PipedInputStream.receive(byte [] b, int offset, int len)을 사용합니다.

그리고 내 문제는 다음과 같습니다 receive(int)PipedInputStream.receive(byte [], int, int), 멀티 바이트 대응 부분은 receive(int)으로 보호되지 않았다고 선언되지 않습니다. 기본 가시성 (패키지 가시성)이 있습니다. 따라서이 기능에 과부하가 걸리지 않고 PipedOutputStream에서 PipedInputStream으로 멀티 바이트 주입을 차단할 수 없습니다.

PipedInputStream.write(byte b[], int offset, int len)PipedInputStream.write(int b)을 호출하지 않습니다. 따라서 을 사용할 때 receive(int)을 오버로드해도 아무런 영향이 없습니다.

내가 이해하는 한 PipedInputStream.receive(byte[], int, int)protected이어야하며 PipedInputStream.receive(int)이어야합니다.선언은 :

protected synchronized void receive(byte [] b, int off, int len) throws IOException {

PipeReaderPipeWriter (PipedInputStreamPipedOutputStream의 문자 버전) 버퍼 필드를 선언하고는 보호되지 않는 (패키지 가시성 방법 나타날

synchronized void receive(byte [] b, int off, int len) throws IOException {

같아야 !). Java의 리더/라이터 (JDK1. 1 이후)는, InputStream/OutputStream (JDK1.

그것은 PipedInputStream의 디자인에 실제 버그인가?, 이전 자바 버전에서 상속 PipedInputStream 디자인 사고의 protected visibity은?, 나 아니면 완전히 손실 있어요?

미리 감사드립니다.

PD : 다음은이 문제가 나타나는 예입니다. 이 프로그램은 컴파일되지 않습니다 (언급 된 가시성 문제로). 이 예제에서는 필요한 경우 버퍼의 자동 확장을 허용하는 PipedInputStream 하위 클래스를 만들려고합니다. 따라서 버퍼가 비어 있고 누군가가 스레드를 읽으려고하면 대기합니다. 그러나 버퍼가 꽉 차서 누군가가 (연결된 PipedOutputStream을 사용하여) 쓰기를 시도하면 스레드는 대기하지 않지만 버퍼가 더 많은 바이트를 저장하도록 확장됩니다. 소비자는 기다리지 만 생산자는 기다리지 않습니다.

이 예제의 기능 구현은 있지만 PipedInputStream 하위 클래스로 구현할 수 없는지 알고 싶습니다.

import java.io.IOException; 
import java.io.PipedInputStream; 
import java.io.PipedOutputStream; 

public class ExtensiblePipedInputStream extends PipedInputStream { 

/** 
* Default extensions' size 
*/ 
private static final int DEFAULT_EXTENSION = 1024; 

/** 
* The current extensions' size 
*/ 
protected int extension = DEFAULT_EXTENSION; 


// the same constructors than the super class (PipedInputStream)... 

public ExtensiblePipedInputStream() { 
    super(); 
} 

public ExtensiblePipedInputStream(PipedOutputStream src) throws IOException { 
    super(src); 
} 

public ExtensiblePipedInputStream(int pipeSize) { 
    super(pipeSize); 
} 

public ExtensiblePipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { 
    super(src, pipeSize); 
} 

/** 
* This function ensures the specified capacity in the internal buffer. If 
* the specified capacity is less or equals than the current internal buffer 
* capacity it does nothing. If the specified capacity is greater than the 
* current one, then the buffer is extended to: at least allocate the new 
* capacity. This function extends the buffer using multiple factors of 
* extension size. 
* 
* @param capacity The capacity 
* @throws IOException if an IO error occurs 
* @throws IllegalArgumentException if capacity is negative 
*/ 
public synchronized void ensureCapacity(int capacity) throws IOException, IllegalArgumentException { 
    if (capacity < 0) { 
     throw new IllegalArgumentException("capacity < 0"); 
    } 

    if (capacity > buffer.length) { 
     int additionalSpace = capacity - buffer.length; 
     final int modExtension = additionalSpace % extension; 
     additionalSpace += (modExtension == 0) ? 0 : extension - modExtension; 

     setCapacity(buffer.length + additionalSpace); 
    } 
} 

/** 
* Returns capacity of the internal buffer (the buffer's size). 
* 
* @return The capacity or the internal buffer 
*/ 
public synchronized int getCapacity() { 
    return buffer.length; 
} 

/** 
* Returns the size of the next buffer's extensions. 
* 
* @return The size of the next buffer's extensions. 
*/ 
public synchronized int getExtension() { 
    return extension; 
} 

/** 
* This function extends and invokes PipedInputStream.receive. It only avoid 
* writers block by extending the internal buffer when needed. 
* 
* @param b The byte to be received 
* @throws IOException if an IO error occurs 
*/ 
@Override 
protected synchronized void receive(int b) throws IOException { 
    ensureCapacity(available() + 1); 
    super.receive(b); 
} 

/** 
* MY PROBLEM!!!! 
* 
* this function is not posible! 
* 
* PipedInputStream.receive(byte[], int, int) 
* has not protected visibility, it has package visibility!!!!! 
* 
* Why? 
* 
* @param b The array of bytes to be received 
* @param off The offset in the array of bytes. 
* @param len The number of bytes to be received. 
* @throws IOException If an IO error occurs 
*/ 
@Override 
protected synchronized void receive(byte b[], int off, int len) throws IOException { 
    ensureCapacity(available() + len); 
    super.receive(b, off, len); 
} 

/** 
* Changes the size of the internal buffer. The new size must be greater or 
* equals than the number of bytes stored in the internal buffer 
* (available()) 
* 
* @param capacity The new size of the internal buffer. 
* @throws IOException If an IO error occurs. 
* @throws IllegalArgumentException If capacity < available() 
*/ 
public synchronized void setCapacity(int capacity) throws IOException, IllegalArgumentException { 
    final int available = available(); 

    if (capacity < available) { 
     throw new IllegalArgumentException("capacity < available"); 
    } 

    final byte[] nbuf = new byte[capacity]; 
    if (available > 0) { 
     final int firstTransferAmount = Math.min(available, buffer.length - out); 
     System.arraycopy(buffer, out, nbuf, 0, firstTransferAmount); 
     if (in > 0) { 
      System.arraycopy(buffer, 0, nbuf, firstTransferAmount, in); 
     } 
     out = 0; 
     in = (available == capacity) ? 0 : available; 
    } 

    buffer = nbuf; 
} 

/** 
* Set the size of future extensions. It must be a value greater than 0. 
* 
* @param extension The size of future extensions. 
* @throws IllegalArgumentException If extension <= 0 
*/ 
public synchronized void setExtension(int extension) throws IllegalArgumentException { 
    if (extension <= 0) { 
     throw new IllegalArgumentException("extension <= 0"); 
    } 

    this.extension = extension; 
} 

} 
+0

왜 파이프 스트림을 확장하려고합니까? 당신이 그것을 자랄 필요가 없도록 충분히 커 보지 않으시겠습니까? –

+0

너는 이것에 2 시간을 보냈다, 이것은 최저 임금에 대략 $ 20 일 것입니다. 정말로 필요하지 않은 버퍼에 128KB를 "낭비"하기로 결정했다면. 그것은 0.2 센트의 메모리를 낭비 할 것입니다.) 이들 중 수백만 개를 사용할 계획이라고 말해주십시오.) –

+1

이 클래스는 확장을 위해 설계되지 않았고 자바 초기에는 디자이너가 이러한 우려에 민감하지 않으므로 불필요한 '보호 된'액세스 수준이 . 하위 클래스에 대한 계약서를 명확하게 문서화 한 클래스를 제외하고는 JDK 클래스를 확장하려고하지 마십시오. –

답변

2

나는 파이프의 목적을 잘못 이해했다고 생각합니다. 파이프는 입니다. 두 스레드 간의 통신이 차단되었습니다. 작성자의 속도는 파이프가 메모리 사용에 효율적이지만 처리 속도가 가장 느린 구성 요소의 속도로 제한된다는 독자의 속도로 제한 될 것으로 예상됩니다.

비동기 쓰기를 원할 경우, 큐를 사용해야합니다. java.util.concurrent 패키지의 버전 중 하나가 적합해야합니다.

+0

PipedInputStream에는 읽기/쓰기 작업의 분리가 제한되어 있습니다. 그러나 시간 제한이 일정해야 함은 언급하지 않습니다. PipedInputStream.java 코드는 파이프 크기가 ​​"int protected static final int PIPE_SIZE"선언에서 변경 될 수 있다고 말하면서 다음과 같은 주석이 있습니다 : 이것은 파이프 크기가 ​​변경되기 전에 상수였습니다. 이 필드는 이전 버전과의 호환성을 위해 계속 유지됩니다. – acesargl

0

특정 유스 케이스를 지원하지 않는 API 디자인 (특히 서브 클래 싱을 포함하는 API 디자인)은 "제한된"또는 "제한된"것으로 설명 될 수 있지만 버그라고 부르는 경우가 있습니다.

당신이 구현하려는 것이 가치 있다고 생각한다면, 당신의 분석은 정확하다고 생각합니다. 확장 버퍼를 사용하여 파이프 된 입출력 스트림의 버전을 생성하는 것은 수정자가 다른 경우보다 훨씬 효과적입니다.

하지만 ... 불가능하지 않습니다. 보다 큰 규모의 메소드 오버라이드 (예 : readwrite 메소드)를 통해 원하는대로 달성 할 수도 있고 처음부터 간단히 시작할 수도 있습니다.


다른 점은이 (StackOverflow)가 Java "버그"를보고하거나 향상 요청을하는 적절한 장소가 아니라는 것입니다. 이 점에 대해 정말로 생각한다면 OpenJDK 팀에 제안 된 변경 사항을 적용하여 패치를 개발하여 제출하십시오. 그러나 호환성 (기존 고객 서브 클래스 인 PipedInputStreamPipedOutputStream과의 호환성 포함)을 위반하는 항목은 거부 될 수 있습니다.이것은 이것이 그들이 아직 "고정"되지 않은 이유를 설명 할 수 있습니다!


1 - 배리가 지적 하듯이, 파이프로 연결된 스트림은 디자인에 의해 차단된다. 비 차단 버전은 잠재적으로 위험합니다.

+0

답장을 보내 주신 스티븐 감사합니다. 그러나 PipedInputStream의 하위 클래스가 PipedOutputStream?에 의해 수행 된 receive (byte [], int, int) 호출을 인터셉트 할 수 있습니까? 쓰기 작업은 PipedInputStream에 있지 않으므로 PipedInputStream의 하위 클래스에서 오버로드 할 수 없습니다. PipedInputStream의 하위 클래스 Y를 처리하기 위해 PipedOutputStream의 하위 클래스 X를 생성해야하는 경우 Y는 PipedInputStream이 아니기 때문에 (어떤 PipedOutputStream에서도 사용할 수 없기 때문에) 디자인에서의 문제점은 PipedInputStream 서브 클래스로 할 수 없다는 것입니다. – acesargl

+0

자바 플랫폼 디버거 아키텍처 [JPDA]를 사용하여 숨겨진/금지 된 수신 메소드 호출을 가로 채는 PipedInputStream의 하위 클래스를 생성 할 수 있다고 생각합니다 (새 하위 클래스는 JVM TI에서 디버거로 선언되며 수신 호출을 인터셉트하고, 클래스 자체가 디버깅됩니다!). 그러나 나는 이것이 "합리적인 방법"이 아니라고 생각한다. (그리고 그것은 또한 매우, 아주 못생긴 것이다.) – acesargl

+0

* "PipedInputStream 인터셉트의 하위 클래스는 어떻게 할 수 있습니다 ..."*. 그것은 할 수 없다. JPDA는 작동하지 않습니다. 실행중인 응용 프로그램은 자체에 대해 JPDA를 사용할 수 없습니다 (AFAIK). 다른 방식으로하십시오. 'read'와'write' 메소드를 오버라이드 (override)합니다. –

관련 문제