2016-07-06 3 views
3

Akka Streams를 사용하여 포트에서 수신 대기하고 들어오는 연결을 수락하며 각 연결에서 데이터를 지속적으로 읽는 네트워크 서버를 프로토 타이핑하고 있습니다. 연결된 각 클라이언트는 데이터 만 보내고 서버에서 유용한 정보를 얻지는 않을 것입니다.여러 수신 TCP 연결을 Akka 스트림의 스트림으로 나타내는 방법은 무엇입니까?

개념적으로, 들어오는 이벤트를 실수로 여러 TCP 연결을 통해 전달되는 하나의 단일 스트림으로 모델링하는 것이 적절할 것이라고 생각했습니다. 따라서 각 데이터 메시지를 나타내는 case class Msg(msg: String)이 있다고 가정하면 들어오는 데이터 전체를 Source[Msg, _]으로 나타내야합니다. 이는 플로우 & 싱크를이 소스에 간단하게 연결할 수 있기 때문에 유스 케이스에는 많은 의미가 있습니다. 여기

내가 내 생각 구현하기 위해 쓴 코드입니다 :

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.SourceShape 
import akka.stream.scaladsl._ 
import akka.util.ByteString 
import akka.NotUsed 
import scala.concurrent.{ Await, Future } 
import scala.concurrent.duration._ 

case class Msg(msg: String) 

object tcp { 
    val N = 2 
    def main(argv: Array[String]) { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    val connections = Tcp().bind("0.0.0.0", 65432) 
    val delim = Framing.delimiter(
     ByteString("\n"), 
     maximumFrameLength = 256, allowTruncation = true 
    ) 
    val parser = Flow[ByteString].via(delim).map(_.utf8String).map(Msg(_)) 
    val messages: Source[Msg, Future[Tcp.ServerBinding]] = 
     connections.flatMapMerge(N, { 
     connection => 
      println(s"client connected: ${connection.remoteAddress}") 
      Source.fromGraph(GraphDSL.create() { implicit builder => 
      import GraphDSL.Implicits._ 
      val F = builder.add(connection.flow.via(parser)) 
      val nothing = builder.add(Source.tick(
       initialDelay = 1.second, 
       interval = 1.second, 
       tick = ByteString.empty 
      )) 
      F.in <~ nothing.out 
      SourceShape(F.out) 
      }) 
     }) 
    import scala.concurrent.ExecutionContext.Implicits.global 
    Await.ready(for { 
     _ <- messages.runWith(Sink.foreach { 
     msg => println(s"${System.currentTimeMillis} $msg") 
     }) 
     _ <- system.terminate() 
    } yield(), Duration.Inf) 
    } 
} 

가이 코드는 예상 작품으로, 그러나을, 궁극적으로 들어오는 데이터에 스트림을 결합 flatMapMerge 호출로 전달되는 val N = 2을주의 하나. 실제로 이것은 한 번에 여러 스트림에서만 읽을 수 있음을 의미합니다.

주어진 시간에이 서버에 몇 개의 연결이 만들어 질지 알 수 없습니다. 이상적으로는 최대한 많은 것을 지원하고 싶지만 상한값을 하드 코딩하는 것은 옳은 일처럼 보이지 않습니다.

내 질문은, 긴 마지막입니다 : 내가 어떻게 얻거나 한 번에 연결 고정 된 수보다 더 많은 읽을 수있는 flatMapMerge 무대를 만들 수 있습니까?

+0

당신은 무한 TCP 포트 및 메모리와 기계를 가지고! –

+0

아니, 무한한 기억 장치가 이월되었습니다. 나는 크리스마스까지 가질 수 없습니다. 위 코드의 문제점은 'N'연결을 래치하고 연결할 수 없을 때까지 계속 읽는 것입니다. 다른 모든 연결은 TCP 수신 버퍼를 채우는 동안 남아 있습니다. 따라서 새로운 연결을 수락하지 않거나 'N'열린 연결의 회전 캐스트를 읽어야합니다 (항상 동일한 것은 아니지만). 이게 너에게 의미가 있니? –

+0

정확하게 작동하는 방식이 아닌가요? 선택기와 마찬가지로 [병합]이라고 생각하면 한 번에 한정된 수의 파일 설명자 만 처리 할 수 ​​있습니다. –

답변

0

Viktor Klang의 의견에 따르면 나는 이것이 1 스트림에서 가능하지 않다고 생각합니다. 그러나, 나는 can receive messages after materialization 스트림을 생성하고 그것을 TCP 연결에서 오는 메시지에 대한 "싱크"로 사용할 수있을 것이라고 생각합니다. 메시지를받을 수

val sinkRef = 
    Source 
    .actorRef[Msg](Int.MaxValue, fail) 
    .to(Sink foreach {m => println(s"${System.currentTimeMillis} $m")}) 
    .run() 

이 sinkRef 각 Connection 사용할 수 있습니다 :

첫 번째는 "싱크"스트림을 생성

connections foreach { conn => 
    Source 
    .empty[ByteString] 
    .via(conn.flow) 
    .via(parser) 
    .runForeach(msg => sinkRef ! msg) 
} 
관련 문제