2012-03-14 3 views
0

장면 :그물코 서버가 차단 될 것 같다?

나는 에코 클라이언트와 서버를 쓰고 있어요.

클라이언트는 문자열을 인코딩 및 서버로 보내 : 옮겨진되는 데이터는 문자열입니다. 서버 RECV 데이터를 디코딩 문자열, 다음, 수신 된 문자열을 인코딩 클라이언트로 보낼 수 있습니다.

상기 처리는 100,000 회 반복된다 (참고 : 연결 영구입니다.).

배설 CONTIONS : 나는 동시에 하나 개의 서버와 두 개의 클라이언트를 실행하면, 모든 것이 괜찮

는 모든 클라이언트가 100000 메시지를 수신하고 정상적으로 종료되었습니다. 내가 서버ExecutionHandler을 추가 한 다음 ONE 서버와 동시에 두 개의 클라이언트를 실행하면

는하지만, 하나의 클라이언트는 종료하지 않습니다, 그리고 네트워크 트래픽은 0이다.

난 당신이 나에게 몇 가지 제안을 줄 것이다, 지금은이 문제의 핵심을 찾아 캔트?

MY 코드 :

문자열 인코더, 문자열 디코더, 클라이언트 핸들러, 서버 핸들러, 클라이언트 메인 서버 메인.

// 디코더 =========================================== ============

import java.nio.charset.Charset; 

import org.jboss.netty.buffer.ChannelBuffer; 
import org.jboss.netty.channel.Channel; 
import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.handler.codec.frame.FrameDecoder; 

public class Dcd extends FrameDecoder { 
    public static final Charset cs = Charset.forName("utf8"); 

    @Override 
    protected Object decode(ChannelHandlerContext ctx, Channel channel, 
      ChannelBuffer buffer) throws Exception { 

     if (buffer.readableBytes() < 4) { 
      return null; 
     } 

     int headlen = 4; 
     int length = buffer.getInt(0); 
     if (buffer.readableBytes() < length + headlen) { 
      return null; 
     } 

     String ret = buffer.toString(headlen, length, cs); 
     buffer.skipBytes(length + headlen); 

     return ret; 
    } 
} 

// 인코더 ============================ =============

import org.jboss.netty.buffer.ChannelBuffer; 
import org.jboss.netty.buffer.ChannelBuffers; 
import org.jboss.netty.channel.Channel; 
import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; 

public class Ecd extends OneToOneEncoder { 
    @Override 
    protected Object encode(ChannelHandlerContext ctx, Channel channel, 
      Object msg) throws Exception { 
     if (!(msg instanceof String)) { 
      return msg; 
     } 

     byte[] data = ((String) msg).getBytes(); 

     ChannelBuffer buf = ChannelBuffers.dynamicBuffer(data.length + 4, ctx 
       .getChannel().getConfig().getBufferFactory()); 
     buf.writeInt(data.length); 
     buf.writeBytes(data); 

     return buf; 
    } 
} 

// 클라이언트 처리기 ============ =========================

import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.atomic.AtomicLong; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.channel.ChannelStateEvent; 
import org.jboss.netty.channel.Channels; 
import org.jboss.netty.channel.ExceptionEvent; 
import org.jboss.netty.channel.MessageEvent; 
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 

/** 
* Handler implementation for the echo client. It initiates the ping-pong 
* traffic between the echo client and server by sending the first message to 
* the server. 
*/ 
public class EchoClientHandler extends SimpleChannelUpstreamHandler { 

    private static final Logger logger = Logger 
      .getLogger(EchoClientHandler.class.getName()); 

    private final AtomicLong transferredBytes = new AtomicLong(); 
    private final AtomicInteger counter = new AtomicInteger(0); 
    private final AtomicLong startTime = new AtomicLong(0); 

    private String dd; 

    /** 
    * Creates a client-side handler. 
    */ 
    public EchoClientHandler(String data) { 
     dd = data; 
    } 

