2016-10-31 3 views
2

webSocketClientFlow에있는 doc에 따라 클라이언트 측 websocket을 시도하고 있습니다.Akka-HTTP client websocket을 사용하는 방법

샘플 코드는 다음과 같습니다 연결이 업그레이드했다

import akka.actor.ActorSystem 
import akka.Done 
import akka.http.scaladsl.Http 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.model._ 
import akka.http.scaladsl.model.ws._ 

import scala.concurrent.Future 

object WebSocketClientFlow { 
    def main(args: Array[String]) = { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    import system.dispatcher 

    // Future[Done] is the materialized value of Sink.foreach, 
    // emitted when the stream completes 
    val incoming: Sink[Message, Future[Done]] = 
     Sink.foreach[Message] { 
     case message: TextMessage.Strict => 
      println(message.text) 
     } 

    // send this as a message over the WebSocket 
    val outgoing = Source.single(TextMessage("hello world!")) 

    // flow to use (note: not re-usable!) 
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")) 

    // the materialized value is a tuple with 
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that 
    // completes or fails when the connection succeeds or fails 
    // and closed is a Future[Done] with the stream completion from the incoming sink 
    val (upgradeResponse, closed) = 
     outgoing 
     .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] 
     .toMat(incoming)(Keep.both) // also keep the Future[Done] 
     .run() 

    // just like a regular http request we can access response status which is available via upgrade.response.status 
    // status code 101 (Switching Protocols) indicates that server support WebSockets 
    val connected = upgradeResponse.flatMap { upgrade => 
     if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Future.successful(Done) 
     } else { 
     throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
     } 
    } 

    // in a real application you would not side effect here 
    connected.onComplete(println) 
    closed.foreach(_ => println("closed")) 
    } 
} 

후, 어떻게 서버 측 웹 소켓하는 connection 메시지 보내기를 사용할 수 있나요?

내가이 문서에서주의 :이 방법에 의해 반환되는

흐름은 한 번만 구체화 될 수있다. 각 요청에 대해 메소드를 다시 호출하여 새 플로우를 확보해야합니다. 우리가 alrady 준비가 업그레이드 된 연결 이후 흐름을 여러 번 구축이 필요한 이유

은 여전히 ​​혼란.

+1

죄송합니다, 당신이 요구하는지 정말 명확하지 않다. 메시지를 각 스트림으로 푸시하여 웹 소켓 연결을 통해 메시지를 전송합니다. 특정 경우에 하나의 메시지 ('TextMessage ("hello world!")')만이 서버로 보내질 것이고 나가는 스트림은 그 후에 닫힐 것이다. ('Sink.foreach'에서 생성 된) 들어오는 스트림은 서버가 스트림을 닫을 때까지 또는 수신 타임 아웃이 시작될 때까지 계속 메시지를받습니다. 원한다면 적절한'Source'를 구성해야합니다. 보다 복잡한 방식으로 보내는 내용을 제어 할 수 있습니다. –

+0

@VladimirMatveev는 상기시켜 주셔서 감사합니다. akka-http가 어떻게 작동하는지에 대한 오해가 있었고 깊이있는 문서를 읽으려고합니다. 정말 고맙습니다! – xring

답변

4

액터 기반 소스를 만들고 설정된 웹 소켓 연결을 통해 새 메시지를 보낼 수 있습니다.

val req = WebSocketRequest(uri = "ws://127.0.0.1/ws") 
    val webSocketFlow = Http().webSocketClientFlow(req) 

    val messageSource: Source[Message, ActorRef] = 
     Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail) 

    val messageSink: Sink[Message, NotUsed] = 
     Flow[Message] 
      .map(message => println(s"Received text message: [$message]")) 
      .to(Sink.ignore) 

    val ((ws, upgradeResponse), closed) = 
     messageSource 
      .viaMat(webSocketFlow)(Keep.both) 
      .toMat(messageSink)(Keep.both) 
      .run() 

    val connected = upgradeResponse.flatMap { upgrade => 
     if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
      Future.successful(Done) 
     } else { 
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
     } 
    } 

    ws ! TextMessage.Strict("Hello World") 
    ws ! TextMessage.Strict("Hi") 
    ws ! TextMessage.Strict("Yay!") 

`

관련 문제