2013-05-08 1 views
0

간단한 라우팅 응용 프로그램을 작성 중입니다. 그 생각은 x 시간 동안 지속되는 일시적인 클라이언트 연결을받는 서버 또는 원본 노드가 있다는 것입니다. 수신 된 메시지는 디코딩 된 다음 해당 메시지의 세부 사항에 따라 이미 열려있는 해당 싱크 노드 또는 클라이언트로 전송됩니다. Router 클래스는 메시지의 대상을 필터링하고 제거 할 수 있도록 모든 채널과 시도를 맵에 저장하도록 등록합니다. 일단 대상을 얻으면 실제 싱크 노드 (구성에 따라 일시적인 특성이있을 수 있음)를 선택하고 해당 채널로 데이터를 보내 응답을 기다린 다음 다시 원래의 보낸 사람에게 보내야합니다. netty를 사용하여 구현 한 것이 올바른 방향인지 먼저 알고 싶습니다. 그리고 어떤 서버에서받은 메시지를 어떻게 통과하여 클라이언트로 보내고 원래의 소스 노드에 응답 할 수 있습니까?Netty의 채널간에 데이터를 전달하는 방법은 무엇입니까?

다음은 내 소스 코드입니다. 다음과 같은 정보를 제공해야합니다. 설명에 코드 예제를 사용하십시오.

