2016-09-28 1 views
5

의 내가 어떤 반복자 있다고 가정 해 봅시다 :Future [T]를 방출하는 소스를 다루는 방법?

val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...) 


그리고 그 반복자에서 소스를 구축하려는 :

val source: Source[Future[Int], NotUsed] = 
    Source.fromIterator(() => nextElemIter) 


그래서 지금 내 소스는 Future의를 방출한다.

val source: Source[Int, NotUsed] = 
    Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */) 


을 그리고 지금은 T 대신 Future[T]을 방출하는 일반적인 소스를 가지고 :이 같은 것을 선물은 Akka의 문서에서 단계 사이에 전달되는 또는 다른 곳, 그래서 대신 내가 항상 할 수있는 본 적이 없다. 그러나 이것은 해키하고 틀린 느낌입니다.

이러한 상황을 처리하기위한 적절한 방법은 무엇입니까?

+3

내가 mapAsync' '라고 생각

def isGoodInt(i : Int) : Boolean = ??? //filter def transformInt(i : Int) : Int = ??? //map def combineInts(i : Int, j : Int) : Int = ??? //reduce 

선물은 이러한 기능을 사용하는 직접적인 방법을 제공 여기 완벽하게 괜찮아. 결국이 목적을 위해 정확하게 의도 된 것입니다. 선물을 스트림으로 병합하는 것입니다. –

+1

'mapAsync (1) (identity)'는 적절한 방법입니다. – expert

+0

@expert가 수정되었습니다. –

답변

4

직접 질문하기 : 내가 설명한 목적에 맞게 mapAsync을 사용하는 것에 대해 "해킹"이 없다는 블라디미르의 의견에 동의합니다. 더 이상 직접적으로 FutureInt 값 주위의 랩핑을 푸는 방법을 생각할 수 없습니다.

동시성 메커니즘으로, 배압이 필요할 때 매우 유용합니다, 간접적으로 질문 ... 선물

스트림을 고수하는

봅니다 응답. 그러나 순수한 Future 작업은 응용 프로그램에도 그대로 적용됩니다.

Iterator[Future[Int]]이 알려진 제한된 수의 Future 값을 생성하려는 경우 Futures를 사용하여 동시성을 유지해야 할 수 있습니다.

필터링하려는지도, &을 Int 값으로 줄이고 싶다고 상상해보십시오. 당신이 제안 스트림을 사용하는 다소 간접적 인 방법과 비교

val finalVal : Future[Int] = 
    Future sequence { 
    for { 
     f <- nextElemIter.toSeq //launch all of the Futures 
     i <- f if isGoodInt(i) 
    } yield transformInt(i) 
    } map (_ reduce combineInts) 

:

val finalVal : Future[Int] = 
    Source.fromIterator(() => nextElemIter) 
     .via(Flow[Future[Int]].mapAsync(1)(identity)) 
     .via(Flow[Int].filter(isGoodInt)) 
     .via(Flow[Int].map(transformInt)) 
     .to(Sink.reduce(combineInts)) 
     .run()