2011-02-02 4 views
4

TryScan을 사용하는 방법에 대한 예제를 찾으려고했지만 발견되지 않았습니다. 도와 주실 수 있습니까?F #에서 TryScan을 올바르게 사용하는 방법

내가 뭘하고 싶은지 (아주 단순화 된 예) : MailboxProcessor은 두 종류의 메시지를받습니다.

  • 첫 번째 GetState은 현재 상태를 반환합니다. GetState 메시지가 상당히 자주 전송됩니다.

  • 기타 UpdateState은 매우 비쌉니다. 인터넷에서 무언가를 다운로드하고 이에 따라 상태를 업데이트합니다. UpdateState은 거의 호출되지 않습니다.

내 문제는 - 메시지 GetState 차단 및 UpdateState을 이전하는 것은 제공 될 때까지 기다려야한다. 그래서 TryScan을 사용하여 모든 메시지를 처리하려고했지만 운이 없었습니다.

내 예제 코드 : 코드를 실행하려고하면이 결과를 기다립니다 때문에

type Msg = GetState of AsyncReplyChannel<int> | UpdateState 
let mbox = MailboxProcessor.Start(fun mbox -> 
      let rec loop state = async { 
       // this TryScan doesn't work as expected 
       // it should process GetState messages and then continue 
       mbox.TryScan(fun m -> 
        match m with 
        | GetState(chnl) -> 
         printfn "G processing TryScan" 
         chnl.Reply(state) 
         Some(async { return! loop state}) 
        | _ -> None 
       ) |> ignore 

       let! msg = mbox.Receive() 
       match msg with 
       | UpdateState -> 
        printfn "U processing" 
        // something very time consuming here... 
        async { do! Async.Sleep(1000) } |> Async.RunSynchronously 
        return! loop (state+1) 
       | GetState(chnl) -> 
        printfn "G processing" 
        chnl.Reply(state) 
        return! loop state 
      } 
      loop 0 
) 

[async { for i in 1..10 do 
      printfn " U" 
      mbox.Post(UpdateState) 
      async { do! Async.Sleep(200) } |> Async.RunSynchronously 
}; 
async { // wait some time so that several `UpdateState` messages are fired 
     async { do! Async.Sleep(500) } |> Async.RunSynchronously 
     for i in 1..20 do 
      printfn "G" 
      printfn "%d" (mbox.PostAndReply(GetState)) 
}] |> Async.Parallel |> Async.RunSynchronously 

, 당신은, GetState 메시지를 거의 처리되지 것을 볼 수 있습니다. 반면에 UpdateState은 화재와 잊기에 효과적이므로 상태를 효과적으로 차단하지 못합니다. 나를 위해 작동

편집

현재 솔루션이 하나입니다 의견에

type Msg = GetState of AsyncReplyChannel<int> | UpdateState 
let mbox = MailboxProcessor.Start(fun mbox -> 
      let rec loop state = async { 
       // this TryScan doesn't work as expected 
       // it should process GetState messages and then continue 
       let! res = mbox.TryScan((function 
        | GetState(chnl) -> Some(async { 
          chnl.Reply(state) 
          return state 
         }) 
        | _ -> None 
       ), 5) 

       match res with 
       | None -> 
        let! msg = mbox.Receive() 
        match msg with 
         | UpdateState -> 
          async { do! Async.Sleep(1000) } |> Async.RunSynchronously 
          return! loop (state+1) 
         | _ -> return! loop state 
       | Some n -> return! loop n 
      } 
      loop 0 
) 

반응 : 병렬 UpdateState을 실행 다른 MailboxProcessor 또는 ThreadPool와 아이디어가 중대하다, 그러나 나는 그것을 현재 필요로하지 않는다. 내가 원했던 것은 모두 GetState 메시지를 모두 처리 한 후 다른 메시지를 처리하는 것뿐이었습니다. 내가 처리하는 동안 걱정하지 않아 UpdateState 에이전트가 차단되었습니다.

// GetState messages are delayed 500 ms - see do! Async.Sleep(500) 
// each UpdateState is sent after 200ms 
// each GetState is sent immediatelly! (not real example, but illustrates the problem) 
U   200ms <-- issue UpdateState 
U processing   <-- process UpdateState, it takes 1sec, so other 
U   200ms  5 requests are sent; sent means, that it is 
U   200ms  fire-and-forget message - it doesn't wait for any result 
          and therefore it can send every 200ms one UpdateState message 
G      <-- first GetState sent, but waiting for reply - so all 
          previous UpdateState messages have to be processed! = 3 seconds 
          and AFTER all the UpdateState messages are processed, result 
          is returned and new GetState can be sent. 
U   200ms 
U   200ms  because each UpdateState takes 1 second 
U   200ms 
U processing 
U 
U 
U 
U 
U processing 
G processing   <-- now first GetState is processed! so late? uh.. 
U processing   <-- takes 1sec 
3 
G 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
G processing   <-- after MANY seconds, second GetState is processed! 
10 
G 
G processing 
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent 
+0

