2014-05-13 3 views
6

포트에서 데이터를 읽을 때 스톰 스파우트를 작성해야합니다. 논리적으로 가능한지 알고 싶었습니다.폭풍 : 포트에서 데이터를 읽는 데 사용되는 스파우트

마음에, 나는 하나의 주둥이와 하나의 볼트로 똑같이 설계된 간단한 토폴로지를 설계했습니다. 스파우트는 wget을 사용하여 전송 된 HTTP 요청을 수집하며 볼트는 요청을 표시합니다.

public class ProxySpout extends BaseRichSpout{ 
     //The O/P collector 
     SpoutOutputCollector sc; 
     //The socket 
     Socket clientSocket; 
     //The server socket 
     ServerSocket sc; 

     public ProxySpout(int port){ 
      this.sc=new ServerSocket(port); 
      try{ 
       clientSocket=sc.accept(); 
      }catch(IOException ex){ 
       //Handle it 
      } 
     } 

     public void nextTuple(){ 
      try{ 
       InputStream ic=clientSocket.getInputStream(); 
       byte b=new byte[8196]; 
       int len=ic.read(b); 

       sc.emit(new Values(b)); 
       ic.close(); 
      }catch(//){ 
       //Handle it 
      }finally{ 
       clientSocket.close(); 
      } 
     } 
} 

내가 너무 방법의 나머지 부분을 구현 한 다음과 같이

내 주둥이 구조이다.

java.lang.RuntimeException가 : java.io.NotSerializableException : java.net.Socket의

나는 첫 번째 요청을 보낼 때 오류가 발생하는, 토폴로지에이 전원을 켜고 실행하면

이 스파우트를 구현하는 방식에 문제가 있는지 알아야합니다. 스파우트가 포트에서 데이터를 수집 할 수 있습니까? 또는 스파우트가 프록시의 인스턴스로 작동하도록하려면?

편집 작업을 얻었다.

코드는 다음과 같습니다

public class ProxySpout extends BaseRichSpout{ 
     //The O/P collector 
     static SpoutOutputCollector _collector; 
     //The socket 
     static Socket _clientSocket; 
     static ServerSocket _serverSocket; 
     static int _port; 

     public ProxySpout(int port){ 
      _port=port; 
     } 

     public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){ 
      _collector=collector; 
      _serverSocket=new ServerSocket(_port); 
     } 

     public void nextTuple(){ 
      _clientSocket=_serverSocket.accept(); 
      InputStream incomingIS=_clientSocket.getInputStream(); 
      byte[] b=new byte[8196]; 
      int len=b.incomingIS.read(b); 
      _collector.emit(new Values(b)); 
    } 
} 

쇼의 제안 @를 당으로 요청을 청취 nextTuple() 방법에 open() 방법 _serverSocket_clientSocket 실행을 초기화했습니다.

이 하나의 성능 metrices을 몰라,하지만 그것은 단지 변수를 지정 생성자에서 .. :-)

답변

6

작동합니다. 준비 메서드에서 ServerSocket을 인스턴스화하려고 시도하고 생성자에 새로운 ...을 쓰지 마십시오. 변수의 이름을 변경하면 두 개의 sc 변수가 생깁니다.

public class ProxySpout extends BaseRichSpout{ 

    int port; 

    public ProxySpout(int port){ 
     this.port=port; 
    } 

    @Override 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     //new ServerSocket 
    } 

    @Override 
    public void nextTuple() { 

    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 

    } 
} 

당신이에서 다음 만 주둥이가 이미 배포 된 후에 호출됩니다 방법을 준비 넣어, 그래서 그것이 한 번만 주둥이의 수명마다 호출됩니다 직렬화해야하고,하지 않을 경우, 그래서 비효율적이지 않습니다.

+0

그럼, 가능할까요? 스파우트 기능을 프록시처럼 만들기 위해서? –

+4

네,하지만 작은 시간마다 nextTuple()이 호출됩니다. 스파우트가 아무것도받지 못하면 오류를 관리해야합니다. – gasparms

관련 문제