2012-01-01 3 views
3

여러 개의 장기 실행 작업을 병렬로 실행해야하며 어떤 방식 으로든 진행 상황을보고하고 싶습니다. 필자의 초기 연구에서 IObservable은이 모델에 적합하다고 보입니다. 아이디어는 int가보고 된 int를 리턴하는 메소드를 호출한다는 것입니다. int가보고 된 비율은 완전합니다. 메소드가 종료되면 즉시 병렬 실행이 시작됩니다.이 관찰 가능 항목은 모든 관찰자가 특정 시점에서 동일한 진행 정보를 학습 할 수 있도록 핫 관찰 가능해야합니다. , 예. 늦은 가입자는 전체 실행이 완료되었고 추적 할 진도가 더 이상 없다는 것을 알 수 있습니다..Net RX : 병렬 실행 진행 추적

Observable.ForkJoin 및 Observable.Start를 사용하여이 문제에 가장 가까운 접근 방법을 찾았지만 메서드에서 반환 할 수있는 단일 관찰 가능 항목을 만드는 방법을 이해할 수 없습니다.

가 달성 할 수있는 방법의 아이디어를 공유하거나 어쩌면 닷넷 RX를 사용하여이 문제에 대한 또 다른 접근 방식이있다하십시오.

답변

3

입니다. 예제를 원한다면 끝으로 건너 뜁니다. 이 답변의 나머지 부분에서는 단계를 설명합니다. 나는 당신의 장기 실행 작업이 자신의 방법이없는이 답변을 위해 가정합니다

는 비동기 적으로 호출합니다. 그럴 경우 다음 단계는 조금 다를 수 있습니다. 다음 작업은 IScheduler을 사용하여 다른 스레드로 작업을 보내는 것입니다. 원하는 경우 호출자가 스케줄러를 매개 변수로 사용하는 오버로드를 수행하여 작업이 이루어지는 위치를 선택할 수 있습니다 (이 경우 기본 스케줄러를 선택하지 않는 오버로드). IScheduler.Scheduler의 몇 가지 오버로드가 있습니다. 그 중 몇 가지가 확장 메소드이므로 일부 상황에 가장 적합한 것을 확인해야합니다. 여기에 Action 만 걸립니다. 모두 병렬로 실행할 수있는 여러 작업이있는 경우 scheduler.Schedule 번을 여러 번 호출 할 수 있습니다.

이의 가장 어려운 부분

아마 진행이 주어진 시점에서 어떤 결정됩니다. 한 번에 여러 작업을 수행하는 경우 현재 진행 상황을 파악하기 위해 완료된 작업 수를 추적해야 할 수 있습니다. 당신이 제공 한 정보로, 나는 그것보다 구체적 일 수는 없습니다. 당신의 작업을 취소 할 경우

마지막으로, 당신은 매개 변수로 CancellationToken을 할 수 있습니다. 이를 사용하여 시작하기 전에 스케쥴러 대기열에있는 동안 작업을 취소 할 수 있습니다. 작업 코드를 올바르게 작성하면 취소를 위해 토큰을 사용할 수 있습니다.

IObservable<int> DoStuff(/*args*/, 
         CancellationToken cancel, 
         IScheduler scheduler) 
{ 
    BehaviorSubject<int> progress; 
    //if you don't take it as a parameter, pick a scheduler 
    //IScheduler scheduler = Scheduler.ThreadPool; 

    var disp = scheduler.Schedule(() => 
    { 
     //do stuff that needs to run on another thread 

     //report progres 
     porgress.OnNext(25); 
    }); 
    var disp2 = scheduler.Schedule(...); 

    //if the operation is cancelled before the scheduler has started it, 
    //you need to dispose the return from the Schedule calls 
    var allOps = new CompositeDisposable(disp, disp2); 
    cancel.Register(allOps.Dispose); 

    return progress; 
} 
+0

정말 좋은 샘플과 설명입니다. 고맙습니다. 나는 한 가지 더 질문 만하고있다. 오랫동안 실행중인 작업이 여러 그룹으로 분리되어 있고 그룹 내에서 작업을 동시에 실행하는 동안 그룹을 연속적으로 실행해야하는 경우 어떻게해야합니까? 이 경우 스케줄러 대신 ContinueWith를 통해 이런 종류의 연결을 허용하지만 BehaviorSubject의 사용을 유지하는 태스크 라이브러리를 사용해야합니까? – andriys

+1

@andriys 그 하나의 옵션입니다. 또 다른 옵션은 각 그룹을 IObservable을 반환하는 별도의 함수로 나누는 것입니다. 이 함수들은'Observable.Concat'을 통해 함께 연결될 수 있습니다. 그러나 체인점에서 처음 관찰 할 수있는 모든 것을 차갑게 만들고 싶을 것입니다. '관찰 가능하다.Defer'는 뜨거운 관측 대상을 차가운 관측 대상으로 변환합니다. –

+0

정말 대단합니다. 안내해 주셔서 감사합니다! – andriys

0

여기에 아마 반환 값과 작업 진행 상황을보고하는 방식으로 BehaviorSubject를 사용하는 방법으로 시작 것, 뜨거운가 관찰하려면 한 가지 방법

// setup a method to do some work, 
// and report it's own partial progress 
Func<string, IObservable<int>> doPartialWork = 
    (arg) => Observable.Create<int>(obsvr => { 
     return Scheduler.TaskPool.Schedule(arg,(sched,state) => { 
      var progress = 0; 
      var cancel = new BooleanDisposable(); 
      while(progress < 10 && !cancel.IsDisposed) 
      { 
       // do work with arg 
       Thread.Sleep(550); 
       obsvr.OnNext(1); //report progress 
       progress++; 
      } 
      obsvr.OnCompleted(); 
      return cancel; 
     }); 
    }); 

var myArgs = new[]{"Arg1", "Arg2", "Arg3"}; 

// run all the partial bits of work 
// use SelectMany to get a flat stream of 
// partial progress notifications 
var xsOfPartialProgress = 
     myArgs.ToObservable(Scheduler.NewThread) 
       .SelectMany(arg => doPartialWork(arg)) 
        .Replay().RefCount(); 

// use Scan to get a running aggreggation of progress 
var xsProgress = xsOfPartialProgress 
        .Scan(0d, (prog,nextPartial) 
          => prog + (nextPartial/(myArgs.Length*10d))); 
+0

함수 감기 관찰을 만듭니다. 이 작업은'doPartialWork'에 대한 초기 호출로 시작하지 않고 오히려 반환되는 관찰 대상에 대한 모든 * subscription에 대해서만 시작됩니다. –

+0

@ 기드온 좋은 지적 - 수정 된 답변 –

+0

더 나은,하지만 여전히 감기. –