2017-10-26 1 views
0

Akka Streams를 사용하여 서버에 요청을 동시에 보내고 각 요청을 원본 컨텍스트 (이 예제에서는 Int)와 연결하려고합니다. 이것은 내가 특히akka 스트림의 미래 변화하기

val createRequestFlow: Flow[(String, String), (HttpRequest, Int), _] = Flow.fromFunction[(String, String), (HttpRequest, Int)]((mkRequest _).tupled) 
val sendRequestFlow: Flow[(HttpRequest, Int), (HttpResponse, Int), _] = Flow[(HttpRequest, Int)].mapAsyncUnordered(32)((sendRequest _).tupled) 
val handleResponseFlow: Flow[(HttpResponse, Int), String, _] = Flow[(HttpResponse, Int)].map[String]((getStatusString _).tupled) 

val handler = createRequestFlow via sendRequestFlow via handleResponseFlow 

toghether 넣어 한 흐름이다, 나는 Future[(HttpResponse, Int)]을 반환하는 방법을 찾기 위해 노력하고있어. 현재,이

def sendRequest(request: HttpRequest, ctx: Int): Future[(HttpResponse, Int)] = { 
    Http().singleRequest(request).map(r => (r,ctx)) 
    } 

을하고 있어요하지만 난이 Executor에 필요하다는 사실이 그것을 할 수있는 또 다른 (더 좋은) 방법이 있음을 나타냅니다 것으로 알고 있습니다.

미리 감사드립니다.

+1

올바른 경로에있어'sendRequest'를 암시 적 ExecutionContext로 만듭니다. –

답변

1

나는 더 좋은 방법이 있다고 생각하지 않습니다. Akka는 표준 스칼라 Futures을 사용하며 디자인 상 거의 모든 작업을 수행하려면 ExecutionContext이 필요합니다. 실제로이 간단한 map에 다른 스레드를 사용하고 싶지 않다면, Akka가 내부에서 사용하는 것과 유사한 sameThreadExecutionContext을 만들 수 있습니다 (akka.dispatch.ExecutionContexts.sameThreadExecutionContext 참조). 따라서 map은 기본 Http 응답을 처리하는 동일한 스레드에서 수행됩니다 , 더 복잡한 것에는 사용하지 마십시오 (GitHub #19043의 토론 참조).