2017-10-21 1 views
1

기본적으로 여기에 사용 된 코드입니다.akka 스트림 읽기 끝없는 http 스트림 backpressured 읽을 때

컬링으로 연결할 때 curl 명령어에서 모든 엔티티가 정말 빠릅니다. akka를 사용하여 동일한 동작을 에뮬레이션하려고하면 내가 가지고있는 요소를 인쇄하는 사이에 큰 멈춤이 있습니다.

스트림 벨로우즈가 어떻게 든 다시 압력을 받게되고 첫 번째 4 개의 메시지 후 나머지 1 개의 메시지는 인쇄 라인에 눈에 띄는 시간이 지나면 나타납니다.

처음 4 개의 메시지는 약 2k JSON, 마지막 하나는 no입니다. 5는 80k JSON입니다.

마지막 엔티티 (번호 5)도 가장 큰 청크이며 스트림이 완료되기 전에 인쇄되는 것처럼 보입니다. 그리고 나는 단 2 ~ 3 초 만에 달리기가 가능하다는 것이 매우 긍정적입니다.

이 스트림이 그냥 그것이 내가 https://github.com/akka/akka-http/issues/57을 가지고 있지만 어떻게 든 내 경우에 도움이 뭔가를 발견하지 못할 것에 정말 가까이 보이는이 문제를 살펴했다 최초의 4 개 요소

val awesomeHttpReq = Http().singleRequest(
    HttpRequest(
    method = GET, 
    uri = Uri("http://some-service-providing-endless-http.stream") 
) 
) 

val a = Source.fromFuture(awesomeHttpReq).flatMapConcat { 
    case HttpResponse(status, _, entity, _) => 
    // I saw some comments the back pressure might kick in 
    // because I might not be consuming the bytes here properly 
    // but this is totally in line with all the examples etc. 

    entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
} map { bytes => 
    parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json]) 
} mapConcat { items => 
    // every line that comes in from previous stage contains 
    // key elements - this I'm interested in, it's an array 
    items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil 
} 

val b: Future[Vector[Json]] = a 
    .takeWithin(50 second) 
    .runWith(Sink.fold(Vector.empty[Json])((a, b) => { 

    // I'm using this to see what's going on in the stream 
    // there are significant pauses between the entities 
    // in reality the elements are available in the stream (all 5) 
    // within 2-3 seconds 
    // and this printing just has very big pause after first 4 elements 

    println(s"adding\n\n\n ${b.noSpaces}") 
    a :+ b 
    })) 

Await.result(b, 1 minute) 

을 읽은 후 중단 이유는 어떤 생각.

또한 akka http의 청크 크기를 변경해 보았지만 실제로 도움이되지 않았습니다. 여기

수신 메시지의 타이밍은 다음과 같습니다 스트림 초기화에서 :

1. 881 ms 
2. 889 ms 
3. 894 ms 
4. 898 ms 
// I don't understand why this wait time of 30 seconds in betweeen 
5. 30871 ms 

마지막 메시지는 분명 30초 어떤 아이디어가 정말 감사하겠습니다

어딘가에 중단됩니다.

업데이트 : 그것은 최초의 4 개 요소 4에서 지속적으로 나가서 5 일 30 초 동안 기다렸다되고 있다는 것을 정말 이상한 때문에, 지금 16에 기본 4에서 initial-input-buffer-size = 4을 증가하기로 결정

그것은 예상대로 작동합니다. 난 단지 위의 코드에서 배압이 시작되는 부분을 이해하지 못합니다.

업데이트 2 :

간단한 예제로 버퍼 크기가 도움이되었습니다.

entity.withoutSizeLimit.dataBytes 
    .alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8)))) 
    .via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
    .buffer(1000, OverflowStrategy.backpressure) 
    .alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8)))) 

내가 프레이밍 전에 필요한 메시지를 볼 수 있습니다 (1 단계)이 아니라 로그 (2 단계)의 뒤에 :하지만 내 진짜 문제에서 나는 계속 아주 이상한 일이있다. 그리고 그 뒤에 버퍼를 두어 밀어 넣을 충분한 공간이 있는지 확인했습니다.

"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString 
res12: String = 
"} 
" 

을 나는 '나의 마지막 항목에 : 모든 라인은 일반적으로 어떻게 끝나는 이제

나는 새 라인 문자가 정말 뿅 단계 (1 단계)에 오지 않는 것을 발견했습니다, 이것은이다 마지막 바이트 a이 누락되었습니다. 기본적으로 새 라인이 프레이밍되지 않습니다. 그래서 모든 것이 방출되지 않습니다.

+0

흥미 롭다면, 당신은 akka-http없이 이것을 재현 할 수 있는지, 즉 소스 JSON의 일부를 파일에 덤프하고 http 요청 대신'Source.fromFile'을 사용하는지 궁금합니다. –

+0

그냥 컬에서 덤프하면 효과가 있습니다. 또한 지금은'initial-input-buffer-size = 16'을 시도했고 예상대로 작동합니다 ... 이것은 정말로 이상합니다. 배압이 어딘가에있는 것처럼 보입니다. 그러나 어디서인지 알아낼 수는 없습니다. –

+0

