2016-08-31 3 views
3

Akka 2.4.3을 사용하여 TCP 스트림을 다른 싱크로 리디렉션하거나 전달하려고합니다. 프로그램은 서버 소켓을 열고 들어오는 연결을 수신 한 다음 TCP 스트림을 소비해야합니다. 우리의 보낸 사람은 우리의 응답을 기대하거나 받아들이지 않으므로 결코 아무것도 돌려 보내지 않습니다. 우리는 단지 그 흐름을 소비합니다. tcp 스트림을 프레이밍 한 후에 바이트를 더 유용한 것으로 변환하여 싱크 (Sink)로 보내야합니다.TCP 스트림을 소비하고 다른 싱크 (Akka 스트림 포함)로 리디렉션

나는 지금까지 시도했지만 특히 송신기로 tcp 패킷을 보내지 않고 싱크를 올바르게 연결하는 방법에 대해 고심한다.

import scala.util.Failure 
import scala.util.Success 

import akka.actor.ActorSystem 
import akka.event.Logging 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import akka.stream.scaladsl.Tcp 
import akka.stream.scaladsl.Framing 
import akka.util.ByteString 
import java.nio.ByteOrder 
import akka.stream.scaladsl.Flow 

object TcpConsumeOnlyStreamToSink { 
    implicit val system = ActorSystem("stream-system") 
    private val log = Logging(system, getClass.getName)  

    //The Sink 
    //In reality this is of course a real Sink doing some useful things :-) 
    //The Sink accept types of "SomethingMySinkUnderstand" 
    val mySink = Sink.ignore; 

    def main(args: Array[String]): Unit = { 
    //our sender is not interested in getting replies from us 
    //so we just want to consume the tcp stream and never send back anything to the sender 
    val (address, port) = ("127.0.0.1", 6000) 
    server(system, address, port) 
    } 

    def server(system: ActorSystem, address: String, port: Int): Unit = { 
    implicit val sys = system 
    import system.dispatcher 
    implicit val materializer = ActorMaterializer() 
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn => 
     println("Client connected from: " + conn.remoteAddress) 

     conn handleWith Flow[ByteString] 
     //this is neccessary since we use a self developed tcp wire protocol 
     .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
     //here we want to map the raw bytes into something our Sink understands 
     .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) 
     //here we like to connect our Sink to the Tcp Source 
     .to(mySink) //<------ NOT COMPILING 
    } 


    val tcpSource = Tcp().bind(address, port) 
    val binding = tcpSource.to(handler).run() 

    binding.onComplete { 
     case Success(b) => 
     println("Server started, listening on: " + b.localAddress) 
     case Failure(e) => 
     println(s"Server could not bind to $address:$port: ${e.getMessage}") 
     system.terminate() 
    } 

    } 

    class SomethingMySinkUnderstand(x:String) { 

    } 
} 

업데이트 : 필요 deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
+0

다른 사람이 다시 만들 수 있도록 build.sbt 파일 또는 적어도 종속성을 표시하는 데 도움이 될 수 있습니다. – Brian

+0

질문에 위의 deps를 추가했습니다 – salyh

답변

4

handleWith 즉 연결되지 않은 입구와 연결되지 않은 콘센트에 박스, Flow 기대를 얻으려면 build.sbt 파일이 추가. 작업을 사용하여 FlowSink으로 연결했기 때문에 실제로는 Source을 제공합니다.

난 당신이 다음을 수행 할 수 있다고 생각 :

conn.handleWith(
    Flow[ByteString] 
    .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String)) 
    .alsoTo(mySink) 
    .map(_ => ByteString.empty) 
    .filter(_ => false) // Prevents sending anything back 
) 
+0

이것은 나를 위해 작동하지 않습니다. 그것은 영업을 위해 작동 했습니까? – tapasvi

+0

예, 저를 위해 일했습니다. – salyh