2017-02-06 3 views
0

웹 소켓을 통해 클라이언트에 알림을 보내려고합니다. 이 알림은 배우가 생성하므로 서버 시작시 배우의 메시지 스트림을 만들고이 스트림 (구독 이후에 방출 된 알림 만 전송)에 대한 websockects 연결을 구독하려고합니다.Akka Streams 및 Akka HTTP를 사용하여 액터의 메시지에 웹 소켓을 등록하는 방법은 무엇입니까?

Source.actorRef으로 소스를 만들 수 있습니다. 배우 메시지.

val ref = Source.actorRef[Weather](Int.MaxValue, fail) 
       .filter(!_.raining) 
       .to(Sink foreach println) 
       .run() 

ref ! Weather("02139", 32.0, true) 

하지만 어떻게 가입 할 수 있습니다 (akka HTTP *)이 소스에 WebSocket을 연결을 이미 구체화 된 경우? Akka의 HTTP에서

* WebSocket을 연결이 흐름 요구 [메시지, 메시지, 임의]

난 할 노력하고있어

// at server startup 
val notifications: Source[Notification,ActorRef] = Source.actorRef[Notificacion](5,OverflowStrategy.fail) 
val ref = notifications.to(Sink.foreach(println(_))).run() 
val notificationActor = system.actorOf(NotificationActor.props(ref)) 

// on ws connection 
val notificationsWS = path("notificationsWS") { 
    parameter('name) { name ⇒ 
    get { 
     onComplete(flow(name)){ 
     case Success(f) => handleWebSocketMessages(f) 
     case Failure(e) => throw e 
     } 
    } 
    } 
} 

def flow(name: String) = { 
    val messages = notifications filter { n => n.name equals name } map { n => TextMessage.Strict(n.data) } 
    Flow.fromSinkAndSource(Sink.ignore, notifications) 
} 

같은이 doensn't의 작품 알림 소스 때문에 실현 된 것이 아니므로 어떤 요소도 방출하지 않습니다.

참고 : 나는 Source.actorPublisher를 사용하고 그것을 작동하지만 discourages his usage을 ktoso 또한 나는이 오류가 발생 하였다 :

java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0. 

답변

1

당신은 mapMaterializedValue를 사용하여 일부 외부 라우터 배우로 구체화 actorRef 노출 될 수 있습니다.

Flow.fromSinkAndSourceMat(Sink.ignore, notifications)(Keep.right) 
    .mapMaterializedValue(srcRef => router ! srcRef) 

라우터는 그들에게 메시지를 앞으로 (물건을 정리하는 데 도움이 임종을 지켜 봄) 당신의 소스 actorrefs 추적 할 수 있습니다.

NB : 아마도 이미 알고있을 것입니다 만, Source.actorRef을 사용하여 유량을 공급할 경우 유량은 배압을 인식하지 못합니다 (선택한 방식으로 부하가 걸리는 경우).

관련 문제