2009-04-26 8 views
0

Java와 관련된 실용적인 시나리오, 소켓 프로그램에 대해 연구 중입니다. 기존 시스템과 예상 시스템은 다음과 같습니다.Java에서 멀티 스레딩을 어떻게 처리해야합니까?

기존 시스템 - 시스템은 특정 조건이 충족되는지 확인합니다. 그렇다면 보낼 메시지를 작성하여 대기열에 넣으십시오.

큐 프로세서는 별도의 스레드입니다. 주기적으로 큐에 항목이 있는지 여부를 확인합니다. 항목 (메시지)을 찾으면 메시지를 원격 호스트 (하드 코드 된)로 보내고 대기열에서 항목을 제거합니다.

예상 시스템 - 이것은 이와 비슷한 것입니다. 메시지는 특정 조건이 충족 될 때 생성되지만 모든 경우에 수신자가 동일하지 않습니다. 그래서 많은 접근법이 있습니다.

  1. 메시지를 수신자 ID와 동일한 대기열에 넣습니다. 이 경우 두 번째 스레드는 수신자를 식별하여 메시지를 보낼 수 있습니다.

  2. 여러 스레드가 있습니다. 이 경우 조건이 충족되고 수신기가 "새로 만들기"에 있으면 새 대기열을 만들고 그 대기열에 메시지를 넣습니다. 그리고 새로운 스레드는 그 큐를 처리하기 위해 초기화합니다. 다음 메시지가 동일한 수신자에게 보내지는 경우 동일한 큐에 넣고 그렇지 않으면 새 큐와 스레드를 만들어야합니다.

이제 제 2의 비트를 구현하고 싶습니다. 어떻게해야합니까? 스켈레톤으로 충분하고 대기열 등을 만드는 방법을 신경 쓸 필요가 없습니다. :)

업데이트 : 또한 접근 방법 1이이를 수행하는 가장 좋은 방법이라고 생각합니다. 나는 실을 꿰기에 몇몇 기사를 읽고 그 결정에왔다. 그러나 방법 2를 구현하는 방법을 배우는 것은 정말로 가치가 있습니다.

답변

1

우선 수신자가 많을 경우에는 한 줄짜리 및 대기열 당 하나의 수신자 접근 방식을 사용하지 않을 것입니다. 당신은 대부분의 시간을하지 않고 많은 스레드로 끝날 수 있고 나는 당신에게 성능을 해칠 수 있습니다. 다른 방법으로는 작업자 스레드의 스레드 풀을 사용하는 것, 공유 큐에서 작업을 선택하는 것, 자체 수신자 ID가있는 각 작업 및 작업 스레드가 사용할 각 수신기에 대한 소켓 연결이있는 공유 사전 등이 있습니다.

1) 새 스레드 실행 처리하기 위해 새로운 클래스를 작성하십시오

는 여전히 접근 방식을 추구하려는 경우 당신이 할 수있는 것은, 그렇게 말했다 데에

public class Worker implements Runnable { 
    private Queue<String> myQueue = new Queue<String>(); 
    public void run() 
    { 
     while (true) { 
      string messageToProcess = null; 
      synchronized (myQueue) { 
      if (!myQueue.empty()) { 
       // get your data from queue 
       messageToProcess = myQueue.pop(); 
      } 
      } 
      if (messageToProcess != null) { 
      // do your stuff 
      } 
      Thread.sleep(500); // to avoid spinning 
     } 
    } 
    public void queueMessage(String message) 
    { 
     synchronized(myQueue) { 
     myQueue.add(message); 
     } 
    } 
} 

2) 귀하 주 스레드, 메시지를 작성하고 사전 (해시 테이블)을 사용하여 수신자의 스레드가 이미 작성되었는지 확인하십시오. 있다면, 그냥 새 메시지 대기열.

while (true) { 
    String msg = getNewCreatedMessage(); // you get your messages from here 
    int id = getNewCreatedMessageId(); // you get your rec's id from here 
    Worker w = myHash(id); 
    if (w == null) { // create new Worker thread 
     w = new Worker(); 
     new Thread(w).start(); 
    } 
    w.queueMessage(msg); 
} 

행운을 빕니다 : 그렇지 않은 경우, 새로운 메시지, 새로운 스레드를 만들 해시 테이블에 넣고 대기.

편집 :이 방법으로 언급 한 BlockingQueue 브라이언을 사용하여이 솔루션을 향상시킬 수 있습니다.

2

BlockingQueue을 제안 할 수 있습니까? 디스패치 프로세스는이 큐에 쓰고 (put), 클라이언트는 스레드 세이프 (threadafe) 방식으로 가져 오거나 들여다 볼 수 있습니다. 따라서 큐 구현을 전혀 작성할 필요가 없습니다.

