문제 설명 : Integers의 무한 스트림에서 두 개의 연속 정수를 식별합니다.이 정수는 여러 생산자가 생산하지만 단일 소비자는 같은 번호가 다시 반복 될 때 경고를 발생시킵니다.동일한 ExecutorService에서 여러 생산자 및 단일 소비자
나는 복수 Producers
과 하나의 Consumer
이 있습니다. 소비자가 동일한 ExecutorService
에 제출하면 소비자가 시작되지 않았습니다. 하지만 Consumer를 별도의 Thread로 실행하면 Consumer 스레드가 예상대로 시작됩니다.
코드 :
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class FixedBlockingQueue {
final BlockingQueue<Integer> queue;
private int capacity;
public FixedBlockingQueue(int capacity){
super();
this.capacity = capacity;
queue = new ArrayBlockingQueue<Integer>(capacity);
System.out.println("Capactiy:"+this.capacity);
}
public void addElement(Integer element){
try{
queue.put(element);
}catch(Exception err){
err.printStackTrace();
}
}
public void startThreads(){
ExecutorService es = Executors.newFixedThreadPool(1);
for (int i =0; i < 10; i++){
es.submit(new MyProducer(this));
}
//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(this)).start();
}
public BlockingQueue<Integer> getQueue(){
return queue;
}
public static void main(String args[]){
FixedBlockingQueue f = new FixedBlockingQueue(1);
f.startThreads();
}
}
class MyProducer implements Runnable{
private FixedBlockingQueue queue;
public MyProducer(FixedBlockingQueue queue){
this.queue = queue;
}
public void run(){
for (int i=1; i< 5; i++){
queue.addElement(new Integer(i));
System.out.println("adding:"+i);
}
}
}
class MyConsumer implements Runnable{
private BlockingQueue<Integer> queue;
Integer firstNumber = 0;
private final ReentrantLock lock = new ReentrantLock();
public MyConsumer(FixedBlockingQueue fQueue){
this.queue = fQueue.getQueue();
}
/* TODO : Compare two consecutive integers in queue are same or not*/
public void run(){
Integer secondNumber = 0;
while (true){
try{
lock.lock();
System.out.println("queue size:"+queue.size());
if (queue.size() > 0) {
secondNumber = queue.remove();
System.out.println("Removed:"+secondNumber);
System.out.println("Numbers:Num1:Num2:"+firstNumber+":"+secondNumber);
if (firstNumber.intValue() == secondNumber.intValue()){
System.out.println("Numbers matched:"+firstNumber);
}
firstNumber = secondNumber;
}
Thread.sleep(1000);
}catch(Exception err){
err.printStackTrace();
}finally{
lock.unlock();
}
}
}
}
출력 :
Capactiy:1
adding:1
내가
에es.submit(new MyConsumer(queue));
//new Thread(new MyConsumer(queue)).start();
의 코드를 변경하는 경우
//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(queue)).start();
소비자 스레드가 정상적으로 시작되었습니다.
출력 :
Capactiy:1
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:0:1
adding:2
queue size:1
Removed:2
Numbers:Num1:Num2:1:2
adding:3
queue size:1
Removed:3
Numbers:Num1:Num2:2:3
adding:4
queue size:1
Removed:4
Numbers:Num1:Num2:3:4
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:4:1
adding:2
queue size:1
Removed:2
adding:3
Numbers:Num1:Num2:1:2
queue size:1
Removed:3
Numbers:Num1:Num2:2:3
첫 번째 방법에서 :
나는 그 숫자가없는 소비자에 의해 소비 알고 있지만, 이상적으로 다른 Producer
작업의 제출을 차단해서는 안된다.
이 경우 단순한 Threads
대신에 ExecutorService
을 사용하면 100 %를 얻을 수 없습니까?
도착 순서대로 요소를 삽입해야하므로 ReentrantLock을 사용하여 단일 스레드를 선택해야합니다. 100 개 이상의 스레드가 정수를 생성하므로 도착 순서에 따라 두 개의 연속 정수를 확인해야합니다. –
@Ravindrababu, "BlockingQueue's의 어느 부분이 thread-safe한지"추가 잠금은 여러분이'BlockingQueue'에서 자동으로 얻지 못하는 것을 제공한다고 생각할 수 있습니까? –
나중에 Producer와 동일한 클래스를 허용하는 Consumer 파트를 수정했습니다. –