Akka HTTP를 사용하여 기존 Sink (Kafka 반응 스트림 포함)에 연결하는 REST 서비스를 만들고 싶습니다.하지만 HTTP 스트림을 Akka 스트림 싱크에 연결하는 방법을 알 수 없습니다.Akka HTTP를 Akka 스트림에 연결합니다.
흐름을 사용하는 저급 Akka HTTP API를 사용해야합니까?
- 배압을 전체 흐름에
- 200 응답 코드를 모든 이벤트는 싱크 카프카에 의해 인정되는 경우
- 500 배압이 너무 높은 :
내 요구 사항이 있나요? 가능한가?
다음은 이것에 대해가는 몇 가지 방법이 있습니다
// flow to split group of lines into lines
val splitLines = Flow[String].mapConcat(_.split("\n").toList)
// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String](topic, _))
.toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)
val routes = {
path("ingest") {
post {
logger.info("starting ingestion")
entity(as[GenericEvent]) { eventIngest =>
????
}~
entity(as[GenericEventList]) { eventIngestList =>
????
}
}
}
}
Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)
[이벤트]와 같이 스트림을 언 마샬링 할 수있는 방법이 있습니까? – vgkowski
예, 카프카 싱크에 대해 언 마샬링 된 이벤트를 항상 실행할 수 있습니다. 나는 대답을 –
에 또 다른 예를 넣었습니다. 이것은 Akka HTTP 흐름의 새로운 흐름입니다. 쓸모없는 오버 헤드를 도입하지 않았습니까? – vgkowski