14

시스템에는 두 가지 유형의 메시지 유형 A와 B가 있습니다. 각 메시지는 다른 구조를가집니다. 유형 A에는 int 멤버가 포함되고 유형 B에는 이중 멤버가 포함됩니다. 내 시스템은 두 가지 유형의 메시지를 여러 비즈니스 로직 스레드에 전달해야합니다. 대기 시간을 줄이는 것은 매우 중요하므로 Disruptor를 사용하여 주 스레드에서 비즈니스 논리 스레드로 메시지를 기계적으로 전달할 수 있는지 조사하고 있습니다.여러 메시지 유형으로 장애 처리자를 사용하는 방법

내 문제는 disruptor가 링 버퍼에있는 한 유형의 개체 만 허용한다는 것입니다. 이것은 disruptor가 링 버퍼의 객체를 미리 할당하기 때문에 의미가 있습니다. 그러나 Disruptor를 통해 비즈니스 로직 스레드에 두 가지 유형의 메시지를 전달하는 것도 어렵습니다.

  1. 구성 교란이 (로 How should one use Disruptor (Disruptor Pattern) to build real-world message systems?에 의해 권장) 고정 된 크기의 바이트 배열을 포함하는 객체를 사용하기 : 내가 말할 수있는 건, 내가 네 가지 옵션이 있습니다. 이 경우 주 스레드는 메시지를 중단 장치에 게시하기 전에 바이트 배열로 인코딩해야하며 각 비즈니스 논리 스레드는 수신시 바이트 배열을 개체로 다시 디코딩해야합니다. 이 설정의 단점은 비즈니스 로직 스레드가 실제로 중단 장치에서 메모리를 공유하지 않는다는 것입니다. 대신 중단 장치가 제공 한 바이트 배열에서 새 개체를 만들어서 가비지를 만듭니다. 이 설정의 장점은 모든 비즈니스 로직 스레드가 동일한 장애 요인으로부터 여러 유형의 메시지를 읽을 수 있다는 것입니다.

  2. 단일 유형의 개체를 사용하지만 개의 장애가 인을 각 개체 유형에 대해 하나씩 생성하도록 방해자를 구성하십시오. 위의 경우에는 A 유형의 객체와 B 유형의 객체에 대해 각각 두 개의 분리 된 분리자가 있습니다.이 설정의 위쪽은 기본 스레드가 객체를 바이트 배열로 인코딩 할 필요가 없으며 비즈니스 논리가 낮은 쓰레드는 장애가 (쓰레기가 생성되지 않음)에서 사용 된 것과 동일한 객체를 공유 할 수 있습니다. 이 설정의 단점은 어떻게 든 각각의 비즈니스 로직 스레드가 여러 가지 장애가의 메시지를 구독해야한다는 것입니다.

  3. 모두 메시지 A의 모든 필드를 포함하고 B.이 매우 OO 스타일의 반대 "슈퍼"개체의 단일 유형을 사용하도록 교란을 구성하지만, 옵션 # 1과 # 절충이 가능합니다 2.

  4. 개체 참조을 사용하도록 장애 조치를 구성하십시오. 그러나이 경우 객체 사전 할당 및 메모리 정렬의 성능 이점을 잃게됩니다.

이 상황에 대해 무엇을 권하고 싶습니까? 옵션 # 2가 가장 깨끗한 솔루션이라고 생각하지만 소비자가 여러 가지 혼란으로부터 오는 메시지를 기술적으로 구독 할 수 있는지 여부와 그 방법을 알지 못합니다. 누구나 옵션 # 2를 구현하는 방법에 대한 예제를 제공 할 수 있다면 크게 감사하겠습니다!

+5

마이클 바커는 스럽 Google 그룹 내 질문에 대답 게시 할 수 있습니다. 아래에서 그의 답변을 확인하십시오. https://groups.google.com/d/msg/lmax-disruptor/clUkJaFMsZg/54fKplz21MwJ –

+4

질문에 대한 답변이 여기에있는 답변으로 변환하고 해당 답변을 수락 된 것으로 표시하는 것이 좋습니다. – cic

답변

2

구성 고정 된 크기의 바이트 배열 포함하는 객체를 사용하는 장애 물질 (하나 실제 메시지 시스템을 구축 스럽 (스럽 패턴)를 사용하는 방법에 의해 권장 등?). 이 경우 메인 스레드는 메시지를메시지를 중단 코드에 게시하기 전에 바이트 배열로 인코딩해야하며 비즈니스 논리 스레드의 각 은 수신시 바이트 배열을 개체 으로 다시 디코딩해야합니다.이 설정의 단점은 비즈니스 로직 스레드 이 실제로 방해자로부터 메모리를 공유하지 않는다는 것입니다. 대신 방해자가 제공 한 바이트 배열 에서 새 오브젝트를 생성하여 (따라서 가비지를 생성하는) 입니다. 이 설정의 장점은 모든 비즈니스 논리 스레드가 동일한 장애가에서 여러 유형의 메시지를 읽을 수 있다는 것입니다. ,

