2017-02-07 1 views
1

내가 그런 논리 DIFF 변환 스트림을 구현하려는 흐름. Diff를 Source 및 db에 JsValue에서 가져오고 생성 된 Diff를 더 가져오고 JsValue을 원본에서 db로 저장합니다.비동기 호출

Akka-persistence를 저장소 구현으로 생각하지만 현재 상태 만 필요하므로 키 - 값 db가 될 수 있습니다.

저는 akka-stream을 처음 접했기 때문에이 아이디어를 구현하는 가장 좋은 방법은 무엇인지 이해할 수 없습니다.

def dbQuery(id : String) : JsValue = ??? 

val queryFlow : Flow[JsValue, (JsValue,JsValue), _] = 
    Flow[JsValue] map { originalJs => 
    originalJs -> dbQuery((originalJs \ "id").as[String]) 
    } 

이 튜플 수 :

답변

1

당신의 JsValue 객체는 원래 JsValue에 걸릴 것 쿼리 Flow를 작성하고 원본 및 데이터베이스 버전의 튜플을 생성 할 수있는 "ID"필드가 가정 diffing의 Flow에 전달 :

def diffJs(original : JsValue, dbVersion : JsValue) : JsValue = ??? 

val diffFlow : Flow[(JsValue, JsValue), JsValue, _] = 
    Flow[(JsValue, JsValue)] map diffJs.tupled 

당신이 Sink와의 차이, this can represented을 계속 것 dB 있다고 말했습니다 마지막 부분 :

val dbSink : Sink[JsValue, _] = ??? 

이러한 구성 요소의 모든

은 다음 값의 소스를 기반으로 스트림을 형성하기 위해 결합 될 수있다 : 비동기 DB를 쿼리의 예를 들어

val jsSource : Source[JsValue, _] = ??? 

jsSource via queryFlow via diffFlow runWith dbSink 

this examplemapAsync을 보여주는 참조하십시오.

+0

덕분에, 나는 같은 방법으로 생각한다. 그러나 Flow와 Sink에서 비동기식 dbQuery : Future [JsValue]를 사용하는 것이 어떻겠습니까? 나는 단순한'map'을 사용할 수 없다. –

+0

@ andrey.ladniy 여러분 환영합니다. 나는 당신이 찾고있는 정확한 데모 인 stack answer에 대한 링크로 나의 대답을 업데이트했다. 만약 거기에 아무 것도 없다면이 대답을 업데이트 할 수 있습니다 ... –