2012-05-23 3 views
5

현재 서버에 스트리밍을 계속하기 위해 일대 다 연결을 열어주는 응용 프로그램에 java.nio.channel.Selectors & SocketChannels를 사용하고 있습니다. StreamWriteWorker - SocketChannel에 쓰기 작업을 수행하고, StreamReadWorker - 버퍼에서 바이트를 읽고 내용을 파싱하고, StreamTaskDispatcher - readyOps에 대한 Selector의 선택을 수행하고 작업자 스레드에 대한 새로운 실행 파일을 전달합니다.java.nio Selectors and SocketChannel for continue 스트리밍

문제점 - 선택기의 선택 메소드에 대한 호출은 첫 번째 호출에서 값> 0 (유효한 readyOps) 만 리턴합니다. 한 번에 모든 준비 채널에서 데이터를 쓰고 보낼 수 있지만 Selector의 선택 메소드 호출은 모두 0을 반환합니다.

질문 : 모든 읽기 후에는 SocketChannel에서 close를 호출해야합니까?/쓴다 (나는 희망하지 않는다!)? 그렇지 않은 경우 SocketChannels이 읽기/쓰기 작업에 사용할 수없는 원인이 무엇입니까?

죄송합니다. 코드를 게시 할 수 없지만 도움이 될만한 명확한 설명이 되었기를 바랍니다. 나는 답변을 찾았지만 닫은 후에 SocketChannel 연결을 재사용 할 수는 없다는 것을 알았지 만, 내 채널이 가까이 있으면 안되며 서버는 EOF 스트림 결과를받지 못합니다.

나는 약간의 진전을 이루었고 json 구문 분석 오류로 인해 서버 응용 프로그램에서 쓰기 작업이 발생하지 않는다는 것을 알아 냈습니다. 이제 클라이언트 응용 프로그램 코드의 SocketChannel이 읽기 작업을 처리 한 후에 다른 쓰기 작업을 수행 할 준비가되었습니다. 나는 이것이 SocketChannels의 TCP 성질이라고 생각한다. 그러나 SocketChannel을 서버 응용 프로그램에서 다른 읽기 작업에 사용할 수 없게됩니다. SocketChannels에 대한 정상적인 동작입니까? 읽기 작업 후에 클라이언트 측에서 연결을 닫고 새 연결을 설정해야합니까? 여기

는 내가 뭘하려고 오전의 코드 샘플입니다 :

package org.stream.socket; 

import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.net.ServerSocket; 
import java.nio.ByteBuffer; 
import java.nio.CharBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.nio.charset.CodingErrorAction; 
import java.util.HashMap; 
import java.util.Iterator; 
import java.util.UUID; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 

import org.apache.commons.lang3.RandomStringUtils; 

import com.google.gson.Gson; 
import com.google.gson.JsonElement; 
import com.google.gson.JsonObject; 
import com.google.gson.JsonParser; 
import com.google.gson.JsonPrimitive; 
import com.google.gson.stream.JsonToken; 

public class ClientServerTest { 

    private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>(); 
    private ExecutorService executor = Executors.newFixedThreadPool(1); 
    private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>(); 

    private class StreamWriteTask implements Runnable { 
     private ByteBuffer buffer; 
     private SelectionKey key; 
     private Selector selector; 

     private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) { 
      this.buffer = buffer; 
      this.key = key; 
      this.selector = selector; 
     } 

     @Override 
     public void run() { 
      SocketChannel sc = (SocketChannel) key.channel(); 
      byte[] data = (byte[]) key.attachment(); 
      buffer.clear(); 
      buffer.put(data); 
      buffer.flip(); 
      int results = 0; 
      while (buffer.hasRemaining()) { 
       try { 
        results = sc.write(buffer); 
       } catch (IOException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 

       if (results == 0) { 
        buffer.compact(); 
        buffer.flip(); 
        data = new byte[buffer.remaining()]; 
        buffer.get(data); 
        key.interestOps(SelectionKey.OP_WRITE); 
        key.attach(data); 
        selector.wakeup(); 
        return; 
       } 
      } 

      key.interestOps(SelectionKey.OP_READ); 
      key.attach(null); 
      selector.wakeup(); 
     } 

    } 