이 내 선호하는 방법이 될 것입니다,하지만 난 약간, 우리 사용 사례에서 우리가 중 하나에서 수신 또는 I/O 장치의 일종으로 전송 년대 스럽 사용한 적이 거의 모든 장소를 색 따라서 우리의 기본 통화는 바이트 배열입니다. 마샬링에 대한 플라이 웨이트 접근 방식을 사용하여 객체 생성 을 둘러 볼 수 있습니다. 의 예제를 보시려면, Devoxx (https://github.com/mikeb01/ticketing)에서 제시 한 예제에서 Javolution의 Struct 및 Union 클래스를 사용했습니다. 이 이벤트 처리기에서 onEvent 호출을 반환하기 전에 개체를 완전히 처리 할 수 ​​있으면이 방법이 효과적입니다. 이벤트가 해당 지점을 초과하여 살아야하는 경우 데이터의 사본 을 작성해야합니다. 객체로 역 직렬화한다.

단일 유형의 개체를 사용하지만 각 개체 유형에 대해 하나씩 복수 도구 을 생성하도록 방해자를 구성합니다. 위의 경우 개의 분리 된 두 개의 처리기가 있습니다. 하나는 A 유형의 객체이고 다른 하나는 B 유형의 객체입니다.이 설정의 위쪽은 기본 스레드가 바이트 단위로 객체를 인코딩 할 필요가 없다는 것입니다 어레이와 비즈니스 로직 쓰레드가 적은 경우 은 장애물에서 사용 된 것과 동일한 객체를 공유 할 수 있습니다 (생성 된 쓰레기가 없음). 이 설정의 단점은 입니다. 어떻게 든 각 비즈니스 로직 스레드는 여러 개의 장애자가 보낸 메시지를 구독 할 입니다.

이 방법을 시도하지 않았 으면 여러 개의 링 버퍼에서 폴링 할 수있는 사용자 지정 EventProcessor 이 필요할 것입니다.

모두 메시지 A의 모든 필드를 포함하고 B.이 매우 OO 스타일 반대 "슈퍼"개체의 단일 유형을 사용하도록 교란을 구성하지만, 옵션 # 1 사이의 타협을 허용합니다 # 2. 개체 참조를 사용하도록 방해 기능을 구성합니다. 그러나이 경우 I 은 객체 사전 할당 및 메모리 정렬의 성능 이점을 상실합니다.

부족한 사전 할당이 허용되는 몇 가지 경우에이 작업을 수행했습니다. 괜찮아. 개체를 전달하는 경우 소비자 측에서 으로 끝내면 해당 개체를 무효화해야합니다. 우리는 "슈퍼"객체에 대해 double 디스패치 패턴을 사용하면 공정하게 구현을 유지한다는 것을 알았습니다. 이것에 대한 단점은 약간 더 길어질 것입니다. GC는 처럼 오브젝트의 직선 배열이었습니다. GC는 마크 단계에서 트래버스 할 라이브 오브젝트가 더 많습니다.

이 상황에 대해 무엇을 권하고 싶습니까?옵션 # 2가 가장 깨끗한 솔루션이라고 생각하지만 소비자가 기술적으로 어떻게 기술 할 수 있는지 또는 알지 못합니다. 여러 개의 장애가있는 메시지를 구독합니다. 옵션 2를 구현하는 방법에 대해 누구든지 예제를 제공 할 수 있다면 크게 감사하겠습니다! 당신은 데이터의 사용에 관련하여 완벽한 유연성을 원하는 경우

또 다른 옵션은, 링 버퍼를 사용하지만, 대신 시퀀서에 직접 을 이야기하고 가장 피팅 를 참조로 개체의 레이아웃을 정의하지 않는 것입니다 .

1

Ben Baumgold, 나는 당신이 지금까지 해결책을 찾았다 고 확신합니다. # 4 (또는 # 3)는 이벤트 홀더를 만들어 쉽게 구현할 수 있습니다. 객체에 대한 열거 형으로 생각하십시오. 룩업 속도를 높이려면 이벤트를 enum 유형으로 강화해야합니다. 공지 사항, 나는 원래 이벤트에 대한 참조를 홀더에 저장하고 있습니다. 복사 생성자 또는 clone()을 생성하고 링 버퍼에 삽입 할 때 이벤트를 복사하는 것이 더 적절할 수 있습니다.

예에 의해 예시 하나 :

을 //이

public enum MyEventEnum { 
EVENT_TIMER, 
EVENT_MARKETDATA; 
} 

//이 홀더입니다 이벤트

에 사용 된 열거입니다. 언제나 링 버퍼의이 인스턴스는 배열 [type.ordinal()]으로 인덱싱 된 하나의 이벤트 만 보유합니다. 왜 배열은 코드에서 분명해야합니다.

public class RingBufferEventHolder {  
private MyEventEnum; 
private EventBase array[]; 

public RingBufferEventHolder() { 
    array=new EventBase[MyEventEnum.values().length]; 
} 

// TODO: null the rest 
public void setEvent(EventBase event) { 
    type=event.getType(); 
    switch(event.getType()) { 
     case EVENT_TIMER: 
      array[MyEventEnum.EVENT_TIMER.ordinal()]=event; 
      break; 
     case EVENT_MARKETDATA: 
      array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event; 
      break; 
     default: 
      throw new RuntimeException("Unknown event type " + event); 
    } 
} 

// 이벤트

EventBase newEvent=new EventMarketData(....); 
    // prepare 
    long nextSequence = ringBuffer.next(); 
    RingBufferEventHolder holder = ringBuffer.get(nextSequence); 
    holder.setEvent(newEvent); 
    // make the event available to EventProcessors 
    ringBuffer.publish(nextSequence); 
관련 문제