2015-01-06 2 views

답변

3

말했듯이 here 당신은 단지 몇 가지 방법을 구현해야합니다. 물론 링 버퍼에 대한 포인터를 사용하여 직접 메시지를 쓰거나 읽어야합니다. 당신은 또한 염두에 두어야 :

  • 장애 물질은 일반적으로 그렇게 나쁜 생각, 당신은 BalancingPool와 함께 (내부 교란과) 하나의 라우터 배우를 사용할 수 있습니다 배우 당 하나의 교란을 사용하여, 메모리의 큰 금액을 미리 할당합니다.

  • 다른 메시지 유형 소비를 원한다면 저널링, 수리 등을 위해 소비자를 분리해야합니다. 다른 RingBufferPointer (smthng-like) 인스턴스를 매개 변수로 전달해야합니다 (저널링의 시작 값이 동일하고 다른 메시지 유형에 대한 시작 값). 그러나 여전히 하나의 Disruptor를 사용합니다. 따라서 다른 사서함은 하나의 장애자를 나타냅니다.

  • 메시지 생성, 추출 등에서 로우 레벨 제어가 느슨해 지므로 기본적으로 일괄 배치가 없습니다.

  • 반지의 기록을 사용하여 실패한 배우의 상태를 복원 할 수도 있습니다 (preRestart 또는 감독자).

LMAX의 말씀 : 당신은 당신이 사용하는 것보다 약간 다르게 사용 있도록

그것은, 더 전통적인 방법 다른 방식으로 작동합니다. 예를 들어 시스템에 패턴을 적용하는 것은 모든 대기열을 마법 링 버퍼로 바꾸는 것만 큼 간단하지 않습니다. 에 대한 코드 샘플을 제공합니다. 블로그 및 기사의 개요가 늘어나고 있습니다. 작동 방식에 대해서는 으로 예상되는 기술 문서가 일부 자세히 나와 있으며, 성능 테스트에서는 http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html

그리고 here이 될 것 의사 스칼라 코드에서 짧은 대기열/교란/배우 비교

인 스럽를 사용하여 뭔가 같은 :

object MyUnboundedMailbox { 
    val buffer = new RingBuffer() 

    class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue { 

    // these should be implemented; queue used as example 
    def enqueue(receiver: ActorRef, handle: Envelope): Unit = { 
     writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then 

    } 
    def dequeue(): Envelope = readerPointer.poll() 
    def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized 
    def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized 
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { } 
    } 

    trait MyUnboundedMessageQueueSemantics 

} 

class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType 
    with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] { 

    import MyUnboundedMailbox._ 
    final override def create(owner: Option[ActorRef], 
          system: Option[ActorSystem]): MessageQueue = { 

    val pointer = ring.newPointer 
    val read = pointer.copy 
    val write = pointer.copy 
    new MyMessageQueue(pointer, read, write) 
    } 
    // you may use another strategy here based on owner (you can access name and path here), 
    // so for example may allocate same pointers for same prefixes in the name or path 
} 

실패 복구 중에 변경되지 않은 MyMessageQueue.startPointer를 사용하여 메시지 로그에 액세스 할 수 있습니다. 또한 akka의 Event Sourcing을 유추 할 수도 있습니다.

UnboundedQueue 접근법을 사용하면 반지가 "끝나면"아주 오래된 미 전달 메시지를 새 버전으로 덮어 쓸 수 있으므로 여기에 here과 같은 BoundedQueue가 필요할 수 있습니다.

관련 문제