2016-11-22 4 views
2

방금 ​​reactivecouchbase 비동기 데이터베이스 드라이버를 사용하기 시작했지만 기본적인 디자인 문제가 발생합니다. 전통적인 접근 방법에서는 연결 수를 제한하여 데이터베이스에 가하는 압력을 제한합니다. 그러나 비동기 드라이버를 사용하면 데이터베이스를 새로운 쿼리로 늪에 잠길 수 있습니까?akka-streams 맵에서 비동기 드라이버를 사용하는 방법 mapAsync

이것은 중요한 예가 다음과 같습니다.

두 가지 다른 방법으로 데이터베이스를 호출 할 수 있습니다.

내 기능은 DB 호출 :

asyncCallDB: Future[DBResponse] 
blockingCallDB: DBResponse 

지금 나는 두 개의 서로 다른 기능을 사용할 수있는 스트림을 통해 DB를 호출을 매핑 할 : 이제

Flow.map() 
Flow.mapAsync(numberOfConcurrentCalls)() 

내 질문을하는 방법을 당신에게 것 데이터베이스를 호출하기 위해 선택하십시오 :

Flow.map(blockingCallDB) //One call at a time with back preassure 
Flow.map(asyncCallDB) //Unlimited calls floods db no back pressure? 

Flow.mapAsync(numberOfConcurrentCalls)(blockingCallDB) //Up to numberOfConcurrentCalls at the same time with back pressure 
Flow.mapAsync(numberOfConcurrentCalls)(asyncCallDB) //Unlimited calls floods db no back pressure? 

나는 여기에 부족한 부분이 있으며 아래에 있다고 생각합니다. 이런 종류의 사퇴를 명심하십시오.

+1

[맵과 mapAsync의 차이점] (0120-385-333) –

+0

'Flow.mapAsync'를 함수는'Future [DBResponse]'대신'DBResponse'를 반환합니까? [Akka API doc] (http://doc.akka.io/api/akka/2.4/index.html#[email protected]%5BT%5D (병렬 처리 : Int) (f : 출력 => scala.concurrent.Future % 5BT % 5D) : FlowOps.this.Repr % 5BT % 5D) 이것은 불가능합니다. –

답변

3

ReactiveCouchbase는 Couchbase 서버와의 통신에 AsyncHttpClient을 사용합니다. see in the source code은 동시 연결 수를 제한하는 setMaximumConnectionsTotal을 호출합니다. 실제 값은 couchbase.http.maxTotalConnections에서 구성한 값에 따라 다릅니다.

작성한 CouchbaseBucket마다 하나씩 AsyncHttpClient이 있습니다. 따라서 각 CouchbaseBucket에 대해 최대 maxTotalConnections 개의 연결이 있습니다. Couchbase documentation on N1QL REST API에서

: 요청이 시작 에서 문을 실행하면, 결과를 다시 클라이언트로 스트리밍되도록

REST API는 종료를 기적을 실행할 때 문이 종료 실행 .

실제로 각 동시 버킷의 동시 쿼리 수는 maxTotalConnections 개로 제한됩니다.

따라서 DB의 배압은 항상 어떤 식 으로든 제한됩니다. maxTotalConnections을 음수가 아닌 값으로 설정했거나 제한된 RAM 또는 파일 설명자의 수로 인해 클라이언트가 더 많은 연결을 만들 수 없기 때문에 발생합니다.

그러나 너무 많은 클라이언트를 만들면 클라이언트가 메모리가 부족해질 수 있습니다. 이러한 경우가 있다고 생각할 때마다 in this answer과 같이 mapAsync을 사용하거나 "Buffers and working with rate" (Akka documentation)에서 언급 한 다른 기술 중 하나를 사용해야합니다.

good description of mapAsync in the Akka documentation있다 : Future를 결과를 반환하는 함수에

패스 들어오는 요소. 미래가 도착하면 결과가 다운 스트림으로 전달됩니다.n 개의 요소까지

Flow.mapAsync 자체는 아무것도 실행되지 않습니다 것을 명심 ... 동시에 처리 할 수있는, 그냥 다음, 당신이 SourceSink 사이의 연결을 가지고 Flowrun 반환 . Akka Quick Start Guide은 이것을 매우 이해하기 쉬운 방식으로 설명합니다.

+0

네,하지만지도 비동기는 차단 요청을 올바르게 사용하는 경우에만 의미가 있습니까? – user3139545

+0

@ user3139545 음, 스트림을 실행해야합니다. 맞습니까? 그래서 당신은 당신의 흐름에 어떤 종류의 싱크대를 연결해야합니다. 싱크가 처리 될 때까지 싱크가 처리되지 않습니다. 'Flow.mapAsync'는 그 자체로는 아무 것도하지 않습니다. 단지 다른 Flow ([doc] (http://doc.akka.io/api/akka/2.4/index.html#akka.stream.scaladsl .Flow @ mapAsync [T] (병렬 : Int) (f : 출력 => scala.concurrent.Future [T]) : FlowOps.this.Repr [T])). –

+0

쿼리가 실제로 동기화되어 있다고 언급 한 Couchbase 문서 조각을 발견했습니다. [ "REST API는 동 기적으로 실행되므로 요청의 명령문 실행이 시작되면 결과가 클라이언트로 다시 스트리밍되고 명령문 실행이 끝날 때 종료됩니다 "] (http://developer.couchbase.com/documentation/server/4.5/n1ql/n1ql-rest-api/index.html) –

관련 문제