'TryScan' 호출 후에'|> ignore'를한다는 사실은 당신이 API를 잘못 사용하고 있다는 사실을 경고해야합니다. (지금 완전한 대답을위한 시간이 없어, 누군가가 나를 때릴 수 있기를 바랍니다.) – Brian

+0

나는 그것을 잘못된 방식으로 사용한다는 것을 알고 있습니다. 그러나 나는 그것을 전혀 사용하는 것에 대해 어떤 게시물도 찾지 못했습니다. – stej

+0

'TryScan'과'Scan'의 요점은 메시지를 기다리는 것이고 수신되지 않으면 타임 아웃이라고 생각합니다. 두 가지의 유일한 차이점은 제한 시간 동안 TryScan에서 옵션이 반환되는 반면 Scan에서 예외가 발생한다는 것입니다. – gradbot

답변

4

나는 TryScan 방법은이 시나리오에 도움이 될 것입니다 생각하지 않습니다 :

나는 출력에 문제가 무엇인지 보여 줄게. 메시지를 기다리는 동안 사용할 타임 아웃을 지정할 수 있습니다. 일부 메시지가 수신되면 메시지 처리가 시작됩니다 (시간 초과는 무시함). 예를 들어

, 당신은 확인 다른 일부 특정 메시지를 기다리는,하지만 수행하기를 원한다면 매 초마다 당신은 쓸 수 (기다리는 동안) :

let loop() = async { 
    let! res = mbox.TryScan(function 
    | ImportantMessage -> Some(async { 
      // process message 
      return 0 
     }) 
    | _ -> None) 
    match res with 
    | None -> 
     // perform some check & continue waiting 
     return! loop() 
    | Some n -> 
     // ImportantMessage was received and processed 
} 

를 사서함 프로세서 때를 막지 않도록 할 수있는 일 UpdateState 메시지를 처리하고 있습니까? 사서함 프로세서는 (논리적으로) 단일 스레드이므로 UpdateState 메시지의 처리를 취소하고 싶지 않으므로 백그라운드에서 처리를 시작하고 처리가 완료 될 때까지 기다리는 것이 가장 좋습니다.그런 다음 UpdateState을 처리하는 코드는 메일 함으로 다시 메시지를 보낼 수 있습니다 (예 : UpdateStateCompleted).

let rec loop (state) = async { 
    let! msg = mbox.Receive() 
    match msg with 
    | GetState(repl) -> 
     repl.Reply(state) 
     return! scanning state 
    | UpdateState -> 
     async { 
     // complex calculation (runs in parallel) 
     mbox.Post(UpdateStateCompleted newState) } 
     |> Async.Start 
    | UpdateStateCompleted newState -> 
     // Received new state from background workflow 
     return! loop newState } 

은 이제 백그라운드 작업이 병렬로 실행되고 있는지, 당신은 변경 가능한 상태에 대한주의해야 : 여기

이 모습 수있는 방법을 스케치합니다. 또한 메시지를 처리 ​​할 수있는 것보다 더 빨리 UpdateState 메시지를 보내면 문제가 발생할 수 있습니다. 예를 들어 이전 요청을 이미 처리하고있을 때 요청을 무시하거나 대기열로 묶어서 수정할 수 있습니다.

+0

Thx, Tomáš. 나는 여전히'TryScan'을 타임 아웃과 함께 사용하려고 노력할 것이고, 문제가 생길 경우에는 다른'MailboxProcessor'를 사용할 가능성이 여전히 있습니다. – stej

+0

@stej 'UpdateState' 메시지가 이미 시작 되었다면 타임 아웃은 UpdateState 메시지의 처리를 취소하지 않으며, 메시지 처리 중에는 다른 메시지를 병렬로 처리 할 수 ​​없다는 점을 명심하십시오. –

+0

예, 이해합니다. 'UpdateState'를 취소하고 싶지 않습니다.사실, 나는 타임 아웃을 필요로하지 않는다. 내가 원한 것은 큐에있는 모든 GetState 메시지를 처리하고 다른 프로세스 (현재는'UpdateState' 만) 메시지 이후에 처리하는 것이다. 나는 약간의 질문을 업데이트 할 것이다. – stej

2

Tomas는 MailboxProcessor가 단일 스레드임을 언급했습니다. 상태 가져 오기 도구의 별도 스레드에서 업데이트를 실행하려면 다른 MailboxProcessor가 필요합니다.

#nowarn "40" 

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | UpdateState 

let runner_UpdateState = MailboxProcessor.Start(fun mbox -> 
    let rec loop = async { 
     let! state = mbox.Receive() 
     printfn "U start processing %d" !state 
     // something very time consuming here... 
     do! Async.Sleep 100 
     printfn "U done processing %d" !state 
     state := !state + 1 
     do! loop 
    } 
    loop 
) 

let mbox = MailboxProcessor.Start(fun mbox -> 
    // we need a mutiple state if another thread can change it at any time 
    let state = ref 0 

    let rec loop = async { 
     let! msg = mbox.Receive() 

     match msg with 
     | UpdateState -> runner_UpdateState.Post state 
     | GetState chnl -> chnl.Reply !state 

     return! loop 
    } 
    loop) 

