2016-07-09 4 views
1

아래의 코드가 작동하고 예정된 시간에 메시지를 보냅니다. 그러나 타이머가 예약 된 작업을 실행할 때마다 새 소켓을 여는 것은 좋은 해결책이 아닌 것 같습니다. 내가 원하는 것은 run 메서드에서 socket을 한 번만 열고 클래스의 새 인스턴스가 timer에 만들어 질 때마다 SendMessage 클래스에 액세스하는 것입니다. 그런 식으로 작동하지 않습니다, 그것은 단지 하나의 메시지를 보낸 다음 보내는 것을 중지합니다. 또한 코드 비평가 나 스레드 안전을위한 팁에 대해 기뻐할 것입니다.소켓이 한 번만 메시지를 보냅니다.

public class Client implements Runnable{ 

// Client Constructor here 

@Override 
public void run(){ 
    //SENDS ONLY ONE MESSAGE 
    pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

    Timer timer = new Timer(); 

    timer.schedule(new SendMessage(), 0, 1000/mps); 
} 

private class SendMessage extends TimerTask{ 

    private int id; 

    @Override 
    public void run() { 

     try 
      { // THIS WORKS FINE, SENDS MESSAGES AT SCHEDULED TIME      
      pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

      OutputStream outToServer = pitcherSocket.getOutputStream(); 

      DataOutputStream out = new DataOutputStream(outToServer); 

      out.writeInt(id); 

      out.flush(); 

      }catch(IOException e) 
      { 
      e.printStackTrace(); 
      } 
     } 
    } 
} 

편집 : 전체가 코드

CLIENT

import java.io.DataInputStream; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.net.InetAddress; 
import java.net.Socket; 
import java.net.UnknownHostException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.Timer; 
import java.util.TimerTask; 

public class Pitcher implements Runnable{ 

private int port; 
private int mps; 
private int size; 
private String hostname; 
private List<Integer> messageIds = Collections.synchronizedList(new  ArrayList<Integer>()); 
private Socket pitcherSocket; 

//constatns, integer is 4 bytes, long is 8 bytes 
private static final int INT_SIZE = 4; 
private static final int LONG_SIZE = 8; 


public Pitcher(int port, int mps, int size, String hostname) { 

    this.port = port; 
    this.mps = mps; 
    this.size = size; 
    this.hostname = hostname; 
} 

@Override 
public void run(){ 

    System.out.println("Pitcher running..."); 
    System.out.println(); 

    Timer timer = new Timer(); 

    timer.schedule(new SendMessage(), 0, 1000/mps); 

    timer.schedule(new DisplayStatistics(), 0, 1000/mps); 

} 

//Nested class that sends messages 
private class SendMessage extends TimerTask{ 

    private int numberOfSentMessages = 0; 
    private int id; 

    @Override 
    public void run() { 

     try {       
      pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

      OutputStream outToServer = pitcherSocket.getOutputStream(); 

      DataOutputStream out = new DataOutputStream(outToServer); 

      //send message size 
      out.writeInt(size); 

      //message id is same as number of the sent message 
      id = numberOfSentMessages + 1; 
      out.writeInt(id); 
      messageIds.add(id); 



      //get system timestamp 
      long currentTimestamp = System.currentTimeMillis(); 
      out.writeLong(currentTimestamp); 

      //fill in the rest- 
      byte[] rest = new byte[size - 2 * INT_SIZE - LONG_SIZE];  //message size(default 300 bytes) - size(4 bytes) - message id(4 bytse) - timestamp(8 bytes) 
      out.write(rest); 

      out.flush(); 

      numberOfSentMessages++; 


      InputStream inFromServer = pitcherSocket.getInputStream(); 
      DataInputStream in = new DataInputStream(inFromServer); 

      Integer catcherMessageSize = in.readInt(); 
      Integer catcherId = in.readInt(); 
      long catcherTimestamp = in.readLong(); 

      System.out.println("Sent message:  " + size + " " + id + " " + currentTimestamp + "..."); 
      System.out.println("Received message: " + catcherMessageSize + " " + catcherId + " " + catcherTimestamp + "..."); 
      System.out.println(); 

      }catch(IOException e) 
      { 
      e.printStackTrace(); 
      } 

    } 

} 

} 

서버에

import java.io.DataInputStream; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.net.InetAddress; 
import java.net.ServerSocket; 
import java.net.Socket; 
import java.net.SocketTimeoutException; 

