0
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format("%s received %s", ctx.channel(), request));
}
Message msg = new Message(ctx.channel(), request);
ReadQueueHandler.getInstance().addMessageToProcess(msg);
}
public class ReadQueueHandler implements Runnable {
private static int POOL_SIZE = 3;
private static ReadQueueHandler instance;
private final BlockingQueue<Message> messageQueue;
private final ExecutorService threadPool;
private final int threadPoolSize;
private final boolean isActive;
private ReadQueueHandler() {
this.threadPoolSize = POOL_SIZE;
this.threadPool = Executors.newFixedThreadPool(threadPoolSize);
this.messageQueue = new LinkedBlockingQueue<Message>();
isActive = true;
initThreadPool();
}
private void initThreadPool() {
for (int i = 0; i < this.threadPoolSize; i++) {
this.threadPool.execute(this);
}
}
/**
* Add message to read queue
*
* @param message
* - adding message
*/
public void addMessageToProcess(Message message) {
if (message != null) {
this.messageQueue.add(message);
}
}
@Override
public void run() {
while (isActive) {
Message message = null;
try {
message = this.messageQueue.take();
} catch (InterruptedException e) {
System.out.println("Exceptio " + e);
/*
* TODO Add logging
*/
}
if (message != null) {
Channel channel = message.getChannel();
channel.write(new TextWebSocketFrame("Message handled "));
}
}
}
public static ReadQueueHandler getInstance() {
if (instance == null) {
instance = new ReadQueueHandler();
}
return instance;
}
}의 Netty Channel.write 내가 대신 큐에 데이터를 추가하는 Channel.write ("뭔가")를 실행하면, 모든 잘 작동하고 클라이언트가 데이터를 얻을 수
작동하지 않습니다. 그러나 Channel.write ("")가 다른 스레드에서 실행되면 데이터가 가져 오지 않습니다. 이유가 무엇일까요? 채널 쓰기는 다른 스레드에서 실행할 수 없습니까?