2016-12-08 4 views
3

나는 SourceQueue 있어요. 요소를 제공 할 때 Stream을 전달하고 Sink에 도달하면이 요소를 제공하는 코드로 출력이 반환됩니다 (Sink.head은 요소를 RunnableGraph.run() 호출로 반환 함).Akka 스트림 반환 개체에서 싱크

어떻게해야합니까? 내 문제의 간단한 예는 다음과 같습니다

val source = Source.queue[String](100, OverflowStrategy.fail) 
val flow = Flow[String].map(element => s"Modified $element") 
val sink = Sink.ReturnTheStringSomehow 
val graph = source.via(flow).to(sink).run() 

val x = graph.offer("foo") 
println(x) // Output should be "Modified foo" 
val y = graph.offer("bar") 
println(y) // Output should be "Modified bar" 
val z = graph.offer("baz") 
println(z) // Output should be "Modified baz" 

편집 : 블라디미르 Matveev이 가장 좋은 대답을 제공하는이 질문에 준 예를 들어. 그러나이 솔루션은 요소가 source에 제공된 것과 동일한 순서로 sink에 들어가는 경우에만 작동합니다. 보장 할 수없는 경우 sink의 요소 순서가 다를 수 있으며 결과가 예상과 다를 수 있습니다.

답변

5

Sink.queue이라는 스트림에서 값을 가져 오는 데 이미 기존 프리미티브를 사용하는 것이 더 간단하다고 생각합니다.

val source = Source.queue[String](128, OverflowStrategy.fail) 
val flow = Flow[String].map(element => s"Modified $element") 
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1)) 

val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run() 

def getNext: String = Await.result(sinkQueue.pull(), 1.second).get 

sourceQueue.offer("foo") 
println(getNext) 

sourceQueue.offer("bar") 
println(getNext) 

sourceQueue.offer("baz") 
println(getNext) 

그것은 정확하게 당신이 원하는 무엇을 예를 들면 다음과 같습니다.

큐 싱크에 대한 inputBuffer 특성을 설정하는 것이 사용 사례에 중요 할 수도 중요하지 않을 수도 있습니다. 설정하지 않으면 버퍼의 크기가 0이되어 데이터가 스트림을 통해 흐르지 않습니다 싱크대에서 pull() 메서드를 호출 할 때까지

sinkQueue.pull()Future[Option[T]]이며, 싱크가 요소를 받으면 Some으로 완료되고 스트림이 실패하면 실패합니다. 스트림이 정상적으로 완료하면 None으로 완료됩니다. 이 특정 예에서는 Option.get을 사용하여 이것을 무시하지만이 경우를 처리하기 위해 사용자 지정 논리를 추가하려고합니다.

+0

멋지다. 내 예제와 잘 어울린다. 그러나, 내 실제 코드에서'HttpRequests'를 처리하기 위해'Stream'을 사용하고 있습니다. '흐름'은 여러 개의 하위 스트림으로 분기하고 다시 병합됩니다. 하위 스트림 중 일부는 다른 것보다 빠를 것이며 'Sink'를'대기열 '로 사용하면 요청이 올바른 응답으로 끝날 것이라고 보장 할 수 없다고 생각합니다. – RemcoW

+0

예제와 설명에 따르면 값을 소스 큐에 넣고 나중에 싱크에서 다시 가져 오는 것이 좋습니다. "입력 푸시 - 출력물 가져 오기"패턴을 따르는 한 올바른 순서로 응답을 처리 할 수 ​​있습니다. 그러나 액세스 패턴이 다르더라도 (질문에 반영 되었다면 좋을 것입니다.) 요청을 응답과 상관시키는 가장 간단한 방법은 변환 된 값과 함께 요청을 튜플에 전달하는 것입니다. –

+0

네 말이 맞아, 나는이 말을 내 설명에서 언급하지 않았다. 당신이 대답은이 질문에 대한 최선의 대답입니다. – RemcoW

1

글쎄, 당신은 당신의 정의를 살펴 경우 offer() 방법 : 당신이 할 수있는 것은 offer를 통해 스트리밍 쌍을 밀어 도우미 함수를 작성, Source.queue[(Promise[String], String)]을 만드는 것입니다 때문에 실패하지 않습니다 offer 확인 반환 무엇인지 대기열이 꽉 차있을 수 있으며 스트림 내에서 약속을 완료하고 약속의 미래를 사용하여 외부 코드의 완료 이벤트를 포착 할 수 있습니다.

내 프로젝트의 여러 곳에서 사용되는 외부 API에 속도를 조절하는 방법입니다. 형태 보증은

import scala.concurrent.Promise 
import scala.concurrent.Future 
import java.util.concurrent.ConcurrentLinkedDeque 

import akka.stream.scaladsl.{Keep, Sink, Source} 
import akka.stream.{OverflowStrategy, QueueOfferResult} 

import scala.util.Success 

private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure) 
    .toMat(Sink.foreach({ case (p, param) => 
     p.complete(Success(param.reverse)) 
    }))(Keep.left) 
    .run 

private val futureDeque = new ConcurrentLinkedDeque[Future[String]]() 

private def sendQueuedRequest(request: String): Future[String] = { 

    val p = Promise[String] 

    val offerFuture = queue.offer(p -> request) 

    def addToQueue(future: Future[String]): Future[String] = { 
    futureDeque.addLast(future) 
    future.onComplete(_ => futureDeque.remove(future)) 
    future 
    } 

    offerFuture.flatMap { 
    case QueueOfferResult.Enqueued => 
     addToQueue(p.future) 
    }.recoverWith { 
    case ex => 
     val first = futureDeque.pollFirst() 
     if (first != null) 
     addToQueue(first.flatMap(_ => sendQueuedRequest(request))) 
     else 
     sendQueuedRequest(request) 
    } 
} 

을 akka하는 허브 소스를 추가하기 전에 내 프로젝트에 모습 여기

는 내가 동기화 큐를 차단하는 병목 현상 일 수 있고, 그러나 API 호출 때문에 내 프로젝트에만에서 만든 무한정 커질 수 있습니다 실현 backpressured있는 다른 akka 스트림 나는 futureDeque에 12 개 이상의 항목이 없습니다. 상황이 다를 수 있습니다.

MergeHub.source[(Promise[String], String)]()을 만들면 재사용 할 수있는 싱크대가 생깁니다. 따라서 항목을 처리해야 할 때마다 완전한 그래프를 작성하고 실행합니다. 이 경우 대기열 요청에 해킹 된 Java 컨테이너가 필요하지 않습니다.

+0

이것은 훌륭합니다. 나는 약속의 사용을 생각하지 않았다. 그래도 도우미 기능에 대해 자세히 설명해 주시겠습니까?나는 이것이 당신이 의미하는 바를 이해하지 못합니다. – RemcoW

+0

@RemcoW 업데이트되었습니다. – expert

+0

고마워, 고마워. – RemcoW

관련 문제