public class Catcher implements Runnable{ 

private int port; 
private String bind; 
private ServerSocket serverSocket; 

//constatns, integer is 4 bytes, long is 8 bytes 
private static final int INT_SIZE = 4; 
private static final int LONG_SIZE = 8; 

public Catcher(int port, String bind) { 

    this.port = port; 
    this.bind = bind; 
} 

@Override 
public void run() { 

    System.out.println("Catcher running..."); 
    System.out.println(); 

    try { 
     serverSocket = new ServerSocket(port, 100, InetAddress.getByName(bind)); 
    } 
    catch (IOException e1) { 
     e1.printStackTrace(); 
    } 

    while(true){ 

     try 
     {    
      Socket server = serverSocket.accept(); 

      DataInputStream in = new DataInputStream(server.getInputStream()); 

      Integer pitcherMessageSize = in.readInt(); 
      Integer pitcherId = in.readInt(); 
      long pitcherTimestamp = in.readLong(); 

      DataOutputStream out = new DataOutputStream(server.getOutputStream()); 

      //message id and size are sent back 
      out.writeInt(pitcherMessageSize); 
      out.writeInt(pitcherId); 

      //send back current time 
      long currentTimestamp = System.currentTimeMillis(); 
      out.writeLong(currentTimestamp); 

      //fill in the rest 
      byte[] rest = new byte[pitcherMessageSize - 2 * INT_SIZE - LONG_SIZE]; //message size(default 300 bytes) - size(4 bytes) - message id(4 bytes) - timestamp(8 bytes) 
      out.write(rest); 

      out.flush(); 

      System.out.println("Received message: " + pitcherMessageSize + " " + pitcherId + " " + pitcherTimestamp + "..."); 
      System.out.println("Sent message:  " + pitcherMessageSize + " " + pitcherId + " " + currentTimestamp + "..."); 
      System.out.println(); 

      //server.close(); 

     } 
     catch(SocketTimeoutException s){ 
      System.out.println("Socket timed out!"); 
      break; 
     } 
     catch(IOException e){ 
      e.printStackTrace(); 
      break; 
     } 
     } 
} 
} 
+0

예외가 표시됩니까? 전송을 중단하면 프로그램이 끝나거나 방금 매달려 있습니까? – niceman

+0

예외는 없지만 서버는 첫 번째 ID를 수신하고 계속 청취합니다. 클라이언트 타이머는 여전히 SendMessage 클래스를 실행하고 보내기를 시도하지만 nothind는 첫 번째 반복 이후에 전송됩니다. – asdf

+0

내가 보는 것으로부터, 당신은 예정대로'SendMessage'를 생성하고 실행하고 있습니다. 'SendMessage'는 그것이 생성 될 때마다'Socket'을 통해 재접속을 시도합니다. –

답변

-2

자바 소켓 클래스는 스레드로부터 안전하지 않습니다. 여러 스레드가 동일한 Socket 객체에 액세스하도록하려면 해당 동작을 동기화해야합니다. 이것은 모든 SendMessage 쓰레드에 잠금 기능을하는 공통 객체를 제공함으로써 가능합니다. 사용하려는 각 소켓 작업 (예 : 읽기 및 쓰기)에 대해 객체가 필요합니다. 그런 다음 Socket 객체를 개별 메소드로 호출하고 해당 객체를 동기화하는 모든 액션을 리팩터링합니다. 예 : 읽기 작업을 위해 SendMessage 내에 read()라는 메서드가있을 수 있습니다.이 메서드는 Socket.read를 호출하고이 메서드를 잠금 개체 주위에서 읽을 수 있도록 동기화합니다.

private class SendMessage extends TimerTask{ 

    private Object readLock; 
    private Socket socket; 

    public SendMessage(Object readLock, Socket socket) { 
     this.readLock = readLock; 
     this.socket = socket; 
    } 

    public void readFromSocket() { 
     synchronized(readLock) { 
       socket.read(); 
     } 
    } 

    @Override 
    public void run() { 
     readFromSocket(); 
     // do other stuff 
    } 

} 
+0

이 문제의 원인을 설명하지 않습니다. –

+0

여러 스레드에서 동일한 소켓 개체를 사용하는 경우에만 문제가 발생합니다. 따라서 소켓이 스레드로부터 안전하지 않기 때문에 가능성이 큽니다. 문제의 원인이라고 생각하지 않는 이유에 대해 자세히 설명해주십시오. – Soggiorno

+0

