2010-07-29 4 views
6

처음에는이 질문을 here에게 물었습니다. 그러나 제 질문은 while-true 루프가 아니라는 것을 깨달았습니다. Java에서 고성능 비동기 메시지 전달을 수행하는 적절한 방법은 무엇입니까? 내가 할 노력하고있어Java : 고성능 메시지 전달 (단일 생성자/단일 소비자)

...

내가 ~ 10,000 소비자가 자신의 개인 큐의 각 소비하는 메시지를 표시합니다. 하나씩 메시지를 생성하고 올바른 소비자 대기열에 넣는 스레드가 있습니다. 각 소비자는 대기열에 표시하고 처리 할 메시지를 확인하면서 무기한 반복합니다.

하나의 제작자가 있기 때문에이 용어는 "단일 생산자/단일 소비자"라고 생각하며 각 소비자는 자신의 개인 대기열에서만 작업합니다 (다중 소비자는 동일한 대기열에서 읽지 않습니다).

내부 Consumer.java가 :

@Override 
public void run() { 
    while (true) { 
     Message msg = messageQueue.poll(); 
     if (msg != null) { 
      ... // do something with the message 
     } 
    } 
} 

생산자가 빠른 속도로 소비자 메시지 큐 내부 메시지를두고있다 (초당 몇 만 메시지). 소비자는 가능한 한 빨리 이러한 메시지를 처리해야합니다!

참고 : while (true) { ... }은 생산자가 마지막 메시지로 보낸 KILL 메시지로 종료됩니다.

그러나 내 질문은이 메시지 전달을 디자인하는 적절한 방법에 관한 것입니다. messageQueue에 어떤 종류의 대기열을 사용해야합니까? 동기 또는 비동기 여야합니까? 메시지는 어떻게 디자인되어야합니까? while-true 루프를 사용해야합니까? 소비자가 스레드가되어야합니까? 10,000 개의 스레드가 느려지 게됩니까? 쓰레드의 대안은 무엇입니까?

그래서 Java에서 고성능 메시지 전달을 수행하는 적절한 방법은 무엇입니까?

+1

왜 10k 스레드가 있습니까? 각 스레드마다 많은 양의 대기열을 필요로하는 매우 많은 코어 나 작업이 없으면 스레드 전환에 많은 오버 헤드가 발생합니다. – Mike

+2

10k 쓰레드는 고성능에서 멀리 떨어져 있습니다. – whiskeysierra

+0

'@Mike :'10,000 개의 다른 심볼이 있으며 각 소비자가 하나의 심볼에 대한 메시지를 처리합니다. 스레드로 구현해야하는지 모르겠지만 소비자는 서로 아무 것도 공유하지 않으며 배우 ​​모델에 대한 좋은 후보자가됩니다. –

답변

5

10,000 스레드의 컨텍스트 전환 오버 헤드가 메모리 오버 헤드는 말할 것도없이 매우 높을 것이라고 나는 말할 것입니다. 기본적으로 32 비트 플랫폼에서 각 스레드는 256KB의 기본 스택 크기를 사용하므로 스택에 2.5GB가 사용됩니다. 분명히 당신은 64 비트이지만, 그렇다고하더라도 꽤 많은 양의 메모리를 말하는 것입니다. 사용 된 메모리의 양 때문에 캐시가 많이 쓰러져서 CPU가 메모리 대역폭에 의해 제한됩니다.

많은 양의 스택과 컨텍스트 스위칭 오버 헤드를 할당하는 것을 피하기 위해 너무 많은 스레드를 사용하지 않는 디자인을 찾아 볼 것입니다. 동시에 10,000 개의 스레드를 처리 할 수 ​​없습니다. 현재 하드웨어는 일반적으로 100 코어 미만입니다.

하드웨어 스레드 당 하나의 큐를 만들고 라운드 로빈 방식으로 메시지를 발송합니다. 처리 시간이 상당히 다를 경우, 일부 스레드는 더 많은 작업을하기 전에 대기열 처리를 끝내고 다른 스레드는 할당 된 작업을 처리하지 못할 위험이 있습니다. JSR-166 ForkJoin 프레임 워크에서 구현 된대로 작업 도용을 사용하면이 문제를 피할 수 있습니다.

게시자가 구독자에게 전달하는 한 가지 방법이므로 구독자가 게시 된 메시지를 변경하지 않는다는 가정하에 메시지에는 특별한 디자인이 필요하지 않습니다.

EDIT : 10,000 개의 기호가있는 경우 의견을 읽은 다음 일반 구독자 스레드 (코어 당 하나의 구독자 스레드)를 만들고 게시자로부터 메시지를 비동기 적으로 (예 : 메시지 대기열을 통해) 수신합니다.구독자는 메시지를 큐에서 가져 와서 메시지에서 심볼을 가져 와서 메시지 처리기 맵에서 찾아서 핸들러를 검색하고 핸들러를 호출하여 메시지를 동 기적으로 처리합니다. 완료되면 큐에서 다음 메시지를 가져 오는 과정을 반복합니다. 동일한 기호에 대한 메시지를 순서대로 처리해야하는 경우 (이는 내가 10,000 개의 대기열을 원했을 것으로 추측합니다) 기호를 구독자에 매핑해야합니다. 예 : 가입자가 10 명인 경우 0-999 번 심볼은 가입자 0, 1000-1999는 가입자 1 등으로 이동합니다.보다 세련된 방식은 심볼을 주파수 분포에 따라 매핑하여 각 가입자가 대략 동일한로드를 얻도록하는 것입니다. 예를 들어 트래픽의 10 %가 기호 0이면 구독자 0은 하나의 기호 만 처리하고 다른 기호는 다른 구독자 사이에 분배됩니다.

