2012-10-18 2 views
5

Reactive Extensions를 사용하여 RSS 항목을 가져 오는 실험을하고 있습니다. Tim Greenfield의 블로그 게시물을 기반으로합니다 : Silverlight Rx DataClient within MVVM.Rx 재시도()가 예상대로 작동하지 않습니다.

데스크톱 응용 프로그램 내에서 사용하고 있지만 코드는 비슷합니다.

내가 겪고있는 문제는 Retry() 기능을 이해하는 데 있습니다. 그것은 내가 기대하는 것과 기대하고있는 것을 수행하지 않는 것 같습니다.

var items = new List<RssItem>(); 
WebHelper.DownloadXmlFileAsync<RssItem>(new Uri(URI), "item") 
    .Retry(2) 
    .Finally(PublishResults) 
    .Subscribe(items.Add, ProcessError,() => ProcessCompleted(items)); 

유효한 URI를 전달하면 문제없이 작동합니다. URI에 오타를 만들면 ProcessError() 함수를 통해 404 오류가보고되지만 예상 한대로 오류는 한 번만보고됩니다. 나는 그것이이 오류를 두 번 나타낼 것으로 예상했을 것이다.

내 웹 요청에서 Retry() 기능이 작동하지 않는 것 같지만 실제로는 Subscribe()에 전달되는 기능에 적용되는 것처럼 보입니다. 나는 여기에서 틀릴 수 있었다.

Retry() 통화가 웹 요청에 적용되는지 어떻게 확인할 수 있습니까?

추가 코드 :

public static class WebHelper 
{ 
    public static HttpWebRequest CreateHttp(Uri uri) 
    { 
     return CreateHttp(uri, "GET"); 
    } 

    public static HttpWebRequest CreateHttp(Uri uri, string method) 
    { 
     if (uri.Scheme != Uri.UriSchemeHttp && uri.Scheme != Uri.UriSchemeHttps) 
     { 
      throw new ArgumentException("The specified URI does not use HTTP or HTTPS.", "uri"); 
     } 

     var request = (HttpWebRequest)WebRequest.Create(uri); 
     request.Method = method; 

     return request; 
    } 

    public static IObservable<T> DownloadXmlFileAsync<T>(Uri uri, string elementName) where T : class 
    { 
     return (from request in Observable.Return(CreateHttp(uri)) 
       from response in Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)() 
       let stream = response.GetResponseStream() 
       where stream != null 
       from item in XmlReader.Create(stream).GetXmlItem<T>(elementName).ToObservable() 
       select item); 
    } 
} 

public static class XmlExtensions 
{ 
    public static IEnumerable<T> GetXmlItem<T>(this XmlReader reader, string elementName) where T : class 
    { 
     var serializer = new XmlSerializer(typeof (T)); 
     while (reader.GoToElement(elementName)) 
     { 
      yield return serializer.Deserialize(reader) as T; 
     } 
    } 

    public static bool GoToElement(this XmlReader reader, string elementName) 
    { 
     do 
     { 
      if (reader.NodeType == XmlNodeType.Element && reader.Name == elementName) 
      { 
       return true; 
      } 
     } while (reader.Read()); 

     return false; 
    } 
} 

XmlRoot("item")] 
public class RssItem 
{ 
    [XmlElement("description")] 
    public string Description { get; set; } 

    [XmlElement("link")] 
    public string Link { get; set; } 

    [XmlElement("pubDate")] 
    public string PublishDate { get; set; } 

    [XmlElement("title")] 
    public string Title { get; set; } 

    public override string ToString() 
    { 
     return string.Format("Title: {0}", Title); 
    } 
} 

답변

11

시퀀스의 수신 문법은 다음과 같이 정의된다 :

OnNext * (의 OnError | OnCompleted)?

는 어느 쪽 OnError 또는 파이프 라인의 순서 및 가입의 종료가 예상되는 OnCompleted 신호 철거되는 수신. 연산자의 맥락에서

:

observable.Retry(n)이다 :는 OnError가 수신 될 때, N 개까지의 시간, observable에 재 - 가입.

observable.Finally(action)은 다음과 같습니다 재시도 추운 관찰 가능한 함께 사용하기위한 것입니다 OnError|OnCompleted

