웹 소켓을 통해 클라이언트에 알림을 보내려고합니다. 이 알림은 배우가 생성하므로 서버 시작시 배우의 메시지 스트림을 만들고이 스트림 (구독 이후에 방출 된 알림 만 전송)에 대한 websockects 연결을 구독하려고합니다.Akka Streams 및 Akka HTTP를 사용하여 액터의 메시지에 웹 소켓을 등록하는 방법은 무엇입니까?
Source.actorRef으로 소스를 만들 수 있습니다. 배우 메시지.
val ref = Source.actorRef[Weather](Int.MaxValue, fail)
.filter(!_.raining)
.to(Sink foreach println)
.run()
ref ! Weather("02139", 32.0, true)
하지만 어떻게 가입 할 수 있습니다 (akka HTTP *)이 소스에 WebSocket을 연결을 이미 구체화 된 경우? Akka의 HTTP에서
* WebSocket을 연결이 흐름 요구 [메시지, 메시지, 임의]
난 할 노력하고있어
// at server startup
val notifications: Source[Notification,ActorRef] = Source.actorRef[Notificacion](5,OverflowStrategy.fail)
val ref = notifications.to(Sink.foreach(println(_))).run()
val notificationActor = system.actorOf(NotificationActor.props(ref))
// on ws connection
val notificationsWS = path("notificationsWS") {
parameter('name) { name ⇒
get {
onComplete(flow(name)){
case Success(f) => handleWebSocketMessages(f)
case Failure(e) => throw e
}
}
}
}
def flow(name: String) = {
val messages = notifications filter { n => n.name equals name } map { n => TextMessage.Strict(n.data) }
Flow.fromSinkAndSource(Sink.ignore, notifications)
}
같은이 doensn't의 작품 알림 소스 때문에 실현 된 것이 아니므로 어떤 요소도 방출하지 않습니다.
참고 : 나는 Source.actorPublisher를 사용하고 그것을 작동하지만 discourages his usage을 ktoso 또한 나는이 오류가 발생 하였다 :
java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0.