2010-04-21 3 views
4

Java에서 멀티 스레드 프로그래밍 & 소켓 프로그래밍이 생소했습니다. 두 스레드를 구현하는 가장 좋은 방법은 무엇인지 알고 싶습니다. 하나는 소켓 수신 용이고 다른 하나는 소켓 전송 용입니다. 내가하려는 일이 터무니없는 소리로 들린다면, pls가 왜 그런지 알려주지! 이 코드는 주로 Sun의 온라인 자습서에서 영감을 얻었습니다. 멀티 캐스트 소켓을 사용하여 멀티 캐스트 그룹과 작업 할 수 있습니다.Java : 멀티 스레딩 및 UDP 소켓 프로그래밍

class Server extends Thread 
{ 

    static protected MulticastSocket socket = null; 
    protected BufferedReader in = null; 
    public InetAddress group; 

    private static class Receive implements Runnable 
    { 

     public void run() 
     { 
      try 
      { 
       byte[] buf = new byte[256]; 
       DatagramPacket pkt = new DatagramPacket(buf,buf.length); 
       socket.receive(pkt); 
       String received = new String(pkt.getData(),0,pkt.getLength()); 
       System.out.println("From [email protected]" + received);   
       Thread.sleep(1000); 
      } 
      catch (IOException e) 
      { 
       System.out.println("Error:"+e); 
      } 
      catch (InterruptedException e) 
      { 
       System.out.println("Error:"+e); 
      } 

     } 

    } 


    public Server() throws IOException 
    { 
     super("server"); 
     socket = new MulticastSocket(4446); 
     group = InetAddress.getByName("239.231.12.3"); 
     socket.joinGroup(group); 
    } 

    public void run() 
    { 

     while(1>0) 
     { 
      try 
      { 
       byte[] buf = new byte[256]; 
       DatagramPacket pkt = new DatagramPacket(buf,buf.length);   
       //String msg = reader.readLine(); 
       String pid = ManagementFactory.getRuntimeMXBean().getName(); 
       buf = pid.getBytes(); 
       pkt = new DatagramPacket(buf,buf.length,group,4446); 
       socket.send(pkt); 
       Thread t = new Thread(new Receive()); 
       t.start(); 

       while(t.isAlive()) 
       { 
        t.join(1000); 
       } 
       sleep(1); 
      } 
      catch (IOException e) 
      { 
       System.out.println("Error:"+e); 
      } 
      catch (InterruptedException e) 
      { 
       System.out.println("Error:"+e); 
      } 

     } 
     //socket.close(); 
    } 

    public static void main(String[] args) throws IOException 
    { 
     new Server().start(); 
     //System.out.println("Hello"); 
    } 

} 
+0

최종 목표는 무엇입니까? – Xailor

+0

@Ravi, 서식을 수정했지만 클래스 이름을 편집해야합니다. 대문자로 시작해야합니다.수업 이름이 소문자로 시작하는 경우 코드를 읽는 것은 고통 스럽습니다. – Kiril

+0

@Xepoch : 최종 목표는 분산 시스템에 특정 프로토콜을 구현하는 것입니다. @Lirik : 클래스 이름을 유감스럽게 생각합니다! 나는 그들을 지금 고쳤다. – Ravi

답변

2

응용 프로그램에서 스레드를 만들고 싶지 않은 것은 어설 거리지 않습니다! 정확히 2 개의 스레드가 필요하지는 않지만 Runnable 인터페이스를 구현하는 2 개의 클래스에 대해 이야기하고 있다고 생각합니다.

Java 1.5 이후로 스레딩 API가 향상되었고 java.lang.Thread를 더 이상 사용하지 않아도됩니다. 간단히 java.util.concurrent.Executor을 만들고 Runnable 인스턴스를 제출할 수 있습니다.

Java Concurrency in Practice은 정확한 문제 (스레드 된 소켓 서버 만들기)를 사용하고 코드를 여러 번 반복하여 수행하는 가장 좋은 방법을 보여줍니다. 무료 샘플 장을 확인하십시오. 나는 여기에 코드를 복사/붙여 넣기하지 않을 것이지만 목록 6.8을 특별히보아야한다.

+0

감사 드류 (Drew) 감사합니다. 즉시 concurrentExecutor를 살펴 보겠습니다. – Ravi

+0