    public long getTransferredBytes() { 
     return transferredBytes.get(); 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { 
     // Send the first message. Server will not send anything here 
     // because the firstMessage's capacity is 0. 
     startTime.set(System.currentTimeMillis()); 

     Channels.write(ctx.getChannel(), dd); 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { 
     // Send back the received message to the remote peer. 
     transferredBytes.addAndGet(((String) e.getMessage()).length()); 
     int i = counter.incrementAndGet(); 
     int N = 100000; 
     if (i < N) { 
      e.getChannel().write(e.getMessage()); 
     } else { 
      ctx.getChannel().close(); 
      System.out.println(N * 1.0 
        /(System.currentTimeMillis() - startTime.get()) * 1000); 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { 
     // Close the connection when an exception is raised. 
     logger.log(Level.WARNING, "Unexpected exception from downstream.", 
       e.getCause()); 
     e.getChannel().close(); 
    } 
} 

// 클라이언트 메인 ===================== ==================

import java.net.InetSocketAddress; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.Executors; 

import org.jboss.netty.bootstrap.ClientBootstrap; 
import org.jboss.netty.channel.ChannelFuture; 
import org.jboss.netty.channel.ChannelPipeline; 
import org.jboss.netty.channel.ChannelPipelineFactory; 
import org.jboss.netty.channel.Channels; 
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 

/** 
* Sends one message when a connection is open and echoes back any received data 
* to the server. Simply put, the echo client initiates the ping-pong traffic 
* between the echo client and server by sending the first message to the 
* server. 
*/ 
public class EchoClient { 

    private final String host; 
    private final int port; 

    public EchoClient(String host, int port) { 
     this.host = host; 
     this.port = port; 
    } 

    public void run() { 
     // Configure the client. 
     final ClientBootstrap bootstrap = new ClientBootstrap(
       new NioClientSocketChannelFactory(
         Executors.newCachedThreadPool(), 
         Executors.newCachedThreadPool())); 

     // Set up the pipeline factory. 
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
      public ChannelPipeline getPipeline() throws Exception { 
       return Channels.pipeline(new Dcd(), new Ecd(), 
         new EchoClientHandler("abcdd")); 
      } 
     }); 

     bootstrap.setOption("sendBufferSize", 1048576); 
     bootstrap.setOption("receiveBufferSize", 1048576); 
     bootstrap.setOption("tcpNoDelay", true); 
     bootstrap.setOption("writeBufferLowWaterMark", 32 * 1024); 
     bootstrap.setOption("writeBufferHighWaterMark", 64 * 1024); 

     List<ChannelFuture> list = new ArrayList<ChannelFuture>(); 
     for (int i = 0; i < 1; i++) { 
      // Start the connection attempt. 
      ChannelFuture future = bootstrap.connect(new InetSocketAddress(
        host, port)); 
      // Wait until the connection is closed or the connection 
      // attempt 
      // fails. 
      list.add(future); 
     } 

     for (ChannelFuture f : list) { 
      f.getChannel().getCloseFuture().awaitUninterruptibly(); 
     } 

     // Shut down thread pools to exit. 
     bootstrap.releaseExternalResources(); 
    } 

    private static void testOne() { 
     final String host = "192.168.0.102"; 
     final int port = 8000; 

     new EchoClient(host, port).run(); 
    } 

    public static void main(String[] args) throws Exception { 
     testOne(); 
    } 
} 

// 서버 처리기 ===== ==========================================================================================================

import java.util.concurrent.atomic.AtomicLong; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.channel.Channels; 
import org.jboss.netty.channel.ExceptionEvent; 
import org.jboss.netty.channel.MessageEvent; 
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 

/** 
* Handler implementation for the echo server. 
*/ 
public class EchoServerHandler extends SimpleChannelUpstreamHandler { 

    private static final Logger logger = Logger 
      .getLogger(EchoServerHandler.class.getName()); 

    private final AtomicLong transferredBytes = new AtomicLong(); 

    public long getTransferredBytes() { 
     return transferredBytes.get(); 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { 
     // Send back the received message to the remote peer. 
     transferredBytes.addAndGet(((String) e.getMessage()).length()); 
     Channels.write(ctx.getChannel(), e.getMessage()); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { 
     // Close the connection when an exception is raised. 
     logger.log(Level.WARNING, "Unexpected exception from downstream.", 
       e.getCause()); 
     e.getChannel().close(); 
    } 
} 

// 서버의 주요 ======================================= ================

import java.net.InetSocketAddress; 
import java.util.concurrent.Executors; 

import org.jboss.netty.bootstrap.ServerBootstrap; 
import org.jboss.netty.channel.ChannelPipeline; 
import org.jboss.netty.channel.ChannelPipelineFactory; 
import org.jboss.netty.channel.Channels; 
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 
import org.jboss.netty.handler.execution.ExecutionHandler; 
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; 

/** 
* Echoes back any received data from a client. 
*/ 
public class EchoServer { 

    private final int port; 

    public EchoServer(int port) { 
     this.port = port; 
    } 

    public void run() { 
     // Configure the server. 
     ServerBootstrap bootstrap = new ServerBootstrap(
       new NioServerSocketChannelFactory(
         Executors.newCachedThreadPool(), 
         Executors.newCachedThreadPool())); 

     System.out.println(Runtime.getRuntime().availableProcessors() * 2); 

     final ExecutionHandler executionHandler = new ExecutionHandler(
       new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); 

     // Set up the pipeline factory. 
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
      public ChannelPipeline getPipeline() throws Exception { 
       System.out.println("new pipe"); 
       return Channels.pipeline(new Dcd(), new Ecd(), 
         executionHandler, new EchoServerHandler()); 
      } 
     }); 

     bootstrap.setOption("child.sendBufferSize", 1048576); 
     bootstrap.setOption("child.receiveBufferSize", 1048576); 
     bootstrap.setOption("child.tcpNoDelay", true); 
     bootstrap.setOption("child.writeBufferLowWaterMark", 32 * 1024); 
     bootstrap.setOption("child.writeBufferHighWaterMark", 64 * 1024); 

     // Bind and start to accept incoming connections. 
     bootstrap.bind(new InetSocketAddress(port)); 
    } 

    public static void main(String[] args) throws Exception { 
     int port = 8000; 
     new EchoServer(port).run(); 
    } 
} 
+0

를 참조하십시오? 당신은 jstack 유틸리티 또는 jvisualvm (JDK)을 통해 그것들을 얻을 수 있거나 linux/unix 박스를 사용한다면 "kill -3 JAVA_PID"신호를 보낼 수 있습니다 (http://stackoverflow.com/questions/4876274/kill-3-to- – SirVaulterScoff

+0

메시지가 EchoServerHandler에 도달 할 수 없도록 서버의 디코더가 "Channels.fireMessageReceived (context, result, remoteAddress)"를 실행하지 못하는 것 같습니다. 그것은 잃어버린다. 나는 지금 그 이유를 찾고있다. – zhmt

+0

fireMessageReceived 호출 중에 예외가 있습니까? 제발 그것을 게시하십시오 – SirVaulterScoff

답변

1

나는 지금 그 이유를 발견했다. 그러나 그것은 열심히 노력하지만 즐거움으로 가득 차있다.

ExecutionHandler를 추가하면 메시지가 Runnable 작업으로 래핑되어 ChildExecutor에서 실행됩니다. 중요한 점은 다음과 같습니다. Executor가 거의 종료 될 때 ChildExecutor에 태스크를 추가 한 다음 ChildExecutor에서 태스크를 무시합니다.

3 줄 코드와 몇 가지 덧글을 추가했는데, 최종 코드는 아래처럼 보입니다. 이제 작성자에게 메일을 보내야합니까?:

private final class ChildExecutor implements Executor, Runnable { 
    private final Queue<Runnable> tasks = QueueFactory 
      .createQueue(Runnable.class); 
    private final AtomicBoolean isRunning = new AtomicBoolean(); 

    public void execute(Runnable command) { 
     // TODO: What todo if the add return false ? 
     tasks.add(command); 

     if (!isRunning.get()) { 
      doUnorderedExecute(this); 
     } else { 
     } 
    } 

    public void run() { 
     // check if its already running by using CAS. If so just return 
     // here. So in the worst case the thread 
     // is executed and do nothing 
     boolean acquired = false; 
     if (isRunning.compareAndSet(false, true)) { 
      acquired = true; 
      try { 
       Thread thread = Thread.currentThread(); 
       for (;;) { 
        final Runnable task = tasks.poll(); 
        // if the task is null we should exit the loop 
        if (task == null) { 
         break; 
        } 

        boolean ran = false; 
        beforeExecute(thread, task); 
        try { 
         task.run(); 
         ran = true; 
         onAfterExecute(task, null); 
        } catch (RuntimeException e) { 
         if (!ran) { 
          onAfterExecute(task, e); 
         } 
         throw e; 
        } 
       } 
       //TODO NOTE (I added): between here and "isRunning.set(false)",some new tasks maybe added. 
      } finally { 
       // set it back to not running 
       isRunning.set(false); 
      } 
     } 

     //TODO NOTE (I added): Do the remaining works. 
     if (acquired && !isRunning.get() && tasks.peek() != null) { 
      doUnorderedExecute(this); 
     } 
    } 
} 
1

이것은 버그이며 3.4.0.Alpha2에서 수정 될 예정입니다.

은 교수형 때 교수형 클라이언트/서버의 스레드 스택을 게시하시기 바랍니다 수 https://github.com/netty/netty/issues/234

관련 문제