2013-08-27 1 views
2

나는 Rx에 처음 왔어. 그래서 나와 곰.Rx - 작업 <T>에서 IObservable <T>을 만드는 방법. 탈퇴로 인해 작업이 취소됩니까?

IObservable<T>Task<T>을 포함하고 싶습니다. 지금까지 너무 좋은 :

var cancel = new CancellationToken(); 
Task<T> task = Task.Factory.StartNew(..., cancel); 

IObservable<T> obs = task.ToObservable(); //there should be a way to tie the cancel token 
              //to the IObservable (?) 

IDisposable disposable = obs.Subscribe(...); 
Thread.Sleep(1000); 
disposable.Dispose(); // this should signal the task to cancel 

어떻게 그렇게합니까 :

이제
Task<T> task = Task.Factory.StartNew(...); 
IObservable<T> obs = task.ToObservable(); 

는, 내가 원하는 것은 관찰자의 구독 취소 때 취소 할 수있는 작업을 신호하는 것입니다? Observable.Create를 사용하여, 여기에 Rx and tasks - cancel running task when new task is spawned?

+0

'Observable.Create' 오버로드 중 하나를 사용하여'Func '를 전달하는 것이 가장 간단 할 수 있습니다. 하지만이 작업은 이전보다 구독시 새 작업을 시작합니다. –

+0

구독 할 때 작업을 시작하는 것이 실제로 * 내 경우에는 올바른 일이라고 생각합니다. –

+0

또한 Observable.Create() 오버로드가 취소 토큰을 사용하여 우연히 발견되었지만이를 사용하여 매력적인 예제를 아직 찾지 못했습니다. 당신은 하나를 쓸 수 있습니까 (아니면 알고 있다면 하나에 대한 링크)? –

답변

2

내가 생각할 수있는 가장 간단한 방법은 다음과 같습니다 :

static IObservable<int> SomeRxWork() 
{ 
    return Observable.Create<int>(o => 
    { 
     CancellationTokenSource cts = new CancellationTokenSource(); 
     IDisposable sub = SomeAsyncWork(cts.Token).ToObservable().Subscribe(o); 
     return new CompositeDisposable(sub, new CancellationDisposable(cts)); 
    }); 
} 

static Task<int> SomeAsyncWork(CancellationToken token); 

내가 코멘트에 암시 초기 방법은 실제로 다소 장황

FWIW는 여기 탄젠트를 생성 한 시나리오입니다 :

static IObservable<int> SomeRxWork() 
{ 
    return Observable.Create<int>(async (o, token) => 
    { 
     try 
     { 
      o.OnNext(await SomeAsyncWork(token)); 
      o.OnCompleted(); 
     } 
     catch (OperationCanceledException) 
     { 
     } 
     catch (Exception ex) 
     { 
      o.OnError(ex); 
     } 
    }); 
} 
+0

키는'새로운 CompositeDisposable (sub, new CancellationDisposable (cts))'를 반환합니다. 멋지다! 깨끗한 답변 주셔서 감사합니다. –

2

하면이 가정이 같은 방법

Task<Gizmo> GetGizmoAsync(int id, CancellationToken cancellationToken); 

이것을 IObservable<Gizmo>으로 바꿀 수 있습니다. 구독은 Task<Gizmo>으로 시작하고 수신 거부는 다음을 사용하여 취소합니다.

IObservable<Gizmo> observable = Observable.FromAsync(
    cancellationToken => GetGizmoAsync(7, cancellationToken)); 

// Starts the task: 
IDisposable subscription = observable.Subscribe(...); 

// Cancels the task if it is still running: 
subscription.Dispose(); 
관련 문제