[ 
    async { 
     for i in 1..10 do 
      mbox.Post UpdateState 
      do! Async.Sleep 200 
    }; 
    async { 
     // wait some time so that several `UpdateState` messages are fired 
     do! Async.Sleep 1000 

     for i in 1..20 do 
      printfn "G %d" (mbox.PostAndReply GetState) 
      do! Async.Sleep 50 
    } 
] 
|> Async.Parallel 
|> Async.RunSynchronously 
|> ignore 

System.Console.ReadLine() |> ignore 

출력 :

U start processing 0 
U done processing 0 
U start processing 1 
U done processing 1 
U start processing 2 
U done processing 2 
U start processing 3 
U done processing 3 
U start processing 4 
U done processing 4 
G 5 
U start processing 5 
G 5 
U done processing 5 
G 5 
G 6 
U start processing 6 
G 6 
G 6 
U done processing 6 
G 7 
U start processing 7 
G 7 
G 7 
U done processing 7 
G 8 
G U start processing 8 
8 
G 8 
U done processing 8 
G 9 
G 9 
U start processing 9 
G 9 
U done processing 9 
G 9 
G 10 
G 10 
G 10 
G 10 

또한 ThreadPool이를 사용할 수 있습니다.

open System.Threading 

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | SetState of int 
    | UpdateState 

let mbox = MailboxProcessor.Start(fun mbox -> 
    let rec loop state = async { 
     let! msg = mbox.Receive() 

     match msg with 
     | UpdateState -> 
      ThreadPool.QueueUserWorkItem((fun obj -> 
       let state = obj :?> int 

       printfn "U start processing %d" state 
       Async.Sleep 100 |> Async.RunSynchronously 
       printfn "U done processing %d" state 
       mbox.Post(SetState(state + 1)) 

       ), state) 
      |> ignore 
     | GetState chnl -> 
      chnl.Reply state 
     | SetState newState -> 
      return! loop newState 
     return! loop state 
    } 
    loop 0) 

[ 
    async { 
     for i in 1..10 do 
      mbox.Post UpdateState 
      do! Async.Sleep 200 
    }; 
    async { 
     // wait some time so that several `UpdateState` messages are fired 
     do! Async.Sleep 1000 

     for i in 1..20 do 
      printfn "G %d" (mbox.PostAndReply GetState) 
      do! Async.Sleep 50 
    } 
] 
|> Async.Parallel 
|> Async.RunSynchronously 
|> ignore 

System.Console.ReadLine() |>

+0

('state : =! state + 1')의 연산자'!'는 원자입니까? mutable 변수를 피하기 위해'MailboxProcessor'를 사용합니다.이 경우에는 문제가 될 수 있습니다. 스레드 풀은 좋은 소리입니다. – stej

+0

원자가 아닙니다. 불변의 상태를 원하면 여분의 SetState와 ThreadPool을 사용하십시오. – gradbot

3

이 TRYSCAN을 사용하지 마십시오 무시!

불행히도 F # 현재 버전의 TryScan 함수는 두 가지 방법으로 구분됩니다. 첫째, 전체 요점은 타임 아웃을 지정하는 것이지만 구현은 실제로 그것을 존중하지 않습니다. 특히, 관련성이없는 메시지는 타이머를 재설정합니다. 둘째, 다른 Scan 함수와 마찬가지로 메시지 대기열은 검사 기간 동안 다른 스레드가 게시하는 것을 막는 잠금 상태에서 검사되며 임의의 시간이 걸릴 수 있습니다. 따라서 TryScan 함수 자체는 동시 시스템을 잠그는 경향이 있으며 심지어 호출자의 코드가 잠금 내부에서 평가되기 때문에 교착 상태가 발생할 수 있습니다 (예 : 함수 인수에서 또는 TryScan에 게시하면 잠금 대기중인 코드가 대기 중일 때 에이전트가 교착 상태가 될 수 있음) 이미 자물쇠를 얻기 위해).

제 제작 코드 초기 프로토 타입에 TryScan을 사용했는데 문제가 발생하지 않았습니다. 그러나, 나는 그 주위에서 건축가를 관리했고 그 결과 건축물은 실제로 더 좋았다. 본질적으로, 나는 열심히 Receive 모든 메시지와 필터를 내 자신의 로컬 큐를 사용합니다.

+0

꽤 흥미 롭습니다. 그러나 솔직히 말하면 대상 메시지를 찾는 동안 대기열이 잠길 것으로 예상됩니다. 어쨌든, 사실인지, 타임 아웃이 이상한 방식으로 작동 하는지를 알기 위해 goot하십시오. 로컬 큐를 사용하는 코드 (마지막 단락에서 언급 했음)를 표시 할 수 있다면 좋을 것입니다. – stej

+1

메일 박스 (및 TPL)의 구현이 대기 상태 일 것을 기대하고 있습니다. 내 대답에 코드를 추가하겠습니다 ... –

관련 문제