수신에 action 실행 (Lee Campbella good post on this있다) 가입이 본질적으로 시작하는 소스를 일으키는 곳.

마찬가지로 RepeatOnCompleted을 수신하면 다시 구독한다는 점을 제외하고는 정확히 Retry입니다.

이 동작을 보려면 처음 n 시간 동안 "실패"하고 성공한 관측 가능성을 만들 수 있습니다. 이제 몇 가지 코드 :

private static IObservable<int> ErrorProducer(int i) 
    { 
     int count = 0; 
     return Observable.Create<int>(observer => 
     { 
      Console.WriteLine("Doing work"); 

      if (count++ < i) 
      { 
       Console.WriteLine("Failed"); 
       observer.OnError(new Exception()); 
      } 
      else 
      { 
       Console.WriteLine("Done"); 
       observer.OnNext(count); 
       observer.OnCompleted();      
      } 
      return Disposable.Empty; 
     }); 
    } 

프로듀서 항상 실패

 print(ErrorProducer(3).Retry(2)); 

을 제공합니다 :

Doing work <-- Subscription 
Failed 
Doing work <-- Resubscription 
Failed 
OnError(System.Exception) 
Finally 

를 결국 성공 프로듀서를 들어 :

print(ErrorProducer(2).Retry(3)); 

Doing work 
Failed 
Doing work 
Failed 
Doing work 
Done 
OnNext(3) <-- Succeeded 
OnCompleted() 
Finally 

을 당신이 원하면 우리의 프로세스 오류 함수는 재 시도만큼 여러 번 호출해야하므로 Retry 앞에 위치해야합니다.

즉, seq.Do(value => { }, exception => { }).Retry(n)

당신은 냉/관찰 가능한을 사용하고 이해를 명확히하기 위해 수신와 비동기 패턴을 사용하여 읽을 수 있습니다.

+2

당신의 대답은 좋은 통찰력을 제공하고 또한 일부 특정 키워드 http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/96a06e27-9c02-4177의 결과로 인터넷을 검색 할 날 허용 -ae6a-04b8a7f966e5는 관측소가 작동하는 방식에 대해 좀 더 통찰력을주었습니다. – Jensen

+0

@JensenSomers 다행스럽게도 도움이 될 수 있습니다. 문제의 정확한 원인에 대해 구체적이지 않은 것에 대해 죄송합니다. Rx를 적용하는 방법에 관한 문서는 부족합니다. 앞으로 Rx를 배우려고하는 사람들에게 일반적인 대답이 유용 할 수 있기를 바랍니다. – Asti

+0

내가 그렇듯이 모든 사람들이 Rx에 관해서 빠져 나올 것이라면, 적절한 문서와 더 큰 유스 케이스의 예가 곧 나타날 것입니다. :-) – Jensen

4

아 스티의 답변에 자리입니다. 단일 논리 시퀀스에 대해 여러 오류를 표시하는 방법을 알고 싶을 때를 대비하여 추가 정보를 추가하려고했습니다. 아 스티가 지적한대로

한 번만 시퀀스를 종료 할 수 있습니다. 이 종료는 오류 또는 완료 (OnError | OnCompleted) 일 수 있습니다.

그러나 아무것도 당신이 관찰 시퀀스를 중첩 한이 중지되지 않습니다! 여러 개의 오류 메시지를보고 싶다면 IObservable<IObservable<T>>을 반환 한 시나리오를 고려하십시오. 내부 시퀀스는 데이터 시퀀스 (현재 가지고있는 시퀀스)입니다. 이 시퀀스가 ​​오류가 발생하면 더 이상 사용할 수 없기 때문에 외부 시퀀스가 ​​새로운 내부 데이터 시퀀스를 생성 할 수 있습니다.

조금 이상하게 보일 수 있지만 병합 등의 연산자로 수신에서 지원되는 개념이며, 이러한 중첩 된 시퀀스 이미 전환 맞추. 수신의이 스타일은 Sequences of Coincidence

에서 자세히 다시 Nested Sequences 단락에서 내 책, IntroToRx에서의 감동하고 나는 이것이 미래에 수신하는 방법의 다른 가능성을보고하는 데 도움이 기대하고있다.

관련 문제