2012-02-27 4 views
14

Java BlockingQueue와 동일한 데이터 구조에 관심이 있습니다. 단, 큐의 객체를 일괄 처리 할 수 ​​있어야한다는 점이 다릅니다. 즉, 생산자가 객체를 대기열에 넣을 수 있지만 대기열이 특정 크기 (배치 크기)에 도달 할 때까지 소비자 블록을 take()에 두는 것이 좋습니다.Java BlockingQueue with batching?

큐가 배치 크기에 도달하면 생산자는 대기열의 모든 요소를 ​​소비 할 때까지 put()을 차단해야합니다. 생산자가 생산을 다시 시작하고 소비자 블록이 배치까지 다시 도달 함).

비슷한 데이터 구조가 존재합니까? 또는 나는 그것을 써야 하는가 (나는 상관하지 않는다), 나는 거기에 뭔가가 있다면 내 시간을 낭비하고 싶지 않다.


UPDATE 아마 것들을 조금 명확하게 다음과 같이

상황은 항상있을 것입니다. 여러 생산자가 대기열에 항목을 추가 할 수 있지만 하나 이상의 소비자가 대기열에서 항목을 가져 오지는 않습니다.

이제 병렬 및 직렬로 이러한 설정이 여러 개 있습니다. 즉, 생산자는 여러 대기열에 대한 항목을 생산하지만, 자신의 권리를 지닌 소비자도 생산자가 될 수 있습니다. 이것은 생산자, 소비자 생산자 및 최종 소비자의 감독 그래프로보다 쉽게 ​​생각할 수 있습니다.

대기열이 비어있을 때까지 생산자가 차단해야하는 이유는 (@Peter Lawrey) 각각이 스레드에서 실행되기 때문입니다. 공간을 사용할 수있게되면 단순히 생성하도록 남겨두면 너무 많은 스레드를 한꺼번에 처리하려고하는 상황이 발생하게됩니다.

아마도 이것을 실행 서비스와 결합하면 문제가 해결 될 수 있습니까?

답변

9

BlockingQueue.drainTo(Collection, int)을 사용하는 것이 좋습니다. take()와 함께 사용하면 최소한의 요소 수를 확보 할 수 있습니다.

이 접근법을 사용하면 작업량에 따라 일괄 처리 크기가 동적으로 커지고 소비자가 바쁠 때 생성자를 차단할 필요가 없다는 이점이 있습니다. 즉, 대기 시간 및 처리량을 위해 자체 최적화됩니다. 질문으로


바쁜 소비 스레드와 SynchronousQueue는 사용할 수 있습니다 (I 나쁜 아이디어라고 생각한다)을 정확하게 구현합니다.

즉 소비 스레드는 소비자가 바쁜 그 어느 때

list.clear(); 
while(list.size() < required) list.add(queue.take()); 
// process list. 

생산자가 차단됩니다 않습니다.

+0

소비자가 바쁠 때 생산자를 차단하고 싶습니다. –

+1

재미있게, 대부분의 시스템은 이것을 피하기 위해 아주 오래갑니다. ;) 두 번째 제안은 정확히 수행 할 것입니다. 제작자를 차단하지 않으려면 왜 여러 스레드를 사용하고 있습니까? "프로듀서"가 프로세서/소비자 일뿐만 아니라 동시에 실행하고 싶지 않은 경우에도 더 간단하지 않습니다. –

+0

내 업데이트를 참조하십시오. 설계는 생산자가 너무 블록되어 실행중인 스레드의 수가 적게 유지되어야합니다. 또한 생산자와 소비자 간의 의존성 문제를 해결합니다. –

1

알고 있지 않습니다. 제대로 이해한다면 큐를 채울 때까지 생산자가 작업을하고 (생산자가 차단하는 동안) 대기열을 지울 때까지 (소비자가 차단 된 동안) 대기열을 지울 때까지 생산자가 작업하기를 원할 것입니다. 그렇다면 데이터 구조가 필요 없지만 한 쪽을 차단하는 메커니즘이 있고 다른 쪽은 뮤텍스 방식으로 작업하고있는 것이 좋습니다. 당신은 그 객체를 잠글 수 있으며, 내부적으로 전체 또는 비어있는 논리를 가지고 자물쇠를 풀어 상대방에게 전달할 수 있습니다. 따라서 간단히 말해서 직접 작성해야합니다.

1

RingBuffer가 LMAX Disruptor 패턴에서 작동하는 것처럼 들립니다. 자세한 내용은 http://code.google.com/p/disruptor/을 참조하십시오.

매우 거친 설명은 주요 데이터 구조가 RingBuffer입니다. 생산자는 데이터를 링 버퍼에 순서대로 넣고 소비자는 생산자가 버퍼에 넣은만큼의 데이터를 가져올 수 있습니다 (본질적으로 일괄 처리). 버퍼가 꽉 차면 생산자는 소비자가 끝내고 버퍼의 슬롯을 비울 때까지 블록합니다.

2

여기에 귀하의 요청에 맞을 것으로 생각되는 빠른 (= 간단하지만 완전히 테스트되지 않은) 구현이 있습니다. 필요한 경우 전체 대기열 인터페이스를 지원하도록 확장 할 수 있어야합니다.

public class BatchBlockingQueue<T> { 

    private ArrayList<T> queue; 
    private Semaphore readerLock; 
    private Semaphore writerLock; 
    private int batchSize; 

    public BatchBlockingQueue(int batchSize) { 
     this.queue = new ArrayList<>(batchSize); 
     this.readerLock = new Semaphore(0); 
     this.writerLock = new Semaphore(batchSize); 
     this.batchSize = batchSize; 
    } 

    public synchronized void put(T e) throws InterruptedException { 
     writerLock.acquire(); 
     queue.add(e); 
     if (queue.size() == batchSize) { 
      readerLock.release(batchSize); 
     } 
    } 

    public synchronized T poll() throws InterruptedException { 
     readerLock.acquire(); 
     T ret = queue.remove(0); 
     if (queue.isEmpty()) { 
      writerLock.release(batchSize); 
     } 
     return ret; 
    } 

} 

당신이 그것을 유용 희망 .. 대신 사용 "동기화"키워드의 ReentrantLock와로 전환 할 수있는 성능을 증가시킵니다.