2012-04-14 1 views
1

RX를 처음 사용합니다.C# RX (System.Reactive) - 비동기 - 여러 관찰 데이터 처리기에 IEnumerable <DataRow> 게시

IEnumerable을 트래버스하여 각 스레드에서 데이터를 처리하는 다중 DataHandler에 게시하고 싶습니다.

다음은 저의 샘플 프로그램입니다. 게시 작업과 새 스레드가 만들어 지지만 3 개의 RowHandler가 모두 하나의 스레드에서 실행됩니다. 스레드가 3 개 필요합니다. 이것을 구현하는 가장 좋은 방법은 무엇입니까?

class Program 
{ 

    public class MyDataGenerator 
    { 
     public IEnumerable<int> myData() 
     { 
      //Heavy lifting....Don't want to process more than once. 
      yield return 1; 
      yield return 2; 
      yield return 3; 
      yield return 4; 
      yield return 5; 
      yield return 6; 
     } 
    } 

    static void Main(string[] args) 
    { 
     MyDataGenerator h = new MyDataGenerator(); 
     Console.WriteLine("Thread id " + Thread.CurrentThread.ManagedThreadId.ToString()); 
     // 
     var shared = h.myData().ToObservable().Publish(); 
     /////////////////////////////// 
     // Row Handling Requirements 
     // 
     //  1. Single Scan of IEnumerable. 
     //  2. Row handlers process data in their own threads. 
     //  3. OK if scanning thread blocks while data is processed 
     // 

     //Create the RowHandlers 
     MyRowHandler rn1 = new MyRowHandler(); 
     rn1.ido = shared.Subscribe(i => rn1.processID(i)); 
     MyRowHandler rn2 = new MyRowHandler(); 
     rn2.ido = shared.Subscribe(i => rn2.processID(i)); 
     MyRowHandler rn3 = new MyRowHandler(); 
     rn3.ido = shared.Subscribe(i => rn3.processID(i)); 
     // 
     shared.Connect(); 
    } 

    public class MyRowHandler 
    { 
     public IDisposable ido = null; 
     public void processID(int i) 
     { 
      var o = Observable.Start(() => 
             { 
              Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i)); 
              Thread.Sleep(30); 
              Console.WriteLine("Done Thread ID"+Thread.CurrentThread.ManagedThreadId.ToString()); 
             } 
            ); 
      o.First(); 
     } 
    } 
} 

발견 :

코딩 속도 & 코드 품질 향상의 한 성능의 비용으로 올 수신에서 수신합니다. 작업/대리인은 의심의 여지가없는 배수입니다. 즉, Rx에 대해 알아야 할 가장 중요한 것은 Rx를 사용해야 할 때입니다. 다음은 요약 요약 초안입니다. 큰 볼륨의 경우 chuncking, 결합 및 기타 많은 스트림 많은 핸들러 모델에서 Rx에 대한 사용을 볼 수 있습니다. 그러나 기본 Async는 rx를 사용하지 않아야합니다.

내가 매트릭스 가이드 라인과 이미지를 게시 싶지만, 사이트가 방금에 관찰 할 수있는, 내가 제대로 시퀀싱 요구 사항을 이해하고 세 개의 병렬 운전 검사를 원하는 경우 나 이미지를

+0

사용중인 Rx 버전은 무엇입니까? –

+0

귀하의 설명에서 명확하지 않습니다, 당신이 해결하는 문제는 무엇입니까. "무거운 작업을 대기"비동기 호출보다 그것을 할 것입니다. 귀하가 진술 한 것처럼 재 계산을 피하려면 var cachedData = myData(). ToArray()는 Rx없이 수행합니다. –

답변

3

을 게시하지 않습니다 TaskPool 그리고 거기에서 구독하십시오; 끝내야 당신이 다음 비동기 적으로 실행하고 당신의 메인 쓰레드는 스캔을 기다리지 않기 때문에, 프로그램이 프로그램의 마지막에 Console.ReadKey()을 넣어 바로 당신이하지 않는 예를 들면 종료됩니다 것을

... 

//Create the RowHandlers 
MyRowHandler rn1 = new MyRowHandler(); 
rn1.ido = shared.ObserveOn(Scheduler.TaskPool).Subscribe(i => rn1.processID(i)); 

... 

주 .

편집 : 동일한 스레드를 "모든 방법"으로 실행하는 것과 관련하여 이상하게 일정을 잡으려고합니다. 관측 가능 행을 행 처리기에 놓으면 Scheduler.NewThread를 사용하여 좋은 결과를 얻을 수 있습니다.

... 

var rowHandler1 = new MyRowHandler(); 
rowHandler1.ido = shared.ObserveOn(Scheduler.NewThread).Subscribe(rowHandler1.ProcessID); 

... 

public void ProcessID(int i) 
{ 
    Console.WriteLine(String.Format("Start Thread ID {0} Int{1}", Thread.CurrentThread.ManagedThreadId, i)); 
    Thread.Sleep(30); 
    Console.WriteLine("Done Thread ID" + Thread.CurrentThread.ManagedThreadId.ToString(CultureInfo.InvariantCulture)); 
} 

이렇게하면 각 구독에 자체 스레드가 제공되고 그 스레드와 함께 유지됩니다.

+0

굉장합니다. 매력처럼 일했습니다. API 문서 및 샘플에서 ObserveOn을 얻지 못했습니다. .NET 3.5에서 Scheduler.ThreadPool을 사용해야했습니다. TaskPool이 네임 스페이스에있는 것 같습니다. – user1332889

+0

슬프게도 Rx는 스레딩을 수동으로 관리하는 것보다 상당히 느린 것 같습니다. 마치 모든 행이 새로운 ThreadPool 요청을 강제 실행하는 것처럼 거의 매 스레드마다 새 스레드를 해제하고 할당합니다. 사실입니까? 아니면 모든 행이 처리 될 때까지 동일한 스레드를 유지할 수있는 방법이 있습니까? – user1332889

+0

@ user1332889이를 달성하는 방법에 대한 정보가 업데이트되었습니다. –

관련 문제