2011-10-30 2 views
1

Reactive Extension을 배우고 있으며 이와 같은 작업에 적합한 지 알아 내려고 노력했습니다.Reactive Extension으로 요청 묶음 처리

요청 단위를 작업 단위로 처리하고 모든 요청이 완료되면 콜백을 호출하는 Process() 메소드가 있습니다.

여기서 중요한 점은 구현에 따라 각 요청이 동기식 또는 비동기식으로 콜백을 호출하므로 배치 프로세서가이 두 가지를 모두 처리 할 수 ​​있어야한다는 것입니다.

그러나 배치 프로세서에서 시작된 스레드가 없으면 필요한 경우 새 스레드 (또는 다른 비동기 실행)가 요청 핸들러 내부에서 시작됩니다. 이것이 rx의 사용 사례와 일치하는지 나는 모른다. 이 같은

내 현재 작업 코드 모양 (거의) :

가 이
public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted) 
{ 
    IUnitOfWork uow = null; 
    try 
    { 
     uow = unitOfWorkFactory.Create(); 

     var responses = new List<IResponse>(); 
     var outstandingRequests = requests.Count; 
     foreach (var request in requests) 
     { 
      var correlationId = request.CorrelationId; 
      Action<IResponse> requestCallback = response => 
      { 
       response.CorrelationId = correlationId; 
       responses.Add(response); 
       outstandingRequests--; 
       if (outstandingRequests != 0) 
        return; 

       uow.Commit(); 
       onCompleted(responses); 
      }; 

      requestProcessor.Process(request, requestCallback); 
     } 
    } 
    catch(Exception) 
    { 
     if (uow != null) 
      uow.Rollback(); 
    } 

    if (uow != null) 
     uow.Commit(); 
}   

방법이 사용 RX를 구현하는 것이? 그것은 합리적입니까?

아직 리턴되지 않은 비동기 요청이 있어도 작업 단위 (UOW)가 동 기적으로 확약되어야한다는 점에 유의하십시오. 당신도 동기 또는 비동기 결과를 반환 할 자유예요으로

답변

3

내 방식 : 당신은 마지막에 반환 항목에 피 감시 항목을 설정 'n'을하므로 완전한 모든 IObservables이 집계는이 작업을 수행 할 때 알아하려면 2 단계입니다.

public static class ObservableEx 
{ 
    public static Func<T, IObservable<R>> FromAsyncCallbackPattern<T, R>(
     this Action<T, Action<R>> call) 
    { 
     if (call == null) throw new ArgumentNullException("call"); 
     return t => 
     { 
      var subject = new AsyncSubject<R>(); 
      try 
      { 
       Action<R> callback = r => 
       { 
        subject.OnNext(r); 
        subject.OnCompleted(); 
       }; 
       call(t, callback); 
      } 
      catch (Exception ex) 
      { 
       return Observable.Throw<R>(ex, Scheduler.ThreadPool); 
      } 
      return subject.AsObservable<R>(); 
     }; 
    } 
} 

다음, IObservable<IResponse> Process(IObservable<IRequest> requests)에 전화 void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)을 설정 :

먼저 Action<T, Action<R>>Func<T, IObservable<R>>로 바뀝니다 범용 운영자 만들

public IObservable<IResponse> Process(IObservable<IRequest> requests) 
{ 
    Func<IRequest, IObservable<IResponse>> rq2rp = 
     ObservableEx.FromAsyncCallbackPattern 
      <IRequest, IResponse>(requestProcessor.Process); 

    var query = (
     from rq in requests 
     select rq2rp(rq)).Concat(); 

    var uow = unitOfWorkFactory.Create(); 
    var subject = new ReplaySubject<IResponse>(); 

    query.Subscribe(
     r => subject.OnNext(r), 
     ex => 
     { 
      uow.Rollback(); 
      subject.OnError(ex); 
     }, 
     () => 
     { 
      uow.Commit(); 
      subject.OnCompleted(); 
     }); 

    return subject.AsObservable(); 
} 

을 지금을뿐만 아니라이 처리 비동기 실행 않는다 그러나 결과의 올바른 순서도 보장합니다.

사실, 당신이 모음으로 시작되기 때문에, 당신도이 작업을 수행 할 수 있습니다 :

var rqs = requests.ToObservable(); 

var rqrps = rqs.Zip(Process(rqs), 
    (rq, rp) => new 
    { 
     Request = rq, 
     Response = rp, 
    }); 

그런 다음 당신은 관찰을했을 것이라고 업을 짝 CorrelationId 속성에 대한 필요없이 각 요청/응답.

이 정보가 도움이되기를 바랍니다.

+0

이것은 실제로 매우 도움이됩니다! 매우 정교한 대답을 주셔서 감사합니다.내가 여기서 진정으로 머리를 맞출 수없는 유일한 방법은 모든 요청에 ​​응답 할 때 어떻게해야합니까? 우편 후에 어떻게 말을해야합니까? 그리고 지금 모든 요청이 응답됨에 따라 이러한 요청/응답 쌍으로이 메소드를 호출하십시오 ... "rqrps"관측 가능에 가입하고 목록에서 응답을 수집하고 방법은 완료되었지만 성공하지 못했습니다 (그리고이 모든 경우에도 목록에서 응답을 수집하는 것은 약간 이상한 것 같습니다). 나는 그것에 관해 약간의 힌트를 정말로 바르게 평가할 것이다 :) – asgerhallas

+0

doh! ... 잘못된 오버로드를 호출했습니다. – asgerhallas

+0

@asgerhallas - 모든 쌍이 돌아 오기를 기다릴 필요는 없습니다. 다시 돌아올 때 처리하기 만하면됩니다. 그렇지 않으면'.ToArray()'observable 연산자를 보자. 그것은'IObservable '을'IObservable '으로 바꾼다. n 개의 값을 관측 할 수있는 하나의 배열을 n 개의 원소로 구성하고 성공할 때에 만 만든다. 귀하의 필요에 맞는 완벽한 운영자처럼 들립니다. – Enigmativity

1

이, 수신의 천재의 일부입니다

public IObservable<int> AddNumbers(int a, int b) { 
    return Observable.Return(a + b); 
} 

public IObservable<int> AddNumbersAsync(int a, int b) { 
    return Observable.Start(() => a + b, Scheduler.NewThread); 
} 

그들은 모두 IObservable 유형이, 그래서 그들은 동일하게 작동합니다. 이것에

IObservable<int> listOfObservables[]; 

listObservables.ToObservable() 
    .Merge() 
    .Aggregate(0, (acc, x) => acc+1) 
    .Subscribe(x => Console.WriteLine("{0} items were run", x)); 
+0

매우 감사합니다. 지금 내가 궁금해, 가장 쉬운 방법은, 내가 callback 걸리고 관찰되는 그 콜백의 호출을 변환 현재 requestProcessor.Process() 걸릴 해야하는지 궁금해? – asgerhallas

+0

오 ... 이것을 찾았습니다. http://stackoverflow.com/questions/5827178/using-rx-framework-for-async-calls-using-the-void-asyncmethodactiont-callback. 그것은 올바른 방향으로 보인다. – asgerhallas