2013-01-31 3 views
3

취소, 뭔가 같은 : Switch가에의 가입을 처분 할 때 있도록긴 실행 수신 스트림

inputStream.Select(n => Task.Run(() => 
{ 
    // Long running operation 
    Thread.Sleep(TimeSpan.FromSeconds(5)); 

    return n * n; 
}).ToObservable()) 
.Switch() 
.Subscribe(result => 
{ 
    // Use result in some way 
    Console.WriteLine(result); 
}); 

어떻게 Task.Run 전화의 내부 CancellationToken를 얻을 수 있습니다 -flight 계산, CancellationToken을 취소 된 것으로 설정하여 계산을 중단합니다.

답변

3

Observable.StartAsync 방법을 사용할 수 있습니다. 여러 값을 생성 할 수 있을지

inputStream.Select(n => Observable.StartAsync((token => Task.Run(() => 
{ 
    if (token.IsCancellationRequested) 
    { 
     // .. don't need to do anything 
     return 0; 
    } 
    else 
    { 
     Thread.Sleep(TimeSpan.FromSeconds(1)); 
     return n * n; 

    }     
})))) 
.Switch() 
.Subscribe(Console.WriteLine); 

또는, CancellationToken을 얻을 수 Task와 함께 작동 Observable.Create 과부하를 사용할 수 있습니다. 예 : 귀하의 작업의 내부

inputStream.Select(n => Observable.Create<int>((observer, token) => Task.Run(() => 
{ 
    while (!token.IsCancellationRequested) 
    { 
     Thread.Sleep(TimeSpan.FromSeconds(1)); 
     observer.OnNext(n * n); 
    } 

    observer.OnCompleted(); 
}))) 
.Switch() 
.Subscribe(Console.WriteLine); 

, 당신은 가치를 생산하는 OnNext를 호출해야합니다. 태스크의 리턴 값이있는 경우, 리턴 값은 무시됩니다.