그래서 나는 한 소비자와 많은 생산자를 Go에서 구현하는 많은 방법을 보았습니다 - Concurrency in Go의 고전적인 fanIn 기능.이동 : 하나의 생산자가 많은 소비자
내가 원하는 것은 fanOut 함수입니다. 값을 읽는 채널을 매개 변수로 사용하고이 값의 사본을 쓰는 채널 조각을 반환합니다.
이것을 구현하는 올바른/권장 방법이 있습니까?
그래서 나는 한 소비자와 많은 생산자를 Go에서 구현하는 많은 방법을 보았습니다 - Concurrency in Go의 고전적인 fanIn 기능.이동 : 하나의 생산자가 많은 소비자
내가 원하는 것은 fanOut 함수입니다. 값을 읽는 채널을 매개 변수로 사용하고이 값의 사본을 쓰는 채널 조각을 반환합니다.
이것을 구현하는 올바른/권장 방법이 있습니까?
당신이 할 수있는 가장 좋은 방법은 꽤 많이 설명되었지만, 여기에는 작은 코드 샘플이 있습니다.
이동 놀이터 : 입력 채널이 소진되면 우리는 출력 채널을 종료하는 방법 https://play.golang.org/p/jwdtDXVHJk
package main
import (
"fmt"
"time"
)
func producer(iters int) <-chan int {
c := make(chan int)
go func() {
for i := 0; i < iters; i++ {
c <- i
time.Sleep(1 * time.Second)
}
close(c)
}()
return c
}
func consumer(cin <-chan int) {
for i := range cin {
fmt.Println(i)
}
}
func fanOut(ch <-chan int, size, lag int) []chan int {
cs := make([]chan int, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan int, lag)
}
go func() {
for i := range ch {
for _, c := range cs {
c <- i
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}
func fanOutUnbuffered(ch <-chan int, size int) []chan int {
cs := make([]chan int, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan int)
}
go func() {
for i := range ch {
for _, c := range cs {
c <- i
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}
func main() {
c := producer(10)
chans := fanOutUnbuffered(c, 3)
go consumer(chans[0])
go consumer(chans[1])
consumer(chans[2])
}
중요한 부분을 주목해야 할 것이다. 또한 출력 채널 중 하나가 전송에서 차단되면 다른 출력 채널에서 전송을 보류합니다. 우리는 채널의 버퍼 크기를 설정하여 지연의 양을 제어합니다.
먼저, 관련 질문 What is the neatest idiom for producer/consumer in Go? 및 One thread showing interest in another thread (consumer/producer)을 참조하십시오. 또한 producer-consumer problem을 살펴보십시오. 동시성 정보는 how to achieve concurrency In Google Go을 참조하십시오.
아래이 솔루션은 약간 인위적인이지만, 그것은 나를 위해 작동 : 우수한
package main
import (
"fmt"
"time"
"crypto/rand"
"encoding/binary"
)
func handleNewChannels(arrchangen chan [](chan uint32),
intchangen chan (chan uint32)) {
currarr := []chan uint32{}
arrchangen <- currarr
for {
newchan := <-intchangen
currarr = append(currarr, newchan)
arrchangen <- currarr
}
}
func sendToChannels(arrchangen chan [](chan uint32)) {
tick := time.Tick(1 * time.Second)
currarr := <-arrchangen
for {
select {
case <-tick:
sent := false
var n uint32
binary.Read(rand.Reader, binary.LittleEndian, &n)
for i := 0 ; i < len(currarr) ; i++ {
currarr[i] <- n
sent = true
}
if sent {
fmt.Println("Sent generated ", n)
}
case newarr := <-arrchangen:
currarr = newarr
}
}
}
func handleChannel(tchan chan uint32) {
for {
val := <-tchan
fmt.Println("Got the value ", val)
}
}
func createChannels(intchangen chan (chan uint32)) {
othertick := time.Tick(5 * time.Second)
for {
<-othertick
fmt.Println("Creating new channel! ")
newchan := make(chan uint32)
intchangen <- newchan
go handleChannel(newchan)
}
}
func main() {
arrchangen := make(chan [](chan uint32))
intchangen := make(chan (chan uint32))
go handleNewChannels(arrchangen, intchangen)
go sendToChannels(arrchangen)
createChannels(intchangen)
}
! 고맙습니다. 그것은 나를 망쳐 놓고 있었던 채널의 폐쇄였다. 앞으로이 기능이 필요한 사람들에게 감사 드리며 실행 버전이 있습니다. http://play.golang.org/p/jwdtDXVHJk – Carl