import java.net.InetSocketAddress; 
    import java.util.ArrayList; 
    import java.util.HashMap; 
    import java.util.List; 
    import java.util.Map; 
    import java.util.concurrent.Executors; 
    import org.jboss.netty.bootstrap.ClientBootstrap; 
    import org.jboss.netty.bootstrap.ServerBootstrap; 
    import org.jboss.netty.channel.ChannelFactory; 
    import org.jboss.netty.channel.ChannelHandlerContext; 
    import org.jboss.netty.channel.ChannelPipeline; 
    import org.jboss.netty.channel.ChannelPipelineFactory; 
    import org.jboss.netty.channel.ChannelStateEvent; 
    import org.jboss.netty.channel.Channels; 
    import org.jboss.netty.channel.ChildChannelStateEvent; 
    import org.jboss.netty.channel.ExceptionEvent; 
    import org.jboss.netty.channel.MessageEvent; 
    import org.jboss.netty.channel.SimpleChannelHandler; 
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 

    /* 
    * @author Kimathi 
    */ 

    public class Service { 

     private Nodes nodes; 

     public void start(){ 

      nodes = new Nodes(); 
      nodes.addSourceNodes(new SourceNodes()). 
        addSinkNodes(new SinkNodes()). 
        addConfigurations(new Configurations()). 
        boot(); 
     } 

     public void stop(){ 

      nodes.stop(); 
     } 

     public static void main(String [] args){ 

      new Service().start(); 
     } 

    } 

    class Nodes { 

     private SourceNodes sourcenodes; 

     private SinkNodes sinknodes ; 

     private Configurations configurations; 

     public Nodes addConfigurations(Configurations configurations){ 

      this.configurations = configurations; 

      return this; 
     } 

     public Nodes addSourceNodes(SourceNodes sourcenodes){ 

      this.sourcenodes = sourcenodes; 

      return this; 
     } 

     public Nodes addSinkNodes(SinkNodes sinknodes){ 

      this.sinknodes = sinknodes; 

      return this; 
     } 

     public void boot(){ 

      Router router = new Router(configurations); 

      sourcenodes.addPort(8000). 
         addPort(8001). 
         addPort(8002); 
      sourcenodes.addRouter(router); 
      sourcenodes.boot() ; 

      sinknodes.addRemoteAddress("127.0.0.1", 6000). 
        addRemoteAddress("127.0.0.1", 6001). 
        addRemoteAddress("127.0.0.1", 6002); 
      sinknodes.addRouter(router); 
      sinknodes.boot(); 

     } 

     public void stop(){ 

      sourcenodes.stop(); 

      sinknodes.stop(); 
     } 

    } 

    final class SourceNodes implements Bootable , Routable { 

     private List <Integer> ports = new ArrayList(); 

     private ServerBootstrap serverbootstrap; 

     private Router router; 

     @Override 
     public void addRouter(final Router router){ 

      this.router = router; 
     } 

     public SourceNodes addPort(int port){ 

      this.ports.add(port); 

      return this; 
     } 

     @Override 
     public void boot(){ 

      this.initBootStrap(); 

      this.serverbootstrap.setOption("child.tcpNoDelay", true); 
      this.serverbootstrap.setOption("child.keepAlive", true); 
      this.serverbootstrap.setPipelineFactory(new ChannelPipelineFactory() { 

       @Override 
       public ChannelPipeline getPipeline() throws Exception { 

        return Channels.pipeline(new SourceHandler(router)); 
       } 
      }); 



      for(int port:this.ports){ 
       this.serverbootstrap.bind(new InetSocketAddress(port)); 
      } 
     } 

     @Override 
     public void stop(){ 

      this.serverbootstrap.releaseExternalResources(); 

     } 

     private void initBootStrap(){ 

      ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); 

      this.serverbootstrap = new ServerBootstrap(factory); 
     } 
    } 

    final class SinkNodes implements Bootable , Routable { 

     private List<SinkAddress> addresses= new ArrayList(); 

     private ClientBootstrap clientbootstrap; 

     private Router router; 

     @Override 
     public void addRouter(final Router router){ 

      this.router = router; 

     } 

     public SinkNodes addRemoteAddress(String hostAddress,int port){ 

      this.addresses.add(new SinkAddress(hostAddress,port)); 

      return this; 
     } 

     @Override 
     public void boot(){ 

      this.initBootStrap(); 

      this.clientbootstrap.setOption("tcpNoDelay", true); 
      this.clientbootstrap.setOption("keepAlive", true); 
      this.clientbootstrap.setPipelineFactory(new ChannelPipelineFactory() { 

       @Override 
       public ChannelPipeline getPipeline() throws Exception { 

        return Channels.pipeline(new SinkHandler(router)); 
       } 
      }); 

      for(SinkAddress address:this.addresses){ 

       this.clientbootstrap.connect(new InetSocketAddress(address.hostAddress(),address.port())); 
      } 
     } 

     @Override 
     public void stop(){ 

      this.clientbootstrap.releaseExternalResources(); 
     } 

     private void initBootStrap(){ 

      ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); 

      this.clientbootstrap = new ClientBootstrap(factory); 
     } 

     private class SinkAddress { 

      private final String hostAddress; 
      private final int port; 

      public SinkAddress(String hostAddress, int port) { 
       this.hostAddress = hostAddress; 
       this.port = port; 
      } 

      public String hostAddress() { return this.hostAddress; } 
      public int port() { return this.port; } 
     } 
    } 

    class SourceHandler extends SimpleChannelHandler { 

     private Router router; 

     public SourceHandler(Router router){ 

      this.router = router; 
     } 

     @Override 
     public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { 

      System.out.println("child is opened"); 
     } 

     @Override 
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 

      System.out.println("child is closed"); 
     } 

     @Override 
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 


       System.out.println("Server is opened"); 

     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 

      System.out.println(e.getCause()); 
     } 

     @Override 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 


      System.out.println("channel received message"); 

     } 
    } 

    class SinkHandler extends SimpleChannelHandler { 

     private Router router; 

     public SinkHandler(Router router){ 

      this.router = router; 
     } 

     @Override 
     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 

      System.out.println("Channel is connected"); 
     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 

      System.out.println(e.getCause()); 
     } 

     @Override 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 

      System.out.println("channel received message"); 

     } 
    } 

    final class Router { 

     private Configurations configurations; 

     private Map sourcenodes = new HashMap(); 

     private Map Sinknodes = new HashMap(); 

     public Router(){} 

     public Router(Configurations configurations){ 

      this.configurations = configurations; 
     } 

     public synchronized boolean submitSource(ChannelHandlerContext ctx , MessageEvent e){ 

      boolean responded = false; 

      return responded; 
     } 

     public synchronized boolean submitSink(ChannelHandlerContext ctx , MessageEvent e){ 

      boolean responded = false; 

      return responded; 
     } 
    } 

    final class Configurations { 

     public Configurations(){} 
    } 

    interface Bootable { 

     public abstract void boot(); 

     public abstract void stop(); 
    } 

    interface Routable { 

     public abstract void addRouter(Router router); 
    } 

답변

0

아이디어가 합리적인 것처럼 보입니다.

소스 채널 처리기는 Channel#write(...)을 사용하여 해당 싱크 채널에 쓸 수 있으며 그 반대의 경우도 마찬가지입니다.

물론 원본 채널과 회신을 연관시키는 방법이 필요하며 가장 좋은 방법은 프로토콜의 특성에 따라 다릅니다. 가능하면 최선의 대안은 메시지의 소스 채널 ID를 싱크 채널 (물론 응답에도 물론)로 인코딩하는 것입니다.

그럴 수 없다면 어쨌든 상관 관계를 유지해야합니다. 싱크 채널 당 FIFO 큐는 응답이 보낸 요청과 쌍을 이루도록 보장되는 경우 적절할 수 있습니다.

관련 문제