2017-02-20 4 views
0

나는 다음과 같은 코드가 있습니다 어딘가에 내가 흐름에 메시지를 보내고있다 코드에Akka 스트림 + Akka HTTP를 패스 매개 변수

case class SomeClass(param1:String,param2:String,param3:String) 

    val someClassActorSource: Source[SomeClass, ActorRef] = Source 
     .actorPublisher[SomeClass](Props[SomeClassActorPublisher]) 

    val someFlow: ActorRef = Flow[SomeClass] 

     .mapAsync(3)(f=> getDocumentById(f)) 

     .map(f =>{ 
      val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test") 
      .withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a) 
      ) 
      (request,request) 

     }).via(connection) 

     //Parsing Response 
     .mapAsync(3){ 
      case (Success(HttpResponse(status, _, entity, _)),request)=> 
      entity.dataBytes.runFold(ByteString(""))(_ ++ _) 
     } 
     .map(resp =>parse(resp.utf8String,?????????????)) 
     .to(Sink.someSink{....}) 
     .runWith(someClassActorSource) 

    def parse(resp:String,parseParam:String)=???? 

과 :

someflow ! SomeClass("a","b","c") 
someflow ! SomeClass("a1","b1","c1") 

내 문제를 그 방법 구문 분석은해야

그래서 첫 번째 메시지에 대한 정품 케이스 클래스에서 PARAM2 사용해야합니다

와 두 번째 메시지는

parse(response,"b1") 

그래서 질문은, 어떻게 내가 흐름에 제출하는 방법에서 매개 변수를 가져올 수있다되어야 하는가?

답변

1

은 당신의 connection 값은 연결이 튜플에 소요되는 사실을 사용할 수 있습니다 대신 단순히 튜플 두 번 request를 전달하는 당신이 입력 SomeClass에 전달할 수 있습니다

val connection = Http().cachedHostConnectionPool(...) 

를 통해 인스턴스화되고 가정하면. 이 SomeClass 인스턴스는 사용자의 Flow 값을 파싱 단계로 가져 가야합니다.

val getDocumentFlow = 
    Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map(d => d -> f)) 

그래서 난 그냥 Document을 사용하고 getDocumentById의 리턴 타입을 명시하지 않습니다 질문 :

val documentToRequest = 
    Flow[(Document, SomeClass)] map { case (document, someClass) => 
    val request = ... 

    (request, someClass) 
    } 

val parseResponse = 
    Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){ 
    case (Success(HttpResponse(status, _, entity, _)), someClass) => 
     entity 
     .dataBytes 
     .runFold(ByteString(""))(_ ++ _) 
     .map(e => e -> someClass) 
    } 

val parseEntity = Flow[(ByteString, SomeClass)] map { 
    case (entity, someClass) => parse(entity.utf8String, someClass) 
} 

이러한 흐름은 다음과 같이 사용할 수 있습니다

코드 약간 수정 질문에 설명 :

val someFlow = 
    someClassActorSource 
    .via(getDocumentFlow) 
    .via(documentToRequest) 
    .via(connection) 
    .via(parseResponse) 
    .via(parseEntity) 
    .to(Sink.someSink{...}) 
    .run() 
관련 문제