2013-10-04 3 views
4

주 - Go2의 신참.채널 멀티플렉서

은 채널 배열의 출력을 하나로 병합해야하는 멀티플렉서를 작성했습니다. 건설적인 비판에 만족합니다.

func Mux(channels []chan big.Int) chan big.Int { 
    // Count down as each channel closes. When hits zero - close ch. 
    n := len(channels) 
    // The channel to output to. 
    ch := make(chan big.Int, n) 

    // Make one go per channel. 
    for _, c := range channels { 
     go func() { 
      // Pump it. 
      for x := range c { 
       ch <- x 
      } 
      // It closed. 
      n -= 1 
      // Close output if all closed now. 
      if n == 0 { 
       close(ch) 
      } 
     }() 
    } 
    return ch 
} 

나는 그것을 테스트입니다 :

func fromTo(f, t int) chan big.Int { 
    ch := make(chan big.Int) 

    go func() { 
     for i := f; i < t; i++ { 
      fmt.Println("Feed:", i) 
      ch <- *big.NewInt(int64(i)) 
     } 
     close(ch) 
    }() 
    return ch 
} 

func testMux() { 
    r := make([]chan big.Int, 10) 
    for i := 0; i < 10; i++ { 
     r[i] = fromTo(i*10, i*10+10) 
    } 
    all := Mux(r) 
    // Roll them out. 
    for l := range all { 
     fmt.Println(l) 
    } 
} 

하지만 내 출력이 매우 이상하다 :

내 질문에 그래서
Feed: 0 
Feed: 10 
Feed: 20 
Feed: 30 
Feed: 40 
Feed: 50 
Feed: 60 
Feed: 70 
Feed: 80 
Feed: 90 
Feed: 91 
Feed: 92 
Feed: 93 
Feed: 94 
Feed: 95 
Feed: 96 
Feed: 97 
Feed: 98 
Feed: 99 
{false [90]} 
{false [91]} 
{false [92]} 
{false [93]} 
{false [94]} 
{false [95]} 
{false [96]} 
{false [97]} 
{false [98]} 
{false [99]} 

:

  • 가 나는 것이 있습니다 Mux에서 잘못 했나요?
  • 출력 채널에서 마지막 10 개만 가져 오는 이유는 무엇입니까?
  • 왜 먹이가 너무 이상하게 보입니까? (각 입력 채널 중 첫 번째 채널, 마지막 채널 모두, 그 다음에는 아무 것도 입력하지 마십시오.)
  • 더 좋은 방법이 있습니까?

나는 출력 채널과 동일한 권한이 입력 채널의 모든 필요 - 내가 모두에서 등 다음


누구를 위해 그 한 채널의 출력을 모두 가지고 할 수없는, 즉 관심 -이 수정 후 최종 코드와 goroutines의 각 Mux에서 양산 sync.WaitGroup

import (
    "math/big" 
    "sync" 
) 

/* 
    Multiplex a number of channels into one. 
*/ 
func Mux(channels []chan big.Int) chan big.Int { 
    // Count down as each channel closes. When hits zero - close ch. 
    var wg sync.WaitGroup 
    wg.Add(len(channels)) 
    // The channel to output to. 
    ch := make(chan big.Int, len(channels)) 

    // Make one go per channel. 
    for _, c := range channels { 
     go func(c <-chan big.Int) { 
      // Pump it. 
      for x := range c { 
       ch <- x 
      } 
      // It closed. 
      wg.Done() 
     }(c) 
    } 
    // Close the channel when the pumping is finished. 
    go func() { 
     // Wait for everyone to be done. 
     wg.Wait() 
     // Close. 
     close(ch) 
    }() 
    return ch 
} 

답변

2

의 정확한 (아마도) 사용 이후, 같은 채널에서 당겨 끝 c은 루프 –의 각 반복마다 업데이트되어 c 값을 캡처하지 않습니다.

for _, c := range channels { 
    go func(c <-chan big.Int) { 
     ... 
    }(c) 
} 

이 수정 here을 테스트 할 수 있습니다 : 당신이 그렇게 같은 goroutine에 채널을 통과 할 경우 예상되는 결과를 얻을 수 있습니다.

다른 가능한 문제는 변수를 처리하는 것입니다. GOMAXPROCS != 1으로 실행중인 경우 한 번에 두 개의 goroutines를 업데이트 할 수 있습니다. sync.WaitGroup 유형은 goroutines가 완료 될 때까지 기다리는 것이 더 안전한 방법입니다.

+0

고맙습니다 - 내 문제를 정확하게 설명합니다. 그 결과 모든 건축물에서 일관되게 모든 채널에 동등한 권리가 부여됩니까? – OldCurmudgeon

+0

'ch '에게 먹이를주는 각 goroutine이 공정하게 계획 될지 묻고 있습니까? 그것이 정의되어 있는지 아닌지 나는 모른다. 특정 결과의 인터리빙이 필요하면 더 많은 것을 필요로 할 수 있습니다. –

+0

일부 환경에서는 다음 채널이 룩 - 인되기 전에 각 채널이 고갈 될 수도 있습니다. 그것은 피해야합니다. 특정 순서는 필요하지 않지만 모든 채널간에 균형이 필요합니다. – OldCurmudgeon

2

조금 뒤늦게 알았지 만, 일반적인 Multiplex 기능을 구현 한 패키지를 작성했습니다. 리플렉션 패키지에서 "선택"호출을 사용하여 잠금 또는 대기 그룹을 필요로하지 않고 효율적이고 균형 잡힌 멀티플렉싱을 보장합니다.

1

James Hentridge 대답에 다시 assignement 문제를 처리하는 관용적 방법을 구축하려면 위태로운 가치 :

for _, c := range channels { 
    c := c 
    go func() { 
    ... 
    }() 
}