2010-02-24 2 views
2

ObjectInputStream 블록은 생성 된 때 직렬 입력 스트림을 수신 할 때까지 차단합니다. 나는 그것을 통해 소켓을 사용하여 첫 번째 프로그램을 만들려고 시도하고 이것을 발견. 더미 개체를 사용하여 차단하지 않습니다. 코드는 다음과 같습니다.ObjectInputStream :이 블록을 해제하는 정확한 방법입니다.

import java.io.*;      
import java.net.*;      
import java.util.*;      

class Dummy implements Serializable { 
} 

class X_Int implements Serializable { 
    int x; 
} 

class Server { 
     public static void main(String args[]) throws Exception { 
       ServerSocket ss = new ServerSocket(5879); 
       Socket client = ss.accept(); 
       ObjectOutputStream out = new ObjectOutputStream(client.getOutputStream()); 
       out.writeObject(new Dummy()); 
       ObjectInputStream in = new ObjectInputStream(client.getInputStream()); 
       in.readObject(); 
       out.flush(); 
       out.writeObject(new Date()); 
       out.flush(); 
       out.close(); 
     } 
} 

class Client { 
    public static void main(String args[]) throws Exception { 
     Socket server = new Socket("localhost", 5879); 
     ObjectOutputStream out = new ObjectOutputStream(server.getOutputStream()); 
     out.writeObject(new Dummy()); 
     ObjectInputStream in = new ObjectInputStream(server.getInputStream()); 
     in.readObject(); 
     out.flush(); 
     Date d = (Date)in.readObject(); 
     System.out.println(d); 
    } 
} 

올바른 방법입니다. 의견을 부탁합니다.

답변

2

더 좋은 방법은 처음부터 차단의 원인을 없애는 것입니다. 당신이 할 수있는 경우, 대신 양쪽 끝에서 이러한 클래스를 사용

public class ObjInputStream extends ObjectInputStream { 
    /** 
    * @param in 
    * @throws IOException 
    */ 
    public ObjInputStream(InputStream in) throws IOException { 
     super(in); 
    } 

    /* (non-Javadoc) 
    * @see java.io.ObjectInputStream#readStreamHeader() 
    */ 
    @Override 
    protected void readStreamHeader() throws IOException, StreamCorruptedException { 
    } 
} 

public class ObjOutputStream extends ObjectOutputStream { 

    /** 
    * @param out 
    * @throws IOException 
    */ 
    public ObjOutputStream(OutputStream out) throws IOException { 
     super(out); 
    } 

    /* (non-Javadoc) 
    * @see java.io.ObjectOutputStream#writeStreamHeader() 
    */ 
    @Override 
    protected void writeStreamHeader() throws IOException { 
    } 
} 

는 스트림 버전 정보 등을 확인하기 위해 호출되는 기능을 제거합니다.

또한 TCP 패킷을 사용함에 따라 IP 조각화로 인해 상대방에게 개체가 '전체'로 수신되지 않습니다. TCP는 스트림 소켓입니다. 필요한 것은 추가 프레이밍 입출력 클래스입니다. 다행히, 나는 이미이 :)

/** 
* 
*/ 
package objtest; 

import java.io.IOException; 
import java.io.InputStream; 
import java.nio.BufferUnderflowException; 
import java.nio.ByteBuffer; 
import java.util.ArrayDeque; 
import java.util.Queue; 

import kokuks.KokuKS; 

/** 
* UnrealConceptTest - FramedInputStream 
* @version 1.0 
*/ 
public class FramedInputStream extends InputStream { 
    public static final int INITIAL_BUFFER_SIZE = 8 << 1; 

    public static final int FRAME_HEADER_1 = 0xBEEFFACE; 
    public static final int FRAME_HEADER_2 = 0xFACEBEEF; 

    public static final byte[] HEADER_BYTES = new byte[4 * 2]; 
    protected static final byte[] CURR_HEADER_BUFF = new byte[HEADER_BYTES.length]; 

    static { 
     ByteBuffer b = ByteBuffer.allocateDirect(8); 

     b.putInt(FRAME_HEADER_1); 
     b.putInt(FRAME_HEADER_2); 

     ByteBuffer b2 = (ByteBuffer) b.flip(); 

     b2.get(HEADER_BYTES, 0, 4); 
     b2.get(HEADER_BYTES, 3, 4); 
    } 

