간단한 라우팅 응용 프로그램을 작성 중입니다. 그 생각은 x 시간 동안 지속되는 일시적인 클라이언트 연결을받는 서버 또는 원본 노드가 있다는 것입니다. 수신 된 메시지는 디코딩 된 다음 해당 메시지의 세부 사항에 따라 이미 열려있는 해당 싱크 노드 또는 클라이언트로 전송됩니다. Router 클래스는 메시지의 대상을 필터링하고 제거 할 수 있도록 모든 채널과 시도를 맵에 저장하도록 등록합니다. 일단 대상을 얻으면 실제 싱크 노드 (구성에 따라 일시적인 특성이있을 수 있음)를 선택하고 해당 채널로 데이터를 보내 응답을 기다린 다음 다시 원래의 보낸 사람에게 보내야합니다. netty를 사용하여 구현 한 것이 올바른 방향인지 먼저 알고 싶습니다. 그리고 어떤 서버에서받은 메시지를 어떻게 통과하여 클라이언트로 보내고 원래의 소스 노드에 응답 할 수 있습니까?Netty의 채널간에 데이터를 전달하는 방법은 무엇입니까?
다음은 내 소스 코드입니다. 다음과 같은 정보를 제공해야합니다. 설명에 코드 예제를 사용하십시오.
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/*
* @author Kimathi
*/
public class Service {
private Nodes nodes;
public void start(){
nodes = new Nodes();
nodes.addSourceNodes(new SourceNodes()).
addSinkNodes(new SinkNodes()).
addConfigurations(new Configurations()).
boot();
}
public void stop(){
nodes.stop();
}
public static void main(String [] args){
new Service().start();
}
}
class Nodes {
private SourceNodes sourcenodes;
private SinkNodes sinknodes ;
private Configurations configurations;
public Nodes addConfigurations(Configurations configurations){
this.configurations = configurations;
return this;
}
public Nodes addSourceNodes(SourceNodes sourcenodes){
this.sourcenodes = sourcenodes;
return this;
}
public Nodes addSinkNodes(SinkNodes sinknodes){
this.sinknodes = sinknodes;
return this;
}
public void boot(){
Router router = new Router(configurations);
sourcenodes.addPort(8000).
addPort(8001).
addPort(8002);
sourcenodes.addRouter(router);
sourcenodes.boot() ;
sinknodes.addRemoteAddress("127.0.0.1", 6000).
addRemoteAddress("127.0.0.1", 6001).
addRemoteAddress("127.0.0.1", 6002);
sinknodes.addRouter(router);
sinknodes.boot();
}
public void stop(){
sourcenodes.stop();
sinknodes.stop();
}
}
final class SourceNodes implements Bootable , Routable {
private List <Integer> ports = new ArrayList();
private ServerBootstrap serverbootstrap;
private Router router;
@Override
public void addRouter(final Router router){
this.router = router;
}
public SourceNodes addPort(int port){
this.ports.add(port);
return this;
}
@Override
public void boot(){
this.initBootStrap();
this.serverbootstrap.setOption("child.tcpNoDelay", true);
this.serverbootstrap.setOption("child.keepAlive", true);
this.serverbootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new SourceHandler(router));
}
});
for(int port:this.ports){
this.serverbootstrap.bind(new InetSocketAddress(port));
}
}
@Override
public void stop(){
this.serverbootstrap.releaseExternalResources();
}
private void initBootStrap(){
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
this.serverbootstrap = new ServerBootstrap(factory);
}
}
final class SinkNodes implements Bootable , Routable {
private List<SinkAddress> addresses= new ArrayList();
private ClientBootstrap clientbootstrap;
private Router router;
@Override
public void addRouter(final Router router){
this.router = router;
}
public SinkNodes addRemoteAddress(String hostAddress,int port){
this.addresses.add(new SinkAddress(hostAddress,port));
return this;
}
@Override
public void boot(){
this.initBootStrap();
this.clientbootstrap.setOption("tcpNoDelay", true);
this.clientbootstrap.setOption("keepAlive", true);
this.clientbootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new SinkHandler(router));
}
});
for(SinkAddress address:this.addresses){
this.clientbootstrap.connect(new InetSocketAddress(address.hostAddress(),address.port()));
}
}
@Override
public void stop(){
this.clientbootstrap.releaseExternalResources();
}
private void initBootStrap(){
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
this.clientbootstrap = new ClientBootstrap(factory);
}
private class SinkAddress {
private final String hostAddress;
private final int port;
public SinkAddress(String hostAddress, int port) {
this.hostAddress = hostAddress;
this.port = port;
}
public String hostAddress() { return this.hostAddress; }
public int port() { return this.port; }
}
}
class SourceHandler extends SimpleChannelHandler {
private Router router;
public SourceHandler(Router router){
this.router = router;
}
@Override
public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {
System.out.println("child is opened");
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("child is closed");
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("Server is opened");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println(e.getCause());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("channel received message");
}
}
class SinkHandler extends SimpleChannelHandler {
private Router router;
public SinkHandler(Router router){
this.router = router;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("Channel is connected");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println(e.getCause());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("channel received message");
}
}
final class Router {
private Configurations configurations;
private Map sourcenodes = new HashMap();
private Map Sinknodes = new HashMap();
public Router(){}
public Router(Configurations configurations){
this.configurations = configurations;
}
public synchronized boolean submitSource(ChannelHandlerContext ctx , MessageEvent e){
boolean responded = false;
return responded;
}
public synchronized boolean submitSink(ChannelHandlerContext ctx , MessageEvent e){
boolean responded = false;
return responded;
}
}
final class Configurations {
public Configurations(){}
}
interface Bootable {
public abstract void boot();
public abstract void stop();
}
interface Routable {
public abstract void addRouter(Router router);
}