2013-08-27 2 views
5

Rx로 처리하고 싶은 사용자 상호 작용 시나리오가 있습니다.Rx 및 작업 - 새 작업이 생성 될 때 실행중인 작업을 취소 하시겠습니까?

  • :하지만 나는 또한 필요 - "사용자가 어떤 작업을 수행, 입력을 멈춘 때"

    시나리오 (1) (일반적으로, 사용자가 지금까지 입력 한 것을 검색) 정식 유사하다 (2) 만

  • (3) 새로운 작업 단위를 시작할 때, 가 진행중인 작업을 취소 (내 경우는 CPU
  • 의 (아래 참조) "할 몇 가지 작업"단위의 결과의 최신를 얻을 수 집중 형)

(1) 사용자 이벤트에 IObservable을 사용하고 이벤트 사이의 일시 중지시에만 트리거하는 .Throttle()으로 조절합니다 ("사용자 입력 중지").

여기에서 i .Select(_ => CreateMyTask(...).ToObservable()).

이것은 내측 관찰 대상 각각이 단일 작업을 래핑하는 IObservable<IObservable<T>>을 제공합니다.

(2) 마지막으로 .Switch()을 적용하여 최신 작업 단위에서만 결과를 얻습니다.

보류중인 작업 취소 (3)은 어떻게됩니까? Dispose()에 원인 이전 (들)에서

새로운 내부 IObservable<T>있을 때마다 만약 내가 제대로 이해하고

,의 .Switch() 방법은 구독 및 구독 취소.
어쩌면 취소 할 작업을 실행하는 데 유선이 될 수 있습니까?

답변

3

작업을 수행해야합니까?

당신이 Observables와 함께 일하게되어 기쁘다면, 당신은 자신을 멋지게 할 수 있습니다.

같은 일을보십시오 :

var query = 
    Observable.Create<int>(o => 
    { 
     var cancelling = false; 
     var cancel = Disposable.Create(() => 
     { 
      cancelling = true; 
     }); 
     var subscription = Observable.Start(() => 
     { 
      for (var i = 0; i < 100; i++) 
      { 
       Thread.Sleep(10); //1000 ms in total 
       if (cancelling) 
       { 
        Console.WriteLine("Cancelled on {0}", i); 
        return -1; 
       } 
      } 
      Console.WriteLine("Done"); 
      return 42; 
     }).Subscribe(o); 
     return new CompositeDisposable(cancel, subscription); 
    }); 

이 관찰은 Thread.Sleep(10);와 루프에 대한 몇 가지 노력을하고 있지만, 관찰이 배치 될 때 루프가 종료되고 집중적 인 CPU 작업이 중단. 그런 다음 Switch과 함께 표준 Rx Dispose을 사용하여 진행중인 작업을 취소 할 수 있습니다.

public static IObservable<T> Start<T>(Func<Func<bool>, T> work) 
{ 
    return Observable.Create<T>(o => 
    { 
     var cancelling = false; 
     var cancel = Disposable 
      .Create(() => cancelling = true); 
     var subscription = Observable 
      .Start(() => work(() => cancelling)) 
      .Subscribe(o); 
     return new CompositeDisposable(cancel, subscription); 
    }); 
} 

을 그리고 다음과 같은 기능을 호출 : 당신이 방법에 번들 것을 싶은 경우

,이 시도

Func<Func<bool>, int> work = cancelling => 
{ 
    for (var i = 0; i < 100; i++) 
    { 
     Thread.Sleep(10); //1000 ms in total 
     if (cancelling()) 
     { 
      Console.WriteLine("Cancelled on {0}", i); 
      return -1; 
     } 
    } 
    Console.WriteLine("Done"); 
    return 42; 
}; 

여기이 증명 내 코드입니다 근무 :

var disposable = 
    ObservableEx 
     .Start(work) 
     .Subscribe(x => Console.WriteLine(x)); 

Thread.Sleep(500); 
disposable.Dispose(); 

출력물에 "Cancelled on 50"(언젠가는 "Cancelled on 51")이 출력됩니다.

input.Throttle(...) 
    .Select(_ => Observable.FromAsync(token => CreateMyTask(..., token))) 
    .Switch() 
    .Subscribe(...); 

이 각 작업 단위에 대한 새로운 토큰을 생성하고 새로운 하나마다 그것을 Switch 스위치를 취소합니다

+0

아니요, 작업을 사용할 필요가 없습니다. 처리 집약적 인 작업을 하나의 작업으로 캡슐화하는 것이 자연 스럽습니다. 나는 당신의 솔루션을 잘 살펴볼 것이다 : –

+0

@CristiDiaconescu - 솔직히 말해서 TPL이 그것을 위해 무엇을 할 수 있는지 알 수 있었지만 Rx를 사용하는 나의 솔루션은 항상 훨씬 더 깔끔하고 훨씬 표현력이 강했다. 나는 Rx에 찬성하여 TPL을 피하려고 노력한다. – Enigmativity

+0

Rx를 수용한다면, 제안 된'Start' 메소드는'Start (Func work)'로 이해하기 쉽고'CancellationDisposable'을 사용하여 커스텀 일회용 대신에 토큰을 생성 할 수 있습니다. 또는 Rx :'Start (Func , T> work)'를 사용하면'Start' 메소드가 구독을 처리 할 때 취소 신호를 보내는 AsyncSubject를 제공합니다. 'Func '는 Rx의 정신과는 거리가 멀다. – Brandon

12

당신은 관찰자 unsubcribes 때 취소 토큰을 생성하는 Observable.FromAsync을 사용할 수 있습니다 .

+1

감사합니다. 나는이 Factory 메소드에 대해 몰랐다. 취소 지원 때문에'ToObservable()'연산자보다 훨씬 더 나은 것으로 보인다. –

+0

예, 몇 주 전에 만 발견했습니다. Rx의 단점은 강력한 문서가 없다는 것입니다. – Brandon

+0

@Brandon Lee의 전자 서 *는 * 문서입니다. :) –

관련 문제