1

Oracle AQ에서 리 액티브 확장을 사용하려고합니다. Oracle Queue에 메시지가 오면 소비자에게 메시지가 있음을 알리는 "OracleAQMessageAvailableEvent"를 발생시킵니다. OracleAQMessageAvailableEventHandler에서 소비자는 OracleAQQueue.Dequeue()를 호출하여 메시지를 검색합니다.반응성 확장에서 뜨거운 관측 가능 생성 방법

위와 같이 RX를 사용하고 있습니다. 다음은 내가 사용한 코드입니다.

var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h) 
       .Where(x => x.EventArgs.AvailableMessages > 0) 
       .Select(x => 
       { 
        OracleAQMessage msg = _queue.Dequeue(); 
        return (UpdateMsg) msg.Payload; 
       }); 
messages.subscribe(....) 

문제는 메시지를 구독 할 경우 모든 작품 일단 있다는 것입니다하지만 난 메시지를 구독 할 경우 여러 번 (즉, 내 응용 프로그램 내부의 여러 소비자)를 모든 소비자는 "_queue.Dequeue()"모든 전화를 시도 할 것이다 새 메시지가 없으면 첫 번째 호출 이후 호출이 실패합니다.

아무에게도 가이드를 보내 줄 수 없습니까? 내 시나리오는 Hot Observable에 대한 시나리오이지만 주위를 고민하기 위해 고심하고 있습니다.

답변

3

당신은 당신이 Hot Observable을 찾고 있다고 생각합니다. 코드를 따라 가면 _queue.Dequeue();이 여러 번 호출되는 이유를 더 분명히 알 수 있습니다.

먼저 당신은 오라클

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h) 

에서 이벤트에 가입이 당신이 미리 수신 세계에있는 것 같은 이벤트 핸들러를 접선 같다. 수신자 (구독자)는 모두 동일한 이벤트를 수신합니다. 이벤트가 발생한 후 구독하는 경우 이벤트를 놓친 것입니다.

그런 다음 빈 세트를 필터링합니다.

.Where(x => x.EventArgs.AvailableMessages > 0) 

특별한 것은 없습니다.

그런 다음 쿼리 내부에서 부작용을 수행합니다.

.Select(x => 
    { 
     OracleAQMessage msg = _queue.Dequeue(); 
     return (UpdateMsg) msg.Payload; 
    }); 

여기 부작용은 파괴적인 읽기 (Dequeue)를 만들고 있다는 것입니다. 모든 구독자가 업스트림 _queue.MessageAvailable에서 이벤트를 푸시하면 모두 Dequeue()에 전화를 겁니다.

구독자가 모두 부작용을 일으키지 않도록하려면 원하는대로 시퀀스를 HOT로 만들 수 있습니다. 이렇게하려면 Publish() 연산자를 살펴보십시오.

Publish() 연산자는 Connect() 메서드를 추가하여 IObservable<T>으로 확장되는 IConnectableObservable<T>을 반환합니다. 이렇게하면 구독 논리가 실행될 때 세밀하게 제어 할 수 있습니다. 그러나 이것은 아마도 너에게 너무 많은 통제 일 것이며 아마도 RefCount()이 필요한 것일 것입니다.

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h) 
.Where(x => x.EventArgs.AvailableMessages > 0) 
.Select(x => 
    { 
     OracleAQMessage msg = _queue.Dequeue(); 
     return (UpdateMsg) msg.Payload; 
    }) 
.Publish() 
.Refcount(); 

지금 당신의 가입자는 각각 동일한 메시지를 발송되고 Dequeue() 부작용은 이벤트 당 한 번만 호출 (가입자가있는 동안에 만)됩니다.

온천과 차가운 관찰의이 here

+0

의견을 주셔서 감사합니다. 그러나 여전히 여러 구독자로부터 _queue.Dequeue를 호출하려고합니다. 어떤 아이디어? – tangokhi

+0

Observable 대신 Subject를 사용해야합니다 .FromEventPattern. 사용자는 IObservable로 노출 된 제목을 구독 할 수 있습니다. OracleAQ에서 메시지를 받고 핸들러가 시작되면 메시지를 한 번 대기열에서 제거하고 여러 가입자를 대상으로하는 Subject.OnNext (NewMessage)를 호출 할 수 있습니다. – tangokhi

+0

비록 당신이 지금 괜찮 았다고 생각 하긴하지만 (대답은 아래에 나와 있습니다) 나는 대답 할 것이라고 생각했습니다. 아니, 나는 당신이 주제를 사용해야한다고 생각하지 않습니다. 나는 누구도 주제를 사용해야한다고 생각하지 않는다. 일반적으로 설계 결함을 지적합니다. –

-1

리 캠벨, 죄송합니다 나의 나쁜 적용됩니다. 언급 한 솔루션이 효과가 있습니다. 사실, 나는 그것을 잘못 사용하고 있었다. 메시지라는 속성이있는 클래스 호출 QueueWrapper가 있습니다. 나는 메시지

public IObservable<UpdateMsg> Messages { 
     get { return Observable.FromEventPattern<OracleAQMessageAvailableEventHandler,   OracleAQMessageAvailableEventArgs> (
     h => _queue.MessageAvailable += h, 
     h => _queue.MessageAvailable -= h) 
     .Where(x => x.EventArgs.AvailableMessages > 0) 
     .Select(x => 
     { 
      OracleAQMessage msg = _queue.Dequeue(); 
      return (UpdateMsg) msg.Payload; 
     }) 
     .Publish() 
     .Refcount(); 
}} 

의 구현을했고 내 클라이언트 코드는 메시지 속성이 새로운 IObservable을 반환 된 각 가입에 대한 그래서이

// First Subscription 
_queueWrapper.Messages.Subscribe(....) 

// Second Subscription 
_queueWrapper.Messages.Subscribe(....) 

같은 메시지 속성을 사용하여 가입했다. 이 문제를 해결하려면, 나는 다음 코드 QueueWrapper의 즉의 생성자에 관찰의 초기화를 이동 :

public QueueWrapper() { 
    _messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
     h => _queue.MessageAvailable += h, 
     h => _queue.MessageAvailable -= h) 
    .Where(x => x.EventArgs.AvailableMessages > 0) 
    .Select(x => 
     { 
      OracleAQMessage msg = _queue.Dequeue(); 
      return (UpdateMsg) msg.Payload; 
     }) 
    .Publish() 
    .Refcount(); 
} 

그냥 _messages를 돌려 내 메시지 속성;

public IObservable<UpdateMsg> Messages { get { return _messages; } } 

그 후 모든 것이 예상대로 작동하기 시작했습니다.

관련 문제