2012-02-24 2 views
2

org.fuseource.mqtt (mqtt-client-1.0-20120208.162159-18-uber)를 사용 중이고 비 차단 예제를 기반으로 Java에서 리스너를 작성했습니다.Java의 MQTT 클라이언트 - 스레드에서 내 리스너 시작하기

리스너 클래스를 다음과 같이 사용합니다. 리스너 mqList = 새 리스너 ("tcp : // localhost : 1883", "mytopic/#", "c : /test.log", true);
새 스레드 (mqList) .start();

완벽하게 작동합니다. 두 개의 인스턴스/스레드를 생성하면 충돌이 발생하고 연결/연결 끊기 메시지가 표시됩니다.

Listener mqList = new Listener("tcp://localhost:1883", "mytopic/#", "c:/test.log", true);  
new Thread(mqList).start();         

Listener mqList1 = new Listener("tcp://localhost:1883", "mytopic1/#", "c:/test1.log", true);   
new Thread(mqList1).start(); 

내 리스너 클래스는 매우 간단하고 나는이 여러 스레드에서 작동하지 않는 이유를 의아해입니다 : 여기

이 실패 사용합니다. 어떤 아이디어/힌트?

import org.fusesource.hawtbuf.Buffer; 
import org.fusesource.hawtbuf.UTF8Buffer; 
import org.fusesource.mqtt.client.*; 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.concurrent.CountDownLatch; 
import java.util.logging.*; 
import java.io.*; 
import java.net.URISyntaxException; 

public class Listener implements Runnable{ 
    private static final long DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS = 5000; 
    private static final long DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS = 3600 * 3; 

    private long listenerSleepBeforeReAttemptInSeconds; 
    private long listenerMaxReAttemptDurationInSeconds;  
    private MQTT mqtt; 

    private ArrayList<Topic> topics; 
    private boolean listenerDebug; 
    private String listenerHostURI; 
    private String listenerTopic; 
    private String listenerLogFile; 
    private long listenerLastSuccessfulSubscription; 

    private Logger fLogger; 
    private String NEW_LINE = System.getProperty("line.separator"); 

    public Listener(String listenerHostURI, String listenerTopic, String logFile, boolean debug) { 
     this(listenerHostURI, listenerTopic, logFile, DEFAULT_SLEEP_BEFORE_RE_ATTEMPT_IN_SECONDS, DEFAULT_MAX_RE_ATTEMPT_DURATION_IN_SECONDS, debug); 
    } 

    public Listener(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) { 
     init(listenerHostURI, listenerTopic, logFile, listenerSleepBeforeReAttemptInSeconds, listenerMaxReAttemptDurationInSeconds, debug); 
    } 

    private void init(String listenerHostURI, String listenerTopic, String logFile, long listenerSleepBeforeReAttemptInSeconds, long listenerMaxReAttemptDurationInSeconds, boolean debug) {   
     this.listenerHostURI = listenerHostURI; 
     this.listenerTopic = listenerTopic; 
     this.listenerLogFile = logFile; 
     this.listenerSleepBeforeReAttemptInSeconds = listenerSleepBeforeReAttemptInSeconds; 
     this.listenerMaxReAttemptDurationInSeconds = listenerMaxReAttemptDurationInSeconds; 
     this.listenerDebug = debug; 
     initMQTT(); 
    } 

    private void initMQTT() { 
     mqtt = new MQTT(); 
     listenerLastSuccessfulSubscription = System.currentTimeMillis(); 

     try { 
      fLogger = Logger.getLogger("eTactica.mqtt.listener"); 
      FileHandler handler = new FileHandler(listenerLogFile); 
      fLogger.addHandler(handler); 
     } catch (IOException e) { 
      System.out.println("Logger - Failed"); 
     }      

     try { 
      mqtt.setHost(listenerHostURI); 
     } catch (URISyntaxException e) { 
      stderr("setHost failed: " + e); 
      stderr(e); 
     }  
     QoS qos = QoS.AT_MOST_ONCE; 
     topics = new ArrayList<Topic>(); 
     topics.add(new Topic(listenerTopic, qos));    
    } 

    private void stdout(String x) { 
     if (listenerDebug) { 
      fLogger.log(Level.INFO, x + NEW_LINE); 
     } 
    } 

    private void stderr(String x) { 
     if (listenerDebug) { 
      fLogger.log(Level.SEVERE, x + NEW_LINE); 
     } 
    } 

