나는 LMAX Disruptor에 대해 약간의 경험이 있으며, 나는 정말로 방해자를 사용하여 사용자 지정 액터 사서함을 구현하고 싶습니다.Disruptor를 사용하여 사용자 지정 액터 사서함을 구현하는 방법은 무엇입니까?
가이드 라인이 있습니까? 심지어 가능할까요? Akka의 배우 사서함의 한계는 무엇입니까?
나는 LMAX Disruptor에 대해 약간의 경험이 있으며, 나는 정말로 방해자를 사용하여 사용자 지정 액터 사서함을 구현하고 싶습니다.Disruptor를 사용하여 사용자 지정 액터 사서함을 구현하는 방법은 무엇입니까?
가이드 라인이 있습니까? 심지어 가능할까요? Akka의 배우 사서함의 한계는 무엇입니까?
말했듯이 here 당신은 단지 몇 가지 방법을 구현해야합니다. 물론 링 버퍼에 대한 포인터를 사용하여 직접 메시지를 쓰거나 읽어야합니다. 당신은 또한 염두에 두어야 :
장애 물질은 일반적으로 그렇게 나쁜 생각, 당신은 BalancingPool
와 함께 (내부 교란과) 하나의 라우터 배우를 사용할 수 있습니다 배우 당 하나의 교란을 사용하여, 메모리의 큰 금액을 미리 할당합니다.
다른 메시지 유형 소비를 원한다면 저널링, 수리 등을 위해 소비자를 분리해야합니다. 다른 RingBufferPointer (smthng-like) 인스턴스를 매개 변수로 전달해야합니다 (저널링의 시작 값이 동일하고 다른 메시지 유형에 대한 시작 값). 그러나 여전히 하나의 Disruptor를 사용합니다. 따라서 다른 사서함은 하나의 장애자를 나타냅니다.
메시지 생성, 추출 등에서 로우 레벨 제어가 느슨해 지므로 기본적으로 일괄 배치가 없습니다.
반지의 기록을 사용하여 실패한 배우의 상태를 복원 할 수도 있습니다 (preRestart
또는 감독자).
LMAX의 말씀 : 당신은 당신이 사용하는 것보다 약간 다르게 사용 있도록
그것은, 더 전통적인 방법 다른 방식으로 작동합니다. 예를 들어 시스템에 패턴을 적용하는 것은 모든 대기열을 마법 링 버퍼로 바꾸는 것만 큼 간단하지 않습니다. 에 대한 코드 샘플을 제공합니다. 블로그 및 기사의 개요가 늘어나고 있습니다. 작동 방식에 대해서는 으로 예상되는 기술 문서가 일부 자세히 나와 있으며, 성능 테스트에서는 http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html
그리고 here이 될 것 의사 스칼라 코드에서 짧은 대기열/교란/배우 비교
인 스럽를 사용하여 뭔가 같은 :
object MyUnboundedMailbox {
val buffer = new RingBuffer()
class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue {
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then
}
def dequeue(): Envelope = readerPointer.poll()
def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized
def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { }
}
trait MyUnboundedMessageQueueSemantics
}
class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType
with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
import MyUnboundedMailbox._
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue = {
val pointer = ring.newPointer
val read = pointer.copy
val write = pointer.copy
new MyMessageQueue(pointer, read, write)
}
// you may use another strategy here based on owner (you can access name and path here),
// so for example may allocate same pointers for same prefixes in the name or path
}
실패 복구 중에 변경되지 않은 MyMessageQueue.startPointer를 사용하여 메시지 로그에 액세스 할 수 있습니다. 또한 akka의 Event Sourcing을 유추 할 수도 있습니다.
UnboundedQueue 접근법을 사용하면 반지가 "끝나면"아주 오래된 미 전달 메시지를 새 버전으로 덮어 쓸 수 있으므로 여기에 here과 같은 BoundedQueue가 필요할 수 있습니다.