1
자바 7 글래스 피시 3.1.2Anynchronous 다중 스레드 처리 메시지
입력 메시지 등이다. 새 컨텍스트 ID가 인 메시지의 경우 새 스레드를 시작해야합니다. 이미 존재하는 contextId의 경우 이미 존재하는 스레드를 사용하십시오. 기존의 thread는 같은 contextId 순차를 가지는 메세지를 처리 할 필요가 있습니다.
안녕하세요. 마지막으로 작동하지 않는 Worker 버전입니다.
@Stateless
@LocalBean
public class Worker {
private static final Map<String, Future<Result>> MAP = new ConcurrentHashMap<>();
@EJB
private Worker worker;
@Asynchronous
public void work(Message message) {
System.out.println(Thread.currentThread().getName() + ": A message: " + message.toString()+ " should be processed");
Future<Result> sameContext = MAP.get(message.getContextId());
if (sameContext != null) {
waitForSameContextId(message, sameContext);
}
MAP.put(message.getContextId(), worker.doWork(message));
}
@Asynchronous
public Future<Result> doWork(Message message) {
System.out.println(Thread.currentThread().getName() + ": Processing the message: " + message.toString());
AsyncResult<Result> asyncResult = new AsyncResult<>(new Result());
try {
Thread.sleep(15000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
MAP.remove(message.getContextId()); //We are done removing
System.out.println(Thread.currentThread().getName() + ": The message: " + message.toString()+ " was processed");
return asyncResult;
}
private void waitForSameContextId(Message message, Future<Result> result) {
try {
System.out.println(Thread.currentThread().getName() + ": message with id: " + message.toString()
+ " is already in work, blocking Thread until it is finished");
Result get = result.get(); //blocks thread
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
// Do some failure management
}
}
테스트 클래스 :
public class MessageReceiver {
private static String ID = "#########";
@EJB
private Worker worker;
public void receive(Message message) {
worker.work(message);
}
@PostConstruct
void init() {
receive(new Message(ID, "message 1"));
receive(new Message(ID, "message 2"));
receive(new Message(ID, "message 3"));
...
}