2016-06-13 2 views
2

문제 설명 : Integers의 무한 스트림에서 두 개의 연속 정수를 식별합니다.이 정수는 여러 생산자가 생산하지만 단일 소비자는 같은 번호가 다시 반복 될 때 경고를 발생시킵니다.동일한 ExecutorService에서 여러 생산자 및 단일 소비자

나는 복수 Producers과 하나의 Consumer이 있습니다. 소비자가 동일한 ExecutorService에 제출하면 소비자가 시작되지 않았습니다. 하지만 Consumer를 별도의 Thread로 실행하면 Consumer 스레드가 예상대로 시작됩니다.

코드 :

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.locks.ReentrantLock; 
import java.util.Iterator; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ExecutorService; 

public class FixedBlockingQueue { 
    final BlockingQueue<Integer> queue; 
    private int capacity; 

    public FixedBlockingQueue(int capacity){ 
     super(); 
     this.capacity = capacity; 
     queue = new ArrayBlockingQueue<Integer>(capacity); 
     System.out.println("Capactiy:"+this.capacity); 
    } 
    public void addElement(Integer element){ 
     try{ 
      queue.put(element); 
     }catch(Exception err){ 
      err.printStackTrace(); 
     } 
    } 
    public void startThreads(){ 
     ExecutorService es = Executors.newFixedThreadPool(1); 
     for (int i =0; i < 10; i++){ 
      es.submit(new MyProducer(this)); 
     } 
     //es.submit(new MyConsumer(queue)); 
     new Thread(new MyConsumer(this)).start(); 
    } 
    public BlockingQueue<Integer> getQueue(){ 
     return queue; 
    } 
    public static void main(String args[]){ 
     FixedBlockingQueue f = new FixedBlockingQueue(1); 
     f.startThreads(); 
    } 
} 

class MyProducer implements Runnable{ 

    private FixedBlockingQueue queue; 
    public MyProducer(FixedBlockingQueue queue){ 
     this.queue = queue;  
    } 
    public void run(){ 
     for (int i=1; i< 5; i++){ 
      queue.addElement(new Integer(i)); 
      System.out.println("adding:"+i); 
     } 
    } 
} 

class MyConsumer implements Runnable{ 
    private BlockingQueue<Integer> queue; 
    Integer firstNumber = 0; 
    private final ReentrantLock lock = new ReentrantLock(); 

    public MyConsumer(FixedBlockingQueue fQueue){ 
     this.queue = fQueue.getQueue(); 
    } 
    /* TODO : Compare two consecutive integers in queue are same or not*/ 
    public void run(){ 
     Integer secondNumber = 0; 
     while (true){ 
      try{ 
       lock.lock(); 
       System.out.println("queue size:"+queue.size()); 
       if (queue.size() > 0) { 
        secondNumber = queue.remove(); 
        System.out.println("Removed:"+secondNumber); 
        System.out.println("Numbers:Num1:Num2:"+firstNumber+":"+secondNumber); 
        if (firstNumber.intValue() == secondNumber.intValue()){ 
         System.out.println("Numbers matched:"+firstNumber); 
        } 
        firstNumber = secondNumber; 
       } 
       Thread.sleep(1000); 
      }catch(Exception err){ 
       err.printStackTrace(); 
      }finally{ 
       lock.unlock(); 
      } 
     } 
    } 
} 

출력 :

Capactiy:1 
adding:1 

내가

es.submit(new MyConsumer(queue)); 
//new Thread(new MyConsumer(queue)).start(); 

의 코드를 변경하는 경우

//es.submit(new MyConsumer(queue)); 
new Thread(new MyConsumer(queue)).start(); 

소비자 스레드가 정상적으로 시작되었습니다.

출력 :

Capactiy:1 
adding:1 
queue size:1 
Removed:1 
Numbers:Num1:Num2:0:1 
adding:2 
queue size:1 
Removed:2 
Numbers:Num1:Num2:1:2 
adding:3 
queue size:1 
Removed:3 
Numbers:Num1:Num2:2:3 
adding:4 
queue size:1 
Removed:4 
Numbers:Num1:Num2:3:4 
adding:1 
queue size:1 
Removed:1 
Numbers:Num1:Num2:4:1 
adding:2 
queue size:1 
Removed:2 
adding:3 
Numbers:Num1:Num2:1:2 
queue size:1 
Removed:3 
Numbers:Num1:Num2:2:3 

첫 번째 방법에서 :

나는 그 숫자가없는 소비자에 의해 소비 알고 있지만, 이상적으로 다른 Producer 작업의 제출을 ​​차단해서는 안된다.

이 경우 단순한 Threads 대신에 ExecutorService을 사용하면 100 %를 얻을 수 없습니까?

답변

2

단일 스레드 및 고정 용량 1의 BlockingQueue으로 스레드 풀을 생성합니다. 그런 다음 풀에 3 개의 작업을 제출합니다. 먼저 각각 5 개의 값을 각각 대기열에 넣으려고 시도하고 나머지 하나는 값을 대기열에서 제외합니다. 유효한.

고정 크기 풀에는 스레드가 하나뿐이므로 제출 한 작업은 병렬이 아닌 순차적으로 실행됩니다. 먼저 생산자 작업을 제출하여 먼저 실행합니다. 그러나 일단 첫 번째 숫자를 대기열에 넣으면 큐가 가득 차서 더 이상 진행할 수 없습니다. 생성자 작업이 완료해야 풀링 된 스레드가 소비자와 같은 다른 작업에 사용 가능해지기 때문에 대기열은 으로 끝납니다.이 영원히 남아 있습니다.

스레드 풀을 직접 사용하는 이유는 확실하지 않습니다. 특히 스레드가 이미 Runnable을 구현하고 있기 때문에 스레드 관리를 직접 수행하기가 어렵지 않을 수 있습니다. 풀을 사용하는 경우 모두 개의 작업을 동시에 수용 할 수있는 충분한 스레드가 있는지 확인하십시오.

BlockingQueue 구현은 스레드로부터 안전해야하며 표준 라이브러리에서 제공하는 모든 것은 그렇습니다. 따라서 addElement()에서 자신의 잠금을 수행 할 필요가 없습니다. 또한, 만약 당신이 자신의 잠금을 수행 할 필요가 있었다면, 당신은 엘리먼트를 큐에 넣을 때뿐만 아니라 엘리먼트를 큐에서 제거 할 때도해야 할 것입니다.

또한 프로듀서 작업이 FixedBlockingQueue 인스턴스를 통해 간접적으로 기본 큐에 요소를 추가하지만 사용자 작업이 기본 큐로 곧바로 이동한다는 것은 이상합니다.

FixedBlockingQueue 클래스의 이름은 클래스가 BlockingQueue을 구현한다는 것을 의미하기 때문에 잘못 선택되었지만 실제로는 그렇게하지 않습니다.

+0

도착 순서대로 요소를 삽입해야하므로 ReentrantLock을 사용하여 단일 스레드를 선택해야합니다. 100 개 이상의 스레드가 정수를 생성하므로 도착 순서에 따라 두 개의 연속 정수를 확인해야합니다. –

+0

@Ravindrababu, "BlockingQueue's의 어느 부분이 thread-safe한지"추가 잠금은 여러분이'BlockingQueue'에서 자동으로 얻지 못하는 것을 제공한다고 생각할 수 있습니까? –

+0

나중에 Producer와 동일한 클래스를 허용하는 Consumer 파트를 수정했습니다. –

관련 문제