Akka 2.4.3을 사용하여 TCP 스트림을 다른 싱크로 리디렉션하거나 전달하려고합니다. 프로그램은 서버 소켓을 열고 들어오는 연결을 수신 한 다음 TCP 스트림을 소비해야합니다. 우리의 보낸 사람은 우리의 응답을 기대하거나 받아들이지 않으므로 결코 아무것도 돌려 보내지 않습니다. 우리는 단지 그 흐름을 소비합니다. tcp 스트림을 프레이밍 한 후에 바이트를 더 유용한 것으로 변환하여 싱크 (Sink)로 보내야합니다.TCP 스트림을 소비하고 다른 싱크 (Akka 스트림 포함)로 리디렉션
나는 지금까지 시도했지만 특히 송신기로 tcp 패킷을 보내지 않고 싱크를 올바르게 연결하는 방법에 대해 고심한다.
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow
object TcpConsumeOnlyStreamToSink {
implicit val system = ActorSystem("stream-system")
private val log = Logging(system, getClass.getName)
//The Sink
//In reality this is of course a real Sink doing some useful things :-)
//The Sink accept types of "SomethingMySinkUnderstand"
val mySink = Sink.ignore;
def main(args: Array[String]): Unit = {
//our sender is not interested in getting replies from us
//so we just want to consume the tcp stream and never send back anything to the sender
val (address, port) = ("127.0.0.1", 6000)
server(system, address, port)
}
def server(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
println("Client connected from: " + conn.remoteAddress)
conn handleWith Flow[ByteString]
//this is neccessary since we use a self developed tcp wire protocol
.via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))
//here we want to map the raw bytes into something our Sink understands
.map(msg => new SomethingMySinkUnderstand(msg.utf8String))
//here we like to connect our Sink to the Tcp Source
.to(mySink) //<------ NOT COMPILING
}
val tcpSource = Tcp().bind(address, port)
val binding = tcpSource.to(handler).run()
binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
class SomethingMySinkUnderstand(x:String) {
}
}
업데이트 : 필요 deps
libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
다른 사람이 다시 만들 수 있도록 build.sbt 파일 또는 적어도 종속성을 표시하는 데 도움이 될 수 있습니다. – Brian
질문에 위의 deps를 추가했습니다 – salyh