2017-02-27 9 views
3

Akka HTTP를 사용하여 기존 Sink (Kafka 반응 스트림 포함)에 연결하는 REST 서비스를 만들고 싶습니다.하지만 HTTP 스트림을 Akka 스트림 싱크에 연결하는 방법을 알 수 없습니다.Akka HTTP를 Akka 스트림에 연결합니다.

흐름을 사용하는 저급 Akka HTTP API를 사용해야합니까?

  • 배압을 전체 흐름에
  • 200 응답 코드를 모든 이벤트는 싱크 카프카에 의해 인정되는 경우
  • 500 배압이 너무 높은 :

    내 요구 사항이 있나요? 가능한가?

다음은 이것에 대해가는 몇 가지 방법이 있습니다

// flow to split group of lines into lines 
    val splitLines = Flow[String].mapConcat(_.split("\n").toList) 

// sink to produce kafka records in kafka 
val kafkaSink = Flow[String] 
    .map(new ProducerRecord[Array[Byte], String](topic, _)) 
    .toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right) 

val routes = { 
    path("ingest") { 
     post { 
     logger.info("starting ingestion") 
     entity(as[GenericEvent]) { eventIngest => 
      ????  
     }~ 
     entity(as[GenericEventList]) { eventIngestList => 
      ???? 
     } 
     } 
    } 
    } 

Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort) 

답변

2

내 코드 현재 코드입니다. 한 가지 제안은 요청 엔터티에서 카프카 싱크로 곧바로 데이터를 스트리밍하는 것입니다. extractDataBytes 지시문을 사용하면 정확히 수행 할 수 있습니다 (more info here).

아래 예제의 내용을 따라 무언가를 시도해보십시오. 케이스 구체화 변환이 요청 엔티티 바이트를 올바르게 분할/변환 할 수 있도록 ??? 플로우를 추가했습니다. Framing.delimiter 등을 사용하여 엔티티 바이트 스트림을 분할 할 수 있습니다 (추가 정보 here). 일부 도메인 객체로 엔티티를 비 정렬 화하려는 경우

(extractDataBytes & extractMaterializer) { (byteSrc, mat) => 
    val f = byteSrc.via(???).runWith(kafkaSink)(mat) 
    onComplete(f){ 
     case Success(value) => complete(s"OK") 
     case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
    } 
    } 

또는, 당신은 카프카의 배압해야, 스트림 것 결코 완료, 당신의 마지막 질문에 와서

(entity(as[Event]) & extractMaterializer) { (event, mat) => 
    val f = Source.single(event).via(???).runWith(kafkaSink)(mat) 
    onComplete(f){ 
     case Success(value) => complete(s"OK") 
     case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
    } 
    } 

처럼 뭔가를 할 수 있습니다. 당신은 (아래 문서를 인용) 구성된 요청 타임 아웃 후 다시 500을 제공하기 위해 서버를 기대한다 :

기본 요청 제한 시간이 구성된 모든 경로에 전 세계적으로 적용되고 될 수 akka.http를 사용하여 .server.request-timeout 설정 ( 의 기본값은 20 초).

+0

[이벤트]와 같이 스트림을 언 마샬링 할 수있는 방법이 있습니까? – vgkowski

+0

예, 카프카 싱크에 대해 언 마샬링 된 이벤트를 항상 실행할 수 있습니다. 나는 대답을 –

+0

에 또 다른 예를 넣었습니다. 이것은 Akka HTTP 흐름의 새로운 흐름입니다. 쓸모없는 오버 헤드를 도입하지 않았습니까? – vgkowski

관련 문제