subscribe()
을 호출하기 전에 Subscription
을 취소하고 싶지는 않습니다. 메서드를 사용하여 Subscription
을 만들고 해당 신호를 체인 위로 전파하여 데이터 방출을 시작합니다.
취소하려는 특정 구독을 찾는 방법이 필요하기 때문에 모든 구독이있는 중앙 집중식 장소가 없습니다. 체인의 각 운영자가 중간 구독도 ...).
일부 운영자는 사용자를 대신하여 구독을 취소합니다.
는
Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println);
출력됩니다 : O 을하지만 :
14:17:48.729 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(1)
1
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(2)
2
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | cancel()
아, 나는 그 운영자에 대해 알고하지 않았다 즉 한 번에 충분한 항목을 상향 삭제됩니다 예를 들어
take(int)
의 경우, 방출 된입니다 나를 위해 가장 중요한 것은 Cancellation 객체가 OnCancel 시그널을 보내고 적절하게 처리 할 수 있다는 것입니다. 하지만 문제 해결 방법을 찾았습니다. Flux에 예외를 던져 스트림을 취소합니다. 그래서 나쁘지는 않습니다. – Kapitalny'Cancellation' 객체의 사용을 선호 할 것을 권합니다. 3.1에서 'Disposable'이 될 것입니다 (어느 지점에서'cancel() '대신'dispose()'를 호출해야합니다). 또는 원하는대로 자연스럽게 일치 할 수있는 연산자를 조사하고 필요에 따라 취소 할 수 있습니다. 유즈 케이스에 따라 예외를 던져도 해결책이 될 수 없습니다. –
@ SimonBaslé 귀하의 코드를 실행하고'take (2)'가'log()'보다 앞에 있다면'cancel()'신호는 출력되지 않습니다. 왜? 당신은'take '연산자가 소스가 아닌 플럭스를 취소한다고 말했다. 반대로 –