    private void stderr(Throwable e) { 
     if (listenerDebug) {    
      StringWriter sw = new StringWriter(); 
      PrintWriter pw = new PrintWriter(sw); 
      e.printStackTrace(pw); 

      fLogger.log(Level.SEVERE, sw.toString() + NEW_LINE); 
     } 
    } 

    private void subscriptionSuccessful() { 
     listenerLastSuccessfulSubscription = System.currentTimeMillis();  
    }  

    private boolean tryToListen() {    
     return ((System.currentTimeMillis() - listenerLastSuccessfulSubscription) < listenerMaxReAttemptDurationInSeconds * 1000); 
    } 

    private void sleepBeforeReAttempt() throws InterruptedException {  
     stdout(String.format(("Listener stopped, re-attempt in %s seconds."), listenerSleepBeforeReAttemptInSeconds)); 
     Thread.sleep(listenerSleepBeforeReAttemptInSeconds); 
    } 

    private void listenerReAttemptsOver() { 
     stdout(String.format(("Listener stopped since reattempts have failed for %s seconds."), listenerMaxReAttemptDurationInSeconds));   
    } 

    private void listen() { 
     final CallbackConnection connection = mqtt.callbackConnection(); 
     final CountDownLatch done = new CountDownLatch(1); 



     /* Runtime.getRuntime().addShutdownHook(new Thread(){ 
      @Override 
      public void run() { 
       setName("MQTT client shutdown"); 
       stderr("Disconnecting the client."); 

       connection.getDispatchQueue().execute(new Runnable() { 
        public void run() { 
         connection.disconnect(new Callback<Void>() { 
          public void onSuccess(Void value) { 
           stdout("Disconnecting onSuccess."); 
           done.countDown(); 
          } 
          public void onFailure(Throwable value) { 
           stderr("Disconnecting onFailure: " + value); 
           stderr(value); 
           done.countDown(); 
          } 
         }); 
        } 
       }); 
      } 
     }); 
     */ 

     connection.listener(new org.fusesource.mqtt.client.Listener() { 

      public void onConnected() { 
       stdout("Listener onConnected");     
      } 

      public void onDisconnected() { 
       stdout("Listener onDisconnected"); 
      } 

      public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) { 
       stdout(topic + " --> " + body.toString());      
       ack.run(); 
      } 

      public void onFailure(Throwable value) { 
       stdout("Listener onFailure: " + value);        
       stderr(value); 
       done.countDown(); 
      } 
     }); 

     connection.resume(); 

     connection.connect(new Callback<Void>() { 
      public void onFailure(Throwable value) { 
       stderr("Connect onFailure...: " + value);       
       stderr(value); 
       done.countDown();     
      } 

      public void onSuccess(Void value) { 
       final Topic[] ta = topics.toArray(new Topic[topics.size()]); 
       connection.subscribe(ta, new Callback<byte[]>() { 
        public void onSuccess(byte[] value) { 
         for (int i = 0; i < value.length; i++) { 
          stdout("Subscribed to Topic: " + ta[i].name() + " with QoS: " + QoS.values()[value[i]]); 
         } 
         subscriptionSuccessful(); 
        } 
        public void onFailure(Throwable value) { 
         stderr("Subscribe failed: " + value);       
         stderr(value); 
         done.countDown(); 
        } 
       }); 
      } 
     }); 

     try { 
      done.await(); 
     } catch (Exception e) { 
      stderr(e); 
     } 
    } 

    @Override 
    public void run() { 
     while (tryToListen()) { 
      initMQTT(); 
      listen(); 
      try { 
       sleepBeforeReAttempt(); 
      } catch (InterruptedException e) { 
       stderr("Sleep failed:" + e); 
       stderr(e); 
      } 
     } 

     listenerReAttemptsOver();  
    } 

} 

답변

0

TCP 포트는 하나의 수신기를 가질 수 있습니다

여기 내 클래스 정의입니다. "tcp : // localhost : 1883"의 숫자는 각 수신기마다 고유해야합니다. 어딘가, 아마도 (이 특정 API에 익숙하지 않다면) 포트 번호를 가진 클라이언트를 시작했을 것입니다. 번호는 클라이언트와 서버 사이에서 일치해야합니다.

+0

이 응용 프로그램은 클라이언트이며 서버를 수신 대기 중입니다. 여러 클라이언트/스레드가이 방법으로 연결할 수 있어야합니다. 동일한 워크 스테이션의 별도의 DOS 창에서 mosquitto_sub를 실행할 수 있습니다. – Gummi

관련 문제