2017-04-13 1 views
1

나는 4 스레드에서 TopicProcessor로 메시지를 게시하고 구독자에서 메시지를 단순히 컬렉션에 추가하는 간단한 테스트를 실행하고 있습니다. 코드는 다음과 같다 :Reactor 3의 TopicProcessor에서 누락 된 메시지

@Test 
public void testProcessingMessages() throws Exception { 
    int numberOfMessages = 1000; 

    TopicProcessor<Integer> processor = TopicProcessor.create(); 

    ExecutorService executorService = Executors.newFixedThreadPool(4); 

    Queue<Integer> messages = new ConcurrentLinkedQueue<>(); 

    processor.subscribe(messages::add); 

    AtomicInteger counter = new AtomicInteger(0); 
    for (int i = 0; i < numberOfMessages; i++) { 
     executorService.submit(() -> { 
      processor.onNext(counter.incrementAndGet()); 
     }); 
    } 

    Thread.sleep(10000); 

    assertEquals(numberOfMessages, messages.size()); 
} 

그러나 결국 주장은 일반적으로 약 980-990 실제 메시지 대신 예상은 1000 내가 뭔가를 놓치고 실패?

답변

3

문제는 TopicProcessor.create가 단일 스레드에서 게시를 예상하는 프로세서를 만드는 것이 었습니다. TopicProcessor.share은 여러 스레드에서 생성 할 때 사용해야합니다.

관련 문제