__Careful !!! __ 생성 된 스레드에서 차단 작업을 수행하는 것은 완벽하지만 (잠시 동안 만 차단할 것입니다), 그렇게하는 것이 치명적일 수 있습니다 java.util.concurrent.Executor '에게 건네지는 Runnable 인스턴스 왜? Executor가 다른 스레드에서 코드를 실행하도록 보장하지 않기 때문입니다. 호출 스레드에서 코드를 실행할 수도 있습니다. 문서에서 : "_ 그러나 Executor 인터페이스는 엄격하게 실행을 비동기로 요구하지 않습니다 ._" 따라서 주 스레드를 그런 식으로 막을 수 있으며 전체 프로그램을 쉽게 잠글 수 있습니다. – Mecki

+0

구현이 중요하다는 것이 좋습니다. 동기 impl의 예제. [Spring의 SyncTaskExecutor] (http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/core/task/SyncTaskExecutor.html) – Drew

9

우선은 처음이다 : 수업은 Java Naming Conventions 당 대문자로 시작해야합니다

클래스 이름은 명사이어야한다, 혼합 된 경우 각 내부 단어가 대문자로 의 첫 번째 문자로. 클래스 이름을 간단하게 유지하고 을 (를) 기술적 인 것으로 유지하십시오. 전체 단어 사용 - 약어가 URL 또는 HTML과 같이 긴 형식보다 훨씬 더 많이 사용되는 경우가 아니면 약어 및 약어를 사용하지 마십시오 ( 약어가 URL보다 많이 사용되는 경우가 아니면 ). 둘째

: 일관된 섹션에 코드를 분해 아마도 기능이나 프로그래밍하고있는 모델의 주위에 ... 당신이 상대하고있는 몇 가지 일반적인 기능의 주위에 그 (것)들을 정리하려고합니다.

서버의 (기본) 모델은 뿐이며 것입니다. 소켓 연결을 수신합니다 ... 서버는 이러한 연결을 처리하기 위해 처리기를 사용해야하며 그게 전부입니다. 당신이 그 모델을 구축하려고하면 그것은 다음과 같이 보일 것입니다 :

class Server{ 
    private final ServerSocket serverSocket; 
    private final ExecutorService pool; 

    public Server(int port, int poolSize) throws IOException { 
     serverSocket = new ServerSocket(port); 
     pool = Executors.newFixedThreadPool(poolSize); 
    } 

    public void serve() { 
     try { 
     while(true) { 
      pool.execute(new Handler(serverSocket.accept())); 
     } 
     } catch (IOException ex) { 
     pool.shutdown(); 
     } 
    } 
    } 

    class Handler implements Runnable { 
    private final Socket socket; 
    Handler(Socket socket) { this.socket = socket; } 
    public void run() { 
     // receive the datagram packets 
    } 
} 

셋째 : 난 당신이 몇 가지 기존 예제를 살펴 것이 좋습니다 것입니다.

:
확인 라비, 코드 및 일부 일부 문제가 있습니다 부 번호 문제 :

  1. 나는 클라이언트가 Receive 클래스라고 가정하고 ... (메인 클래스를 사용하는) 별도의 프로그램으로 풀어서 서버와 여러 클라이언트를 동시에 실행해야합니다. 보내는 모든 새로운 UDP 패키지에 대해 서버에서 새로운 "클라이언트 스레드"를 생성하는 것은 불안한 생각입니다 ( 문제). 당신은 당 하나 개의 스레드를 필요로한다

    public class Client extends Thread 
    { 
        public Client(/*..*/) 
        { 
         // initialize your client 
        } 
    
        public void run() 
        { 
         while(true) 
         { 
          // receive UDP packets 
          // process the UDP packets 
         } 
        } 
    
        public static void main(String[] args) throws IOException 
        { 
         // start your client 
         new Client().start(); 
        } 
    } 
    
  2. : 당신이 당신의 클라이언트 응용 프로그램을 만들 때

  3. , 당신은 예를 들어 자신의 while 루프 (작은 문제)에 수신 코드를 실행해야한다 클라이언트 및 서버 당 하나의 스레드 (메인에는 자체 스레드가 있으므로 기술적으로는 별도의 스레드가 없기 때문에) ExecutorService이 유용하지 않을 수 있습니다.

그렇지 않으면 당신의 접근 방식은 정확 ...하지만 난 여전히는 예제의 일부를 확인하는 것이 좋습니다 것입니다.

+3

