Oracle AQ에서 리 액티브 확장을 사용하려고합니다. Oracle Queue에 메시지가 오면 소비자에게 메시지가 있음을 알리는 "OracleAQMessageAvailableEvent"를 발생시킵니다. OracleAQMessageAvailableEventHandler에서 소비자는 OracleAQQueue.Dequeue()를 호출하여 메시지를 검색합니다.반응성 확장에서 뜨거운 관측 가능 생성 방법
위와 같이 RX를 사용하고 있습니다. 다음은 내가 사용한 코드입니다.
var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
messages.subscribe(....)
문제는 메시지를 구독 할 경우 모든 작품 일단 있다는 것입니다하지만 난 메시지를 구독 할 경우 여러 번 (즉, 내 응용 프로그램 내부의 여러 소비자)를 모든 소비자는 "_queue.Dequeue()"모든 전화를 시도 할 것이다 새 메시지가 없으면 첫 번째 호출 이후 호출이 실패합니다.
아무에게도 가이드를 보내 줄 수 없습니까? 내 시나리오는 Hot Observable에 대한 시나리오이지만 주위를 고민하기 위해 고심하고 있습니다.
의견을 주셔서 감사합니다. 그러나 여전히 여러 구독자로부터 _queue.Dequeue를 호출하려고합니다. 어떤 아이디어? – tangokhi
Observable 대신 Subject를 사용해야합니다 .FromEventPattern. 사용자는 IObservable로 노출 된 제목을 구독 할 수 있습니다. OracleAQ에서 메시지를 받고 핸들러가 시작되면 메시지를 한 번 대기열에서 제거하고 여러 가입자를 대상으로하는 Subject.OnNext (NewMessage)를 호출 할 수 있습니다. – tangokhi
비록 당신이 지금 괜찮 았다고 생각 하긴하지만 (대답은 아래에 나와 있습니다) 나는 대답 할 것이라고 생각했습니다. 아니, 나는 당신이 주제를 사용해야한다고 생각하지 않습니다. 나는 누구도 주제를 사용해야한다고 생각하지 않는다. 일반적으로 설계 결함을 지적합니다. –