2011-03-11 3 views
3

TChan에 값을 덤핑 한 다음 소비자가 처리합니다. 그러나 소비자는 따라갈 수 없기 때문에 제작자가 채널에 많은 물건을 버리므로 많은 메모리 사용량이 발생하지만 소비자는 유지하지 않습니다. 채널 큐가 특정 크기가되거나 뭔가가 될 경우 생산자 블록을 보유하는 직접적인 방법이있어 생산자가 소비자가 따라 잡을 때까지 기다릴 수 있습니까?하스켈의 TChan에서 생산자/소비자 상황에서 생산자를 조율하는 방법은 무엇입니까?

+2

첫 번째 검색 결과 : GitHub의/JNB/경계-tchan] (https://github.com/jnb/bounded-tchan) – ephemient

답변

3

John의 답변과 마찬가지로, 한정된 TChan을 직접 작성하는 것이 좋습니다.

  1. 추상화를 추가 (BTChan ADT한다)
  2. 인해 그의 IO의 현재 크기를 읽는 코너 케이스를 제거 :이 때문에 내 코드는 다르다.
  3. 은 독서시 TVar 크기로 썽크를 만들지 않으려 고합니다. 썽크가 "한 가지 깊이"일 수 있기 때문에 작성할 때 덜 중요합니다. 다음 작업은 항상 크기를 평가해야합니다. http://hackage.haskell.org/package/bounded-tchan

참고 :

  • 는 hackage에 지금 내가 당신이라면 솔직히,이 모든 해답을 무시하는 것하고 나쁜 것으로 판명되지 않는 한 단지 (자신의 의견에 연결된 코드 ephemient로 이동 암호). 나는 내가 여기에서하는 것과 똑같이하지만 더 생각할 것입니다. STM 프로그래머 노트의

    {-# LANGUAGE BangPatterns #-} 
    module BTChan 
         (BTChan 
         , newBTChanIO 
         , newBTChan 
         , writeBTChan 
         , readBTChan 
         ) where 
    
    import Control.Concurrent.STM 
    
    data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar Int) 
    
    -- | `newBTChan m` make a new bounded TChan of max size `m` 
    newBTChanIO :: Int -> IO (BTChan a) 
    newBTChanIO m = do 
        szTV <- newTVarIO 0 
        c <- newTChanIO 
        return (BTChan m c szTV) 
    
    newBTChan :: Int -> STM (BTChan a) 
    newBTChan m 
         | m < 1 = error "BTChan's can not have a maximum <= 0!" 
         | otherwise = do 
         szTV <- newTVar 0 
         c <- newTChan 
         return (BTChan m c szTV) 
    
    writeBTChan :: BTChan a -> a -> STM() 
    writeBTChan (BTChan mx c szTV) x = do 
         sz <- readTVar szTV 
         if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x 
    
    readBTChan :: BTChan a -> STM a 
    readBTChan (BTChan _ c szTV) = do 
         x <- readTChan c 
         sz <- readTVar szTV 
         let !sz' = sz - 1 
         writeTVar szTV sz' 
         return x 
    
    sizeOfBTChan :: BTChan a -> STM Int 
    sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV 
    

    어떤 것들은 :

    • 명시 적으로 변경할 수있는 TVar 중 하나 TChan의 상태 기다리고 차단 된 상태에서 하스켈 실을 꿰기, 얻을 것 retry를 호출 다시 시도 할 수 있습니다. 이것은 IO의 값을 검사하지 않고 yield 함수를 사용하는 것을 피하는 방법입니다.
    • MVars와 마찬가지로 TVars는 썽크를 참조 할 수 있습니다. 썽크는 일반적으로 원하는 것이 아닙니다. 아마도 누군가 STVar, STChan, SBTChanBTChan (엄격한 및/또는 제한된 TVars 및 TChan)을 정의하는 패키지를 만들어야합니다.
    • 실제로 new{TVar,TChan}IO의 구현이 unsafePerformIO인데도 atomically이 수행 할 수 없기 때문에 의 구현이 작동하기 때문에 newBTChanIO을 쓰는 것이 필요합니다.

    EDIT : TVar를 독자와 작가로 구분하여 성능을 2 ~ 5 배 향상시킬 수 있습니다 (사용하는 범위에 따라 다름). 기준을 사용하여 확인되었습니다. 개선 된 버전 인 0.2.1은 이미 해킹 중입니다.

  • +0

    'retry'를 명시 적으로 호출하면 항상 항복 할 수있는 참조가 있습니까? –

    +0

    @John 그것은 항상 yeild는 아니지만, 읽은 TVar 중 하나가 다시 쓰여질 때까지 다시 시도하지 않을 것입니다. 즉, 동일한 '재시도'가 다시 호출되도록하는 값이 변경되지 않으면 다시 시도하지 않습니다. SimonPJ는 매우 읽기 쉬운 [STM 논문] (http://research.microsoft.com/en-us/um/people/simonpj/papers/stm/index.htm)에 대한 링크가 있습니다. –

    0

    hackage에 BoundedChan이 있지만 STM이 아닌 MVars가 사용됩니다. 이 코드를 사용하여 직접 작성하는 방법을 배울 수 있습니다. 이는 코드 페이지에 관한 것입니다. cursz 값을 변경할 수 있기 때문에,

    type BoundedChan a = (TChan a, TVar Int, Int) 
    
    writeBoundedChan :: BoundedChan a -> a -> IO() 
    writeBoundedChan [email protected](tchan, tsz, maxsz) x = do 
        cursz' <- readTVarIO tsz 
        if cursz' >= maxsz 
        then yield >> writeBoundedChan bc x 
        else atomically $ do 
         writeTChan tchan a 
         cursz <- readTVar tsz 
         writeTVar tsz (cursz+1) 
    
    readBoundedChan :: BoundedChan a -> IO a 
    readBoundedChan (tchan, tsz, maxsz) = atomically $ do 
        x <- readTChan tchan 
        cursz <- readTVar tsz 
        writeTVar tsz (cursz-1) 
        return x 
    

    참고 여러 업체가있는 경우 최대 크기가 약간 초과 할 수있다 :

    2

    아마 가장 쉬운 해결책은 채널의 요소 수를 나타내는 TVar을 추가하는 것입니다 두 가지 읽기 사이.

    +0

    당신은'cursz을 확인 할 필요가 없습니다 '원자 적으로 IO 이전에. 원자 적으로 모든 것을 한 번 읽어보십시오 :'readTVar >> = \ cursz -> cursz> = maxsz라면 다시 시도해주세요.'거기에서'retry'의 사용을 주목하십시오. –

    +0

    @TomMD - 동일한 'STM' 블록 내에서'다시 시도 '하면 스레드가 반드시 생성되지는 않습니다. hackage docs는 "스레드를 차단할 수 있습니다"라고 말하며, 항상 차단할 것이라고 가정하지 않습니다. 앞으로의 구현에서 동작이 변경되지 않을 것이라고 나는 믿지 않을 것입니다. 왜 비 차단 재시도가 유용 할 수 있는지 알 수 없기 때문에 나는 편집증적일 수 있습니다. 그러나 그것은 필자의 생각입니다. –

    +0

    Yielding은 재 시도가 어떻게 작동하는지, 동일하지 않은 TVars에 대해 동일한 STM 작업을 실행하지 않는지 여부입니다. TVars가 다른 STM 작업에 의해 '재시도'작업과 동시에 변경되면 작업이 수행되지 않기 때문에 "발생할 수 있습니다". 만약 당신이 여전히 불편하다면'writeBoundedChan'은'Bool'을 리턴하는'STM' 연산을 호출 할 수 있고'False'는'yeild'를 리턴하고,'True'는 리턴 할 것입니다 - OK 일 것입니다. 일정 잡을 수있는 스레드가 남아 있기 때문에'재시도 (retry) '보다 힘이 듭니다) 크기를 초과 한 버그를 제거하십시오. –

    1

    나는 게임에 약간 늦었다 고 알고 있지만, 대신 건너 뛰기 채널을 구현할 수 있습니다.이 코드는 채널에 비 블로킹 쓰기를 허용하지만 어떤 것으로도 볼 수 없었던 이전 값을 "덮어 씁니다" 리더. "경계 tchan"에 대한

    import Control.Concurrent.MVar 
    
    data SkipChan a = SkipChan (MVar (a, [MVar()])) (MVar()) 
    
    newSkipChan :: IO (SkipChan a) 
    newSkipChan = do 
        sem <- newEmptyMVar 
        main <- newMVar (undefined, [sem]) 
        return (SkipChan main sem) 
    
    putSkipChan :: SkipChan a -> a -> IO() 
    putSkipChan (SkipChan main _) v = do 
        (_, sems) <- takeMVar main 
        putMVar main (v, []) 
        mapM_ (\sem -> putMVar sem()) sems 
    
    getSkipChan :: SkipChan a -> IO a 
    getSkipChan (SkipChan main sem) = do 
        takeMVar sem 
        (v, sems) <- takeMVar main 
        putMVar main (v, sem:sems) 
        return v 
    
    dupSkipChan :: SkipChan a -> IO (SkipChan a) 
    dupSkipChan (SkipChan main _) = do 
        sem <- newEmptyMVar 
        (v, sems) <- takeMVar main 
        putMVar main (v, sem:sems) 
        return (SkipChan main sem) 
    
    관련 문제