글쎄, 당신은 당신의 정의를 살펴 경우 offer()
방법 : 당신이 할 수있는 것은 offer
를 통해 스트리밍 쌍을 밀어 도우미 함수를 작성, Source.queue[(Promise[String], String)]
을 만드는 것입니다 때문에 실패하지 않습니다 offer
확인 반환 무엇인지 대기열이 꽉 차있을 수 있으며 스트림 내에서 약속을 완료하고 약속의 미래를 사용하여 외부 코드의 완료 이벤트를 포착 할 수 있습니다.
내 프로젝트의 여러 곳에서 사용되는 외부 API에 속도를 조절하는 방법입니다. 형태 보증은
import scala.concurrent.Promise
import scala.concurrent.Future
import java.util.concurrent.ConcurrentLinkedDeque
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import scala.util.Success
private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure)
.toMat(Sink.foreach({ case (p, param) =>
p.complete(Success(param.reverse))
}))(Keep.left)
.run
private val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
private def sendQueuedRequest(request: String): Future[String] = {
val p = Promise[String]
val offerFuture = queue.offer(p -> request)
def addToQueue(future: Future[String]): Future[String] = {
futureDeque.addLast(future)
future.onComplete(_ => futureDeque.remove(future))
future
}
offerFuture.flatMap {
case QueueOfferResult.Enqueued =>
addToQueue(p.future)
}.recoverWith {
case ex =>
val first = futureDeque.pollFirst()
if (first != null)
addToQueue(first.flatMap(_ => sendQueuedRequest(request)))
else
sendQueuedRequest(request)
}
}
을 akka하는 허브 소스를 추가하기 전에 내 프로젝트에 모습 여기
는 내가 동기화 큐를 차단하는 병목 현상 일 수 있고, 그러나 API 호출 때문에 내 프로젝트에만에서 만든 무한정 커질 수 있습니다 실현 backpressured있는 다른 akka 스트림 나는 futureDeque
에 12 개 이상의 항목이 없습니다. 상황이 다를 수 있습니다.
MergeHub.source[(Promise[String], String)]()
을 만들면 재사용 할 수있는 싱크대가 생깁니다. 따라서 항목을 처리해야 할 때마다 완전한 그래프를 작성하고 실행합니다. 이 경우 대기열 요청에 해킹 된 Java 컨테이너가 필요하지 않습니다.
멋지다. 내 예제와 잘 어울린다. 그러나, 내 실제 코드에서'HttpRequests'를 처리하기 위해'Stream'을 사용하고 있습니다. '흐름'은 여러 개의 하위 스트림으로 분기하고 다시 병합됩니다. 하위 스트림 중 일부는 다른 것보다 빠를 것이며 'Sink'를'대기열 '로 사용하면 요청이 올바른 응답으로 끝날 것이라고 보장 할 수 없다고 생각합니다. – RemcoW
예제와 설명에 따르면 값을 소스 큐에 넣고 나중에 싱크에서 다시 가져 오는 것이 좋습니다. "입력 푸시 - 출력물 가져 오기"패턴을 따르는 한 올바른 순서로 응답을 처리 할 수 있습니다. 그러나 액세스 패턴이 다르더라도 (질문에 반영 되었다면 좋을 것입니다.) 요청을 응답과 상관시키는 가장 간단한 방법은 변환 된 값과 함께 요청을 튜플에 전달하는 것입니다. –
네 말이 맞아, 나는이 말을 내 설명에서 언급하지 않았다. 당신이 대답은이 질문에 대한 최선의 대답입니다. – RemcoW