첫 번째 항목 이후에 Accka-Stream (오늘 2.11_2.5-SNAPSHOT) Source
을 취소 하겠지만 다음과 같은 (단순화 된) 소비자가 있습니다. onNext
은 여전히 4 배 : 그 요청을 감안할 때Akka-Stream 소스를 열심히 중지하십시오.
static Subscriber<Object> println() {
return new Subscriber<Object>() {
Subscription s;
int n;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(5);
}
@Override
public void onNext(Object t) {
System.out.println(Thread.currentThread().getName()
+ ": " + t + " - " + (++n));
if (s != null) {
s.cancel();
s = null;
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + ": DONE");
}
};
}
public static void main(String[] args) throws Exception {
Config cfg = ConfigFactory.parseResources(
AkkaRange.class, "/akka-streams.conf").resolve();
ActorSystem actorSystem = ActorSystem.create("sys", cfg);
ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
Source<Integer, NotUsed> source = Source.repeat(1);
Publisher<Integer> p = source.runWith(Sink.asPublisher(
AsPublisher.WITH_FANOUT), materializer);
p.subscribe(println());
Thread.sleep(1000);
actorSystem.terminate();
}
아직 단지 4 호출이 이루어지는 5, 나는 기본 메시징 아키텍처가 취소 (또는 추가 요청) 메시지에 대한 메시지 큐를 확인하기 전에 4 일괄 요청에 응답 가정합니다.
취소를 더 열심히하기위한 설정이 있습니까?
유스 케이스 1-2 원 요소 이후 원하는 결과를 하류이 경우, 스트림을 취소 할 수있는 계산 집약적 인 단계 (MAP)가 인 상호 운용성 연산과 같은 것이다. 문제는이 4 배치로 인해 계산이 나머지 2-3 요소에 대해서도 실행된다는 것입니다.
공정성 문제입니다. 'map'에서 장시간 실행되는 연산을 수행하면 일부 신호가 열정적으로 등록되지 않을 수 있습니다. 하나는'mapAsync'를 대신 사용하고''actor.stream.materializer.sync-processing-limit''를 낮추어 외부 시그널이 더 빨리 처리되도록합니다 (처리량에 불리하게 작용할 것입니다) ,). – jrudolph
고마워, 그들은 합리적인 선택 인 것 같다. 처리량이 필요한 다른 Akka 스트림이 없으므로 지금 옵션 2를 사용합니다. 답변으로 의견을 게시 할 수 있습니까? – akarnokd