다른 메시지 유형을 포함하는 하나의 대기열이있는 경우 각 클라이언트에 대해 일부 첨단 유형 메커니즘을 구현해야합니다. 즉, 대기열의 헤드를 확인하고 그 헤더 만 취해야합니다. 효과적으로 작동하기 위해서는 소비자가시기 적절하고 강력한 방식으로 필요한 데이터를 추출해야합니다.

메시지/소비자 유형별로 하나의 대기열/스레드가 있으면 더 쉽고 안정적입니다. 큐가 제네릭의 사용을 만들 수 있으며, take()이 차단되는 것을

while (!done) { 
    Object item = queue.take(); 
    // process item 
} 

참고 :

클라이언트 구현은,에 반복해야합니다.

물론 여러 소비자가 서로 다른 유형의 메시지를 사용하는 경우 space-based architecture을 고려할 수 있습니다. 여기에는 대기열 (FIFO) 특성이 없지만 여러 소비자가 매우 쉽게 사용할 수 있습니다.

2

엔드 시스템이 많고 가끔씩 또는 몇 개의 엔드 시스템과 각각의 메시지에 자주 메시지를 보내야하는지 약간 무게가 나가야합니다.

엔드 시스템이 많은 경우 말 그대로 엔드 시스템 당 하나의 스레드를 사용하는 것이 실제로 모든 컴퓨터에 지속적으로 메시지를 스트리밍하지 않는 한 정상보다 약간 소리가납니다. 특정 경계 사이에서만 성장할 수있는 스레드 풀을 제안 할 것입니다. 이렇게하려면 ThreadPoolExecutor를 사용할 수 있습니다. 당신은 몇 가지 끝 기계가있는 경우, 당신은 기계마다의 BlockingQueue을 가질 수

Executor msgExec = new ThreadPoolExecutor(...); 

public void sendMessage(final String machineId, byte[] message) { 
    msgExec.execute(new Runnable() { 
    public void run() { 
     sendMessageNow(machineId, message); 
    } 
    }); 
} 

private void sendMessageNow(String machineId, byte[] message) { 
    // open connection to machine and send message, thinking 
    // about the case of two simultaneous messages to a machine, 
    // and whether you want to cache connections. 
} 

및 스레드 : 당신이 메시지를 게시 할 필요가있을 때, 당신은 실제로 메시지를 보내드립니다 집행자에 실행 가능한 제출 다음 메시지를 기다리는 블로킹 대기열 당. 이 경우, 패턴은 (테스트되지 않은 오프 최고 수준의 머리 일요일 아침 코드를 조심) 다음과 같다 :

이 경우
ConcurrentHashMap<String,BockingQueue> queuePerMachine; 

public void sendMessage(String machineId, byte[] message) { 
    BockingQueue<Message> q = queuePerMachine.get(machineId); 
    if (q == null) { 
    q = new BockingQueue<Message>(); 
    BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q); 
    if (prev != null) { 
     q = prev; 
    } else { 
     (new QueueProessor(q)).start(); 
    } 
    } 
    q.put(new Message(message)); 
} 

private class QueueProessor extends Thread { 
    private final BockingQueue<Message> q; 
    QueueProessor(BockingQueue<Message> q) { 
    this.q = q; 
    } 
    public void run() { 
    Socket s = null; 
    for (;;) { 
     boolean needTimeOut = (s != null); 
     Message m = needTimeOut ? 
     q.poll(60000, TimeUnit.MILLISECOND) : 
     q.take(); 
     if (m == null) { 
     if (s != null) 
      // close s and null 
     } else { 
     if (s == null) { 
      // open s 
     } 
     // send message down s 
     } 
    } 
    // add appropriate error handling and finally 
    } 
} 

해당 시스템에 대한 메시지를 60 초 내에 도착하지 않는 경우, 우리는 연결을 닫습니다 .

대신 JMS를 사용해야합니까? 글쎄, 당신은이 소리가 당신에게 복잡하게 들리는 지 짐작해야합니다. 내 개인적 감각은 특수한 틀을 정당화하는 것은 충분히 복잡하지 않다는 것입니다. 그러나 의견이 다를 것이라고 확신합니다.

P. 현실에서는 이제 이것을 보았습니다. 아마도 스레드 객체 내에 큐를 놓고 컴퓨터 ID -> 스레드 객체를 매핑하는 것입니다. 어쨌든, 당신은 아이디어를 얻습니다.

2

java.util.concurrent를 정렬의 실제 "엔진"으로 사용하는 in-vm JMS 구현 인 SomnifugiJMS을 사용해 볼 수 있습니다.

아마도 다소 과장 될 것이지만 추가 프로그래밍없이 응용 프로그램을 배포 할 수 있습니다 (적용 가능한 경우). ActiveMQ과 같은 다른 JMS 구현을 플러그인하면 완료됩니다.

관련 문제