http://www.developer.com/ java/ent/article.php/3645111/Java-5s-BlockingQueue.htm - Doug Lea의 '단순한 서버' –

+0

@ 존 아, 네 ... 존, 고마워요. – Kiril

+0

+1 좋은 답변 !!!!! –

0

스레드 2 개가 정상입니다. 한 독자는 다른 작가입니다. UDP를 사용하면 새로운 처리기 스레드를 생성해서는 안된다는 점을 기억하십시오. (처리하는 데 오랜 시간이 걸리지 않는다면) 처리 대기열에 들어오는 메시지를 던지는 것이 좋습니다. send와 동일하게, 들어오는 Queue에서 UDP 전송을 차단하는 send 스레드를 갖습니다.

1

그것은 이클립스의 역사는 다시 그 덕분에 :) 하루에도 작동하는 좋은 일, 나는 라비 모두에게 작업 예제 및 Lirik 누설에 대한 그의 대답을 줄 수 있어요.

이 누출을 일으키는 원인이 무엇인지 알 수 없지만, 충분히 오래 방치하면 OutOfMemoryError에 실패합니다.

둘째, 내 UDP 서버의 작동 기본적인 예를 들어 라비에 대한 주석 작업 코드를 떠났다. 타임 아웃은 방화벽이 수신기의 종료 시간 (30 초)을 얼마나 오래 테스트하는지 테스트하는 것이 었습니다. 수영장에서 아무 것도 제거하면 갈 수 있습니다. 그래서 여기

는 작동하지만, UDP 서버 스레드 내 예를 들어 버전의 유출이다.

public class TestServer { 

private static Integer TIMEOUT = 30; 
private final static int MAX_BUFFER_SIZE = 8192; 
private final static int MAX_LISTENER_THREADS = 5; 
private final static SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-dd-MM HH:mm:ss.SSSZ"); 

private int mPort; 
private DatagramSocket mSocket; 

// You can remove this for a working version 
private ExecutorService mPool; 

public TestServer(int port) { 
    mPort = port; 
    try { 
     mSocket = new DatagramSocket(mPort); 
     mSocket.setReceiveBufferSize(MAX_BUFFER_SIZE); 
     mSocket.setSendBufferSize(MAX_BUFFER_SIZE); 
     mSocket.setSoTimeout(0); 

     // You can uncomment this for a working version 
     //for (int i = 0; i < MAX_LISTENER_THREADS; i++) { 
     // new Thread(new Listener(mSocket)).start(); 
     //} 

     // You can remove this for a working version 
     mPool = Executors.newFixedThreadPool(MAX_LISTENER_THREADS); 

    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

// You can remove this for a working version 
public void start() { 
    try { 
     try { 
      while (true) { 
       mPool.execute(new Listener(mSocket)); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } finally { 
     mPool.shutdown(); 
    } 
} 

private class Listener implements Runnable { 

    private final DatagramSocket socket; 

    public Listener(DatagramSocket serverSocket) { 
     socket = serverSocket; 
    } 

    private String readLn(DatagramPacket packet) throws IOException { 
     socket.receive(packet); 
     return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(packet.getData())), MAX_BUFFER_SIZE).readLine(); 
    } 

    private void writeLn(DatagramPacket packet, String string) throws IOException { 
     packet.setData(string.concat("\r\n").getBytes()); 
     socket.send(packet); 
    } 

    @Override 
    public void run() { 
     DatagramPacket packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE); 
     String s; 
     while (true) { 
      try { 
       packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE); 
       s = readLn(packet); 
       System.out.println(DateFormat.format(new Date()) + " Received: " + s); 
       Thread.sleep(TIMEOUT * 1000); 
       writeLn(packet, s); 
       System.out.println(DateFormat.format(new Date()) + " Sent: " + s); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

public static void main(String[] args) { 
    if (args.length == 1) { 
     try { 
      TIMEOUT = Integer.parseInt(args[0]); 
     } catch (Exception e) { 
      TIMEOUT = 30; 
     } 
    } 
    System.out.println(DateFormat.format(new Date()) + " Timeout: " + TIMEOUT); 
    //new TestServer(4444); 
    new TestServer(4444).start(); 
} 
} 

btw. @Lirik, Eclipse에서이 동작을 처음 목격했습니다. 그 후 명령 줄에서 테스트했습니다. 그리고 다시, 나는 그것이 무엇을 일으키는 지 전혀 모른다;) 미안 ...