차이

2016-10-24 5 views
2

난, Akka streams에서 팬 전략과 다소 혼란이 I는 Broadcast 읽었 - (1 입력 N 출력) 입력 요소 주어지는 각 출력 출사 Balance 동안 - (1 입력, N 출력)은 출력 요소 중 하나에 출력됩니다.차이

당신이 나를 설명 할 수 :

균형 여러 소비자와 작동합니까 어떻게
  1. ?
  2. 문구의 의미는 "출력 포트 중 하나에 방출"
  3. 포트가 다운 스트림과 동일합니까?
  4. 입력 스트림을 일부 출력 파티션으로 복제 할 때 '균형'이 설정되어 있습니까?
  5. 그래프를 분리하여 균형을 조정할 수 있으며 볼륨을 처리하기 위해 다운 스트림 가입자의 여러 인스턴스를 복제한다는 의미는 무엇입니까?
+1

브로드 캐스트는 단순히 메시지를 취하여 메시지가있는 모든 출력 포트로 보냅니다. 잔액은 메시지를 취하여 해당 출력 포트의 가용성 및 배압에 따라 출력 중 하나에만 보냅니다. –

+0

@ Brian Pendleton하지만 D가 builder.add 인 경우 (Z <~ D, F <~ D) 동시에 두 개의 다운 스트림 소비자가있는 경우 어떻게해야합니까? (잔액 [T] (2))? – pacman

+0

'Z' 또는'F'가 메시지를받습니다. 둘 다 아닙니다. 둘 다 모든 메시지를 처리하기를 원하면'Broadcast'를 사용하십시오. '잔액'은 'Z'와 'F'가 동일한 처리 단계 일 때 사용되지만 여러 액터에 걸쳐 작업을 분할하기를 원할 때 사용됩니다. 여기를보세요 : builder.add (Balance [T] (2))를 쓸 때 http://boc.akka.io/docs/akka/2.4.11/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers –

답변

3

설명서에서 ... 브로드 캐스트는 모든 소비자에게 요소를 방출 (전송)합니다. 저울은 첫 번째 사용 가능한 소비자에게만 방출됩니다.

발광

broadcast N 출력의 각 각 수신 소자

.

balance

팬 아웃 여러 스트림 스트림. 각 업스트림 요소는 첫 번째 사용 가능한 다운 스트림 소비자에게 방출 된 입니다. 댓글에서

편집 : 당신의 요지를

, 당신은 두 averageCarrierDelay 기능, ZF 각 하나를해야한다. 그러면 각각에 전송 된 모든 요소를 ​​볼 수 있습니다.

val averageCarrierDelayZ = 
    Flow[FlightDelayRecord] 
     .groupBy(30, _.uniqueCarrier) 
     .fold(("", 0, 0)){ 
      (x: (String, Int, Int), y:FlightDelayRecord) => { 
      println(s"Z Received Element: ${y}") 
      val count = x._2 + 1 
      val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0) 
      (y.uniqueCarrier, count, totalMins) 
      } 
     }.mergeSubstreams 


val averageCarrierDelayF = 
    Flow[FlightDelayRecord] 
     .groupBy(30, _.uniqueCarrier) 
     .fold(("", 0, 0)){ 
      (x: (String, Int, Int), y:FlightDelayRecord) => { 
      println(s"F Received Element: ${y}") 
      val count = x._2 + 1 
      val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0) 
      (y.uniqueCarrier, count, totalMins) 
      } 
     }.mergeSubstreams 

편집 2 : 향후 상황을 확인하려면 진행 상황을 볼 수 있도록 스트림 스테이지 용 일반 로거를 권합니다.

D ~> logElement[FlightDelayRecord]("F received: ") ~> F 
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z 

당신은 당신이 또는 기대되지 않을 수 있습니다 이상한 행동에 대한 그래프의 영역을 확인할 수 있습니다이 방법 : 이렇게

def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a } 

당신이 그런 짓을 할 수 있습니다. 다른 이미 말했듯이 균형 배압에 기초 하나의 출력 포트에 입력을 방출하면서

+0

D가 builder.add (Balance [T] (2)) 인 그래프가 있습니다 (Z <~ D, F <~ D)),이 상황에서 D는 Z와 F 모두에 값을 보냅니다. 답에 따르면, D는 그 중 하나만 보내야 만합니다. 아니면 내 사고가 잘못 되었습니까? – pacman

+0

전체 코드 파일을 게시 할 수 있습니까? 당신이 말하는 것이 사실이라면, Akka는 그것에 버그가 있습니다. 잔액은 사용 가능한 첫 번째 소비자에게만 지급해야합니다. –

+0

내 전체 코드 파일은 https://gist.github.com/anonymous/4515afdd9c40709b23a19e55beb76b3e입니다. 코드는 자습서에서 수정되었습니다. 이 코드에서 val D는 Z와 F에'Balance '를 보내고 두 사람 모두 diff와 함께 콘솔에 데이터와 프린트 결과를받습니다. 어쩌면 상황에 대한 나의 통찰력이 잘못되었습니다.이 경우 내 실수를 분명히하십시오. – pacman

1

은 방송, 에 입력 모든 출력 포트를 방출한다.

GraphStage을 사용할 때 사용할 출력 포트를 선택해야합니다.이 예제를 고려해보십시오 :

val q1 = Source.queue[Int](10, OverflowStrategy.fail) 
val q2 = Source.queue[Int](10, OverflowStrategy.fail) 
GraphDSL.create(q1, q2)(Keep.both) { implicit b => (input1, input2) => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[Int](2)) 
    val balance = b.add(Balance[Int](2)) 

    val consumer1, consumer2, consumer3, consumer4 = b.add(Sink.foreach[Int](println)) 

    input1 ~> broadcast.in 
    input2 ~> balance.in 

    broadcast.out(0) ~> consumer1 
    broadcast.out(1) ~> consumer2 

    balance.out(0) ~> consumer3 
    balance.out(1) ~> consumer4 

    ClosedShape 
} 

여기서 우리는 하나의 입력을 방송 스테이지에, 하나를 밸런스 스테이지에 연결합니다. 그런 다음 방송 및 균형 단계의 다양한 출력 포트를 각 소비자에게 연결합니다. 만약 스트림을 실행할 때 특정한 경우

은, 첫 번째 입력을 통해 오는 요소는 (두 개의 출력이 여기) 방송 단계는 모든 출력에 복사 입력을하기 때문에, consumer1consumer2 모두에 전달되고 두 번째 입력을 통해 들어오는 요소는 의 속도를 기준으로 consumer3consumer4에 균등하게 배분됩니다 (즉, 속도가 println 인 경우). 기능이 오랫동안 실행될 때 역압이 발생하기 때문입니다.

브로드 캐스트 및 밸런스 단계에는 각각 2 개의 포트 (팩토리 메서드를 호출 할 때)가 있고 어떤 출력 포트를 어떤 소비자에 연결할지 지정했습니다.