2014-11-07 3 views
2

액터를 사용하여 메시지 처리 파이프 라인을 구현하려고합니다. 파이프 라인의 단계에는 읽기, 필터링, 기능 보강 및 최종적으로 데이터베이스에 저장하는 기능이 포함됩니다. 이와 유사한 뭔가 : http://sujitpal.blogspot.nl/2013/12/akka-content-ingestion-pipeline-part-i.htmlAkka 액터 파이프 라인과 혼잡 한 매장 액터

문제가 판독, 필터링 및 보강 단계 혼잡 저장 배우 불안정한 시스템을 갖는 결과 저장 단계보다 훨씬 빠르기 때문이다.

다음 옵션을 고려하고 있습니다. 저장소 배우가 처리하고 메시지를 저장할 준비가되었습니다. 이것은 좋은 선택입니까? 더 나은 제안?

답변

3

당신은 몇 가지 옵션을 고려할 수 있습니다 감사합니다 메시지의 순서는 중요하지 않습니다 경우

  • 이 - 다만 모든 저장 작업을 실행 내부에 별도의 배우 (또는 미래). 모든 데이터 저장소가 병렬로 수행 될 것입니다.이를 위해 별도의 스레드 풀을 사용하는 것이 좋습니다. 일부 메시지가 다른 사람에게 수정되거나 동일한 트랜잭션에 참여하는 경우 각 messageId/transactionId에 대해 별도의 액터를 만들어 비관적/낙관적 잠금 문제 (트랜잭션 종료 또는 제한 시간에 해당 액터를 죽이는 것을 잊지 마십시오)를 피할 수 있습니다.

  • 제한된 사서함 (back-pressure) 사용 - 이전 메시지가 아직 처리되지 않은 경우 사용자 입력에서 새 메시지를 차단합니다. 예를 들어 수신 스레드를 차단하여 메시지가 체인의 마지막 액터에서 수신 확인 될 수 있습니다. . 책임을 소스 시스템으로 옮깁니다. JMS 내구재와 함께 잘 작동합니다. 시스템은 마침내 시스템이 처리 할 때까지 JMS 브로커 측에 신뢰할 수있는 방식으로 저장합니다.

  • 나는이 유사한 접근 방식 사용하고 앞의 두

+1

첫 번째 옵션은 라우터를 사용하는 것입니다. – sourcedelica

+0

첫 번째 옵션은 내가 지금하고있는 일이라고 생각합니까? 나는. 나는 한 명의 배우를두고있다. mongo에서 전체 쓰기 잠금 문제로 인해 더 이상 가질 수 없습니다. 두 번째 옵션을 살펴 보겠습니다. – pppnnn

+0

네, 여러 가지 데이터베이스를 쓰는 경우를 제외하고 첫 번째 옵션은 mongo에서 작동하지 않습니다 (mongo는 2.2 이후 전체 서버 대신 데이터베이스 만 잠글 것입니다). 그러면 데이터베이스 당 쓰기 액터를 만들 수 있습니다. – dk14

3

결합 : Akka Work Pulling Pattern (여기에 소스 코드 : WorkPullingPattern.scala를). 은 로컬로 & 및 Akka 클러스터과 함께 작동한다는 이점이 있습니다. 또한 전체 접근 방식이 완전히 비동기식 인 이며 전혀 차단되지 않습니다..

처리 된 "개체"가 모두 메모리에 저장되지 않거나 단계 중 하나가 느린 경우 멋진 솔루션입니다. N 명의 작업자를 스폰하는 경우 N 개의 "작업"이 한 번에 처리됩니다. "단계"를 BalancingPools에도 N (또는 이하)으로 병렬 처리하는 것이 좋습니다.

당신의 처리 "파이프 라인"이 순차적인지 아닌지는 모르겠지만 만약 그렇다면 위의 + Shapeless 라이브러리를 기반으로 한 타입 안전 추상화를 개발했습니다. WorkPullingPattern과 병합되기 전에 코드를 엿볼 수 있습니다 : Pipeline.

서명 기능이 제대로 작동하는 파이프 라인을 사용하고 BalancingPools에서 생성하고 Workers를 만들고이를 작업의 스케줄링에 사용할 수있는 마스터 액터에 연결합니다.

+0

나는이 솔루션을 좋아하지만, 다른 대답이 제안한 것은 내 코드에 대한 변경이 적어 작업이 아주 잘 끝났기 때문에 대답으로 표시 할 것입니다. – pppnnn

0

새로운 AKKA 스트림 (아직 베타 버전)은 역풍을 겪고 있습니다. 이 문제를 해결하기 위해 설계되었습니다.

0

또한 배우에 수신 파이프 라인을 사용할 수 있습니다 : 당신이 전에 작은 파이프 라인 작업을 가지고

class PipelinedActor extends Actor with ReceivePipeline { 

    // Increment 
    pipelineInner { case i: Int ⇒ Inner(i + 1) } 
    // Double 
    pipelineInner { case i: Int ⇒ Inner(i * 2) } 

    def receive: Receive = { case any ⇒ println(any) } 
} 

actor ! 5 // prints 12 = (5 + 1) * 2 

http://doc.akka.io/docs/akka/2.4/contrib/receive-pipeline.html

그것은 배우하여 메시지의 처리 후/귀하의 요구에 최선을 맞는. 또한 그것은 코드를 차단하고 있지만 귀하의 경우에는 괜찮습니다, 나는 믿습니다

관련 문제