    protected int  size   = 0; 
    protected int  chain  = 0; 
    protected boolean inFrame  = false; 
    protected boolean readingSize = false; 
    protected int  sizePos  = 0; 

    protected int dbgput = 0; 


    protected ByteBuffer  bb = ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE); 
    protected Queue<ByteBuffer> bbq = new ArrayDeque<ByteBuffer>(); 
    protected ByteBuffer  currBuff = null; 

    protected final boolean  recoverFromError; 

    /** 
    * 
    */ 
    public FramedInputStream(boolean recoverFromError) { 
     this.recoverFromError = recoverFromError; 
    } 

    public FramedInputStream() { 
     this(true); 
    } 

    protected boolean ensureFramebufferCapacity(int min) { 
     int mymin = 1 << min; 

     if (mymin <= bb.capacity()) return false; 

     int num = bb.capacity(); 
     while (num < mymin) num <<= 1; 

     ByteBuffer bb2 = ByteBuffer.allocateDirect(num); 
     // copy old data into new bytebuffer 
     int bb_pos = bb.position(); 
     bb.rewind(); 
     bb2.put(bb); 
     bb = bb2; 

     if (KokuKS.DEBUG_MODE) System.out.println("modified buffer size to: " + num); 

     return true; 
    } 

    /** 
    * @return the recoverFromError 
    */ 
    public boolean isRecoverFromError() { 
     return recoverFromError; 
    } 

    /* (non-Javadoc) 
    * @see java.io.InputStream#read() 
    */ 
    @Override 
    public int read() throws IOException { 
     if (currBuff == null || !currBuff.hasRemaining()) return -1; 

     byte b = currBuff.get(); 
     //System.out.println("data: " + b); 
     return b; 
    } 

    public void putBuffer(ByteBuffer source) { 
     ensureFramebufferCapacity(bb.capacity() + source.remaining()); 

     while (source.hasRemaining()) { 
      putByte(source.get()); 
     } 
    } 

    public boolean checkCompleteFrame() { 
     return !bbq.isEmpty(); 
    } 

    /* (non-Javadoc) 
    * @see java.io.InputStream#available() 
    */ 
    @Override 
    public int available() throws IOException { 
     return currBuff != null ? currBuff.remaining() : 0; 
    } 

    public int read(byte[] data) { 
     if (currBuff == null || !currBuff.hasRemaining()) { 
      return -1; 
     } 

     if (data.length > currBuff.remaining()) { 
      throw new BufferUnderflowException(); 
     } 

     currBuff.get(data); 

     //System.out.println("data: " + new String(data)); 

     return data.length; 
    } 

    public boolean nextFrame() { 
     ByteBuffer bbf = bbq.poll(); 

     if (bbf != null) { 
      /* 
      System.out.println("bbf limit: " + bbf.limit()); 
      System.out.println("bbf pos: " + bbf.position()); 
      System.out.println("bbf data: " + new String(bbf.array())); 
      */ 

      //byte[] data = bbf.array(); 

      //for (int i = 0; i < data.length; i++) { 
      // byte by = data[i]; 
      // System.out.println("b: " + (by > 32 ? new String(new byte[] {by}) : "??") + ", " + by); 
      //}   

      currBuff = ByteBuffer.allocateDirect(bbf.limit()); 
      currBuff.put(bbf).flip(); 
      bbf.rewind(); 

      /* 
      System.out.println("currbuf limit: " + currBuff.limit()); 
      System.out.println("currbuf pos: " + currBuff.position()); 
      System.out.println("currbuf data: " + new String(currBuff.array())); 
      */ 

      currBuff.rewind(); 
      currBuff.position(1); 

      return true; 
     } 

     return false; 
    } 


    public void putByte(byte b) { 
     //System.out.println("pb b: " + ObjTest.getByteStr(b)); 

     if (recoverFromError || !inFrame) { 
      if (b == HEADER_BYTES[chain++]) { 

       if (chain >= (HEADER_BYTES.length)) { 
        if (KokuKS.DEBUG_MODE) System.out.println("got header!" + (inFrame ? " (recovered)" : "")); 

        // we have a header! hurrah. 
        inFrame = true; 
        sizePos = 0; 
        size = 0; 
        readingSize = true; 
        chain = 0; 

        bb.clear(); 
       } 
      } else { 
       chain = 0; 
      } 
     } 

     if (inFrame) { 
      if (readingSize) { 
       size += (b & 0xFF) << ((8 * 3) - (8 * sizePos)); 
       //System.out.println("new size: " + size); 
       sizePos++; 

       if (sizePos >= 4) { 
        // we've read the size :) 
        readingSize = false; 
        sizePos = 0; 

        ensureFramebufferCapacity(size); 
        bb.clear(); 
        bb.limit(size); // set buffer limit to size 
        //System.out.println("bb limit set to: " + bb.limit()); 
       } 
      } else { 
       //System.out.println("put: " + dbgput++ + ", " + ObjTest.getByteStr(b)); 
       bb.put(b); 

       if (!bb.hasRemaining()) { 
        bb.flip(); 

        //System.out.println("bb limit after flip(): " + bb.limit()); 

        //System.out.println("bblimit: " + bb.limit()); 

        ByteBuffer newbuf = ByteBuffer.allocateDirect(bb.limit()); 
        newbuf.put(bb).flip(); //we have to flip this 
        bbq.offer(newbuf); 

        //byte[] data = newbuf.array(); 

        //for (int i = 0; i < newbuf.limit(); i++) { 
        // byte by = data[i]; 
        // System.out.println("b: " + (by > 32 ? new String(new byte[] {by}) : "??") + ", " + by); 
        //} 

        inFrame = false; 
        readingSize = false; 
        size = 0; 
        sizePos = 0; 
        chain = 0; 

        bb.clear(); 

        if (KokuKS.DEBUG_MODE) System.out.println("FIS: complete object"); 
        //System.out.println("FIS: newbuf: " + new String(newbuf.array(), 0, newbuf.limit())); 
       } 
      } 
     } 
    } 
} 