파일과 함께 스트림으로 시도했지만 여기에서와 같은 코드가 사용되었습니다. 나는이 문제에 부딪치지 않는다 : (나는 지금 약간의 미친 운전을한다. D –

답변

1

여러 가지 요인이 복합되어있는 것처럼 보였으므로 몇 가지 조사를 한 후에 문제를 해결하기로 결정했습니다.전체 질문에 대한 입력 소스는 실제로 회사에서 사용하는 배경에 kafka가있는 독점 엔터프라이즈 서비스 버스입니다. https://github.com/zalando/nakadi.

위의 증상으로 인해 아마도 시스템이 문서에 따라 실행되지 않고 추가로 \n을 보내지 않을 수도 있지만 모든 줄에 줄을 넣었을 것이라고 생각 했었지만 코드에서 : https://github.com/zalando/nakadi/blob/0859645b032d19f7baa919877f72cb076f1da867/src/main/java/org/zalando/nakadi/service/EventStreamWriterString.java#L36

이를 본 후 나는이 예를 사용하여 전체를 시뮬레이션하기 위해 노력 :

build.sbt을

name := "test-framing" 

version := "0.1" 

scalaVersion := "2.12.4"  

lazy val akkaVersion = "2.5.6" 
lazy val akkaHttpVersion = "10.0.10" 

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-stream" % akkaVersion, 
    "com.typesafe.akka" %% "akka-http" % akkaHttpVersion, 
    "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion 
) 

scalacOptions in Compile ++= (scalacOptions in Compile).value :+ "-Yrangepos" 
,451,515,

* TestApp.scala - 나는

import java.nio.charset.StandardCharsets 

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Framing, Sink, Source} 
import akka.util.ByteString 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future} 

object TestApp extends App { 

    implicit val system = ActorSystem("MyAkkaSystem") 
    implicit val materializer = ActorMaterializer() 

    val awesomeHttpReq = Http().singleRequest(
    HttpRequest(
     method = HttpMethods.GET, 
     uri = Uri("http://localhost:9000/streaming-json") 
    ) 
) 

    val a = Source.fromFuture(awesomeHttpReq).flatMapConcat { 
    case HttpResponse(status, _, entity, _) => 
     entity.withoutSizeLimit.getDataBytes 
     .via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
    } map { bytes => 
    bytes decodeString StandardCharsets.UTF_8 
    } 

    val b: Future[Vector[String]] = a 
    .takeWithin(50 second) 
    .runWith(Sink.fold(Vector.empty[String])((a, b) => { 
     println(s"adding $b") 
     a :+ b 
    })) 

    Await.result(b, 1 minute) 

} 

가 * 내가 행동을 가지고 시뮬레이션 엔드 포인트에서 시뮬레이션 엔드 포인트 *

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.common.EntityStreamingSupport 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import akka.http.scaladsl.server.Directives 
import akka.stream.scaladsl.{Flow, Source} 
import akka.stream.{ActorMaterializer, ThrottleMode} 
import akka.util.ByteString 
import spray.json._ 

import scala.concurrent.duration._ 
import scala.io.StdIn 

object TestApp2 extends App { 

    implicit val system = ActorSystem("MyAkkaSystem") 
    implicit val materializer = ActorMaterializer() 

    implicit val executionContext = system.dispatcher 

    case class SomeData(name: String) 

    trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { 
    implicit val someFormat = jsonFormat1(SomeData) 
    } 

    val start = ByteString.empty 
    val sep = ByteString("\n") 
    val end = ByteString.empty 

    implicit val jsonStreamingSupport = EntityStreamingSupport 
    .json() 
    .withFramingRenderer(Flow[ByteString].intersperse(sep)) 

    object MyJsonService extends Directives with JsonSupport { 

    def streamingJsonRoute = 
     path("streaming-json") { 
     get { 
      val sourceOfNumbers = Source(1 to 1000000) 

      val sourceOfDetailedMessages = 
      sourceOfNumbers 
       .map(num => SomeData(s"Hello $num")) 
       .throttle(elements = 5, 
         per = 30 second, 
         maximumBurst = 6, 
         mode = ThrottleMode.Shaping) 

      complete(sourceOfDetailedMessages) 
     } 
     } 
    } 

    val bindingFuture = 
    Http().bindAndHandle(MyJsonService.streamingJsonRoute, "localhost", 9000) 

    println(s"Server online at http://localhost:9000/\nPress RETURN to stop...") 
    StdIn.readLine() // let it run until user presses return 
    bindingFuture 
    .flatMap(_.unbind()) // trigger unbinding from the port 
    .onComplete(_ => system.terminate()) // and shutdown when done 

} 

예상대로 * 내 코드에 문제가 있었다 그래서 akka에는 아무런 문제가 없습니다.

여러 라이브러리 + nakadi를 가져올 때 여전히 문제가있을 수 있지만 이것은 거위 사냥입니다. 결국 batch_flush_timeout을 낮은 값으로 낮추면 서버는 실제로 다음 이벤트를 파이프 라인으로 보내므로 파이프에서 마지막으로 메시지가 처리 될 수 있도록 내 응용 프로그램 계층에 푸시됩니다. 그것.

기본적으로이 텍스트는 어떻게 든 하나의 단일 바이트가 프레이밍에 들어 가지 않기 때문에 발생하지만 다시 지난 며칠간 akka 스트림에 대해 많이 배웠습니다.