2013-11-04 2 views
0

Observable.Count<TSource> Method이 실제로 어떻게 작동하는지 보여주는 예제가 있습니까? 내가 생각해내는 예제는 예상 카운트 대신 관찰 가능으로 싸인 카운트를 반환하는 것처럼 보입니다. 예를 들어IEnumerable <T> .Count()가 실제로 IObservable <T>에서 작동합니까?

, 나는 1이에서 반환 될 것으로 예상 :

System.Diagnostics.Debug.WriteLine((Observable.Return<string>("Hello world!")).Count()); 
(결국은 비동기 순서 때문에)

미래에 반환 1 것인가 ? 아니면 몇 가지 근본을 놓치고 있습니까? 이 글을 쓰는 시점에서 저는 실제로 .Count()이 결과가 푸시 된만큼 시간이 지남에 따라 T의 결과를 반환한다고 생각합니다. 정말? 예. 수신에

+4

당신은 그것을 시도 할 수 있습니다. – Magus

+0

어, 오 : 그것은 작동하고있는 것 같습니다 :'var count = await (Observable.Return ("Hello world!")) Count();' – rasx

답변

5

집계 연산자는 LINQ보다 약간 다르게 작동 - 그들은 바로 값을 반환하지 않는, 그들은 미래의 결과를 반환 (즉, 우리는 최종 카운트가 관찰 완료되면 무엇인지 알 수 있습니다).

당신은 쓰기 그래서 경우 : 결국은 비동기 순서

이 실제로 완전히 사실이 아니다이다

Observable.Return("foo").Count().Subscribe(x => Console.WriteLine(x)); 
>>> 1 

때문이다. 여기 누군가가 Subscribe에게 전화하자마자 즉시 모든 것이 실행됩니다. 위의 코드에 대해 비동기적인 것은 없습니다. 추가 스레드가 없으며 모든 것이 구독에서 발생합니다.

4

나는 곧바로 을 반환하는 observable을 사용하는 것은 rasx가 async/await 구문을 사용하여 주석에서했던 것처럼 너무 많은 것을 혼란스럽게 생각한다고 생각한다.

의 다시 하나를 오는 5 개 요소 스트림을 만들어 보자 매 초마다 다음 완성 :

private IObservable<long> StreamWith5Elements() 
{ 
    return Observable.Interval(TimeSpan.FromSeconds(1)) 
        .Take(5); 
} 

우리는 비동기를 사용하여 호출 할 수 있습니다 /이 LINQPad 친화적 예에서와 같이 마법을 기다리고 있습니다 :

void Main() 
{ 
    CountExampleAsync().Wait(); 
} 

private async Task CountExampleAsync() 
{ 
    int result = await StreamWith5Elements().Count(); 
    Console.WriteLine(result); 
} 

하지만 여기에 무슨 일이 일어나고 있는지 오해하는 것입니다 - Count()IObservable<int>을 반환하지만 Rx는 await으로 슈퍼 친화적이며 결과 스트림을 Task<int>으로 변환합니다. 그런 다음 해당 작업의 int 결과를 반환합니다.

IObservable<T>에 대한 대기 시간을 사용하면 관찰 대상이 OnNext()에 전화를 걸어 단일 결과로 전화 한 다음 OnComplete()으로 전화한다고 암시 적으로 말합니다. 실제로 일어난 일은 스트림이 종료되기 전에 마지막으로 값을 반환하는 Task<T>을 얻을 수 있다는 것입니다. (AsyncSubject<T>이 동작하는 것과 유사).

이는 모든 스트림을 Task에 매핑 할 수 있다는 것을 의미하기 때문에 유용하지만 약간의 신중을 기해야합니다.

지금, 위의 예는 다음과 같은 전통적인 수신하는 것과 같습니다

여기
void Main() 
{ 
    PlainRxCountExample(); 
} 

private void PlainRxCountExample() 
{ 
    IObservable<int> countResult = StreamWith5Elements().Count(); 
    countResult.Subscribe(count => Console.WriteLine(count)); 

    /* block until completed for the sake of the example */ 
    countResult.Wait(); 
} 

당신이 Count() 실제로 INT의 스트림을 반환하는 것을 볼 수는 - 비동기 수를 제공 할 수 있습니다. 소스 스트림이 완료 될 때만 리턴합니다.

Rx 초기에 Count()는 실제로 동기식이었습니다.

그러나 "Exit the Monad"이후로는 매우 유용한 상태가 아닙니다. 즉 IObservable<T>에서 나와서 Rx 운영자와 더 이상 구성하지 못하게합니다.

"스트림에서 생각하기"를 시작하면 Count()의 비동기 특성은 실제로 매우 직관적입니다. 물론 끝나면 스트림 수만 제공 할 수 있습니다. :)

+0

은'Observable이 아닙니다.''Task.Factory .StartNew '? 조세프 알바 하리 (Joseph Albahari)는 우리의 스트림에 하나의 관측 만 가능할 때 'Task '을 사용하는 것을 고려해야한다고 제안했다. – rasx

+1

글쎄, Return은 비동기 적이 지 않지만, 일반적으로 'IObservable '은 'Task '이 될 후보입니다. 그것은 모두 당신이 그것을 사용하려는 방법에 따라 다릅니다. 때로는 'IObservable '에 머무르는 구성상의 이점이 작업으로 변환하는 것보다 더 유용 할 때가 있습니다. 때로는 다른 방식으로 사용하기도합니다. 시간이 지남에 따라 하나 이상의 결과가 필요할 때, 전자는 이점이 분명합니다. 또한 Rx는 스케줄러를 사용하여 시간을 매개 변수화하는 작업을보다 잘 수행하므로 유용한 테스트 이점이 있습니다. –

관련 문제