2017-04-05 6 views
1

나는 직설적 인 것처럼 느껴지지만 놀랍게도 어려운 일을하려고합니다.비동기 구독자 기능으로 관찰 할 수있는 RxJS

나는 RabbitMQ 대기열에 가입하는 기능이 있습니다. 구체적으로 여기 Channel.consume 함수입니다 : http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

서브 스크립 션 ID로 해결되는 약속 - 나중에 구독을 취소하는 데 필요함 - 또한 메시지가 대기열에서 빠져 나올 때 호출 할 콜백 인수가 있음 .

대기열에서 수신 거부하고 싶을 때 여기 Channel.cancel 함수를 사용하여 소비자를 취소해야합니다 (http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel). 이것은 이전에 리턴 된 subscription ID를 사용합니다.

관찰 가능 항목을 구독 할 때 큐에 구독하는 Observable에서이 모든 내용을 감싸고 싶습니다. 관찰 가능 항목의 구독이 취소되었을 때 구독을 취소합니다. 그러나 이는 호출의 '이중 비동기'특성으로 인해 다소 어려움이 있습니다 (콜백과 약속 모두 반환한다는 의미입니다).

적으로는, 내가 쓸 수 있도록하고 싶습니다 코드입니다 :이 생성자는 비동기 가입자의 기능 또는 해체의 논리를 지원하지 않는

return new Rx.Observable(async (subscriber) => { 
    var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message)); 
    return async() => { 
    await channel.cancel(consumeResult.consumerTag); 
    }; 
}); 

그러나 이것은 불가능하다.

나는 이것을 이해하지 못했습니다. 내가 여기서 뭔가를 놓치고 있니? 왜 그렇게 어려운거야?

건배, 알렉스

답변

1

만든 channel.consume 약속 해결하기 위해 단지 함수 내에서 호출되는 관찰자 (이 통과있어 관찰자가 아닌 가입자)로, 기다릴 필요가 없습니다 관찰 당신이 제공합니다.

그러나 사용자가 반환하는 수신 거부 기능은 해당 약정이 해결 될 때까지 기다려야합니다. 그리고 내부적으로 다음과 같이 할 수 있습니다.

return new Rx.Observable((observer) => { 
    var consumeResult = channel.consume(queueName, (message) => observer.next(message)); 
    return() => { 
    consumeResult.then(() => channel.cancel(consumeResult.consumerTag)); 
    }; 
}); 
+0

답장을 보내 주셔서 감사합니다. 요청하기 전에 제안을 고려해 봤지만 문제는 채널을 기다리는 것이 아무 것도 없다는 것입니다. 해결할 것을 약속합니다. 그래서 channel.cancel에 대한 호출이 3 초 후에 만 ​​해결된다고 가정 해 봅시다. 해당 채널에서 새 메시지가 수신 될 수 있지만 Rx 옵저버는 이미 구독 취소되어 있으므로 이러한 메시지는 에테르로 손실됩니다. 이것이 내가 피하려고하는 것입니다. 이 문제를 해결할 제안이 있습니까? – AlexC

+0

그것이 어떻게 문제가되는지 알지 못합니다. [관찰 가능한 계약] (http://reactivex.io/documentation/contract.html)의 * Subscributing and Unsubscribing * 섹션에 따르면 : * 관찰자가 Observable에 대한 수신 거부 통지를 발행하면 Observable은 발행 중단을 시도합니다 관찰자에게 통보. 그러나 관찰자가 관찰자가 수신 거부 알림을 발행 한 후 Observable이 관찰자에게 알림을 전송하지는 않을 수도 있습니다 * – cartant

+0

따라서 취소가 해결 될 때까지 채널에서 메시지가 계속 전송되면 관찰자는이를 수신해야합니다. 즉, 관찰자 ​​구현은 더 이상의 메시지가 방출 될 것으로 기대해서는 안됩니다. – cartant

관련 문제