2013-06-05 3 views
11

그래서 나는 한 소비자와 많은 생산자를 Go에서 구현하는 많은 방법을 보았습니다 - Concurrency in Go의 고전적인 fanIn 기능.이동 : 하나의 생산자가 많은 소비자

내가 원하는 것은 fanOut 함수입니다. 값을 읽는 채널을 매개 변수로 사용하고이 값의 사본을 쓰는 채널 조각을 반환합니다.

이것을 구현하는 올바른/권장 방법이 있습니까?

답변

13

당신이 할 수있는 가장 좋은 방법은 꽤 많이 설명되었지만, 여기에는 작은 코드 샘플이 있습니다.

이동 놀이터 : 입력 채널이 소진되면 우리는 출력 채널을 종료하는 방법 https://play.golang.org/p/jwdtDXVHJk

package main 

import (
    "fmt" 
    "time" 
) 

func producer(iters int) <-chan int { 
    c := make(chan int) 
    go func() { 
     for i := 0; i < iters; i++ { 
      c <- i 
      time.Sleep(1 * time.Second) 
     } 
     close(c) 
    }() 
    return c 
} 

func consumer(cin <-chan int) { 
    for i := range cin { 
     fmt.Println(i) 
    } 
} 

func fanOut(ch <-chan int, size, lag int) []chan int { 
    cs := make([]chan int, size) 
    for i, _ := range cs { 
     // The size of the channels buffer controls how far behind the recievers 
     // of the fanOut channels can lag the other channels. 
     cs[i] = make(chan int, lag) 
    } 
    go func() { 
     for i := range ch { 
      for _, c := range cs { 
       c <- i 
      } 
     } 
     for _, c := range cs { 
      // close all our fanOut channels when the input channel is exhausted. 
      close(c) 
     } 
    }() 
    return cs 
} 

func fanOutUnbuffered(ch <-chan int, size int) []chan int { 
    cs := make([]chan int, size) 
    for i, _ := range cs { 
     // The size of the channels buffer controls how far behind the recievers 
     // of the fanOut channels can lag the other channels. 
     cs[i] = make(chan int) 
    } 
    go func() { 
     for i := range ch { 
      for _, c := range cs { 
       c <- i 
      } 
     } 
     for _, c := range cs { 
      // close all our fanOut channels when the input channel is exhausted. 
      close(c) 
     } 
    }() 
    return cs 
} 

func main() { 
    c := producer(10) 
    chans := fanOutUnbuffered(c, 3) 
    go consumer(chans[0]) 
    go consumer(chans[1]) 
    consumer(chans[2]) 
} 

중요한 부분을 주목해야 할 것이다. 또한 출력 채널 중 하나가 전송에서 차단되면 다른 출력 채널에서 전송을 보류합니다. 우리는 채널의 버퍼 크기를 설정하여 지연의 양을 제어합니다.

+1

! 고맙습니다. 그것은 나를 망쳐 놓고 있었던 채널의 폐쇄였다. 앞으로이 기능이 필요한 사람들에게 감사 드리며 실행 버전이 있습니다. http://play.golang.org/p/jwdtDXVHJk – Carl

2

아래이 솔루션은 약간 인위적인이지만, 그것은 나를 위해 작동 : 우수한

package main 

import (
    "fmt" 
    "time" 
    "crypto/rand" 
    "encoding/binary" 
) 

func handleNewChannels(arrchangen chan [](chan uint32), 
         intchangen chan (chan uint32)) { 
    currarr := []chan uint32{} 
    arrchangen <- currarr 
    for { 
     newchan := <-intchangen 
     currarr = append(currarr, newchan) 
     arrchangen <- currarr 
    } 
} 

func sendToChannels(arrchangen chan [](chan uint32)) { 
    tick := time.Tick(1 * time.Second) 
    currarr := <-arrchangen 
    for { 
     select { 
     case <-tick: 
      sent := false 
      var n uint32 
      binary.Read(rand.Reader, binary.LittleEndian, &n) 
      for i := 0 ; i < len(currarr) ; i++ { 
       currarr[i] <- n 
       sent = true 
      } 
      if sent { 
       fmt.Println("Sent generated ", n) 
      } 
     case newarr := <-arrchangen: 
      currarr = newarr 
     } 
    } 
} 
func handleChannel(tchan chan uint32) { 
    for { 
     val := <-tchan 
     fmt.Println("Got the value ", val) 
    } 
} 

func createChannels(intchangen chan (chan uint32)) { 
    othertick := time.Tick(5 * time.Second) 
    for { 
     <-othertick 
     fmt.Println("Creating new channel! ") 
     newchan := make(chan uint32) 
     intchangen <- newchan 
     go handleChannel(newchan) 
    } 
} 

func main() { 
    arrchangen := make(chan [](chan uint32)) 
    intchangen := make(chan (chan uint32)) 
    go handleNewChannels(arrchangen, intchangen) 
    go sendToChannels(arrchangen) 
    createChannels(intchangen) 
} 
관련 문제