기본적으로 여기에 사용 된 코드입니다.akka 스트림 읽기 끝없는 http 스트림 backpressured 읽을 때
컬링으로 연결할 때 curl 명령어에서 모든 엔티티가 정말 빠릅니다. akka를 사용하여 동일한 동작을 에뮬레이션하려고하면 내가 가지고있는 요소를 인쇄하는 사이에 큰 멈춤이 있습니다.
스트림 벨로우즈가 어떻게 든 다시 압력을 받게되고 첫 번째 4 개의 메시지 후 나머지 1 개의 메시지는 인쇄 라인에 눈에 띄는 시간이 지나면 나타납니다.
처음 4 개의 메시지는 약 2k JSON, 마지막 하나는 no입니다. 5는 80k JSON입니다.
마지막 엔티티 (번호 5)도 가장 큰 청크이며 스트림이 완료되기 전에 인쇄되는 것처럼 보입니다. 그리고 나는 단 2 ~ 3 초 만에 달리기가 가능하다는 것이 매우 긍정적입니다.
이 스트림이 그냥 그것이 내가 https://github.com/akka/akka-http/issues/57을 가지고 있지만 어떻게 든 내 경우에 도움이 뭔가를 발견하지 못할 것에 정말 가까이 보이는이 문제를 살펴했다 최초의 4 개 요소
val awesomeHttpReq = Http().singleRequest(
HttpRequest(
method = GET,
uri = Uri("http://some-service-providing-endless-http.stream")
)
)
val a = Source.fromFuture(awesomeHttpReq).flatMapConcat {
case HttpResponse(status, _, entity, _) =>
// I saw some comments the back pressure might kick in
// because I might not be consuming the bytes here properly
// but this is totally in line with all the examples etc.
entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
} map { bytes =>
parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json])
} mapConcat { items =>
// every line that comes in from previous stage contains
// key elements - this I'm interested in, it's an array
items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil
}
val b: Future[Vector[Json]] = a
.takeWithin(50 second)
.runWith(Sink.fold(Vector.empty[Json])((a, b) => {
// I'm using this to see what's going on in the stream
// there are significant pauses between the entities
// in reality the elements are available in the stream (all 5)
// within 2-3 seconds
// and this printing just has very big pause after first 4 elements
println(s"adding\n\n\n ${b.noSpaces}")
a :+ b
}))
Await.result(b, 1 minute)
을 읽은 후 중단 이유는 어떤 생각.
또한 akka http의 청크 크기를 변경해 보았지만 실제로 도움이되지 않았습니다. 여기
수신 메시지의 타이밍은 다음과 같습니다 스트림 초기화에서 : 는1. 881 ms
2. 889 ms
3. 894 ms
4. 898 ms
// I don't understand why this wait time of 30 seconds in betweeen
5. 30871 ms
마지막 메시지는 분명 30초 어떤 아이디어가 정말 감사하겠습니다
어딘가에 중단됩니다.
업데이트 : 그것은 최초의 4 개 요소 4에서 지속적으로 나가서 5 일 30 초 동안 기다렸다되고 있다는 것을 정말 이상한 때문에, 지금 16에 기본 4에서 initial-input-buffer-size = 4
을 증가하기로 결정
그것은 예상대로 작동합니다. 난 단지 위의 코드에서 배압이 시작되는 부분을 이해하지 못합니다.
업데이트 2 :
간단한 예제로 버퍼 크기가 도움이되었습니다.
entity.withoutSizeLimit.dataBytes
.alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8))))
.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
.buffer(1000, OverflowStrategy.backpressure)
.alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8))))
내가 프레이밍 전에 필요한 메시지를 볼 수 있습니다 (1 단계)이 아니라 로그 (2 단계)의 뒤에 :하지만 내 진짜 문제에서 나는 계속 아주 이상한 일이있다. 그리고 그 뒤에 버퍼를 두어 밀어 넣을 충분한 공간이 있는지 확인했습니다.
"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString
res12: String =
"}
"
을 나는 '나의 마지막 항목에 : 모든 라인은 일반적으로 어떻게 끝나는 이제
나는 새 라인 문자가 정말 뿅 단계 (1 단계)에 오지 않는 것을 발견했습니다, 이것은이다 마지막 바이트 a
이 누락되었습니다. 기본적으로 새 라인이 프레이밍되지 않습니다. 그래서 모든 것이 방출되지 않습니다.
흥미 롭다면, 당신은 akka-http없이 이것을 재현 할 수 있는지, 즉 소스 JSON의 일부를 파일에 덤프하고 http 요청 대신'Source.fromFile'을 사용하는지 궁금합니다. –
그냥 컬에서 덤프하면 효과가 있습니다. 또한 지금은'initial-input-buffer-size = 16'을 시도했고 예상대로 작동합니다 ... 이것은 정말로 이상합니다. 배압이 어딘가에있는 것처럼 보입니다. 그러나 어디서인지 알아낼 수는 없습니다. –
파일과 함께 스트림으로 시도했지만 여기에서와 같은 코드가 사용되었습니다. 나는이 문제에 부딪치지 않는다 : (나는 지금 약간의 미친 운전을한다. D –