2010-12-03 9 views
6

나는 1000 개의 관측 값을 가지고 있다고 가정 해 보겠습니다. 이제 모든 이벤트가 OnNext를 트리거 한 새로운 관찰 가능으로 집계하여 다른 이벤트를 보냈 으면합니다. Rx를 사용하여 가장 좋은 방법은 무엇입니까?다수의 관측 값을 새로운 관측 값으로 집계

업데이트 : Rx 포럼에서 특히 Dave Sexton의 위대한 피드백. 그는 여러 관측 값을 취하는 Zip 확장 메서드를 만드는 방법을 보여주었습니다. http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/

+0

1000 관측치의 모든 유형이 동일합니까? 당신은 집합 관측치의 유형을 무엇입니까? –

+0

1000 개의 관측치가 모두 같은 유형이므로 새 집계가 새로운 유형이 될 수 있습니다. 예 : 이벤트는 AggregateEvent가됩니다. – lukebuehler

+0

최신 값만 결합 하시겠습니까? I.E. Observable이 두 개의 이벤트를 발생시키고 Observable이 하나만 발생시키는 경우 a에서 첫 번째 이벤트를, b의 이벤트에서 a에서 마지막 이벤트를 집계 하시겠습니까? –

답변

2

F #에 MailboxProcessor가 있습니다. 동일한 목적으로 C#에서 SynchronizationContext를 사용합니다. 몇 분만 기다려주세요. 예를 쓸 것입니다.

참고 사항 : 여기에 F #의 코드가 있습니다. 비슷한 점이 있습니다 ... C#에서 Rx를 사용하면 훨씬 더 많은 노력이 가능하지만 여전히 가능합니다.

open System.Diagnostics 

let numWorkers = 20 
let asyncDelay = 100 

type MessageForMailbox = 
    | DataMessage of AsyncReplyChannel<unit> 
    | GetSummary of AsyncReplyChannel<unit> 

let main = 
    let actor = 
     MailboxProcessor.Start(fun inbox -> 
     let rec loop acc = 
      async { 
       let! message = inbox.Receive() 
       match message with 
       | DataMessage replyChannel -> replyChannel.Reply(); return! loop acc 
       | GetSummary replyChannel -> replyChannel.Reply(); return! loop acc 
      } 

     loop 0 // seed for acc 
    ) 

    let codeBlocks = [for i in 1..numWorkers -> 
         async { 
          do! Async.Sleep asyncDelay 
          return! actor.PostAndAsyncReply DataMessage 
         } ] 

    while true do 
     printfn "Concurrent started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore 
     actor.PostAndReply GetSummary 
     sw.Stop() 
     printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * 100)/sw.ElapsedMilliseconds) 

     printfn "Synchronous started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore 
     sw.Stop() 
     printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100)/sw.ElapsedMilliseconds) 

main 
+0

흠. 그래서 SynchronizationContext.Send()를 사용하여 이벤트를 만드는 모든 관측 가능 항목을 동기화하는 줄을 따라 뭔가를 의미합니까? 나는 당신의 F # 코드가하는 일을 좀 보았지만 그것을 완전히 이해할만큼 정통하지는 않습니다. – lukebuehler

+0

나는 그것을 얻은 것 같아요. RunSynchronously는 비동기 워크 플로로 ForkJoin을 구현합니다. – GregC

+0

+1 : 전에는 MailboxProcessor의 좋은 예를 본 적이 없습니다. :) –

관련 문제