나는 에코 클라이언트와 서버를 쓰고 있어요.
클라이언트는 문자열을 인코딩 및 서버로 보내 : 옮겨진되는 데이터는 문자열입니다. 서버 RECV 데이터를 디코딩 문자열, 다음, 수신 된 문자열을 인코딩 클라이언트로 보낼 수 있습니다.
상기 처리는 100,000 회 반복된다 (참고 : 연결 영구입니다.).
배설 CONTIONS : 나는 동시에 하나 개의 서버와 두 개의 클라이언트를 실행하면, 모든 것이 괜찮
는 모든 클라이언트가 100000 메시지를 수신하고 정상적으로 종료되었습니다. 내가 서버에 ExecutionHandler을 추가 한 다음 ONE 서버와 동시에 두 개의 클라이언트를 실행하면
는하지만, 하나의 클라이언트는 종료하지 않습니다, 그리고 네트워크 트래픽은 0이다.
난 당신이 나에게 몇 가지 제안을 줄 것이다, 지금은이 문제의 핵심을 찾아 캔트?
MY 코드 :
문자열 인코더, 문자열 디코더, 클라이언트 핸들러, 서버 핸들러, 클라이언트 메인 서버 메인.
// 디코더 =========================================== ============
import java.nio.charset.Charset;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
public class Dcd extends FrameDecoder {
public static final Charset cs = Charset.forName("utf8");
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4) {
return null;
}
int headlen = 4;
int length = buffer.getInt(0);
if (buffer.readableBytes() < length + headlen) {
return null;
}
String ret = buffer.toString(headlen, length, cs);
buffer.skipBytes(length + headlen);
return ret;
}
}
// 인코더 ============================ =============
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
public class Ecd extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof String)) {
return msg;
}
byte[] data = ((String) msg).getBytes();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(data.length + 4, ctx
.getChannel().getConfig().getBufferFactory());
buf.writeInt(data.length);
buf.writeBytes(data);
return buf;
}
}
// 클라이언트 처리기 ============ =========================
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger
.getLogger(EchoClientHandler.class.getName());
private final AtomicLong transferredBytes = new AtomicLong();
private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicLong startTime = new AtomicLong(0);
private String dd;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(String data) {
dd = data;
}
public long getTransferredBytes() {
return transferredBytes.get();
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
// Send the first message. Server will not send anything here
// because the firstMessage's capacity is 0.
startTime.set(System.currentTimeMillis());
Channels.write(ctx.getChannel(), dd);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
transferredBytes.addAndGet(((String) e.getMessage()).length());
int i = counter.incrementAndGet();
int N = 100000;
if (i < N) {
e.getChannel().write(e.getMessage());
} else {
ctx.getChannel().close();
System.out.println(N * 1.0
/(System.currentTimeMillis() - startTime.get()) * 1000);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
// 클라이언트 메인 ===================== ==================
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
/**
* Sends one message when a connection is open and echoes back any received data
* to the server. Simply put, the echo client initiates the ping-pong traffic
* between the echo client and server by sending the first message to the
* server.
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() {
// Configure the client.
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new Dcd(), new Ecd(),
new EchoClientHandler("abcdd"));
}
});
bootstrap.setOption("sendBufferSize", 1048576);
bootstrap.setOption("receiveBufferSize", 1048576);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("writeBufferLowWaterMark", 32 * 1024);
bootstrap.setOption("writeBufferHighWaterMark", 64 * 1024);
List<ChannelFuture> list = new ArrayList<ChannelFuture>();
for (int i = 0; i < 1; i++) {
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(
host, port));
// Wait until the connection is closed or the connection
// attempt
// fails.
list.add(future);
}
for (ChannelFuture f : list) {
f.getChannel().getCloseFuture().awaitUninterruptibly();
}
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}
private static void testOne() {
final String host = "192.168.0.102";
final int port = 8000;
new EchoClient(host, port).run();
}
public static void main(String[] args) throws Exception {
testOne();
}
}
// 서버 처리기 ===== ==========================================================================================================
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* Handler implementation for the echo server.
*/
public class EchoServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger
.getLogger(EchoServerHandler.class.getName());
private final AtomicLong transferredBytes = new AtomicLong();
public long getTransferredBytes() {
return transferredBytes.get();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
transferredBytes.addAndGet(((String) e.getMessage()).length());
Channels.write(ctx.getChannel(), e.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
// 서버의 주요 ======================================= ================
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
/**
* Echoes back any received data from a client.
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
System.out.println(Runtime.getRuntime().availableProcessors() * 2);
final ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
System.out.println("new pipe");
return Channels.pipeline(new Dcd(), new Ecd(),
executionHandler, new EchoServerHandler());
}
});
bootstrap.setOption("child.sendBufferSize", 1048576);
bootstrap.setOption("child.receiveBufferSize", 1048576);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.writeBufferLowWaterMark", 32 * 1024);
bootstrap.setOption("child.writeBufferHighWaterMark", 64 * 1024);
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
}
public static void main(String[] args) throws Exception {
int port = 8000;
new EchoServer(port).run();
}
}
를 참조하십시오? 당신은 jstack 유틸리티 또는 jvisualvm (JDK)을 통해 그것들을 얻을 수 있거나 linux/unix 박스를 사용한다면 "kill -3 JAVA_PID"신호를 보낼 수 있습니다 (http://stackoverflow.com/questions/4876274/kill-3-to- – SirVaulterScoff
메시지가 EchoServerHandler에 도달 할 수 없도록 서버의 디코더가 "Channels.fireMessageReceived (context, result, remoteAddress)"를 실행하지 못하는 것 같습니다. 그것은 잃어버린다. 나는 지금 그 이유를 찾고있다. – zhmt
fireMessageReceived 호출 중에 예외가 있습니까? 제발 그것을 게시하십시오 – SirVaulterScoff