+0

내 프로그램을 작성하는 방법이 있나요? 개념적으로 10,000 명의 소비자가 각각 자신의 큐에서 작업하고 있습니까? 하지만 몇 개의 대기열을 처리하는 몇 가지 스레드로 실행하고 있습니까? –

+0

제 편집을 참조하십시오. – mdma

+0

@ Mr.Burgundy 물론 많은 접근법이 있습니다. 예 : 간단한 방법으로 소비자 로직을 소비자 스레드와 관련이없는 클래스에 캡슐화하고 목록에있는 10k 개의 소비자 스레드가 올바른 스레드를 찾아 메시지의 특정 소비자에 대한 로직을 호출하도록 할 수 있습니다. – nos

0

하드웨어 및 용량에 비례하여 소비자 스레드 풀이 있습니다. 이러한 소비자 스레드는 메시지 대기열을 폴링 할 수 있습니다.

메시지를 처리하는 방법을 알고 메시지가 초기화 될 때 소비자 스레드 클래스로 프로세서를 등록하도록합니다.

1

우선, 완전한 디자인 문서를 작성하거나 다른 방법을 시도하지 않는 한 하나의 정답은 없습니다.

나는 당신의 프로세싱이 계산적으로 집중적이지 않을 것이라고 가정한다. 그렇지 않으면 당신은 동시에 10000 개의 큐를 처리 할 생각을하지 않을 것이다. 한 가지 가능한 솔루션은 CPU 당 1-2 개의 스레드를 가짐으로써 컨텍스트 스위칭을 최소화하는 것입니다. 시스템이 엄격한 실시간으로 데이터를 처리하지 않으면 각 대기열에서 지연이 더 커지지 만 전체적으로 처리량이 향상 될 수 있습니다.

예를 들어, 제작자 스레드를 자체 CPU에서 실행하고 일괄 처리 메시지를 소비자 스레드에 배치하십시오. 각 소비자 스레드는 N 개의 개인 대기열에 메시지를 배포하고 처리 단계를 수행하며 새로운 데이터 일괄 처리를 수신합니다. 다시 말하지만 지연 허용 오차에 따라 달라 지므로 처리 단계는 모든 대기열, 고정 된 수의 대기열, 시간 임계 값에 도달하지 않으면 대기열 수만큼의 처리를 의미 할 수 있습니다. 어떤 큐가 어떤 소비자 스레드에 속해 있는지 쉽게 알 수 있다면 (예 : 큐가 순차적으로 번호가 매겨진 경우 : int consumerThreadNum = queueNum & 0x03) 해시 테이블을 검색 할 때마다 느려질 수 있으므로 이점이 있습니다.

메모리 스레 싱을 최소화하려면 항상 대기열을 작성/삭제하는 것이 좋지 않으므로 스레드 당 (최대 대기열 수/코어 수) 대기열 객체를 사전 할당 할 수 있습니다. 대기열이 파괴되지 않고 완료되면 지워지고 다시 사용될 수 있습니다. gc가 너무 자주 그리고 너무 오랫동안 들어오는 것을 원하지 않습니다.

생산자가 KILL 명령을 수신 할 때까지 각 대기열에 대해 완전한 데이터 세트를 생성하거나 청크로 데이터를 전송하는 경우 알 수없는 또 다른 사항이 있습니다. 생산자가 완전한 데이터 세트를 보내는 경우 대기열 개념을 완전히 폐기하고 소비자 스레드에 도착할 때 데이터를 처리 할 수 ​​있습니다.

2

당신은 (신용 Which ThreadPool in Java should I use?로 이동)이 사용할 수 있습니다

class Main { 
    ExecutorService threadPool = Executors.newFixedThreadPool(
            Runtime.availableProcessors()*2); 

    public static void main(String[] args){ 
     Set<Consumer> consumers = getConsumers(threadPool); 
     for(Consumer consumer : consumers){ 
      threadPool.execute(consumer); 
     } 
    } 
} 

class Consumer { 
    private final ExecutorService tp; 
    private final MessageQueue messageQueue; 
    Consumer(ExecutorService tp,MessageQueue queue){ 
     this.tp = tp; 
     this.messageQueue = queue; 
    } 
    @Override 
    public void run(){ 
       Message msg = messageQueue.poll(); 

       if (msg != null) { 
        try{ 
         ... // do something with the message 
        finally{ 
         this.tp.execute(this); 
        } 
       } 
      } 
    } 
}  

이 방법을 사용하면 약간의 번거 로움과 함께 좋아 예약을 할 수 있습니다.