2017-12-27 6 views
1

목표는 데이터베이스에서 데이터를 스트리밍하고이 데이터 청크에서 일부 계산을 수행 한 다음 (이 계산은 일부 사례 클래스의 미래를 반환 함)이 데이터를 청크 응답으로 전송하는 것입니다. 사용자에게 현재 데이터를 스트리밍하고 계산을 수행하지 않고 응답을 보낼 수 있습니다. 그러나이 계산을 수행하고 결과를 스트리밍 할 수 없습니다.Slick Streaming 데이터 변환 및 Akka Http를 사용한 청크 응답 전송

이것은 구현 한 경로입니다.

def streamingDB1 = 
path("streaming-db1") { 
    get { 
    val src = Source.fromPublisher(db.stream(getRds)) 
    complete(src) 
    } 
} 

getRds 함수는 case 클래스 (slick 사용)에 매핑 된 테이블의 행을 반환합니다. 이제 각 행을 입력으로 받아서 다른 case 클래스의 Future를 반환하는 함수 compute를 생각해보십시오. 내가 변수 SRC에이 기능을 구현하고 사용자에게이 계산 (스트림 등) 청크 응답을 보낼 수있는 방법

def compute(x: Tweet) : Future[TweetNew] = ? 

비슷 해요. 필요에 따라

val src = 
    Source.fromPublisher(db.stream(getRds)) 
     .mapAsync(parallelism = 3)(compute) 

complete(src) 

병렬 처리 수준을 조정합니다

답변

1

당신은 mapAsync을 사용하여 소스를 변환 할 수있다.

+0

이것은 작동하지 않습니다. 끝 (curl) 명령을 실행하여 끝점에 도달했습니다. 그러나 연결이 닫힙니다. – user3294786

관련 문제