설명서에서 ... 브로드 캐스트는 모든 소비자에게 요소를 방출 (전송)합니다. 저울은 첫 번째 사용 가능한 소비자에게만 방출됩니다.
발광
broadcast N 출력의 각 각 수신 소자
.
balance
팬 아웃 여러 스트림 스트림. 각 업스트림 요소는 첫 번째 사용 가능한 다운 스트림 소비자에게 방출 된 입니다. 댓글에서
편집 : 당신의 요지를
, 당신은 두 averageCarrierDelay 기능, Z
및 F
각 하나를해야한다. 그러면 각각에 전송 된 모든 요소를 볼 수 있습니다.
val averageCarrierDelayZ =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"Z Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
val averageCarrierDelayF =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"F Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
편집 2 : 향후 상황을 확인하려면 진행 상황을 볼 수 있도록 스트림 스테이지 용 일반 로거를 권합니다.
D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
당신은 당신이 또는 기대되지 않을 수 있습니다 이상한 행동에 대한 그래프의 영역을 확인할 수 있습니다이 방법 : 이렇게
는
def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
당신이 그런 짓을 할 수 있습니다. 다른 이미 말했듯이 균형 배압에 기초 하나의 출력 포트에 입력을 방출하면서
브로드 캐스트는 단순히 메시지를 취하여 메시지가있는 모든 출력 포트로 보냅니다. 잔액은 메시지를 취하여 해당 출력 포트의 가용성 및 배압에 따라 출력 중 하나에만 보냅니다. –
@ Brian Pendleton하지만 D가 builder.add 인 경우 (Z <~ D, F <~ D) 동시에 두 개의 다운 스트림 소비자가있는 경우 어떻게해야합니까? (잔액 [T] (2))? – pacman
'Z' 또는'F'가 메시지를받습니다. 둘 다 아닙니다. 둘 다 모든 메시지를 처리하기를 원하면'Broadcast'를 사용하십시오. '잔액'은 'Z'와 'F'가 동일한 처리 단계 일 때 사용되지만 여러 액터에 걸쳐 작업을 분할하기를 원할 때 사용됩니다. 여기를보세요 : builder.add (Balance [T] (2))를 쓸 때 http://boc.akka.io/docs/akka/2.4.11/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers –