F #에 MailboxProcessor가 있습니다. 동일한 목적으로 C#에서 SynchronizationContext를 사용합니다. 몇 분만 기다려주세요. 예를 쓸 것입니다.
참고 사항 : 여기에 F #의 코드가 있습니다. 비슷한 점이 있습니다 ... C#에서 Rx를 사용하면 훨씬 더 많은 노력이 가능하지만 여전히 가능합니다.
open System.Diagnostics
let numWorkers = 20
let asyncDelay = 100
type MessageForMailbox =
| DataMessage of AsyncReplyChannel<unit>
| GetSummary of AsyncReplyChannel<unit>
let main =
let actor =
MailboxProcessor.Start(fun inbox ->
let rec loop acc =
async {
let! message = inbox.Receive()
match message with
| DataMessage replyChannel -> replyChannel.Reply(); return! loop acc
| GetSummary replyChannel -> replyChannel.Reply(); return! loop acc
}
loop 0 // seed for acc
)
let codeBlocks = [for i in 1..numWorkers ->
async {
do! Async.Sleep asyncDelay
return! actor.PostAndAsyncReply DataMessage
} ]
while true do
printfn "Concurrent started..."
let sw = new Stopwatch()
sw.Start()
codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
actor.PostAndReply GetSummary
sw.Stop()
printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
printfn "efficiency: %d%%" (int64 (asyncDelay * 100)/sw.ElapsedMilliseconds)
printfn "Synchronous started..."
let sw = new Stopwatch()
sw.Start()
for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
sw.Stop()
printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100)/sw.ElapsedMilliseconds)
main
1000 관측치의 모든 유형이 동일합니까? 당신은 집합 관측치의 유형을 무엇입니까? –
1000 개의 관측치가 모두 같은 유형이므로 새 집계가 새로운 유형이 될 수 있습니다. 예 : 이벤트는 AggregateEvent가됩니다. – lukebuehler
최신 값만 결합 하시겠습니까? I.E. Observable이 두 개의 이벤트를 발생시키고 Observable이 하나만 발생시키는 경우 a에서 첫 번째 이벤트를, b의 이벤트에서 a에서 마지막 이벤트를 집계 하시겠습니까? –