관련 : http://stackoverflow.com/questions/13545578/is-java-socket-multi-thread-safe – Soggiorno

0

소켓 및 SendMessage의 DataOutputStream 멤버 변수를 작성하는 것에 대해 생각해 보셨습니까? 이것은 당신에게 거친 시작을주는 몇 가지 코드입니다. 당신은 아마 내가 생각 전체 코드를 볼 수있는 후 ... 소켓 열고 현재의가 닫혀있는 경우 새로 만들 수있는 여부를 확인 같은

private class SendMessage extends TimerTask { 
    private int id = 10; 
    private Socket pitchSocket; 
    private DataOutputStream out; 

    public SendMessage(Socket socket) { 
     this.pitchSocket = socket; 
     try{ 
      out = new DataOutputStream(pitchSocket.getOutputStream()); 
     } catch(IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void run() { 
     try { 
      out.writeInt(id); 
      out.flush(); 
     } catch(IOException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
+0

여전히 동일하게 작동합니다./ – asdf

+0

내가 원래 게시물 인 것을 잘못 읽었을 수도 있습니다. 둘 이상의 SendMessenger를 만들고 있습니까? – CodeJockNYC

0

몇 가지 향상된 기능을 넣을 것 클라이언트 측보다는 서버 측에서 많이 쓰고 있다고 생각하지만 스레딩 문제는 분명히 있습니다. 서버가 단일 스레드입니다. 즉 한 번에 하나의 요청 만 처리 할 수 ​​있습니다. 다중 스레드 서버가 필요합니다. 필자는 코드를 리팩토링하여 멀티 스레드 된 Catcher 예제를 작성했습니다. Thead 클래스를 사용하여이 모든 작업을 수행합니다.이 작업은 약간 구식 일 수 있습니다. java.util.concurrent를 살펴 보길 원할 것입니다. 그들은 아마도 최신 버전을 가지고있을 것입니다.

package clientserver; 

import java.io.DataInputStream; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.net.InetAddress; 
import java.net.ServerSocket; 
import java.net.Socket; 
import java.net.SocketTimeoutException; 

public class Catcher implements Runnable{ 

private int port; 
private String bind; 
private ServerSocket serverSocket; 



public Catcher(int port, String bind) { 

    this.port = port; 
    this.bind = bind; 
} 

@Override 
public void run() { 

    System.out.println("Catcher running..."); 
    System.out.println(); 

    try { 
     serverSocket = new ServerSocket(port, 100, InetAddress.getByName(bind)); 
    } 
    catch (IOException e1) { 
     e1.printStackTrace(); 
    } 

    while(true){ 
     try 
     {    
      new Thread(new CatcherHandler(serverSocket.accept())).start(); 
      Thread.sleep(1000); 

     } 
     catch(SocketTimeoutException s){ 
      System.out.println("Socket timed out!"); 
      break; 
     } 
     catch(IOException e){ 
      e.printStackTrace(); 
      break; 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

public static void main(String[] argv){ 
    new Thread(new Catcher(8093, "localhost")).start();; 

} 
} 

class CatcherHandler implements Runnable{ 
    Socket server; 
    DataOutputStream out; 
    DataInputStream in; 

    private static final int INT_SIZE = 4; 
    private static final int LONG_SIZE = 8; 

    public CatcherHandler(Socket server) { 
     super(); 
     this.server = server; 
     try { 
      in = new DataInputStream(server.getInputStream()); 
      out = new DataOutputStream(server.getOutputStream()); 

     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 



    @Override 
    public void run() { 
     try{ 
      if(in.available() > 0){ 

       Integer pitcherMessageSize = in.readInt(); 
       Integer pitcherId = in.readInt(); 
       long pitcherTimestamp = in.readLong(); 

       //message id and size are sent back 
       out.writeInt(pitcherMessageSize); 
       out.writeInt(pitcherId); 

       //send back current time 
       long currentTimestamp = System.currentTimeMillis(); 
       out.writeLong(currentTimestamp); 

       //fill in the rest 
       byte[] rest = new byte[pitcherMessageSize - 2 * INT_SIZE - LONG_SIZE]; //message size(default 300 bytes) - size(4 bytes) - message id(4 bytes) - timestamp(8 bytes) 
       out.write(rest); 

       out.flush(); 

       System.out.println("Received message: " + pitcherMessageSize + " " + pitcherId + " " + pitcherTimestamp + "..."); 
       System.out.println("Sent message:  " + pitcherMessageSize + " " + pitcherId + " " + currentTimestamp + "..."); 
       System.out.println(); 
       Thread.sleep(1000); 

      } 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     }finally{} 
     //server.close(); 

    } 
} 

또한 클라이언트를 리팩토링하여 하나의 소켓을 사용하고 안전하게 밟을 수 있도록했습니다. 이제 SendMessage는 DataInputStream과 DataOutputSteam을 인수로 취합니다.

import java.io.DataInputStream; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.net.InetAddress; 
import java.net.Socket; 
import java.net.UnknownHostException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.Timer; 
import java.util.TimerTask; 

public class Pitcher implements Runnable{ 

private int port; 
private int mps; 
private int size; 
private String hostname; 
private List<Integer> messageIds = Collections.synchronizedList(new  ArrayList<Integer>()); 
private Socket pitcherSocket; 
private DataOutputStream out; 
private DataInputStream in; 

//constatns, integer is 4 bytes, long is 8 bytes 
private static final int INT_SIZE = 4; 
private static final int LONG_SIZE = 8; 


public Pitcher(int port, int mps, int size, String hostname) { 

    this.port = port; 
    this.mps = mps; 
    this.size = size; 
    this.hostname = hostname; 



    try { 
     this.pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 
     out = new DataOutputStream(pitcherSocket.getOutputStream()); 
     in = new DataInputStream(pitcherSocket.getInputStream()); 
    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 


} 

public static void main(String[] argv) throws Exception{ 
    for(int i = 0; i < 10; i++){ 
     new Thread(new Pitcher(8093, 1, 200, "localhost")).start(); 
     Thread.sleep(1000); 
    } 

    Thread.sleep(10000); 
} 

@Override 
public void run(){ 

    System.out.println("Pitcher running..."); 
    System.out.println(); 

    Timer timer = new Timer(); 

    timer.schedule(new SendMessage(out, in), 0, 1000); 

    //timer.schedule(new DisplayStatistics(), 0, 1000); 

} 

//Nested class that sends messages 
private class SendMessage extends TimerTask{ 

    private int numberOfSentMessages = 0; 
    private int id; 
    private DataOutputStream out; 
    private DataInputStream in; 

    public SendMessage(DataOutputStream out, DataInputStream in){ 
     this.out = out; 
     this.in = in; 
    } 

    @Override 
    public void run() { 

     try {       
      long currentTimestamp = 0L; 
      synchronized(out){ 
       //send message size 
       out.writeInt(size); 

       //message id is same as number of the sent message 
       id = numberOfSentMessages + 1; 
       out.writeInt(id); 
       messageIds.add(id); 



       //get system timestamp 
       currentTimestamp = System.currentTimeMillis(); 
       out.writeLong(currentTimestamp); 

       //fill in the rest- 
       byte[] rest = new byte[size - 2 * INT_SIZE - LONG_SIZE];  //message size(default 300 bytes) - size(4 bytes) - message id(4 bytse) - timestamp(8 bytes) 
       out.write(rest); 

       out.flush(); 
      } 
      numberOfSentMessages++; 

      long catcherTimestamp = 0L; 
      Integer catcherMessageSize; 
      Integer catcherId; 
      synchronized(in){ 
       catcherMessageSize = in.readInt(); 
       catcherId = in.readInt(); 
       catcherTimestamp = in.readLong(); 
      } 
      System.out.println("Sent message:  " + size + " " + id + " " + currentTimestamp + "..."); 
      System.out.println("Received message: " + catcherMessageSize + " " + catcherId + " " + catcherTimestamp + "..."); 
      System.out.println(); 
      Thread.sleep(1000); 

      }catch(IOException e) 
      { 
      e.printStackTrace(); 
      } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 

} 

} 
+0

대단히 감사합니다! 내일이 코드를 구현하고 모든 것이 괜찮은지 알려 드리겠습니다. 왜 thread.sleep (X)를 사용하고 있는지 말해 주실 수 있습니까? 나는 새로운 스레드입니다. – asdf

+0

예, 다른 스레드가 실행될 수 있도록 절전 모드를 사용하고 있습니다. 기본적으로 sleep 메소드를 호출하면 스케줄러가 다른 스레드를 실행할 수 있습니다. – CodeJockNYC

+0

다른 문제가 있습니다. 투수가 메시지를 보내면 캐처 System.currentTimeMillis(); 투수보다 작습니다. – asdf

관련 문제