2012-12-19 4 views
1

자바 7 글래스 피시 3.1.2Anynchronous 다중 스레드 처리 메시지

입력 메시지 등이다. 새 컨텍스트 ID가 인 메시지의 경우 새 스레드를 시작해야합니다. 이미 존재하는 contextId의 경우 이미 존재하는 스레드를 사용하십시오. 기존의 thread는 같은 contextId 순차를 가지는 메세지를 처리 ​​할 필요가 있습니다.

안녕하세요. 마지막으로 작동하지 않는 Worker 버전입니다.

@Stateless 
@LocalBean 
public class Worker { 

private static final Map<String, Future<Result>> MAP = new ConcurrentHashMap<>(); 
@EJB 
private Worker worker; 

@Asynchronous 
public void work(Message message) { 
    System.out.println(Thread.currentThread().getName() + ": A message: " + message.toString()+ " should be processed"); 
    Future<Result> sameContext = MAP.get(message.getContextId()); 
    if (sameContext != null) { 
     waitForSameContextId(message, sameContext); 
    } 
    MAP.put(message.getContextId(), worker.doWork(message)); 
} 

@Asynchronous 
public Future<Result> doWork(Message message) { 
    System.out.println(Thread.currentThread().getName() + ": Processing the message: " + message.toString()); 

    AsyncResult<Result> asyncResult = new AsyncResult<>(new Result()); 
    try { 
     Thread.sleep(15000); 
    } catch (InterruptedException ex) { 
     ex.printStackTrace(); 
    } 

    MAP.remove(message.getContextId()); //We are done removing 
    System.out.println(Thread.currentThread().getName() + ": The message: " + message.toString()+ " was processed"); 
    return asyncResult; 
} 

private void waitForSameContextId(Message message, Future<Result> result) { 
    try { 
     System.out.println(Thread.currentThread().getName() + ": message with id: " + message.toString() 
       + " is already in work, blocking Thread until it is finished"); 
     Result get = result.get(); //blocks thread 
    } catch (InterruptedException | ExecutionException ex) { 
     ex.printStackTrace(); 
     // Do some failure management 
    } 
} 

테스트 클래스 :

public class MessageReceiver { 

private static String ID = "#########"; 

@EJB 
private Worker worker; 

public void receive(Message message) { 

    worker.work(message); 
} 

@PostConstruct 
void init() { 

    receive(new Message(ID, "message 1")); 
    receive(new Message(ID, "message 2")); 
    receive(new Message(ID, "message 3")); 
... 
} 

답변

관련 문제