/** 
* 
*/ 
package objtest; 

import java.io.IOException; 
import java.nio.ByteBuffer; 

import koku.util.io.ByteBufferOutputStream; 

/** 
* UnrealConceptTest - FramedOutputStream 
* @version 1.0 
* @author Chris Dennett 
*/ 
public class FramedOutputStream extends ByteBufferOutputStream { 
    public static final int FRAME_HEADER_1 = 0xBEEFFACE; 
    public static final int FRAME_HEADER_2 = 0xFACEBEEF; 

    public static final byte[] HEADER_BYTES = new byte[4 * 2]; 
    public static final byte[] CURR_HEADER_BUFF = new byte[HEADER_BYTES.length]; 

    /* We pad the beginning of our buffer so that we can write the frame 
    * length when the time comes. */ 
    protected static final byte[] SIZE_PAD = new byte[4]; 

    static { 
     ByteBuffer b = ByteBuffer.allocate(8); 

     b.putInt(FRAME_HEADER_1); 
     b.putInt(FRAME_HEADER_2); 

     ByteBuffer b2 = (ByteBuffer) b.flip(); 

     b2.get(HEADER_BYTES, 0, 4); 
     b2.get(HEADER_BYTES, 3, 4); 
    } 

    /** 
    * 
    */ 
    public FramedOutputStream() { 
     try { 
      write(HEADER_BYTES); 
      write(SIZE_PAD); 
     } catch (IOException e) { 
      System.out.println("Couldn't write header padding!"); 
     } 
    } 

    /* (non-Javadoc) 
    * @see koku.util.io.ByteBufferOutputStream#flip() 
    */ 
    @Override 
    public ByteBuffer flip() { 
     // flip the buffer which will limit it to it's current position 
     super.flip(); 

     // then write the frame length and rewind back to the start of the 
     // buffer so that all the data is available   
     _buffer.position(11); 
     int size = _buffer.remaining(); 

     //System.out.println("remaining after complete header: " + size); 

     _buffer.position(7); 

     //System.out.println("remaining after frameheader: " + _buffer.remaining()); 

     putSizeAsBytes(size, _buffer); 

     //System.out.println("written size: " + size); 

     // System.out.println("buffer limit: " + _buffer.limit()); 

     //System.out.println("_buffer: " + new String(_buffer.array(), 0, _buffer.limit())); 

     _buffer.position(11); 

     // System.out.println("_buffer11: " + ObjTest.getByteStr(_buffer.get())); 
     //System.out.println("_buffer12: " + ObjTest.getByteStr(_buffer.get())); 
     //System.out.println("_buffer13: " + ObjTest.getByteStr(_buffer.get())); 
     //System.out.println("_buffer14: " + ObjTest.getByteStr(_buffer.get())); 

     _buffer.rewind(); 

     //_buffer.rewind(); 

     //while (_buffer.hasRemaining()) { 
     // byte b = _buffer.get(); 
     // System.out.println("b: " + (b > 32 ? new String(new byte[] {b}) : "??") + ", " + b); 
     //} 

     _buffer.rewind(); 

     return _buffer; 
    } 

