2016-09-01 2 views
2

나는 Reactor Project를 알아 내려고하고 구독을 취소 할 방법을 찾고 있습니다. 예를 들어 Flux를 구독하면 onCancel 신호를 보낼 때 사용할 수있는 Cancellation 개체에 대한 참조를 얻을 수 있지만 구독을 한 후에야 해당 컬렉션을 Collection에 보관해야합니다.원자로 취소 방법 구독 신청

Cancellation 개체를 가져 오는 더 좋은 방법이 있습니까? 아니면 구독을 취소하기 만하면됩니다. 모든 활성 구독에 대한 참조를 포함하는 일종의 장소 일 수도 있습니다. 예 ... 그게 최고 일 것입니다 ...

답변

2

subscribe()을 호출하기 전에 Subscription을 취소하고 싶지는 않습니다. 메서드를 사용하여 Subscription을 만들고 해당 신호를 체인 위로 전파하여 데이터 방출을 시작합니다.

취소하려는 특정 구독을 찾는 방법이 필요하기 때문에 모든 구독이있는 중앙 집중식 장소가 없습니다. 체인의 각 운영자가 중간 구독도 ...).

일부 운영자는 사용자를 대신하여 구독을 취소합니다.

Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println); 

출력됩니다 : O 을하지만 :

14:17:48.729 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | request(unbounded) 
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(1) 
1 
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(2) 
2 
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | cancel() 
+0

아, 나는 그 운영자에 대해 알고하지 않았다 즉 한 번에 충분한 항목을 상향 삭제됩니다 예를 들어 take(int)의 경우, 방출 된입니다 나를 위해 가장 중요한 것은 Cancellation 객체가 OnCancel 시그널을 보내고 적절하게 처리 할 수 ​​있다는 것입니다. 하지만 문제 해결 방법을 찾았습니다. Flux에 예외를 던져 스트림을 취소합니다. 그래서 나쁘지는 않습니다. – Kapitalny

+1

'Cancellation' 객체의 사용을 선호 할 것을 권합니다. 3.1에서 'Disposable'이 될 것입니다 (어느 지점에서'cancel() '대신'dispose()'를 호출해야합니다). 또는 원하는대로 자연스럽게 일치 할 수있는 연산자를 조사하고 필요에 따라 취소 할 수 있습니다. 유즈 케이스에 따라 예외를 던져도 해결책이 될 수 없습니다. –

+0

@ SimonBaslé 귀하의 코드를 실행하고'take (2)'가'log()'보다 앞에 있다면'cancel()'신호는 출력되지 않습니다. 왜? 당신은'take '연산자가 소스가 아닌 플럭스를 취소한다고 말했다. 반대로 –