2014-12-16 2 views
0

SpringRactor Stream API를 사용하여 ServiceA가 반환 한 값을 사용하여 ServiceC를 호출하는 예제를 프로토 타입 화하려고했습니다. 그래서 ServiceA 및 ServiceC에 전화를 관련된 대기 시간을 시뮬레이션하기 위해이Spring Reactor를 통해 미래의 HotStream 생성하기

final ExecutorService executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); 
     Streams.defer(executor.submit(new CallToRemoteServiceA())) 
       .flatMap(s -> Streams.defer(executor.submit(new CallToRemoteServiceC(s)))) 
        .consume(s -> System.out.println("End Result : " + s)); 

과 같은 코드를 작성() CallToRemoteServiceA 및 CallToRemoteServiceC의 방법에 Thread.sleep() 메소드가 있습니다. 문제는 Thread.sleep() 메서드를 주석 처리 할 때 즉, 서비스 메소드 호출이 실제 메소드에서 true가 아닌 대기 시간을 가지면 소비 메소드가 호출된다는 것입니다. Thread.sleep() 메소드를 그대로두면 소비 메소드가 호출되지 않습니다. 나는 Streams.defer()가 콜드 스트림을 돌려 주므로 등록 후 허용 된 항목에 대한 소비 메소드 만 실행하지만 ExecutorService가 반환 한 Future에서 HotStream을 어떻게 생성 할 수 있는지 궁금합니다.

답변

0

내가 원하는 것은 Streams.just을 사용하는 것입니다. 원하는 경우 .dispatchOn(Dispatcher)을 지정할 수 있지만 이미 스레드 풀의 스레드에 있으므로 동기화 Dispatcher을 사용하려고합니다. 다음은 설명하기위한 빠른 테스트입니다.

@Test 
public void streamsDotJust() throws InterruptedException { 
    ExecutorService executor = Executors.newSingleThreadExecutor(); 

    Streams 
      .just(executor.submit(() -> "Hello World!")) 
      .map(f -> { 
       try { 
        return f.get(); 
       } catch (Exception e) { 
        throw new IllegalStateException(e); 
       } 
      }) 
      .consume(System.out::println); 

    Thread.sleep(100); 
} 
1

나는 이것이 reactor.rx.stream.FutureStream.subscribe() 메소드의 버그로 생각됩니다. 이 라인에서 :이 경우

try { 
     // Bug in the line below since unit is never null 
     T result = unit == null ? future.get() : future.get(time, unit); 

     buffer.complete(); 

     onNext(result); 
     onComplete(); 

} catch (Throwable e) { 
     onError(e); <-- With default constructor this gets called if time == 0 and 
         future has as yet not returned 
} 

기본 FutureStream (미래) 생성자는 장치라고 결코 널 (null) 따라서 위의 코드는 항상 호출하는 Future.get (0, TimeUnit.SECONDS) 즉시로 이어지는 catch (Throwable) 블록의 타임 아웃 예외. 여러분이이 버그에 동의한다면이 문제에 대한 픽스를 요청할 수 있습니까?

관련 문제