    private class StreamReadTask implements Runnable { 
     private ByteBuffer buffer; 
     private SelectionKey key; 
     private Selector selector; 

     private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) { 
      this.buffer = buffer; 
      this.key = key; 
      this.selector = selector; 
     } 

     private boolean checkUUID(byte[] data) { 
      return uuidToSize.containsKey(new String(data)); 
     } 

     @Override 
     public void run() { 
      SocketChannel sc = (SocketChannel) key.channel(); 
      buffer.clear(); 
      byte[] data = (byte[]) key.attachment(); 
      if (data != null) { 
       buffer.put(data); 
      } 
      int count = 0; 
      int readAttempts = 0; 
      try { 
       while ((count = sc.read(buffer)) > 0) { 
        readAttempts++; 
       } 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 

      if (count == 0) { 
       buffer.flip(); 
       data = new byte[buffer.limit()]; 
       buffer.get(data); 
       if (checkUUID(data)) { 
        key.interestOps(SelectionKey.OP_READ); 
        key.attach(data); 
       } else { 
        System.out.println("Clinet Read - uuid ~~~~ " + new String(data)); 
        key.interestOps(SelectionKey.OP_WRITE); 
        key.attach(null); 
       } 
      } 

      if (count == -1) { 
       try { 
        sc.close(); 
       } catch (IOException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 

      selector.wakeup(); 
     } 

    } 

    private class ClientWorker implements Runnable { 

     @Override 
     public void run() { 
      try { 
       Selector selector = Selector.open(); 
       SocketChannel sc = SocketChannel.open(); 
       sc.configureBlocking(false); 
       sc.connect(new InetSocketAddress("127.0.0.1", 9001)); 
       sc.register(selector, SelectionKey.OP_CONNECT); 
       ByteBuffer buffer = ByteBuffer.allocateDirect(65535); 

       while (selector.isOpen()) { 
        int count = selector.select(10); 

        if (count == 0) { 
         continue; 
        } 

        Iterator<SelectionKey> it = selector.selectedKeys().iterator(); 

        while (it.hasNext()) { 
         final SelectionKey key = it.next(); 
         it.remove(); 
         if (!key.isValid()) { 
          continue; 
         } 

         if (key.isConnectable()) { 
          sc = (SocketChannel) key.channel(); 
          if (!sc.finishConnect()) { 
           continue; 
          } 
          sc.register(selector, SelectionKey.OP_WRITE); 
         } 

         if (key.isReadable()) { 
          key.interestOps(0); 
          executor.execute(new StreamReadTask(buffer, key, selector)); 
         } 
         if (key.isWritable()) { 
          key.interestOps(0); 
          if(key.attachment() == null){ 
           key.attach(dataQueue.take()); 
          } 
          executor.execute(new StreamWriteTask(buffer, key, selector)); 
         } 
        } 
       } 
      } catch (IOException ex) { 
       // Handle Exception 
      }catch(InterruptedException ex){ 

      } 

     } 
    } 

    private class ServerWorker implements Runnable { 
     @Override 
     public void run() { 
      try { 
       Selector selector = Selector.open(); 
       ServerSocketChannel ssc = ServerSocketChannel.open(); 
       ServerSocket socket = ssc.socket(); 
       socket.bind(new InetSocketAddress(9001)); 
       ssc.configureBlocking(false); 
       ssc.register(selector, SelectionKey.OP_ACCEPT); 
       ByteBuffer buffer = ByteBuffer.allocateDirect(65535); 
       DataHandler handler = new DataHandler(); 

       while (selector.isOpen()) { 
        int count = selector.select(10); 

        if (count == 0) { 
         continue; 
        } 

        Iterator<SelectionKey> it = selector.selectedKeys().iterator(); 

        while (it.hasNext()) { 
         final SelectionKey key = it.next(); 
         it.remove(); 
         if (!key.isValid()) { 
          continue; 
         } 

         if (key.isAcceptable()) { 
          ssc = (ServerSocketChannel) key.channel(); 
          SocketChannel sc = ssc.accept(); 
          sc.configureBlocking(false); 
          sc.register(selector, SelectionKey.OP_READ); 
         } 
         if (key.isReadable()) { 
          handler.readSocket(buffer, key); 
         } 
         if (key.isWritable()) { 
          handler.writeToSocket(buffer, key); 
         } 
        } 
       } 

      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

    } 

    private class DataHandler { 

     private JsonObject parseData(StringBuilder builder) { 
      if (!builder.toString().endsWith("}")) { 
       return null; 
      } 

      JsonParser parser = new JsonParser(); 
      JsonObject obj = (JsonObject) parser.parse(builder.toString()); 
      return obj; 
     } 

     private void readSocket(ByteBuffer buffer, SelectionKey key) 
       throws IOException { 
      SocketChannel sc = (SocketChannel) key.channel(); 
      buffer.clear(); 
      int count = Integer.MAX_VALUE; 
      int readAttempts = 0; 
      try { 
       while ((count = sc.read(buffer)) > 0) { 
        readAttempts++; 
       } 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 

      if (count == 0) { 
       buffer.flip(); 
       StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key 
         .attachment() : new StringBuilder(); 
       Charset charset = Charset.forName("UTF-8"); 
       CharsetDecoder decoder = charset.newDecoder(); 
       decoder.onMalformedInput(CodingErrorAction.IGNORE); 
       System.out.println(buffer); 
       CharBuffer charBuffer = decoder.decode(buffer); 
       String content = charBuffer.toString(); 
       charBuffer = null; 
       builder.append(content);  
       System.out.println(content); 
       JsonObject obj = parseData(builder); 
       if (obj == null) { 
        key.attach(builder); 
        key.interestOps(SelectionKey.OP_READ); 
       } else { 
        System.out.println("data ~~~~~~~ " + builder.toString()); 
        JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive(); 
        key.attach(uuid.toString().getBytes()); 
        key.interestOps(SelectionKey.OP_WRITE); 
       } 
      } 

      if (count == -1) { 
       key.attach(null); 
       sc.close(); 
      } 
     } 

     private void writeToSocket(ByteBuffer buffer, SelectionKey key) 
       throws IOException { 
      SocketChannel sc = (SocketChannel) key.channel(); 
      byte[] data = (byte[]) key.attachment(); 
      buffer.clear(); 
      buffer.put(data); 
      buffer.flip(); 
      int writeAttempts = 0; 
      while (buffer.hasRemaining()) { 
       int results = sc.write(buffer); 
       writeAttempts++; 
       System.out.println("Write Attempt #" + writeAttempts); 
       if (results == 0) { 
        buffer.compact(); 
        buffer.flip(); 
        data = new byte[buffer.remaining()]; 
        buffer.get(data); 
        key.attach(data); 
        key.interestOps(SelectionKey.OP_WRITE); 
        break; 
       } 
      } 

      key.interestOps(SelectionKey.OP_READ); 
      key.attach(null); 
     } 
    } 

    public ClientServerTest() { 
     for (int index = 0; index < 1000; index++) { 
      JsonObject obj = new JsonObject(); 
      String uuid = UUID.randomUUID().toString(); 
      uuidToSize.put(uuid, uuid.length()); 
      obj.addProperty("uuid", uuid); 
      String data = RandomStringUtils.randomAlphanumeric(10000); 
      obj.addProperty("event", data); 
      dataQueue.add(obj.toString().getBytes()); 
     } 

     Thread serverWorker = new Thread(new ServerWorker()); 
     serverWorker.start(); 

     Thread clientWorker = new Thread(new ClientWorker()); 
     clientWorker.start(); 

    } 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     ClientServerTest test = new ClientServerTest(); 
     for(;;){ 

     } 
    } 

} 
+2

왜이 세 개의 스레드가 필요하다고 생각하십니까? 쓰는 쓰레드는 필요 없으며, NIO가 의도 한 선을 따라 약간의 재구성을하면 읽기 쓰레드도 제거 할 수 있습니다. 멀티 스레딩과 NIO는 실제로 섞이지 않습니다. 멀티 스레딩을 원하면 java.net을 사용하고 입출력을 차단하십시오. – EJP

+0

의견을 보내 주셔서 감사합니다. 시기 적절성이 중요한 요소이므로 3 가지 스레드로 시작했습니다. 나는 Write on Write 또는 그 반대로 쓰기를 원하지 않았고, 나는 많은 양의 데이터로 작업 할 것이다. 내가 진술 한 문제에 관해서 당신은 대응하고 있습니까? –

+0

3 개의 스레드에 동기를 부여하는 문제가 있는지 확실하지 않습니다. 당신은 * non-blocking 모드에있다. * select() 호출을 제외하고 아무것도 대기하지 않을 것이다. 그러나 select()가 0을 반환하면 아무 것도 준비되지 않습니다. 무작위로 채널을 닫으면 변경되지 않습니다. – EJP

답변

4
  1. OP_CONNECT 한 번 finishConnect()을 시도하는 것, 그것이 성공하면 OP_CONNECT 등록을 취소하고 등록 처리하는 올바른 방법 OP_READ 또는 OP_WRITE , 아마 당신이 클라이언트 인 후자. 루핑 및 비 차단 모드에서자는 것은 의미가 없습니다. finishConnect()이 false를 반환하면 OP_CONNECT이 다시 실행됩니다.

  2. !key.isAcceptable(), !key.isReadable()!key.isWriteable()의 처리는 전혀 의미가 없습니다. 키가 허용 가능한 경우 accept()으로 전화하십시오. 읽을 수 있으면 read()으로 전화하십시오. 쓰기가 가능한 경우 write()으로 전화하십시오. 그것만큼이나 간단합니다.

  3. 채널은 소켓 송신 버퍼가 가득 찬 짧은 시간을 제외하고는 거의 항상 쓰기 가능하다는 것을 알아야합니다. 그래서 쓸만한 것이 있으면 OP_WRITE에 등록하십시오. 이후에 후에 쓰기 만하면 0이됩니다. OP_WRITE이 실행되면 쓰기를 다시 시도하고 다른 0이 없으면 OP_WRITE의 등록을 취소하십시오.

  4. ByteBuffer으로는 너무 경제적입니다. 실제로 채널당 하나씩 이 필요합니다. 키 첨부 파일로 저장할 수 있으므로 필요할 때 다시 가져올 수 있습니다.그렇지 않으면 어떤 일이 일어나기도 전에 부분적인 읽기를 축적 할 방법이 없으며 쓰기를 재 시도하는 방법도 없습니다.

+0

좋아요, 그래서 OP_CONNECT가 큰 문제는 아니라고 생각하지만, 저는 그것을 바꿉니다. 직장에서 제 코드를 사용하기 전에 저는 대개 내 연결을 초기화하지만, 당신의 방식으로 시도하고 도움이되는지 확인합니다. 분명히 오타였던 두 번째 총알까지, 나는 샘플을 늦게 보내고 있었다. OP_WRITE가 처음에는 발사되기 때문에 흥미 롭다. 그러나 OP_READ가 발사 될 때까지는 발사되지 않는다. 나는 또한 바이트 []의 전체 내용을 쓰고 있는데 그것은 내 버퍼의 한계를 넘지 않는다. –

+0

마지막으로, 나를 따라 가면 부분적인 읽기 또는 쓰기를 처리 할 수 ​​없다는 것이 확실하지 않습니다. 내 코드가이 문제를 처리하고 있습니다. –

+0

내가 일을하고 나면 당신이 말한 모든 것을 이해하기 시작했습니다. 내 코드는 작동하는 방식대로 작동합니다. 양해 해 주셔서 감사합니다. 공연 및 기억 관리와 관련하여 새로운 질문을 게시 할 것이고, 나는 당신의 지혜를 기대합니다. –