2017-05-18 1 views
0

첫 번째 항목 이후에 Accka-Stream (오늘 2.11_2.5-SNAPSHOT) Source을 취소 하겠지만 다음과 같은 (단순화 된) 소비자가 있습니다. onNext은 여전히 ​​4 배 : 그 요청을 감안할 때Akka-Stream 소스를 열심히 중지하십시오.

static Subscriber<Object> println() { 
    return new Subscriber<Object>() { 

     Subscription s; 

     int n; 

     @Override 
     public void onSubscribe(Subscription s) { 
      this.s = s; 
      s.request(5); 
     } 

     @Override 
     public void onNext(Object t) { 
      System.out.println(Thread.currentThread().getName() 
        + ": " + t + " - " + (++n)); 
      if (s != null) { 
       s.cancel(); 
       s = null; 
      } 
     } 

     @Override 
     public void onError(Throwable t) { 
      t.printStackTrace(); 
     } 

     @Override 
     public void onComplete() { 
      System.out.println(Thread.currentThread().getName() + ": DONE"); 
     } 
    }; 
} 

public static void main(String[] args) throws Exception { 
    Config cfg = ConfigFactory.parseResources(
     AkkaRange.class, "/akka-streams.conf").resolve(); 
    ActorSystem actorSystem = ActorSystem.create("sys", cfg); 

    ActorMaterializer materializer = ActorMaterializer.create(actorSystem); 

    Source<Integer, NotUsed> source = Source.repeat(1); 

    Publisher<Integer> p = source.runWith(Sink.asPublisher(
     AsPublisher.WITH_FANOUT), materializer); 

    p.subscribe(println()); 

    Thread.sleep(1000); 

    actorSystem.terminate(); 
} 

아직 단지 4 호출이 이루어지는 5, 나는 기본 메시징 아키텍처가 취소 (또는 추가 요청) 메시지에 대한 메시지 큐를 확인하기 전에 4 일괄 요청에 응답 가정합니다.

취소를 더 열심히하기위한 설정이 있습니까?

유스 케이스 1-2 원 요소 이후 원하는 결과를 하류이 경우, 스트림을 취소 할 수있는 계산 집약적 인 단계 (MAP)가 인 상호 운용성 연산과 같은 것이다. 문제는이 4 배치로 인해 계산이 나머지 2-3 요소에 대해서도 실행된다는 것입니다.

+0

공정성 문제입니다. 'map'에서 장시간 실행되는 연산을 수행하면 일부 신호가 열정적으로 등록되지 않을 수 있습니다. 하나는'mapAsync'를 대신 사용하고''actor.stream.materializer.sync-processing-limit''를 낮추어 외부 시그널이 더 빨리 처리되도록합니다 (처리량에 불리하게 작용할 것입니다) ,). – jrudolph

+0

고마워, 그들은 합리적인 선택 인 것 같다. 처리량이 필요한 다른 Akka 스트림이 없으므로 지금 옵션 2를 사용합니다. 답변으로 의견을 게시 할 수 있습니까? – akarnokd

답변

0

인터페이스는 a part of이고 반응 스트림 사양은 많은 라이브러리 (akka-streams 포함)가 구현됩니다. 본 명세서의 다음 상태 일 :있을 경우 가입자가 Subscription.cancel (호출 한 후에 하나 개 이상의 onNext 신호를 수신 할 준비가되어 있어야

)는 여전히 계류중인 요소 [3.12 참조] 요청. Subscription.cancel()은 기본 클리닝 작업을 즉시 수행 할 것을 보장하지 않습니다.

따라서 구독자에서 수동으로 처리해야합니다. 그렇지 않으면 사양을 위반하여 규격을 구현하는 라이브러리와 함께 사용할 수 없습니다.

+0

나는 편지에 대한 명세를 알고 있으며, 그것이 내 구독자 인 경우에만 다음을 무시할 것이다. 문제는 무거운 계산이 한 번이 아닌 네 번 실행되는 일반 map() 작업에서 Akka Stream 측에서 수행된다는 것입니다. map() 함수의 구현을 제어 할 권한이 없습니다. 또한 Rx와 같은 취소에 열심 인 것은 사양을 위반하지 않습니다. – akarnokd

+0

@akarnokd는 귀하의 가입자에게'요청 (5)'을 발행 한 다음'취소 '를 발행 했습니까? –

관련 문제