    /* (non-Javadoc) 
    * @see koku.util.io.ByteBufferOutputStream#reset() 
    */ 
    @Override 
    public void reset() { 
     super.reset(); 

     try { 
      write(HEADER_BYTES); 
      write(SIZE_PAD); 
     } catch (IOException e) { 
      System.out.println("Couldn't write header padding!"); 
     } 
    } 

    public static void putSizeAsBytes(int size, ByteBuffer bb) { 
     //System.out.println("putSizeAsBytes: given size: " + size); 

     // encode 
     for (int i = 0; i < 4; i++) { 
      bb.put((byte)((size >>> ((8 * 3) - (8 * i))) & 0xFF)); 
     } 
    } 
} 

BBOS 코딩 :

// 
// $Id: ByteBufferOutputStream.java 5829 2009-06-20 21:09:34Z mdb $ 
// 
// Narya library - tools for developing networked games 
// Copyright (C) 2002-2009 Three Rings Design, Inc., All Rights Reserved 
// http://www.threerings.net/code/narya/ 
// 
// This library is free software; you can redistribute it and/or modify it 
// under the terms of the GNU Lesser General Public License as published 
// by the Free Software Foundation; either version 2.1 of the License, or 
// (at your option) any later version. 
// 
// This library is distributed in the hope that it will be useful, 
// but WITHOUT ANY WARRANTY; without even the implied warranty of 
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
// Lesser General Public License for more details. 
// 
// You should have received a copy of the GNU Lesser General Public 
// License along with this library; if not, write to the Free Software 
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 

package misc; 

import java.io.OutputStream; 
import java.nio.BufferOverflowException; 
import java.nio.ByteBuffer; 

import org.apache.mina.core.buffer.IoBuffer; 

/** 
* Stores output in an {@link ByteBuffer} that grows automatically to accommodate the data. 
*/ 
public class ByteBufferOutputStream extends OutputStream 
{ 
    /** 
    * Creates a new byte buffer output stream. 
    */ 
    public ByteBufferOutputStream() 
    { 
     _buffer = IoBuffer.allocate(INITIAL_BUFFER_SIZE); 
    } 

    /** 
    * Returns a reference to the underlying buffer. 
    */ 
    public IoBuffer getBuffer() 
    { 
     return _buffer; 
    } 

    /** 
    * Flips and returns the buffer. The returned buffer will have a position of zero and a limit 
    * equal to the number of bytes written. Call {@link #reset} to reset the buffer before 
    * writing again. 
    */ 
    public IoBuffer flip() 
    { 
     return _buffer.flip(); 
    } 

    /** 
    * Resets our internal buffer. 
    */ 
    public void reset() 
    { 
     _buffer.clear(); 
    } 

    @Override // documentation inherited 
    public void write (int b) 
    { 
     try { 
      _buffer.put((byte)b); 
     } catch (BufferOverflowException boe) { 
      expand(1); 
      _buffer.put((byte)b); 
     } 
    } 

    @Override // documentation inherited 
    public void write (byte[] b, int off, int len) 
    { 
     // sanity check the arguments 
     if ((off < 0) || (off > b.length) || (len < 0) || 
      ((off + len) > b.length) || ((off + len) < 0)) { 
      throw new IndexOutOfBoundsException(); 
     } else if (len == 0) { 
      return; 
     } 

     try { 
      _buffer.put(b, off, len); 
     } catch (BufferOverflowException boe) { 
      expand(len); 
      _buffer.put(b, off, len); 
     } 
    } 

    /** 
    * Expands our buffer to accomodate the specified capacity. 
    */ 
    protected final void expand (int needed) 
    { 
     _buffer.expand(needed); 
    } 

    /** The buffer in which we store our frame data. */ 
    protected IoBuffer _buffer; 

    /** The default initial size of the internal buffer. */ 
    protected static final int INITIAL_BUFFER_SIZE = 32; 
} 
0

당신은 단지 객체 입력 스트림을 작성하기 전에) (출력을 세척해야합니다. 더미 개체를 보낼 필요가 없습니다.

관련 문제